Source code for workbench.workers.yara_sigs

''' Yara worker '''
import os
import yara
import pprint


class YaraSigs(object):
[docs] ''' This worker check for matches against yara sigs. Output keys: [matches:list of matches] ''' dependencies = ['sample'] def __init__(self): self.rules = self.get_yara_rules() def get_yara_rules(self):
[docs] ''' Recursively traverse the yara/rules directory for rules ''' # Try to find the yara rules directory relative to the worker my_dir = os.path.dirname(os.path.realpath(__file__)) yara_rule_path = os.path.join(my_dir, 'yara/rules') if not os.path.exists(yara_rule_path): raise RuntimeError('yara could not find yara rules directory under: %s' % my_dir) # Okay load in all the rules under the yara rule path self.rules = yara.load_rules(rules_rootpath=yara_rule_path) return self.rules def execute(self, input_data):
[docs] ''' yara worker execute method ''' raw_bytes = input_data['sample']['raw_bytes'] matches = self.rules.match_data(raw_bytes) return {'matches': matches} # Unit test: Create the class, the proper input and run the execute() method for a test def test():
[docs] ''' yara.py: Unit test''' # This worker test requires a local server running import zerorpc workbench = zerorpc.Client(timeout=300, heartbeat=60) workbench.connect("tcp://127.0.0.1:4242") # Store all the files in directory and make an md5 list data_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)),'../data/pe/bad') file_list = [os.path.join(data_dir, child) for child in os.listdir(data_dir)] md5_list = [] for filename in file_list: # Skip OS generated files if '.DS_Store' in filename: continue with open(filename,'rb') as pe_file: base_name = os.path.basename(filename) md5_list.append(workbench.store_sample(base_name, pe_file.read(), 'exe')) # Store the md5 list on the server as a sample set workbench.store_sample_set(md5_list) # Grab one of the sample for input to the local unit test input_data = workbench.get_sample(md5_list[0]) # Execute the worker (unit test) worker = YaraSigs() output = worker.execute(input_data) print '\n<<< Unit Test >>>' pprint.pprint(output) # Execute the worker (server test) output = workbench.batch_work_request('yara_sigs', {'md5_list': md5_list}) get_all_output = list(output) print '\n<<< Server Test >>>' pprint.pprint(get_all_output) if __name__ == "__main__":
test()