| @@ -0,0 +1,12 @@ | ||
| + | __pycache__/ | |
| + | *.pyc | |
| + | .pytest_cache/ | |
| + | .coverage | |
| + | .ruff_cache/ | |
| + | .venv/ | |
| + | venv/ | |
| + | .env | |
| + | config.yaml | |
| + | output/ | |
| + | ||
| + | docs/screenshots/_raw/ |
| @@ -0,0 +1,18 @@ | ||
| + | [project] | |
| + | name = "cti-detection-automation" | |
| + | version = "0.1.0" | |
| + | requires-python = ">=3.10" | |
| + | ||
| + | [tool.pytest.ini_options] | |
| + | pythonpath = ["src"] | |
| + | testpaths = ["tests"] | |
| + | ||
| + | [tool.ruff] | |
| + | line-length = 100 | |
| + | src = ["src"] | |
| + | ||
| + | [tool.ruff.lint] | |
| + | select = ["E", "F", "I", "B"] | |
| + | ||
| + | [tool.ruff.lint.per-file-ignores] | |
| + | "tests/*" = ["E501"] |
| @@ -0,0 +1,6 @@ | ||
| + | Flask==3.0.3 | |
| + | requests==2.32.3 | |
| + | PyYAML==6.0.2 | |
| + | itsdangerous==2.2.0 | |
| + | Jinja2==3.1.4 | |
| + | gunicorn==23.0.0 |
| @@ -0,0 +1 @@ | ||
| + | __version__ = "0.1.0" |
| @@ -0,0 +1,84 @@ | ||
| + | from __future__ import annotations | |
| + | ||
| + | import os | |
| + | from pathlib import Path | |
| + | ||
| + | import yaml | |
| + | ||
| + | DEFAULTS = { | |
| + | "min_confidence": 60, | |
| + | "output_dir": "output", | |
| + | "fixtures_dir": "fixtures", | |
| + | "use_fixtures": False, | |
| + | "feeds": { | |
| + | "threatfox": {"enabled": True, "days": 1}, | |
| + | "feodo": {"enabled": True}, | |
| + | "urlhaus": {"enabled": True}, | |
| + | "otx": {"enabled": True}, | |
| + | "openphish": {"enabled": True}, | |
| + | "leaks": {"enabled": False, "watch_domains": []}, | |
| + | }, | |
| + | "rules": {"base_id": 100300}, | |
| + | "approval": { | |
| + | "base_url": "http://localhost:8080", | |
| + | "token_ttl": 86400, | |
| + | "analyst_email": "analyst@example.com", | |
| + | }, | |
| + | "email": { | |
| + | "backend": "file", | |
| + | "from_addr": "cti-pipeline@lab.local", | |
| + | "smtp_host": "localhost", | |
| + | "smtp_port": 25, | |
| + | "use_tls": False, | |
| + | }, | |
| + | } | |
| + | ||
| + | ENV_SECRETS = { | |
| + | ("approval", "secret"): "CTI_APPROVAL_SECRET", | |
| + | ("email", "smtp_user"): "CTI_SMTP_USER", | |
| + | ("email", "smtp_password"): "CTI_SMTP_PASSWORD", | |
| + | } | |
| + | ||
| + | FEED_SECRET_ENV = { | |
| + | "threatfox": ("auth_key", "THREATFOX_AUTH_KEY"), | |
| + | "otx": ("api_key", "OTX_API_KEY"), | |
| + | "leaks": ("token", "CTI_LEAKS_TOKEN"), | |
| + | } | |
| + | ||
| + | ||
| + | def _merge(base: dict, override: dict) -> dict: | |
| + | result = dict(base) | |
| + | for key, value in override.items(): | |
| + | if isinstance(value, dict) and isinstance(result.get(key), dict): | |
| + | result[key] = _merge(result[key], value) | |
| + | else: | |
| + | result[key] = value | |
| + | return result | |
| + | ||
| + | ||
| + | def load_config(path: str | None = None) -> dict: | |
| + | config = DEFAULTS | |
| + | if path and Path(path).exists(): | |
| + | loaded = yaml.safe_load(Path(path).read_text(encoding="utf-8")) or {} | |
| + | config = _merge(DEFAULTS, loaded) | |
| + | else: | |
| + | config = _merge(DEFAULTS, {}) | |
| + | ||
| + | for (section, key), env in ENV_SECRETS.items(): | |
| + | value = os.environ.get(env) | |
| + | if value: | |
| + | config.setdefault(section, {})[key] = value | |
| + | ||
| + | for feed_name, (key, env) in FEED_SECRET_ENV.items(): | |
| + | value = os.environ.get(env) | |
| + | if value: | |
| + | config["feeds"].setdefault(feed_name, {})[key] = value | |
| + | ||
| + | if os.environ.get("CTI_USE_FIXTURES") == "1": | |
| + | config["use_fixtures"] = True | |
| + | ||
| + | backend = os.environ.get("CTI_EMAIL_BACKEND") | |
| + | if backend: | |
| + | config["email"]["backend"] = backend | |
| + | ||
| + | return config |
| @@ -0,0 +1,19 @@ | ||
| + | from cti.feeds.feodo import FeodoTracker | |
| + | from cti.feeds.leaks import LeakFeed | |
| + | from cti.feeds.openphish import OpenPhish | |
| + | from cti.feeds.otx import OTX | |
| + | from cti.feeds.threatfox import ThreatFox | |
| + | from cti.feeds.urlhaus import URLhaus | |
| + | ||
| + | FEEDS = { | |
| + | "threatfox": ThreatFox, | |
| + | "feodo": FeodoTracker, | |
| + | "urlhaus": URLhaus, | |
| + | "otx": OTX, | |
| + | "openphish": OpenPhish, | |
| + | "leaks": LeakFeed, | |
| + | } | |
| + | ||
| + | ||
| + | def build_feed(name, settings=None): | |
| + | return FEEDS[name](settings or {}) |
| @@ -0,0 +1,63 @@ | ||
| + | from __future__ import annotations | |
| + | ||
| + | import json | |
| + | from abc import ABC, abstractmethod | |
| + | from pathlib import Path | |
| + | ||
| + | import requests | |
| + | ||
| + | from cti.models import Indicator | |
| + | ||
| + | DEFAULT_TIMEOUT = 20 | |
| + | ||
| + | ||
| + | class Feed(ABC): | |
| + | name: str = "feed" | |
| + | url: str = "" | |
| + | method: str = "GET" | |
| + | ||
| + | def __init__(self, settings: dict | None = None): | |
| + | self.settings = settings or {} | |
| + | ||
| + | def headers(self) -> dict: | |
| + | return {} | |
| + | ||
| + | def request_body(self) -> dict | None: | |
| + | return None | |
| + | ||
| + | def fetch_raw(self) -> str: | |
| + | if self.method == "POST": | |
| + | response = requests.post( | |
| + | self.url, | |
| + | headers=self.headers(), | |
| + | json=self.request_body(), | |
| + | timeout=DEFAULT_TIMEOUT, | |
| + | ) | |
| + | else: | |
| + | response = requests.get( | |
| + | self.url, headers=self.headers(), timeout=DEFAULT_TIMEOUT | |
| + | ) | |
| + | response.raise_for_status() | |
| + | return response.text | |
| + | ||
| + | def load_fixture(self, fixtures_dir: Path) -> str: | |
| + | path = fixtures_dir / self.fixture_name() | |
| + | return path.read_text(encoding="utf-8") | |
| + | ||
| + | def fixture_name(self) -> str: | |
| + | return f"{self.name}.json" | |
| + | ||
| + | @abstractmethod | |
| + | def parse(self, raw: str) -> list[Indicator]: | |
| + | ... | |
| + | ||
| + | def collect(self, fixtures_dir: Path | None = None) -> list[Indicator]: | |
| + | if fixtures_dir is not None: | |
| + | raw = self.load_fixture(fixtures_dir) | |
| + | else: | |
| + | raw = self.fetch_raw() | |
| + | return self.parse(raw) | |
| + | ||
| + | ||
| + | def load_json(raw: str): | |
| + | return json.loads(raw) |
| @@ -0,0 +1,35 @@ | ||
| + | from __future__ import annotations | |
| + | ||
| + | from cti import mitre | |
| + | from cti.feeds.base import Feed, load_json | |
| + | from cti.models import Indicator | |
| + | ||
| + | ||
| + | class FeodoTracker(Feed): | |
| + | name = "feodo" | |
| + | url = "https://feodotracker.abuse.ch/downloads/ipblocklist.json" | |
| + | ||
| + | def parse(self, raw: str) -> list[Indicator]: | |
| + | entries = load_json(raw) | |
| + | indicators: list[Indicator] = [] | |
| + | for entry in entries: | |
| + | ip = entry.get("ip_address") | |
| + | if not ip: | |
| + | continue | |
| + | malware = entry.get("malware") | |
| + | techniques = mitre.techniques_for_malware(malware) | |
| + | techniques += mitre.techniques_for_threat_type("botnet_cc") | |
| + | indicators.append( | |
| + | Indicator( | |
| + | type="ip", | |
| + | value=ip, | |
| + | source=self.name, | |
| + | threat_type="botnet_cc", | |
| + | confidence=90, | |
| + | malware=malware, | |
| + | techniques=sorted(set(techniques)), | |
| + | tags=[entry.get("status", "")], | |
| + | first_seen=entry.get("first_seen"), | |
| + | ) | |
| + | ) | |
| + | return indicators |
| @@ -0,0 +1,47 @@ | ||
| + | from __future__ import annotations | |
| + | ||
| + | from cti import mitre | |
| + | from cti.feeds.base import Feed, load_json | |
| + | from cti.models import Indicator | |
| + | ||
| + | ||
| + | class LeakFeed(Feed): | |
| + | name = "leaks" | |
| + | url = "" | |
| + | ||
| + | def fetch_raw(self) -> str: | |
| + | endpoint = self.settings.get("endpoint") | |
| + | if not endpoint: | |
| + | return "[]" | |
| + | self.url = endpoint | |
| + | return super().fetch_raw() | |
| + | ||
| + | def headers(self) -> dict: | |
| + | token = self.settings.get("token") | |
| + | return {"Authorization": f"Bearer {token}"} if token else {} | |
| + | ||
| + | def parse(self, raw: str) -> list[Indicator]: | |
| + | entries = load_json(raw) | |
| + | techniques = sorted(set(mitre.techniques_for_threat_type("leaked_credentials"))) | |
| + | watched = {d.lower() for d in self.settings.get("watch_domains", [])} | |
| + | indicators: list[Indicator] = [] | |
| + | for entry in entries: | |
| + | email = (entry.get("email") or "").strip().lower() | |
| + | if not email or "@" not in email: | |
| + | continue | |
| + | domain = email.split("@", 1)[1] | |
| + | if watched and domain not in watched: | |
| + | continue | |
| + | indicators.append( | |
| + | Indicator( | |
| + | type="email", | |
| + | value=email, | |
| + | source=self.name, | |
| + | threat_type="leaked_credentials", | |
| + | confidence=70, | |
| + | techniques=techniques, | |
| + | tags=[entry.get("breach", "unknown_breach")], | |
| + | first_seen=entry.get("breach_date"), | |
| + | ) | |
| + | ) | |
| + | return indicators |
| @@ -0,0 +1,48 @@ | ||
| + | from __future__ import annotations | |
| + | ||
| + | from urllib.parse import urlparse | |
| + | ||
| + | from cti import mitre | |
| + | from cti.feeds.base import Feed | |
| + | from cti.models import Indicator | |
| + | ||
| + | ||
| + | class OpenPhish(Feed): | |
| + | name = "openphish" | |
| + | url = "https://openphish.com/feed.txt" | |
| + | ||
| + | def fixture_name(self) -> str: | |
| + | return "openphish.txt" | |
| + | ||
| + | def parse(self, raw: str) -> list[Indicator]: | |
| + | techniques = sorted(set(mitre.techniques_for_threat_type("phishing"))) | |
| + | indicators: list[Indicator] = [] | |
| + | for line in raw.splitlines(): | |
| + | url = line.strip() | |
| + | if not url or not url.lower().startswith("http"): | |
| + | continue | |
| + | indicators.append( | |
| + | Indicator( | |
| + | type="url", | |
| + | value=url, | |
| + | source=self.name, | |
| + | threat_type="phishing", | |
| + | confidence=80, | |
| + | techniques=techniques, | |
| + | tags=["phishing"], | |
| + | ) | |
| + | ) | |
| + | host = urlparse(url).hostname | |
| + | if host: | |
| + | indicators.append( | |
| + | Indicator( | |
| + | type="domain", | |
| + | value=host, | |
| + | source=self.name, | |
| + | threat_type="phishing", | |
| + | confidence=75, | |
| + | techniques=techniques, | |
| + | tags=["phishing"], | |
| + | ) | |
| + | ) | |
| + | return indicators |
| @@ -0,0 +1,56 @@ | ||
| + | from __future__ import annotations | |
| + | ||
| + | from cti import mitre | |
| + | from cti.feeds.base import Feed, load_json | |
| + | from cti.models import Indicator | |
| + | ||
| + | TYPE_MAP = { | |
| + | "IPv4": "ip", | |
| + | "IPv6": "ip", | |
| + | "domain": "domain", | |
| + | "hostname": "domain", | |
| + | "URL": "url", | |
| + | "FileHash-SHA256": "sha256", | |
| + | "FileHash-MD5": "md5", | |
| + | "FileHash-SHA1": "sha1", | |
| + | } | |
| + | ||
| + | ||
| + | class OTX(Feed): | |
| + | name = "otx" | |
| + | url = "https://otx.alienvault.com/api/v1/pulses/subscribed?limit=50" | |
| + | ||
| + | def headers(self) -> dict: | |
| + | key = self.settings.get("api_key") | |
| + | return {"X-OTX-API-KEY": key} if key else {} | |
| + | ||
| + | def parse(self, raw: str) -> list[Indicator]: | |
| + | payload = load_json(raw) | |
| + | indicators: list[Indicator] = [] | |
| + | for pulse in payload.get("results", []): | |
| + | techniques = [ | |
| + | a for a in pulse.get("attack_ids", []) if isinstance(a, str) | |
| + | ] | |
| + | techniques = [t.upper() for t in techniques] | |
| + | malware_families = pulse.get("malware_families") or [] | |
| + | malware = malware_families[0].get("display_name") if malware_families else None | |
| + | tags = pulse.get("tags") or [] | |
| + | for raw_ind in pulse.get("indicators", []): | |
| + | ind_type = TYPE_MAP.get(raw_ind.get("type")) | |
| + | if ind_type is None: | |
| + | continue | |
| + | derived = list(techniques) + mitre.techniques_for_malware(malware) | |
| + | indicators.append( | |
| + | Indicator( | |
| + | type=ind_type, | |
| + | value=raw_ind.get("indicator", ""), | |
| + | source=self.name, | |
| + | threat_type=pulse.get("name", "otx_pulse"), | |
| + | confidence=60, | |
| + | malware=malware, | |
| + | techniques=sorted(set(derived)), | |
| + | tags=tags, | |
| + | reference=f"https://otx.alienvault.com/pulse/{pulse.get('id')}", | |
| + | ) | |
| + | ) | |
| + | return indicators |
| @@ -0,0 +1,59 @@ | ||
| + | from __future__ import annotations | |
| + | ||
| + | from cti import mitre | |
| + | from cti.feeds.base import Feed, load_json | |
| + | from cti.models import Indicator | |
| + | ||
| + | TYPE_MAP = { | |
| + | "ip:port": "ip", | |
| + | "domain": "domain", | |
| + | "url": "url", | |
| + | "md5_hash": "md5", | |
| + | "sha256_hash": "sha256", | |
| + | "sha1_hash": "sha1", | |
| + | } | |
| + | ||
| + | ||
| + | class ThreatFox(Feed): | |
| + | name = "threatfox" | |
| + | url = "https://threatfox-api.abuse.ch/api/v1/" | |
| + | method = "POST" | |
| + | ||
| + | def headers(self) -> dict: | |
| + | key = self.settings.get("auth_key") | |
| + | return {"Auth-Key": key} if key else {} | |
| + | ||
| + | def request_body(self) -> dict: | |
| + | return {"query": "get_iocs", "days": int(self.settings.get("days", 1))} | |
| + | ||
| + | def parse(self, raw: str) -> list[Indicator]: | |
| + | payload = load_json(raw) | |
| + | if payload.get("query_status") != "ok": | |
| + | return [] | |
| + | indicators: list[Indicator] = [] | |
| + | for entry in payload.get("data", []): | |
| + | ioc_type = TYPE_MAP.get(entry.get("ioc_type")) | |
| + | if ioc_type is None: | |
| + | continue | |
| + | value = entry.get("ioc", "") | |
| + | if ioc_type == "ip" and ":" in value: | |
| + | value = value.split(":", 1)[0] | |
| + | malware = entry.get("malware_printable") or entry.get("malware") | |
| + | threat_type = entry.get("threat_type", "unknown") | |
| + | techniques = mitre.techniques_for_malware(malware) | |
| + | techniques += mitre.techniques_for_threat_type(threat_type) | |
| + | indicators.append( | |
| + | Indicator( | |
| + | type=ioc_type, | |
| + | value=value, | |
| + | source=self.name, | |
| + | threat_type=threat_type, | |
| + | confidence=int(entry.get("confidence_level", 50)), | |
| + | malware=malware, | |
| + | techniques=sorted(set(techniques)), | |
| + | tags=entry.get("tags") or [], | |
| + | reference=entry.get("reference"), | |
| + | first_seen=entry.get("first_seen"), | |
| + | ) | |
| + | ) | |
| + | return indicators |
| @@ -0,0 +1,60 @@ | ||
| + | from __future__ import annotations | |
| + | ||
| + | from urllib.parse import urlparse | |
| + | ||
| + | from cti import mitre | |
| + | from cti.feeds.base import Feed, load_json | |
| + | from cti.models import Indicator | |
| + | ||
| + | ||
| + | class URLhaus(Feed): | |
| + | name = "urlhaus" | |
| + | url = "https://urlhaus.abuse.ch/downloads/json_recent/" | |
| + | ||
| + | def parse(self, raw: str) -> list[Indicator]: | |
| + | payload = load_json(raw) | |
| + | indicators: list[Indicator] = [] | |
| + | rows = payload.values() if isinstance(payload, dict) else payload | |
| + | for group in rows: | |
| + | entries = group if isinstance(group, list) else [group] | |
| + | for entry in entries: | |
| + | url = entry.get("url") | |
| + | if not url: | |
| + | continue | |
| + | threat = entry.get("threat", "malware_download") | |
| + | tags = entry.get("tags") or [] | |
| + | techniques = mitre.techniques_for_threat_type(threat) | |
| + | if any("exploit" in str(t).lower() for t in tags): | |
| + | techniques += mitre.techniques_for_threat_type("exploit_kit") | |
| + | indicators.append( | |
| + | Indicator( | |
| + | type="url", | |
| + | value=url, | |
| + | source=self.name, | |
| + | threat_type=threat, | |
| + | confidence=75, | |
| + | techniques=sorted(set(techniques)), | |
| + | tags=tags, | |
| + | reference=entry.get("urlhaus_reference"), | |
| + | first_seen=entry.get("date_added"), | |
| + | ) | |
| + | ) | |
| + | host = urlparse(url).hostname | |
| + | if host and not _is_ip(host): | |
| + | indicators.append( | |
| + | Indicator( | |
| + | type="domain", | |
| + | value=host, | |
| + | source=self.name, | |
| + | threat_type=threat, | |
| + | confidence=70, | |
| + | techniques=sorted(set(techniques)), | |
| + | tags=tags, | |
| + | ) | |
| + | ) | |
| + | return indicators | |
| + | ||
| + | ||
| + | def _is_ip(host: str) -> bool: | |
| + | parts = host.split(".") | |
| + | return len(parts) == 4 and all(p.isdigit() for p in parts) |
| @@ -0,0 +1,92 @@ | ||
| + | TECHNIQUES = { | |
| + | "T1003.001": ("OS Credential Dumping: LSASS Memory", "credential-access"), | |
| + | "T1005": ("Data from Local System", "collection"), | |
| + | "T1041": ("Exfiltration Over C2 Channel", "exfiltration"), | |
| + | "T1055": ("Process Injection", "defense-evasion"), | |
| + | "T1056.001": ("Input Capture: Keylogging", "collection"), | |
| + | "T1059": ("Command and Scripting Interpreter", "execution"), | |
| + | "T1059.001": ("Command and Scripting Interpreter: PowerShell", "execution"), | |
| + | "T1071": ("Application Layer Protocol", "command-and-control"), | |
| + | "T1071.001": ("Application Layer Protocol: Web Protocols", "command-and-control"), | |
| + | "T1071.004": ("Application Layer Protocol: DNS", "command-and-control"), | |
| + | "T1090": ("Proxy", "command-and-control"), | |
| + | "T1102": ("Web Service", "command-and-control"), | |
| + | "T1105": ("Ingress Tool Transfer", "command-and-control"), | |
| + | "T1110": ("Brute Force", "credential-access"), | |
| + | "T1204": ("User Execution", "execution"), | |
| + | "T1204.001": ("User Execution: Malicious Link", "execution"), | |
| + | "T1204.002": ("User Execution: Malicious File", "execution"), | |
| + | "T1486": ("Data Encrypted for Impact", "impact"), | |
| + | "T1547.001": ("Registry Run Keys / Startup Folder", "persistence"), | |
| + | "T1555": ("Credentials from Password Stores", "credential-access"), | |
| + | "T1566": ("Phishing", "initial-access"), | |
| + | "T1566.001": ("Phishing: Spearphishing Attachment", "initial-access"), | |
| + | "T1566.002": ("Phishing: Spearphishing Link", "initial-access"), | |
| + | "T1573": ("Encrypted Channel", "command-and-control"), | |
| + | "T1589.001": ("Gather Victim Identity Information: Credentials", "reconnaissance"), | |
| + | "T1588.001": ("Obtain Capabilities: Malware", "resource-development"), | |
| + | "T1608": ("Stage Capabilities", "resource-development"), | |
| + | } | |
| + | ||
| + | MALWARE_TECHNIQUES = { | |
| + | "cobaltstrike": ["T1071.001", "T1059.001", "T1055"], | |
| + | "agenttesla": ["T1056.001", "T1555", "T1041"], | |
| + | "redline": ["T1555", "T1005", "T1041"], | |
| + | "redlinestealer": ["T1555", "T1005", "T1041"], | |
| + | "emotet": ["T1566.001", "T1071.001", "T1105"], | |
| + | "dridex": ["T1566.001", "T1071.001", "T1059.001"], | |
| + | "qakbot": ["T1566.001", "T1055", "T1071.001"], | |
| + | "qbot": ["T1566.001", "T1055", "T1071.001"], | |
| + | "icedid": ["T1566.001", "T1105", "T1071.001"], | |
| + | "trickbot": ["T1071.001", "T1055", "T1105"], | |
| + | "lokibot": ["T1555", "T1056.001", "T1041"], | |
| + | "formbook": ["T1056.001", "T1055", "T1041"], | |
| + | "njrat": ["T1059.001", "T1056.001", "T1071"], | |
| + | "asyncrat": ["T1059.001", "T1071", "T1105"], | |
| + | "remcos": ["T1059.001", "T1056.001", "T1071"], | |
| + | } | |
| + | ||
| + | THREAT_TYPE_TECHNIQUES = { | |
| + | "phishing": ["T1566.002", "T1204.001"], | |
| + | "exploit_kit": ["T1204.001", "T1608"], | |
| + | "botnet_cc": ["T1071.001", "T1573"], | |
| + | "c2": ["T1071.001", "T1573"], | |
| + | "malware_download": ["T1105", "T1204.002"], | |
| + | "payload_delivery": ["T1105", "T1204.002"], | |
| + | "ransomware": ["T1486", "T1071.001"], | |
| + | "leaked_credentials": ["T1589.001"], | |
| + | } | |
| + | ||
| + | ||
| + | def name_for(technique_id: str) -> str: | |
| + | base = technique_id.split(".")[0] | |
| + | if technique_id in TECHNIQUES: | |
| + | return TECHNIQUES[technique_id][0] | |
| + | if base in TECHNIQUES: | |
| + | return TECHNIQUES[base][0] | |
| + | return technique_id | |
| + | ||
| + | ||
| + | def tactic_for(technique_id: str) -> str: | |
| + | base = technique_id.split(".")[0] | |
| + | if technique_id in TECHNIQUES: | |
| + | return TECHNIQUES[technique_id][1] | |
| + | if base in TECHNIQUES: | |
| + | return TECHNIQUES[base][1] | |
| + | return "unknown" | |
| + | ||
| + | ||
| + | def techniques_for_malware(malware: str | None) -> list[str]: | |
| + | if not malware: | |
| + | return [] | |
| + | key = malware.lower().replace(" ", "").replace("-", "").replace("_", "") | |
| + | for name, techniques in MALWARE_TECHNIQUES.items(): | |
| + | if name in key: | |
| + | return list(techniques) | |
| + | return [] | |
| + | ||
| + | ||
| + | def techniques_for_threat_type(threat_type: str | None) -> list[str]: | |
| + | if not threat_type: | |
| + | return [] | |
| + | return list(THREAT_TYPE_TECHNIQUES.get(threat_type.lower(), [])) |
| @@ -0,0 +1,76 @@ | ||
| + | from __future__ import annotations | |
| + | ||
| + | from dataclasses import asdict, dataclass, field | |
| + | from datetime import datetime, timezone | |
| + | from typing import Optional | |
| + | ||
| + | IndicatorType = str | |
| + | ||
| + | ||
| + | @dataclass | |
| + | class Indicator: | |
| + | type: IndicatorType | |
| + | value: str | |
| + | source: str | |
| + | threat_type: str | |
| + | confidence: int = 50 | |
| + | malware: Optional[str] = None | |
| + | techniques: list[str] = field(default_factory=list) | |
| + | tags: list[str] = field(default_factory=list) | |
| + | reference: Optional[str] = None | |
| + | first_seen: Optional[str] = None | |
| + | ||
| + | def key(self) -> tuple[str, str]: | |
| + | return (self.type, self.value.lower()) | |
| + | ||
| + | def to_dict(self) -> dict: | |
| + | return asdict(self) | |
| + | ||
| + | ||
| + | @dataclass | |
| + | class Technique: | |
| + | technique_id: str | |
| + | name: str | |
| + | tactic: str | |
| + | sources: set[str] = field(default_factory=set) | |
| + | indicator_count: int = 0 | |
| + | ||
| + | def to_dict(self) -> dict: | |
| + | return { | |
| + | "technique_id": self.technique_id, | |
| + | "name": self.name, | |
| + | "tactic": self.tactic, | |
| + | "sources": sorted(self.sources), | |
| + | "indicator_count": self.indicator_count, | |
| + | } | |
| + | ||
| + | ||
| + | @dataclass | |
| + | class RuleBundle: | |
| + | bundle_id: str | |
| + | generated_at: str | |
| + | indicators: list[Indicator] | |
| + | techniques: list[Technique] | |
| + | cdb_lists: dict[str, str] | |
| + | rules_xml: str | |
| + | ||
| + | @staticmethod | |
| + | def now_iso() -> str: | |
| + | return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") | |
| + | ||
| + | def counts_by_type(self) -> dict[str, int]: | |
| + | counts: dict[str, int] = {} | |
| + | for indicator in self.indicators: | |
| + | counts[indicator.type] = counts.get(indicator.type, 0) + 1 | |
| + | return counts | |
| + | ||
| + | def manifest(self) -> dict: | |
| + | return { | |
| + | "bundle_id": self.bundle_id, | |
| + | "generated_at": self.generated_at, | |
| + | "indicator_count": len(self.indicators), | |
| + | "counts_by_type": self.counts_by_type(), | |
| + | "technique_count": len(self.techniques), | |
| + | "techniques": [t.technique_id for t in self.techniques], | |
| + | "cdb_lists": {name: content.count("\n") for name, content in self.cdb_lists.items()}, | |
| + | } |