Zion Boggan
repos/CTI Detection Automation/src/cti/feeds/urlhaus.py
zionboggan.com ↗
60 lines · python
History for this file →
1
from __future__ import annotations
2
 
3
from urllib.parse import urlparse
4
 
5
from cti import mitre
6
from cti.feeds.base import Feed, load_json
7
from cti.models import Indicator
8
 
9
 
10
class URLhaus(Feed):
11
    name = "urlhaus"
12
    url = "https://urlhaus.abuse.ch/downloads/json_recent/"
13
 
14
    def parse(self, raw: str) -> list[Indicator]:
15
        payload = load_json(raw)
16
        indicators: list[Indicator] = []
17
        rows = payload.values() if isinstance(payload, dict) else payload
18
        for group in rows:
19
            entries = group if isinstance(group, list) else [group]
20
            for entry in entries:
21
                url = entry.get("url")
22
                if not url:
23
                    continue
24
                threat = entry.get("threat", "malware_download")
25
                tags = entry.get("tags") or []
26
                techniques = mitre.techniques_for_threat_type(threat)
27
                if any("exploit" in str(t).lower() for t in tags):
28
                    techniques += mitre.techniques_for_threat_type("exploit_kit")
29
                indicators.append(
30
                    Indicator(
31
                        type="url",
32
                        value=url,
33
                        source=self.name,
34
                        threat_type=threat,
35
                        confidence=75,
36
                        techniques=sorted(set(techniques)),
37
                        tags=tags,
38
                        reference=entry.get("urlhaus_reference"),
39
                        first_seen=entry.get("date_added"),
40
                    )
41
                )
42
                host = urlparse(url).hostname
43
                if host and not _is_ip(host):
44
                    indicators.append(
45
                        Indicator(
46
                            type="domain",
47
                            value=host,
48
                            source=self.name,
49
                            threat_type=threat,
50
                            confidence=70,
51
                            techniques=sorted(set(techniques)),
52
                            tags=tags,
53
                        )
54
                    )
55
        return indicators
56
 
57
 
58
def _is_ip(host: str) -> bool:
59
    parts = host.split(".")
60
    return len(parts) == 4 and all(p.isdigit() for p in parts)