Source code for workbench.workers.mem_procdump
''' Memory Image ProcDump worker. This worker utilizes the Rekall Memory Forensic Framework.
See Google Github: http://github.com/google/rekall
All credit for good stuff goes to them, all credit for bad stuff goes to us. :)
'''
from rekall_adapter.rekall_adapter import RekallAdapter
import zerorpc
import contextlib
import tempfile
import shutil
import glob, os
import hashlib
class MemoryImageProcDump(object):
[docs] ''' This worker dumps process pe files from memory image files. '''
dependencies = ['sample']
def __init__(self):
''' Initialization '''
self.output = {}
self.plugin_name = 'procdump'
self.orig_dir = os.getcwd()
# Spin up workbench connection
self.c = zerorpc.Client(timeout=300, heartbeat=60)
self.c.connect("tcp://127.0.0.1:4242")
def execute(self, input_data):
[docs] ''' Execute method '''
# Grab the raw bytes of the sample
raw_bytes = input_data['sample']['raw_bytes']
# Spin up the rekall adapter
adapter = RekallAdapter(raw_bytes)
session = adapter.get_session()
renderer = adapter.get_renderer()
# Create a temporary directory
with self.make_temp_directory() as temp_dir:
os.chdir(temp_dir)
# Here we can grab any plugin
try:
plugin = session.plugins.procdump(dump_dir=temp_dir)
except KeyError:
print 'Could not load the %s Rekall Plugin.. Failing with Error.' % self.plugin_name
return {'Error': 'Could not load the %s Rekall Plugin' % self.plugin_name}
# Render the plugin and grab all the dumped files
renderer.render(plugin)
# Scrape any extracted files
print 'mem_procdump: Scraping dumped files...'
self.output['dumped_files'] = []
for output_file in glob.glob('*'):
# Store the output into workbench, put md5s in the 'dumped_files' field
output_name = os.path.basename(output_file)
output_name = output_name.replace('executable.', '')
with open(output_file, 'rb') as dumped_file:
raw_bytes = dumped_file.read()
md5 = self.c.store_sample(output_name, raw_bytes, 'exe')
# Remove some columns from meta data
meta = self.c.work_request('meta', md5)['meta']
del meta['customer']
del meta['encoding']
del meta['import_time']
del meta['mime_type']
self.output['dumped_files'].append(meta)
# Organize the output a bit
self.output['tables'] = ['dumped_files']
return self.output
@contextlib.contextmanager
def make_temp_directory(self):
[docs] temp_dir = tempfile.mkdtemp()
try:
yield temp_dir
finally:
# Remove the directory/files
shutil.rmtree(temp_dir)
# Change back to original directory
os.chdir(self.orig_dir)
def __del__(self):
[docs] ''' Class Cleanup '''
# Close zeroRPC client
self.c.close()
# Unit test: Create the class, the proper input and run the execute() method for a test
import pytest
@pytest.mark.rekall
def test():
''' mem_procdump.py: Test '''
# This worker test requires a local server running
import zerorpc
workbench = zerorpc.Client(timeout=300, heartbeat=60)
workbench.connect("tcp://127.0.0.1:4242")
# Store the sample
data_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), '../data/memory_images/exemplar4.vmem')
with open(data_path, 'rb') as mem_file:
raw_bytes = mem_file.read()
md5 = hashlib.md5(raw_bytes).hexdigest()
if not workbench.has_sample(md5):
md5 = workbench.store_sample('exemplar4.vmem', open(data_path, 'rb').read(), 'mem')
# Execute the worker (unit test)
worker = MemoryImageProcDump()
output = worker.execute({'sample':{'raw_bytes':raw_bytes}})
print '\n<<< Unit Test >>>'
import pprint
pprint.pprint(output)
assert 'Error' not in output
# Execute the worker (server test)
output = workbench.work_request('mem_procdump', md5)
print '\n<<< Server Test >>>'
import pprint
pprint.pprint(output)
assert 'Error' not in output
if __name__ == "__main__":
test()