Source code for workbench.workers.pe_deep_sim

''' PE SSDeep Similarity worker '''

import os
import ssdeep as ssd
import zerorpc
import pprint
from operator import itemgetter

class PEDeepSim(object):
[docs] ''' This worker computes fuzzy matches between samples with ssdeep ''' dependencies = ['meta_deep'] def __init__(self): self.workbench = zerorpc.Client(timeout=300, heartbeat=60) self.workbench.connect("tcp://") def execute(self, input_data):
[docs] ''' Execute method ''' my_ssdeep = input_data['meta_deep']['ssdeep'] my_md5 = input_data['meta_deep']['md5'] # For every PE sample in the database compute my ssdeep fuzzy match sample_set = self.workbench.generate_sample_set('exe') results = self.workbench.set_work_request('meta_deep', sample_set, ['md5','ssdeep']) sim_list = [] for result in results: if result['md5'] != my_md5: sim_list.append({'md5':result['md5'], 'sim', result['ssdeep'])}) # Sort and return the sim_list (with some logic for threshold) sim_list.sort(key=itemgetter('sim'), reverse=True) output_list = [sim for sim in sim_list if sim['sim'] > 0] return {'md5': my_md5, 'sim_list':output_list} def __del__(self):
[docs] ''' Class Cleanup ''' # Close zeroRPC client self.workbench.close() # Unit test: Create the class, the proper input and run the execute() method for a test def test():
[docs] ''' Unit test ''' # This worker test requires a local server running workbench = zerorpc.Client(timeout=300, heartbeat=60) workbench.connect("tcp://") # Generate input for the worker data_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), '../data/pe/bad/033d91aae8ad29ed9fbb858179271232') md5 = workbench.store_sample(open(data_path, 'rb').read(), 'bad_pe', 'exe') input_data = workbench.work_request('meta_deep', md5) # Execute the worker (unit test) worker = PEDeepSim() output = worker.execute(input_data) print '\n<<< Unit Test >>>' pprint.pprint(output) # Execute the worker (server test) output = workbench.work_request('pe_deep_sim', md5) print '\n<<< Server Test >>>' pprint.pprint(output) if __name__ == "__main__":