| 1 | from __future__ import annotations |
| 2 | |
| 3 | from collections import Counter |
| 4 | from xml.sax.saxutils import escape |
| 5 | |
| 6 | from cti.models import Indicator |
| 7 | |
| 8 | LIST_DEFINITIONS = [ |
| 9 | ("cti-malicious-ip", {"ip"}), |
| 10 | ("cti-malicious-domain", {"domain"}), |
| 11 | ("cti-malicious-url", {"url"}), |
| 12 | ("cti-malware-hash", {"sha256", "md5", "sha1"}), |
| 13 | ("cti-leaked-email", {"email"}), |
| 14 | ] |
| 15 | |
| 16 | |
| 17 | def build_cdb_lists(indicators: list[Indicator]) -> dict[str, str]: |
| 18 | lists: dict[str, list[str]] = {name: [] for name, _ in LIST_DEFINITIONS} |
| 19 | seen: dict[str, set[str]] = {name: set() for name, _ in LIST_DEFINITIONS} |
| 20 | for indicator in indicators: |
| 21 | for name, types in LIST_DEFINITIONS: |
| 22 | if indicator.type in types: |
| 23 | value = indicator.value |
| 24 | if value in seen[name]: |
| 25 | break |
| 26 | seen[name].add(value) |
| 27 | label = indicator.malware or indicator.threat_type or "cti" |
| 28 | lists[name].append(f"{value}:{_sanitize(label)}") |
| 29 | break |
| 30 | return { |
| 31 | name: "\n".join(sorted(entries)) + ("\n" if entries else "") |
| 32 | for name, entries in lists.items() |
| 33 | } |
| 34 | |
| 35 | |
| 36 | def _sanitize(label: str) -> str: |
| 37 | cleaned = label.replace(":", "-").replace("\n", " ").strip() |
| 38 | return cleaned[:48] or "cti" |
| 39 | |
| 40 | |
| 41 | def _top_techniques(indicators: list[Indicator], limit: int = 4) -> list[str]: |
| 42 | counter: Counter[str] = Counter() |
| 43 | for indicator in indicators: |
| 44 | counter.update(indicator.techniques) |
| 45 | return [technique for technique, _ in counter.most_common(limit)] |
| 46 | |
| 47 | |
| 48 | def _mitre_block(techniques: list[str], indent: str) -> str: |
| 49 | if not techniques: |
| 50 | return "" |
| 51 | ids = "\n".join(f"{indent} <id>{escape(t)}</id>" for t in techniques) |
| 52 | return f"{indent}<mitre>\n{ids}\n{indent}</mitre>\n" |
| 53 | |
| 54 | |
| 55 | def build_rules_xml(indicators: list[Indicator], base_id: int = 100300) -> str: |
| 56 | by_type: dict[str, list[Indicator]] = {} |
| 57 | for indicator in indicators: |
| 58 | by_type.setdefault(indicator.type, []).append(indicator) |
| 59 | |
| 60 | ip = by_type.get("ip", []) |
| 61 | domain = by_type.get("domain", []) |
| 62 | url = by_type.get("url", []) |
| 63 | hashes = by_type.get("sha256", []) + by_type.get("md5", []) + by_type.get("sha1", []) |
| 64 | email = by_type.get("email", []) |
| 65 | |
| 66 | rules: list[str] = [] |
| 67 | rule_id = base_id |
| 68 | |
| 69 | if ip: |
| 70 | rules.append(_list_rule( |
| 71 | rule_id, 12, "dstip", "address_match_key", "cti-malicious-ip", |
| 72 | "Outbound connection to CTI-flagged IP", _top_techniques(ip))) |
| 73 | rule_id += 1 |
| 74 | rules.append(_list_rule( |
| 75 | rule_id, 10, "srcip", "address_match_key", "cti-malicious-ip", |
| 76 | "Inbound connection from CTI-flagged IP", _top_techniques(ip))) |
| 77 | rule_id += 1 |
| 78 | if domain: |
| 79 | rules.append(_list_rule( |
| 80 | rule_id, 12, "win.eventdata.queryName", "match_key", "cti-malicious-domain", |
| 81 | "DNS query for CTI-flagged domain", _top_techniques(domain))) |
| 82 | rule_id += 1 |
| 83 | if url: |
| 84 | rules.append(_url_rule(rule_id, 12, "cti-malicious-url", _top_techniques(url))) |
| 85 | rule_id += 1 |
| 86 | if hashes: |
| 87 | rules.append(_list_rule( |
| 88 | rule_id, 13, "win.eventdata.sha256", "match_key", "cti-malware-hash", |
| 89 | "Execution of CTI-flagged malware hash", _top_techniques(hashes))) |
| 90 | rule_id += 1 |
| 91 | if email: |
| 92 | rules.append(_list_rule( |
| 93 | rule_id, 9, "data.email", "match_key", "cti-leaked-email", |
| 94 | "Activity referencing a leaked credential", _top_techniques(email))) |
| 95 | rule_id += 1 |
| 96 | |
| 97 | body = "\n".join(rules) |
| 98 | return f'<group name="cti,threat-intel,auto-generated,">\n{body}\n</group>\n' |
| 99 | |
| 100 | |
| 101 | def _list_rule(rule_id, level, field, lookup, list_name, description, techniques): |
| 102 | mitre = _mitre_block(techniques, " ") |
| 103 | return ( |
| 104 | f' <rule id="{rule_id}" level="{level}">\n' |
| 105 | f' <list field="{field}" lookup="{lookup}">etc/lists/{list_name}</list>\n' |
| 106 | f' <description>{escape(description)}: $({field})</description>\n' |
| 107 | f'{mitre}' |
| 108 | f' </rule>' |
| 109 | ) |
| 110 | |
| 111 | |
| 112 | def _url_rule(rule_id, level, list_name, techniques): |
| 113 | mitre = _mitre_block(techniques, " ") |
| 114 | return ( |
| 115 | f' <rule id="{rule_id}" level="{level}">\n' |
| 116 | f' <list field="url" lookup="match_key">etc/lists/{list_name}</list>\n' |
| 117 | f' <description>Request to CTI-flagged URL: $(url)</description>\n' |
| 118 | f'{mitre}' |
| 119 | f' </rule>' |
| 120 | ) |