Source code for workbench.workers.vt_query

''' VTQuery worker '''
import os
import requests
import collections
import ConfigParser
import pprint

class VTQuery(object):
[docs] ''' This worker query Virus Total, an apikey needs to be provided ''' dependencies = ['meta'] def __init__(self): ''' VTQuery Init''' # Grab API key from configuration file config_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), '../server/config.ini') conf = ConfigParser.ConfigParser() self.apikey = conf.get('workbench', 'vt_apikey') # Make sure key isn't the dummy value if self.apikey == '123': raise RuntimeError('VTQuery: Invalid api_key, put your VT api key in the config.ini file.') # Change this if you want these fields self.exclude = ['scan_id', 'md5', 'sha1', 'sha256', 'resource', 'response_code', 'permalink', 'verbose_msg', 'scans'] def execute(self, input_data):
[docs] ''' Execute the VTQuery worker ''' md5 = input_data['meta']['md5'] response = requests.get('', params={'apikey':self.apikey,'resource':md5, 'allinfo':1}) # Make sure we got a json blob back try: vt_output = response.json() except ValueError: return {'vt_error': 'VirusTotal Query Error, no valid response... past per min quota?'} # Just pull some of the fields output = {field:vt_output[field] for field in vt_output.keys() if field not in self.exclude} # Check for not-found not_found = False if output else True # Add in file_type output['file_type'] = input_data['meta']['file_type'] # Toss back a not found if not_found: output['not_found'] = True return output # Organize the scans fields scan_results = collections.Counter() for scan in vt_output['scans'].values(): if 'result' in scan: if scan['result']: scan_results[scan['result']] += 1 output['scan_results'] = scan_results.most_common(5) return output # Unit test: Create the class, the proper input and run the execute() method for a test def test():
[docs] ''' -- test -- ''' # This worker test requires a local server running import zerorpc workbench = zerorpc.Client(timeout=300, heartbeat=60) workbench.connect("tcp://") # Generate input for the worker data_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), '../data/pdf/bad/067b3929f096768e864f6a04f04d4e54') md5 = workbench.store_sample(open(data_path, 'rb').read(), 'bad_pdf', 'pdf') input_data = workbench.work_request('meta', md5) # Execute the worker (unit test) worker = VTQuery() output = worker.execute(input_data) print '\n<<< Unit Test >>>' pprint.pprint(output) # Execute the worker (server test) output = workbench.work_request('vt_query', md5) print '\n<<< Server Test >>>' pprint.pprint(output) if __name__ == "__main__":