workbench.workers package

Submodules

workbench.workers.evel_knievel_all module

EvelKnievelAll worker

class workbench.workers.evel_knievel_all.EvelKnievelAll[source]

Bases: object

This worker depends on two workers that throw TypeError and KeyError Exceptions. Good test case as the dependencies will sometimes both work, randomly fail individually and sometimes both of the them will fail, it’s a fail fest!

Initialization

dependencies = ['evel_knievel_key', 'evel_knievel_type']
execute(input_data)[source]

This worker depends on two workers that throw TypeError and KeyError Exceptions

workbench.workers.evel_knievel_key module

EvelKnievelKey worker

class workbench.workers.evel_knievel_key.EvelKnievelKey[source]

Bases: object

This worker pseudo-randomly throws a KeyError Exception. The pseudo-random part is that the logic is deterministic given a pile of md5s about 8% will fail but it will always be the same ones

Initialization

dependencies = ['meta']
execute(input_data)[source]

This worker pseudo-randomly throws a KeyError Exception.

workbench.workers.evel_knievel_type module

EvelKnievelType worker

class workbench.workers.evel_knievel_type.EvelKnievelType[source]

Bases: object

This worker pseudo-randomly throws a TypeError Exception. The pseudo-random part is that the logic is deterministic given a pile of md5s about 7% will fail but it will always be the same ones

Initialization

dependencies = ['meta']
execute(input_data)[source]

This worker pseudo-randomly throws a TypeError Exception.

workbench.workers.help_base module

HelpBase worker

class workbench.workers.help_base.HelpBase[source]

Bases: object

This worker computes help for any ‘info’ object

dependencies = ['info']
execute(input_data)[source]

Info objects all have a type_tag of (‘help’,’worker’,’command’, or ‘other’)

workbench.workers.help_base.test()[source]

help.py: Unit test

workbench.workers.help_formatter module

HelpFormatter worker

class workbench.workers.help_formatter.HelpFormatter[source]

Bases: object

This worker does CLI formatting and coloring for any help object

dependencies = ['help_base']
execute(input_data)[source]

Do CLI formatting and coloring based on the type_tag

workbench.workers.help_formatter.test()[source]

help_formatter.py: Unit test

workbench.workers.json_meta module

JSON Meta worker

class workbench.workers.json_meta.JSONMetaData[source]

Bases: object

This worker computes meta-data for json files.

Initialization

dependencies = ['sample', 'meta']
execute(input_data)[source]
workbench.workers.json_meta.test()[source]

json_meta.py: Test

workbench.workers.log_meta module

Logfile Meta worker

class workbench.workers.log_meta.LogMetaData[source]

Bases: object

This worker computes a meta-data for log files.

Initialization

dependencies = ['sample', 'meta']
execute(input_data)[source]
workbench.workers.log_meta.test()[source]

log_meta.py: Unit test

workbench.workers.mem_connscan module

workbench.workers.mem_dlllist module

workbench.workers.mem_meta module

workbench.workers.mem_procdump module

workbench.workers.mem_pslist module

workbench.workers.meta module

Meta worker

class workbench.workers.meta.MetaData[source]

Bases: object

This worker computes meta data for any file type.

Initialization

dependencies = ['sample', 'tags']
execute(input_data)[source]

This worker computes meta data for any file type.

workbench.workers.meta.test()[source]

meta.py: Unit test

workbench.workers.meta_deep module

MetaDeep worker

class workbench.workers.meta_deep.MetaDeepData[source]

Bases: object

This worker computes deeper meta-data

Initialization

dependencies = ['sample', 'meta']
execute(input_data)[source]
workbench.workers.meta_deep.test()[source]

meta_deep.py: Unit test

workbench.workers.pcap_bro module

PcapBro worker

workbench.workers.pcap_bro.gsleep()[source]

Convenience method for gevent.sleep

class workbench.workers.pcap_bro.PcapBro[source]

Bases: object

This worker runs Bro scripts on a pcap file

dependencies = ['sample']
sample_set_input = True
setup_pcap_inputs(input_data)[source]

Write the PCAPs to disk for Bro to process and return the pcap filenames

execute(input_data)[source]

Execute

subprocess_manager(exec_args)[source]

Bro subprocess manager

goto_temp_directory(*args, **kwds)[source]
__del__()[source]

Class Cleanup

workbench.workers.pcap_bro.test()[source]

pcap_bro.py: Unit test

workbench.workers.pcap_graph module

pcap_graph worker

workbench.workers.pcap_graph.gsleep()[source]

Convenience method for gevent.sleep

class workbench.workers.pcap_graph.PcapGraph[source]

Bases: object

This worker generates a graph from a PCAP (depends on Bro)

Initialization

dependencies = ['pcap_bro']
add_node(node_id, name, labels)[source]

Cache aware add_node

add_rel(source_id, target_id, rel)[source]

Cache aware add_rel

execute(input_data)[source]

Okay this worker is going build graphs from PCAP Bro output logs

conn_log_graph(stream)[source]

Build up a graph (nodes and edges from a Bro conn.log)

http_log_graph(stream)[source]

Build up a graph (nodes and edges from a Bro http.log)

dns_log_graph(stream)[source]

Build up a graph (nodes and edges from a Bro dns.log)

weird_log_graph(stream)[source]

Build up a graph (nodes and edges from a Bro weird.log)

files_log_graph(stream)[source]

Build up a graph (nodes and edges from a Bro files.log)

__del__()[source]

Class Cleanup

workbench.workers.pcap_graph.test()[source]

pcap_graph.py: Unit test

workbench.workers.pcap_http_graph module

pcap_http_graph worker

workbench.workers.pcap_http_graph.gsleep()[source]

Convenience method for gevent.sleep

class workbench.workers.pcap_http_graph.PcapHTTPGraph[source]

Bases: object

This worker generates a graph from a PCAP (depends on Bro)

Initialization

dependencies = ['pcap_bro']
add_node(node_id, name, labels)[source]

Cache aware add_node

add_rel(source_id, target_id, rel)[source]

Cache aware add_rel

execute(input_data)[source]

Okay this worker is going build graphs from PCAP Bro output logs

http_log_graph(stream)[source]

Build up a graph (nodes and edges from a Bro http.log)

weird_log_graph(stream)[source]

Build up a graph (nodes and edges from a Bro weird.log)

files_log_graph(stream)[source]

Build up a graph (nodes and edges from a Bro dns.log)

__del__()[source]

Class Cleanup

workbench.workers.pcap_http_graph.test()[source]

pcap_http_graph.py: Unit test

workbench.workers.pe_classifier module

PEClassifier worker (just a placeholder, not a real classifier at this point)

class workbench.workers.pe_classifier.PEClassifier[source]

Bases: object

This worker classifies PEFiles as Evil or AOK (TOY not a real classifier at this point)

Initialization

dependencies = ['pe_features', 'pe_indicators']
execute(input_data)[source]

This worker classifies PEFiles as Evil or AOK (TOY not a real classifier at this point)

workbench.workers.pe_classifier.test()[source]

pe_classifier.py: Unit test

workbench.workers.pe_deep_sim module

PE SSDeep Similarity worker

class workbench.workers.pe_deep_sim.PEDeepSim[source]

Bases: object

This worker computes fuzzy matches between samples with ssdeep

dependencies = ['meta_deep']
execute(input_data)[source]

Execute method

__del__()[source]

Class Cleanup

workbench.workers.pe_deep_sim.test()[source]

pe_deep_sim.py: Unit test

workbench.workers.pe_features module

PE Features worker. This class pulls static features out of a PE file using the python pefile module.

class workbench.workers.pe_features.PEFileWorker(verbose=False)[source]

Bases: object

Create instance of PEFileWorker class. This class pulls static features out of a PE file using the python pefile module.

Init method

dependencies = ['sample', 'tags']
execute(input_data)[source]

Process the input bytes with pefile

set_dense_features(dense_feature_list)[source]

Set the dense feature list that the Python pefile module should extract. This is really just sanity check functionality, meaning that these are the features you are expecting to get, and a warning will spit out if you don’t get some of these.

get_dense_features()[source]

Set the dense feature list that the Python pefile module should extract.

set_sparse_features(sparse_feature_list)[source]

Set the sparse feature list that the Python pefile module should extract. This is really just sanity check functionality, meaning that these are the features you are expecting to get, and a warning will spit out if you don’t get some of these.

get_sparse_features()[source]

Set the sparse feature list that the Python pefile module should extract.

static open_using_pefile(input_name, input_bytes)[source]

Open the PE File using the Python pefile module.

extract_features_using_pefile(pef)[source]

Process the PE File using the Python pefile module.

workbench.workers.pe_features.convert_to_utf8(string)[source]

Convert string to UTF8

workbench.workers.pe_features.convert_to_ascii_null_term(string)[source]

Convert string to Null terminated ascii

workbench.workers.pe_features.test()[source]

pe_features.py: Test

workbench.workers.pe_indicators module

This python class codifies a bunch of rules around suspicious static features in a PE File. The rules don’t indicate malicious behavior they simply flag things that may be used by a malicious binary. Many of the indicators used were inspired by the material in the ‘Practical Malware Analysis’ book by Sikorski and Honig, ISBN-13: 978-1593272906 (available on Amazon :)

Description:

PE_WARNINGS = PE module warnings verbatim MALFORMED = the PE file is malformed COMMUNICATION = network activities CREDENTIALS = activities associated with elevating or attaining new privileges KEYLOGGING = activities associated with keylogging SYSTEM_STATE = file system or registry activities SYSTEM_PROBE = getting information from the local system (file system, OS config) SYSTEM_INTEGRITY = compromises the security state of the local system PROCESS_MANIPULATION = indicators associated with process manipulation/injection PROCESS_SPAWN = indicators associated with creating a new process STEALTH_LOAD = indicators associated with loading libraries, resources, etc in a sneaky way ENCRYPTION = any indicators related to encryption COM_SERVICES = COM functionality or running as a service ANTI_DEBUG = anti-debugging indicators
class workbench.workers.pe_indicators.PEIndicators[source]

Bases: object

Create instance of Indicators class. This class uses the static features from the pefile module to look for weird stuff.

Note: All methods that start with ‘check’ will be automatically included as part of the checks that happen when ‘execute’ is called.

Init method of the Indicators class.

dependencies = ['sample']
execute(input_data)[source]

Execute the PEIndicators worker

check_corrupted_imports()[source]

Various ways the imports table might be corrupted.

check_checksum_is_zero()[source]

Checking for a checksum of zero

check_checksum_mismatch()[source]

Checking for a checksum that doesn’t match the generated checksum

check_empty_section_name()[source]

Checking for an empty section name

check_nonstandard_section_name()[source]

Checking for an non-standard section name

check_image_size_incorrect()[source]

Checking if the reported image size matches the actual image size

check_overlapping_headers()[source]

Checking if pefile module reported overlapping header

check_section_unaligned()[source]

Checking if any of the sections are unaligned

check_section_oversized()[source]

Checking if any of the sections go past the total size of the image

check_dll_with_no_exports()[source]

Checking if the PE is a DLL with no exports

check_communication_imports()[source]

Checking if the PE imports known communication methods

check_elevating_privs_imports()[source]

Checking if the PE imports known methods associated with elevating or attaining new privileges

check_keylogging_imports()[source]

Checking if the PE imports known methods associated with elevating or attaining new privileges

check_system_state_imports()[source]

Checking if the PE imports known methods associated with changing system state

check_system_probe_imports()[source]

Checking if the PE imports known methods associated with probing the system

check_system_integrity_imports()[source]

Checking if the PE imports known methods associated with system security or integrity

check_crypto_imports()[source]

Checking if the PE imports known methods associated with encryption

check_anti_debug_imports()[source]

Checking if the PE imports known methods associated with anti-debug

check_com_service_imports()[source]

Checking if the PE imports known methods associated with COM or services

check_process_manipulation()[source]

Checking if the PE imports known methods associated with process manipulation/injection

check_process_spawn()[source]

Checking if the PE imports known methods associated with spawning a new process

check_stealth_load()[source]

Checking if the PE imports known methods associated with loading libraries, resources, etc in a sneaky way

check_invalid_entry_point()[source]

Checking the PE File warning for an invalide entry point

check_exports()[source]

This is just a stub function right now, might be useful later

workbench.workers.pe_indicators.convert_to_ascii_null_term(string)[source]

Convert string to null terminated ascii string

workbench.workers.pe_indicators.test()[source]

pe_indicators.py: Unit test

workbench.workers.pe_peid module

PE peid worker, uses the peid_userdb.txt database of signatures

workbench.workers.pe_peid.get_peid_db()[source]

Grab the peid_userdb.txt file from local disk

class workbench.workers.pe_peid.PEIDWorker[source]

Bases: object

This worker looks up pe_id signatures for a PE file.

dependencies = ['sample']
execute(input_data)[source]

Execute the PEIDWorker

peid_features(pefile_handle)[source]

Get features from PEid signature database

workbench.workers.pe_peid.test()[source]

pe_peid.py: Unit test

workbench.workers.strings module

Strings worker

class workbench.workers.strings.Strings[source]

Bases: object

This worker extracts all the strings from any type of file

Initialize the Strings worker

dependencies = ['sample']
execute(input_data)[source]

Execute the Strings worker

workbench.workers.strings.test()[source]

strings.py: Unit test

workbench.workers.swf_meta module

SWFMeta worker: This is a stub the real class (under the experimental directory has too many dependencies)

class workbench.workers.swf_meta.SWFMeta[source]

Bases: object

This worker computes a bunch of meta-data about a SWF file

dependencies = ['sample', 'meta']
execute(input_data)[source]

Execute the SWFMeta worker

workbench.workers.swf_meta.test()[source]

swf_meta.py: Unit test

workbench.workers.unzip module

Unzip worker

class workbench.workers.unzip.Unzip[source]

Bases: object

This worker unzips a zipped file

dependencies = ['sample']
execute(input_data)[source]

Execute the Unzip worker

__del__()[source]

Class Cleanup

workbench.workers.unzip.test()[source]

unzip.py: Unit test

workbench.workers.url module

URLS worker: Tries to extract URL from strings output

class workbench.workers.url.URLS[source]

Bases: object

This worker looks for url patterns in strings output

Initialize the URL worker

dependencies = ['strings']
execute(input_data)[source]

Execute the URL worker

workbench.workers.url.test()[source]

url.py: Unit test

workbench.workers.view module

view worker

class workbench.workers.view.View[source]

Bases: object

View: Generates a view for any file type

dependencies = ['meta']
execute(input_data)[source]
__del__()[source]

Class Cleanup

workbench.workers.view.test()[source]

view.py: Unit test

workbench.workers.view_customer module

view_customer worker

class workbench.workers.view_customer.ViewCustomer[source]

Bases: object

ViewCustomer: Generates a customer usage view.

dependencies = ['meta']
execute(input_data)[source]

Execute Method

workbench.workers.view_customer.test()[source]

view_customer.py: Unit test

workbench.workers.view_deep module

view_deep worker

class workbench.workers.view_deep.ViewDeep[source]

Bases: object

ViewDeep: Generates a view_deep for any file type

dependencies = ['meta']
execute(input_data)[source]
__del__()[source]

Class Cleanup

workbench.workers.view_deep.test()[source]

view_deep.py: Unit test

workbench.workers.view_log_meta module

view_log_meta worker

class workbench.workers.view_log_meta.ViewLogMeta[source]

Bases: object

ViewLogMeta: Generates a view for meta data on the sample

dependencies = ['log_meta']
execute(input_data)[source]

Execute the ViewLogMeta worker

workbench.workers.view_log_meta.test()[source]

view_log_meta.py: Unit test

workbench.workers.view_memory module

view_memory worker

class workbench.workers.view_memory.ViewMemory[source]

Bases: object

ViewMemory: Generates a view for meta data on the sample

dependencies = ['mem_connscan', 'mem_meta', 'mem_procdump', 'mem_pslist']
execute(input_data)[source]

Execute the ViewMemory worker

static file_to_pid(filename)[source]
workbench.workers.view_memory.test()[source]

view_memory.py: Unit test

workbench.workers.view_memory_deep module

view_memory_deep worker

class workbench.workers.view_memory_deep.ViewMemoryDeep[source]

Bases: object

ViewMemoryDeep: Generates a view for meta data on the sample

dependencies = ['view_memory', 'mem_connscan', 'mem_meta', 'mem_procdump', 'mem_pslist']
execute(input_data)[source]

Execute the ViewMemoryDeep worker

workbench.workers.view_memory_deep.test()[source]

view_memory_deep.py: Unit test

workbench.workers.view_pcap module

view_pcap worker

class workbench.workers.view_pcap.ViewPcap[source]

Bases: object

ViewPcap: Generates a view for a pcap sample (depends on Bro)

dependencies = ['pcap_bro']
execute(input_data)[source]

Execute

__del__()[source]

Class Cleanup

workbench.workers.view_pcap.test()[source]

view_pcap.py: Unit test

workbench.workers.view_pcap_deep module

view_pcap_deep worker

class workbench.workers.view_pcap_deep.ViewPcapDeep[source]

Bases: object

ViewPcapDeep: Generates a view for a pcap sample (depends on Bro)

Initialization of ViewPcapDeep

dependencies = ['view_pcap']
execute(input_data)[source]

ViewPcapDeep execute method

__del__()[source]

Class Cleanup

workbench.workers.view_pcap_deep.test()[source]

view_pcap_deep.py: Unit test

workbench.workers.view_pdf module

view_pdf worker

class workbench.workers.view_pdf.ViewPDF[source]

Bases: object

ViewPDF: Generates a view for PDF files

dependencies = ['meta', 'strings']
execute(input_data)[source]

Execute the ViewPDF worker

workbench.workers.view_pdf.test()[source]

‘ view_pdf.py: Unit test

workbench.workers.view_pdf_deep module

view_pdf_deep worker

class workbench.workers.view_pdf_deep.ViewPDFDeep[source]

Bases: object

ViewPDFDeep: Generates a view for PDF files

dependencies = ['meta', 'strings']
execute(input_data)[source]

Execute the ViewPDFDeep worker

workbench.workers.view_pdf_deep.test()[source]

‘ view_pdf_deep.py: Unit test

workbench.workers.view_pe module

view_pe worker

class workbench.workers.view_pe.ViewPE[source]

Bases: object

Generates a high level summary view for PE files that incorporates a large set of workers

dependencies = ['meta', 'strings', 'pe_peid', 'pe_indicators', 'pe_classifier', 'yara_sigs']
execute(input_data)[source]

Execute the ViewPE worker

static safe_get(data, key_list)[source]

Safely access dictionary keys when plugin may have failed

workbench.workers.view_pe.test()[source]

view_pe.py: Unit test

workbench.workers.view_pe_deep module

view_pe_deep worker

class workbench.workers.view_pe_deep.ViewPEDeep[source]

Bases: object

Generates a high level summary view for PE files that incorporates a large set of workers

dependencies = ['view_pe', 'pe_indicators']
execute(input_data)[source]

Execute the ViewPEDeep worker

workbench.workers.view_pe_deep.test()[source]

view_pe_deep.py: Unit test

workbench.workers.view_swf module

view_swf worker

class workbench.workers.view_swf.ViewSWF[source]

Bases: object

ViewSWF: Generates a view for SWF files

dependencies = ['swf_meta', 'strings']
execute(input_data)[source]

Execute the ViewSWF worker

workbench.workers.view_swf.test()[source]

‘ view_swf.py: Unit test

workbench.workers.view_swf_deep module

view_swf_deep worker

class workbench.workers.view_swf_deep.ViewSWFDeep[source]

Bases: object

ViewSWFDeep: Generates a view for SWF files

dependencies = ['view_swf']
execute(input_data)[source]

Execute the ViewSWFDeep worker

workbench.workers.view_swf_deep.test()[source]

‘ view_swf_deep.py: Unit test

workbench.workers.view_zip module

view_zip worker

class workbench.workers.view_zip.ViewZip[source]

Bases: object

ViewZip: Generates a view for Zip files

dependencies = ['meta', 'unzip', 'yara_sigs']
execute(input_data)[source]

Execute the ViewZip worker

__del__()[source]

Class Cleanup

workbench.workers.view_zip.test()[source]

– view_zip.py test –

workbench.workers.view_zip_deep module

view_zip_deep worker

class workbench.workers.view_zip_deep.ViewZipDeep[source]

Bases: object

ViewZipDeep: Generates a view for Zip files

dependencies = ['view_zip']
execute(input_data)[source]

Execute the ViewZipDeep worker

__del__()[source]

Class Cleanup

workbench.workers.view_zip_deep.test()[source]

– view_zip_deep.py test –

workbench.workers.vt_query module

VTQuery worker

class workbench.workers.vt_query.VTQuery[source]

Bases: object

This worker query Virus Total, an apikey needs to be provided

VTQuery Init

dependencies = ['meta']
execute(input_data)[source]

Execute the VTQuery worker

workbench.workers.vt_query.test()[source]

– vt_query.py test –

workbench.workers.yara_sigs module

Yara worker

workbench.workers.yara_sigs.get_rules_from_disk()[source]

Recursively traverse the yara/rules directory for rules

class workbench.workers.yara_sigs.YaraSigs[source]

Bases: object

This worker check for matches against yara sigs. Output keys: [matches:list of matches]

dependencies = ['sample']
execute(input_data)[source]

yara worker execute method

workbench.workers.yara_sigs.test()[source]

yara_sigs.py: Unit test

Module contents

Workbench Workers