Zion Boggan zionboggan.com ↗

generate wazuh cdb lists and detection rules from indicators

e2e7ff8   Zion Boggan committed on May 18, 2026 (1 month ago)
src/cti/rules.py +122 -0
@@ -0,0 +1,122 @@
+from __future__ import annotations
+
+from collections import Counter
+from xml.sax.saxutils import escape
+
+from cti.models import Indicator
+
+LIST_DEFINITIONS = [
+ ("cti-malicious-ip", {"ip"}),
+ ("cti-malicious-domain", {"domain"}),
+ ("cti-malicious-url", {"url"}),
+ ("cti-malware-hash", {"sha256", "md5", "sha1"}),
+ ("cti-leaked-email", {"email"}),
+]
+
+
+def build_cdb_lists(indicators: list[Indicator]) -> dict[str, str]:
+ lists: dict[str, list[str]] = {name: [] for name, _ in LIST_DEFINITIONS}
+ seen: dict[str, set[str]] = {name: set() for name, _ in LIST_DEFINITIONS}
+ for indicator in indicators:
+ for name, types in LIST_DEFINITIONS:
+ if indicator.type in types:
+ value = indicator.value
+ if value in seen[name]:
+ break
+ seen[name].add(value)
+ label = indicator.malware or indicator.threat_type or "cti"
+ lists[name].append(f"{value}:{_sanitize(label)}")
+ break
+ return {
+ name: "\n".join(sorted(entries)) + ("\n" if entries else "")
+ for name, entries in lists.items()
+ }
+
+
+def _sanitize(label: str) -> str:
+ cleaned = label.replace(":", "-").replace("\n", " ").strip()
+ return cleaned[:48] or "cti"
+
+
+def _top_techniques(indicators: list[Indicator], limit: int = 4) -> list[str]:
+ counter: Counter[str] = Counter()
+ for indicator in indicators:
+ counter.update(indicator.techniques)
+ return [technique for technique, _ in counter.most_common(limit)]
+
+
+def _mitre_block(techniques: list[str], indent: str) -> str:
+ if not techniques:
+ return ""
+ ids = "\n".join(f"{indent} <id>{escape(t)}</id>" for t in techniques)
+ return f"{indent}<mitre>\n{ids}\n{indent}</mitre>\n"
+
+
+def build_rules_xml(indicators: list[Indicator], base_id: int = 100300) -> str:
+ by_type: dict[str, list[Indicator]] = {}
+ for indicator in indicators:
+ by_type.setdefault(indicator.type, []).append(indicator)
+
+ ip = by_type.get("ip", [])
+ domain = by_type.get("domain", [])
+ url = by_type.get("url", [])
+ hashes = by_type.get("sha256", []) + by_type.get("md5", []) + by_type.get("sha1", [])
+ email = by_type.get("email", [])
+
+ rules: list[str] = []
+ rule_id = base_id
+
+ if ip:
+ rules.append(_list_rule(
+ rule_id, 12, "dstip", "address_match_key", "cti-malicious-ip",
+ "Outbound connection to CTI-flagged IP", _top_techniques(ip)))
+ rule_id += 1
+ rules.append(_list_rule(
+ rule_id, 10, "srcip", "address_match_key", "cti-malicious-ip",
+ "Inbound connection from CTI-flagged IP", _top_techniques(ip)))
+ rule_id += 1
+ if domain:
+ rules.append(_list_rule(
+ rule_id, 12, "win.eventdata.queryName", "match_key", "cti-malicious-domain",
+ "DNS query for CTI-flagged domain", _top_techniques(domain)))
+ rule_id += 1
+ if url:
+ rules.append(_url_rule(rule_id, 12, "cti-malicious-url", _top_techniques(url)))
+ rule_id += 1
+ if hashes:
+ rules.append(_list_rule(
+ rule_id, 13, "win.eventdata.sha256", "match_key", "cti-malware-hash",
+ "Execution of CTI-flagged malware hash", _top_techniques(hashes)))
+ rule_id += 1
+ if email:
+ rules.append(_list_rule(
+ rule_id, 9, "data.email", "match_key", "cti-leaked-email",
+ "Activity referencing a leaked credential", _top_techniques(email)))
+ rule_id += 1
+
+ body = "\n".join(rules)
+ return f'<group name="cti,threat-intel,auto-generated,">\n{body}\n</group>\n'
+
+
+def _list_rule(rule_id, level, field, lookup, list_name, description, techniques):
+ mitre = _mitre_block(techniques, " ")
+ return (
+ f' <rule id="{rule_id}" level="{level}">\n'
+ f' <field name="{field}" type="pcre2">\\S+</field>\n'
+ f' <list field="{field}" lookup="{lookup}">etc/lists/{list_name}</list>\n'
+ f' <description>{escape(description)}: $({field})</description>\n'
+ f'{mitre}'
+ f' </rule>'
+ )
+
+
+def _url_rule(rule_id, level, list_name, techniques):
+ mitre = _mitre_block(techniques, " ")
+ return (
+ f' <rule id="{rule_id}" level="{level}">\n'
+ f' <field name="url" type="pcre2">\\S+</field>\n'
+ f' <list field="url" lookup="match_key">etc/lists/{list_name}</list>\n'
+ f' <description>Request to CTI-flagged URL: $(url)</description>\n'
+ f'{mitre}'
+ f' </rule>'
+ )