Zion Boggan zionboggan.com ↗
120 lines · python
History for this file →
1
from __future__ import annotations
2
 
3
from collections import Counter
4
from xml.sax.saxutils import escape
5
 
6
from cti.models import Indicator
7
 
8
LIST_DEFINITIONS = [
9
    ("cti-malicious-ip", {"ip"}),
10
    ("cti-malicious-domain", {"domain"}),
11
    ("cti-malicious-url", {"url"}),
12
    ("cti-malware-hash", {"sha256", "md5", "sha1"}),
13
    ("cti-leaked-email", {"email"}),
14
]
15
 
16
 
17
def build_cdb_lists(indicators: list[Indicator]) -> dict[str, str]:
18
    lists: dict[str, list[str]] = {name: [] for name, _ in LIST_DEFINITIONS}
19
    seen: dict[str, set[str]] = {name: set() for name, _ in LIST_DEFINITIONS}
20
    for indicator in indicators:
21
        for name, types in LIST_DEFINITIONS:
22
            if indicator.type in types:
23
                value = indicator.value
24
                if value in seen[name]:
25
                    break
26
                seen[name].add(value)
27
                label = indicator.malware or indicator.threat_type or "cti"
28
                lists[name].append(f"{value}:{_sanitize(label)}")
29
                break
30
    return {
31
        name: "\n".join(sorted(entries)) + ("\n" if entries else "")
32
        for name, entries in lists.items()
33
    }
34
 
35
 
36
def _sanitize(label: str) -> str:
37
    cleaned = label.replace(":", "-").replace("\n", " ").strip()
38
    return cleaned[:48] or "cti"
39
 
40
 
41
def _top_techniques(indicators: list[Indicator], limit: int = 4) -> list[str]:
42
    counter: Counter[str] = Counter()
43
    for indicator in indicators:
44
        counter.update(indicator.techniques)
45
    return [technique for technique, _ in counter.most_common(limit)]
46
 
47
 
48
def _mitre_block(techniques: list[str], indent: str) -> str:
49
    if not techniques:
50
        return ""
51
    ids = "\n".join(f"{indent}  <id>{escape(t)}</id>" for t in techniques)
52
    return f"{indent}<mitre>\n{ids}\n{indent}</mitre>\n"
53
 
54
 
55
def build_rules_xml(indicators: list[Indicator], base_id: int = 100300) -> str:
56
    by_type: dict[str, list[Indicator]] = {}
57
    for indicator in indicators:
58
        by_type.setdefault(indicator.type, []).append(indicator)
59
 
60
    ip = by_type.get("ip", [])
61
    domain = by_type.get("domain", [])
62
    url = by_type.get("url", [])
63
    hashes = by_type.get("sha256", []) + by_type.get("md5", []) + by_type.get("sha1", [])
64
    email = by_type.get("email", [])
65
 
66
    rules: list[str] = []
67
    rule_id = base_id
68
 
69
    if ip:
70
        rules.append(_list_rule(
71
            rule_id, 12, "dstip", "address_match_key", "cti-malicious-ip",
72
            "Outbound connection to CTI-flagged IP", _top_techniques(ip)))
73
        rule_id += 1
74
        rules.append(_list_rule(
75
            rule_id, 10, "srcip", "address_match_key", "cti-malicious-ip",
76
            "Inbound connection from CTI-flagged IP", _top_techniques(ip)))
77
        rule_id += 1
78
    if domain:
79
        rules.append(_list_rule(
80
            rule_id, 12, "win.eventdata.queryName", "match_key", "cti-malicious-domain",
81
            "DNS query for CTI-flagged domain", _top_techniques(domain)))
82
        rule_id += 1
83
    if url:
84
        rules.append(_url_rule(rule_id, 12, "cti-malicious-url", _top_techniques(url)))
85
        rule_id += 1
86
    if hashes:
87
        rules.append(_list_rule(
88
            rule_id, 13, "win.eventdata.sha256", "match_key", "cti-malware-hash",
89
            "Execution of CTI-flagged malware hash", _top_techniques(hashes)))
90
        rule_id += 1
91
    if email:
92
        rules.append(_list_rule(
93
            rule_id, 9, "data.email", "match_key", "cti-leaked-email",
94
            "Activity referencing a leaked credential", _top_techniques(email)))
95
        rule_id += 1
96
 
97
    body = "\n".join(rules)
98
    return f'<group name="cti,threat-intel,auto-generated,">\n{body}\n</group>\n'
99
 
100
 
101
def _list_rule(rule_id, level, field, lookup, list_name, description, techniques):
102
    mitre = _mitre_block(techniques, "    ")
103
    return (
104
        f'  <rule id="{rule_id}" level="{level}">\n'
105
        f'    <list field="{field}" lookup="{lookup}">etc/lists/{list_name}</list>\n'
106
        f'    <description>{escape(description)}: $({field})</description>\n'
107
        f'{mitre}'
108
        f'  </rule>'
109
    )
110
 
111
 
112
def _url_rule(rule_id, level, list_name, techniques):
113
    mitre = _mitre_block(techniques, "    ")
114
    return (
115
        f'  <rule id="{rule_id}" level="{level}">\n'
116
        f'    <list field="url" lookup="match_key">etc/lists/{list_name}</list>\n'
117
        f'    <description>Request to CTI-flagged URL: $(url)</description>\n'
118
        f'{mitre}'
119
        f'  </rule>'
120
    )