Zion Boggan
repos/CTI Detection Automation/src/cti/feeds/threatfox.py
zionboggan.com ↗
59 lines · python
History for this file →
1
from __future__ import annotations
2
 
3
from cti import mitre
4
from cti.feeds.base import Feed, load_json
5
from cti.models import Indicator
6
 
7
TYPE_MAP = {
8
    "ip:port": "ip",
9
    "domain": "domain",
10
    "url": "url",
11
    "md5_hash": "md5",
12
    "sha256_hash": "sha256",
13
    "sha1_hash": "sha1",
14
}
15
 
16
 
17
class ThreatFox(Feed):
18
    name = "threatfox"
19
    url = "https://threatfox-api.abuse.ch/api/v1/"
20
    method = "POST"
21
 
22
    def headers(self) -> dict:
23
        key = self.settings.get("auth_key")
24
        return {"Auth-Key": key} if key else {}
25
 
26
    def request_body(self) -> dict:
27
        return {"query": "get_iocs", "days": int(self.settings.get("days", 1))}
28
 
29
    def parse(self, raw: str) -> list[Indicator]:
30
        payload = load_json(raw)
31
        if payload.get("query_status") != "ok":
32
            return []
33
        indicators: list[Indicator] = []
34
        for entry in payload.get("data", []):
35
            ioc_type = TYPE_MAP.get(entry.get("ioc_type"))
36
            if ioc_type is None:
37
                continue
38
            value = entry.get("ioc", "")
39
            if ioc_type == "ip" and ":" in value:
40
                value = value.split(":", 1)[0]
41
            malware = entry.get("malware_printable") or entry.get("malware")
42
            threat_type = entry.get("threat_type", "unknown")
43
            techniques = mitre.techniques_for_malware(malware)
44
            techniques += mitre.techniques_for_threat_type(threat_type)
45
            indicators.append(
46
                Indicator(
47
                    type=ioc_type,
48
                    value=value,
49
                    source=self.name,
50
                    threat_type=threat_type,
51
                    confidence=int(entry.get("confidence_level", 50)),
52
                    malware=malware,
53
                    techniques=sorted(set(techniques)),
54
                    tags=entry.get("tags") or [],
55
                    reference=entry.get("reference"),
56
                    first_seen=entry.get("first_seen"),
57
                )
58
            )
59
        return indicators