Zion Boggan
repos/CTI Detection Automation/src/cti/pipeline.py
zionboggan.com ↗
169 lines · python
History for this file →
1
from __future__ import annotations
2
 
3
import json
4
from datetime import datetime, timezone
5
from pathlib import Path
6
 
7
from cti import approval, rules
8
from cti.dedup import deduplicate, filter_by_confidence
9
from cti.feeds import build_feed
10
from cti.models import Indicator, RuleBundle
11
from cti.ttp import coverage_report, extract_techniques
12
 
13
CANDIDATES = "candidates"
14
ACTIVE = "active"
15
STATE_FILE = "state.json"
16
 
17
 
18
def collect_indicators(config: dict) -> list[Indicator]:
19
    fixtures_dir = Path(config["fixtures_dir"]) if config.get("use_fixtures") else None
20
    indicators: list[Indicator] = []
21
    for name, settings in config["feeds"].items():
22
        if not settings.get("enabled"):
23
            continue
24
        feed = build_feed(name, settings)
25
        try:
26
            indicators.extend(feed.collect(fixtures_dir))
27
        except FileNotFoundError:
28
            continue
29
    return indicators
30
 
31
 
32
def build_bundle(config: dict) -> RuleBundle:
33
    raw = collect_indicators(config)
34
    merged = deduplicate(raw)
35
    kept = filter_by_confidence(merged, config["min_confidence"])
36
    techniques = extract_techniques(kept)
37
    cdb_lists = rules.build_cdb_lists(kept)
38
    rules_xml = rules.build_rules_xml(kept, base_id=config["rules"]["base_id"])
39
    bundle_id = datetime.now(timezone.utc).strftime("cti-%Y%m%d-%H%M%S")
40
    return RuleBundle(
41
        bundle_id=bundle_id,
42
        generated_at=RuleBundle.now_iso(),
43
        indicators=kept,
44
        techniques=techniques,
45
        cdb_lists=cdb_lists,
46
        rules_xml=rules_xml,
47
    )
48
 
49
 
50
def _active_keys(output_dir: Path) -> set[str]:
51
    state = output_dir / ACTIVE / STATE_FILE
52
    if not state.exists():
53
        return set()
54
    data = json.loads(state.read_text(encoding="utf-8"))
55
    return set(data.get("keys", []))
56
 
57
 
58
def compute_diff(bundle: RuleBundle, output_dir: Path) -> dict:
59
    candidate_keys = {f"{i.type}:{i.value.lower()}" for i in bundle.indicators}
60
    active_keys = _active_keys(output_dir)
61
    added = candidate_keys - active_keys
62
    removed = active_keys - candidate_keys
63
    return {
64
        "added": len(added),
65
        "removed": len(removed),
66
        "unchanged": len(candidate_keys & active_keys),
67
        "total": len(candidate_keys),
68
    }
69
 
70
 
71
def write_candidate(bundle: RuleBundle, output_dir: Path, diff: dict) -> Path:
72
    target = output_dir / CANDIDATES / bundle.bundle_id
73
    (target / "lists").mkdir(parents=True, exist_ok=True)
74
    for name, content in bundle.cdb_lists.items():
75
        (target / "lists" / name).write_text(content, encoding="utf-8")
76
    (target / "local_cti_rules.xml").write_text(bundle.rules_xml, encoding="utf-8")
77
    (target / "ttp_coverage.md").write_text(
78
        coverage_report(bundle.techniques), encoding="utf-8"
79
    )
80
    manifest = bundle.manifest()
81
    manifest["diff"] = diff
82
    (target / "manifest.json").write_text(
83
        json.dumps(manifest, indent=2), encoding="utf-8"
84
    )
85
    keys = sorted(f"{i.type}:{i.value.lower()}" for i in bundle.indicators)
86
    (target / "keys.json").write_text(json.dumps({"keys": keys}), encoding="utf-8")
87
    return target
88
 
89
 
90
def email_context(bundle: RuleBundle, diff: dict, config: dict) -> dict:
91
    secret = config["approval"].get("secret", "insecure-dev-secret")
92
    token = approval.make_token(secret, bundle.bundle_id)
93
    base = config["approval"]["base_url"].rstrip("/")
94
    top_malware = _top_malware(bundle.indicators)
95
    return {
96
        "bundle_id": bundle.bundle_id,
97
        "generated_at": bundle.generated_at,
98
        "counts": bundle.counts_by_type(),
99
        "diff": diff,
100
        "techniques": [t.to_dict() for t in bundle.techniques[:12]],
101
        "top_malware": top_malware,
102
        "review_url": f"{base}/review/{token}",
103
        "list_sizes": {name: content.count(chr(10)) for name, content in bundle.cdb_lists.items()},
104
    }
105
 
106
 
107
def _top_malware(indicators: list[Indicator], limit: int = 6) -> list[tuple[str, int]]:
108
    counts: dict[str, int] = {}
109
    for indicator in indicators:
110
        if indicator.malware:
111
            counts[indicator.malware] = counts.get(indicator.malware, 0) + 1
112
    return sorted(counts.items(), key=lambda kv: (-kv[1], kv[0]))[:limit]
113
 
114
 
115
def promote(bundle_id: str, output_dir: Path, active_dir: Path | None = None) -> dict:
116
    candidate = output_dir / CANDIDATES / bundle_id
117
    if not candidate.exists():
118
        raise FileNotFoundError(bundle_id)
119
    active = output_dir / ACTIVE
120
    (active / "lists").mkdir(parents=True, exist_ok=True)
121
    for list_file in (candidate / "lists").iterdir():
122
        (active / "lists" / list_file.name).write_text(
123
            list_file.read_text(encoding="utf-8"), encoding="utf-8"
124
        )
125
    (active / "local_cti_rules.xml").write_text(
126
        (candidate / "local_cti_rules.xml").read_text(encoding="utf-8"), encoding="utf-8"
127
    )
128
    keys = json.loads((candidate / "keys.json").read_text(encoding="utf-8"))["keys"]
129
    (active / STATE_FILE).write_text(
130
        json.dumps({"bundle_id": bundle_id, "keys": keys}), encoding="utf-8"
131
    )
132
    if active_dir is not None:
133
        _deploy_to_wazuh(candidate, active_dir)
134
    return {"bundle_id": bundle_id, "deployed": active_dir is not None, "indicators": len(keys)}
135
 
136
 
137
def _deploy_to_wazuh(candidate: Path, active_dir: Path) -> None:
138
    lists_dir = active_dir / "lists"
139
    rules_dir = active_dir / "rules"
140
    lists_dir.mkdir(parents=True, exist_ok=True)
141
    rules_dir.mkdir(parents=True, exist_ok=True)
142
    for list_file in (candidate / "lists").iterdir():
143
        (lists_dir / list_file.name).write_text(
144
            list_file.read_text(encoding="utf-8"), encoding="utf-8"
145
        )
146
    (rules_dir / "local_cti_rules.xml").write_text(
147
        (candidate / "local_cti_rules.xml").read_text(encoding="utf-8"), encoding="utf-8"
148
    )
149
 
150
 
151
def run(config: dict) -> dict:
152
    output_dir = Path(config["output_dir"])
153
    bundle = build_bundle(config)
154
    diff = compute_diff(bundle, output_dir)
155
    path = write_candidate(bundle, output_dir, diff)
156
    context = email_context(bundle, diff, config)
157
    html = approval.render_email(context)
158
    target = approval.send_email(
159
        config, f"CTI rule approval {bundle.bundle_id}", html, output_dir
160
    )
161
    return {
162
        "bundle_id": bundle.bundle_id,
163
        "candidate_path": str(path),
164
        "indicators": len(bundle.indicators),
165
        "techniques": len(bundle.techniques),
166
        "diff": diff,
167
        "email_sent_to": target,
168
        "review_url": context["review_url"],
169
    }