| 1 | from __future__ import annotations |
| 2 | |
| 3 | from urllib.parse import urlparse |
| 4 | |
| 5 | from cti import mitre |
| 6 | from cti.feeds.base import Feed, load_json |
| 7 | from cti.models import Indicator |
| 8 | |
| 9 | |
| 10 | class URLhaus(Feed): |
| 11 | name = "urlhaus" |
| 12 | url = "https://urlhaus.abuse.ch/downloads/json_recent/" |
| 13 | |
| 14 | def parse(self, raw: str) -> list[Indicator]: |
| 15 | payload = load_json(raw) |
| 16 | indicators: list[Indicator] = [] |
| 17 | rows = payload.values() if isinstance(payload, dict) else payload |
| 18 | for group in rows: |
| 19 | entries = group if isinstance(group, list) else [group] |
| 20 | for entry in entries: |
| 21 | url = entry.get("url") |
| 22 | if not url: |
| 23 | continue |
| 24 | threat = entry.get("threat", "malware_download") |
| 25 | tags = entry.get("tags") or [] |
| 26 | techniques = mitre.techniques_for_threat_type(threat) |
| 27 | if any("exploit" in str(t).lower() for t in tags): |
| 28 | techniques += mitre.techniques_for_threat_type("exploit_kit") |
| 29 | indicators.append( |
| 30 | Indicator( |
| 31 | type="url", |
| 32 | value=url, |
| 33 | source=self.name, |
| 34 | threat_type=threat, |
| 35 | confidence=75, |
| 36 | techniques=sorted(set(techniques)), |
| 37 | tags=tags, |
| 38 | reference=entry.get("urlhaus_reference"), |
| 39 | first_seen=entry.get("date_added"), |
| 40 | ) |
| 41 | ) |
| 42 | host = urlparse(url).hostname |
| 43 | if host and not _is_ip(host): |
| 44 | indicators.append( |
| 45 | Indicator( |
| 46 | type="domain", |
| 47 | value=host, |
| 48 | source=self.name, |
| 49 | threat_type=threat, |
| 50 | confidence=70, |
| 51 | techniques=sorted(set(techniques)), |
| 52 | tags=tags, |
| 53 | ) |
| 54 | ) |
| 55 | return indicators |
| 56 | |
| 57 | |
| 58 | def _is_ip(host: str) -> bool: |
| 59 | parts = host.split(".") |
| 60 | return len(parts) == 4 and all(p.isdigit() for p in parts) |