Zion Boggan
repos/CTI Detection Automation/src/cti/feeds/otx.py
zionboggan.com ↗
56 lines · python
History for this file →
1
from __future__ import annotations
2
 
3
from cti import mitre
4
from cti.feeds.base import Feed, load_json
5
from cti.models import Indicator
6
 
7
TYPE_MAP = {
8
    "IPv4": "ip",
9
    "IPv6": "ip",
10
    "domain": "domain",
11
    "hostname": "domain",
12
    "URL": "url",
13
    "FileHash-SHA256": "sha256",
14
    "FileHash-MD5": "md5",
15
    "FileHash-SHA1": "sha1",
16
}
17
 
18
 
19
class OTX(Feed):
20
    name = "otx"
21
    url = "https://otx.alienvault.com/api/v1/pulses/subscribed?limit=50"
22
 
23
    def headers(self) -> dict:
24
        key = self.settings.get("api_key")
25
        return {"X-OTX-API-KEY": key} if key else {}
26
 
27
    def parse(self, raw: str) -> list[Indicator]:
28
        payload = load_json(raw)
29
        indicators: list[Indicator] = []
30
        for pulse in payload.get("results", []):
31
            techniques = [
32
                a for a in pulse.get("attack_ids", []) if isinstance(a, str)
33
            ]
34
            techniques = [t.upper() for t in techniques]
35
            malware_families = pulse.get("malware_families") or []
36
            malware = malware_families[0].get("display_name") if malware_families else None
37
            tags = pulse.get("tags") or []
38
            for raw_ind in pulse.get("indicators", []):
39
                ind_type = TYPE_MAP.get(raw_ind.get("type"))
40
                if ind_type is None:
41
                    continue
42
                derived = list(techniques) + mitre.techniques_for_malware(malware)
43
                indicators.append(
44
                    Indicator(
45
                        type=ind_type,
46
                        value=raw_ind.get("indicator", ""),
47
                        source=self.name,
48
                        threat_type=pulse.get("name", "otx_pulse"),
49
                        confidence=60,
50
                        malware=malware,
51
                        techniques=sorted(set(derived)),
52
                        tags=tags,
53
                        reference=f"https://otx.alienvault.com/pulse/{pulse.get('id')}",
54
                    )
55
                )
56
        return indicators