| 1 | from __future__ import annotations |
| 2 | |
| 3 | from urllib.parse import urlparse |
| 4 | |
| 5 | from cti import mitre |
| 6 | from cti.feeds.base import Feed |
| 7 | from cti.models import Indicator |
| 8 | |
| 9 | |
| 10 | class OpenPhish(Feed): |
| 11 | name = "openphish" |
| 12 | url = "https://openphish.com/feed.txt" |
| 13 | |
| 14 | def fixture_name(self) -> str: |
| 15 | return "openphish.txt" |
| 16 | |
| 17 | def parse(self, raw: str) -> list[Indicator]: |
| 18 | techniques = sorted(set(mitre.techniques_for_threat_type("phishing"))) |
| 19 | indicators: list[Indicator] = [] |
| 20 | for line in raw.splitlines(): |
| 21 | url = line.strip() |
| 22 | if not url or not url.lower().startswith("http"): |
| 23 | continue |
| 24 | indicators.append( |
| 25 | Indicator( |
| 26 | type="url", |
| 27 | value=url, |
| 28 | source=self.name, |
| 29 | threat_type="phishing", |
| 30 | confidence=80, |
| 31 | techniques=techniques, |
| 32 | tags=["phishing"], |
| 33 | ) |
| 34 | ) |
| 35 | host = urlparse(url).hostname |
| 36 | if host: |
| 37 | indicators.append( |
| 38 | Indicator( |
| 39 | type="domain", |
| 40 | value=host, |
| 41 | source=self.name, |
| 42 | threat_type="phishing", |
| 43 | confidence=75, |
| 44 | techniques=techniques, |
| 45 | tags=["phishing"], |
| 46 | ) |
| 47 | ) |
| 48 | return indicators |