Source code for workbench.workers.pe_peid

''' PE peid worker, uses the peid_userdb.txt database of signatures '''
import peutils
import pefile
import pkg_resources
import pprint

# Fixme: We want to load this once per module load
PEID_SIGS = pkg_resources.resource_string(__name__, 'peid_userdb.txt')

[docs]class PEIDWorker(object): ''' This worker looks up pe_id signatures for a PE file. ''' dependencies = ['sample'] def __init__(self): self.peid_sigs = PEID_SIGS
[docs] def execute(self, input_data): ''' Execute the PEIDWorker ''' raw_bytes = input_data['sample']['raw_bytes'] # Have the PE File module process the file try: pefile_handle = pefile.PE(data=raw_bytes, fast_load=False) except (AttributeError, pefile.PEFormatError), error: return {'error': str(error), 'match_list': []} # Now get information from PEID module peid_match = self.peid_features(pefile_handle) return {'match_list': peid_match}
[docs] def peid_features(self, pefile_handle): ''' Get features from PEid signature database''' signatures = peutils.SignatureDatabase(data = self.peid_sigs) peid_match = signatures.match(pefile_handle) return peid_match if peid_match else [] # Unit test: Create the class, the proper input and run the execute() method for a test
[docs]def test(): ''' pe_peid.py: Unit test''' # This worker test requires a local server running import zerorpc workbench = zerorpc.Client(timeout=300, heartbeat=60) workbench.connect("tcp://127.0.0.1:4242") # Generate input for the worker import os data_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), '../data/pe/bad/033d91aae8ad29ed9fbb858179271232') md5 = workbench.store_sample('bad', open(data_path, 'rb').read(), 'exe') input_data = workbench.get_sample(md5) # Execute the worker (unit test) worker = PEIDWorker() output = worker.execute(input_data) print '\n<<< Unit Test >>>' pprint.pprint(output) # Execute the worker (server test) output = workbench.work_request('pe_peid', md5) print '\n<<< Server Test >>>' pprint.pprint(output)
if __name__ == "__main__": test()