spamtrap_backend package
Subpackages
- spamtrap_backend.core package
- Subpackages
- Submodules
- spamtrap_backend.core.database module
DatabaseHandler
DatabaseHandler.collection_map
DatabaseHandler.connect_db()
DatabaseHandler.ensure_index()
DatabaseHandler.find_file_by_sha512()
DatabaseHandler.init_db()
DatabaseHandler.insert_dm()
DatabaseHandler.insert_file()
DatabaseHandler.insert_generic()
DatabaseHandler.insert_gridfs()
DatabaseHandler.insert_network_entity()
DatabaseHandler.insert_url()
DatabaseHandler.is_database_up()
DatabaseHandler.retrieve_file()
- spamtrap_backend.core.mediator module
- spamtrap_backend.core.message_ingestor module
- Module contents
Submodules
spamtrap_backend.datamodels module
- class spamtrap_backend.datamodels.Address(address: str, domain: str = None, top_level_domain: str = None, subdomain: str = None)
Bases:
object
- address: str
- domain: str = None
- subdomain: str = None
- top_level_domain: str = None
- class spamtrap_backend.datamodels.CollectionEnum(value)
Bases:
str
,Enum
Enum of strings, specifying collection names of MongoDB
- email = 'Email'
- events = 'Event'
- file = 'File'
- network_entity = 'NetworkEntity'
- raw = 'Raw'
- url = 'Url'
- class spamtrap_backend.datamodels.Email(attachment_count: int, attachments: List[spamtrap_backend.datamodels.Extraction], cc: List[spamtrap_backend.datamodels.Address], destination: spamtrap_backend.datamodels.NetworkEntity, domains: List[str], message: str, message_id: str, observer: spamtrap_backend.datamodels.Observer, recipients: List[spamtrap_backend.datamodels.Address], related: List[netaddr.ip.IPAddress], reply_to: spamtrap_backend.datamodels.Address, return_path: spamtrap_backend.datamodels.Address, sender: spamtrap_backend.datamodels.Address, hash: spamtrap_backend.datamodels.Hash, size: int, source: spamtrap_backend.datamodels.NetworkEntity, subject: str, to: List[spamtrap_backend.datamodels.Address], timestamp: datetime.datetime, urls: List[str], data: bytes, is_enriched: bool = True, _id: str = None)
Bases:
object
- attachment_count: int
- attachments: List[Extraction]
- data: bytes
- destination: NetworkEntity
- domains: List[str]
- is_enriched: bool = True
- message: str
- message_id: str
- size: int
- source: NetworkEntity
- subject: str
- timestamp: datetime
- urls: List[str]
- class spamtrap_backend.datamodels.EntityEnum(value)
Bases:
str
,Enum
Enum of strings, specifying certain roles of network infrastructure
- c2_server = 'c2_server'
- dns_query = 'dns_query'
- exploit_landing_page = 'exploit_landing_page'
- honeypot = 'honeypot'
- malware_distribution_site = 'malware_distribution_site'
- malware_infrastructure = 'malware_infrastructure'
- smtp_server = 'smtp_server'
- unspecified = 'unspecified'
- victim = 'victim'
- website = 'website'
- class spamtrap_backend.datamodels.Extraction(description: str, hash: spamtrap_backend.datamodels.Hash, content_guess: str, extension: str)
Bases:
object
- content_guess: str
- description: str
- extension: str
- class spamtrap_backend.datamodels.FeedMsg(identifier: str, channel: str, payload: bytes, timestamp: datetime.datetime = <factory>, _id: bson.objectid.ObjectId = None)
Bases:
object
- channel: str
- identifier: str
- payload: bytes
- timestamp: datetime
- class spamtrap_backend.datamodels.File(content_guess: str, extension: str, filename: str, hash: spamtrap_backend.datamodels.Hash, data: bytes, timestamp: datetime.datetime, is_enriched: bool = False, parent: spamtrap_backend.datamodels.Parent = None, file_id: bson.objectid.ObjectId = None, encoding: str = 'application/octet-stream', analysis_id: bson.objectid.ObjectId = None, mal_score: float = 0.0, analysis_timestamp: datetime.datetime = None, extractions: List[spamtrap_backend.datamodels.Hash] = <factory>, family: str = 'Unkown', password: str = None, entropy: float = None, _id: str = None)
Bases:
object
- ARCHIVE_EXTS: ClassVar[list] = ['zip', 'rar', 'tar']
- analysis_id: ObjectId = None
- analysis_timestamp: datetime = None
- content_guess: str
- data: bytes
- encoding: str = 'application/octet-stream'
- entropy: float = None
- extension: str
- family: str = 'Unkown'
- file_id: ObjectId = None
- filename: str
- is_enriched: bool = False
- mal_score: float = 0.0
- password: str = None
- timestamp: datetime
- class spamtrap_backend.datamodels.Geo(city_name: str, continent_name: str, country_iso_code: str, country_name: str, location: dict)
Bases:
object
- city_name: str
- continent_name: str
- country_iso_code: str
- country_name: str
- location: dict
- class spamtrap_backend.datamodels.Hash(md5: str, sha1: str, sha256: str, sha512: str)
Bases:
object
- md5: str
- sha1: str
- sha256: str
- sha512: str
- class spamtrap_backend.datamodels.HashFactory
Bases:
object
- classmethod get_hashstruct_from_bytes(buffer)
- class spamtrap_backend.datamodels.Network(protocol: str, type: spamtrap_backend.datamodels.NetworkTypeEnum = 'ipv4', transport: spamtrap_backend.datamodels.NetworkTransportEnum = 'tcp')
Bases:
object
- protocol: str
- transport: NetworkTransportEnum = 'tcp'
- type: NetworkTypeEnum = 'ipv4'
- class spamtrap_backend.datamodels.NetworkEntity(ip: str, port: int, category: spamtrap_backend.datamodels.EntityEnum = <EntityEnum.unspecified: 'unspecified'>, geo: spamtrap_backend.datamodels.Geo = None, is_enriched: bool = False, timestamp: datetime.datetime = datetime.datetime(2023, 10, 23, 5, 9, 9, 781695), parent: spamtrap_backend.datamodels.Parent = None, hostname: str = None, _id: str = None)
Bases:
object
- category: EntityEnum = 'unspecified'
- hostname: str = None
- ip: str
- is_enriched: bool = False
- port: int
- timestamp: datetime = datetime.datetime(2023, 10, 23, 5, 9, 9, 781695)
- class spamtrap_backend.datamodels.NetworkEntityFactory
Bases:
object
- GEO_DB = None
- geoip_reader = None
- classmethod get_from_hostname(hostname, type, timestamp=datetime.datetime(2023, 10, 23, 5, 9, 9, 782543))
- classmethod get_from_ip(ip, port, type, hostname=None, timestamp=datetime.datetime(2023, 10, 23, 5, 9, 9, 782540))
- classmethod get_geo(ip_addr)
- classmethod get_ip(hostname)
- classmethod get_rdns(ip)
- ip_pattern = re.compile('^\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}$')
- class spamtrap_backend.datamodels.NetworkEvent(timestamp: datetime.datetime, source: spamtrap_backend.datamodels.NetworkEntity, destination: spamtrap_backend.datamodels.NetworkEntity, related: List[netaddr.ip.IPAddress], observer: spamtrap_backend.datamodels.Observer, category: spamtrap_backend.datamodels.Network, urls: List[str] = <factory>, kind: str = 'event', type: str = 'creation')
Bases:
object
- destination: NetworkEntity
- kind: str = 'event'
- source: NetworkEntity
- timestamp: datetime
- type: str = 'creation'
- urls: List[str]
- class spamtrap_backend.datamodels.NetworkTransportEnum(value)
Bases:
str
,Enum
An enumeration.
- tcp = 'tcp'
- udp = 'udp'
- class spamtrap_backend.datamodels.NetworkTypeEnum(value)
Bases:
str
,Enum
An enumeration.
- ipv4 = 'ipv4'
- ipv6 = 'ipv6'
- class spamtrap_backend.datamodels.Observer(name: str, type: str = 'spamtrap')
Bases:
object
- name: str
- type: str = 'spamtrap'
- class spamtrap_backend.datamodels.Parent(parent_id: str, parent_type: spamtrap_backend.datamodels.CollectionEnum)
Bases:
object
- parent_id: str
- parent_type: CollectionEnum
- class spamtrap_backend.datamodels.Session(timestamp: datetime.datetime, source_ip: str, honeypot: str, protocol: str, parent: bson.objectid.ObjectId, attachments: List[spamtrap_backend.datamodels.Hash] = <factory>, source_port: int = 0, destination_port: int = 0, source_country: str = '')
Bases:
object
- destination_port: int = 0
- honeypot: str
- parent: ObjectId
- protocol: str
- source_country: str = ''
- source_ip: str
- source_port: int = 0
- timestamp: datetime
- class spamtrap_backend.datamodels.Url(url: str, timestamp: datetime.datetime, parent: spamtrap_backend.datamodels.Parent = None, is_enriched: bool = False, scheme: str = None, domain: str = None, tld: str = None, credential: str = None, fragment: str = None, subdomain: str = None, resource_path: str = None, query_string: str = None, extractions: List[spamtrap_backend.datamodels.Extraction] = <factory>, exploits: List[dict] = <factory>, analysis_timestamp: datetime.datetime = None, category: str = <EntityEnum.website: 'website'>, _id: str = None)
Bases:
object
- analysis_timestamp: datetime = None
- category: str = 'website'
- credential: str = None
- domain: str = None
- exploits: List[dict]
- extractions: List[Extraction]
- fragment: str = None
- is_enriched: bool = False
- query_string: str = None
- resource_path: str = None
- scheme: str = None
- subdomain: str = None
- timestamp: datetime
- tld: str = None
- url: str
- spamtrap_backend.datamodels.asdict(o, skip_empty=True)
Inspired by https://stackoverflow.com/a/56839195