| 1 | from __future__ import annotations |
| 2 | |
| 3 | import json |
| 4 | from datetime import datetime, timezone |
| 5 | from pathlib import Path |
| 6 | |
| 7 | from cti import approval, rules |
| 8 | from cti.dedup import deduplicate, filter_by_confidence |
| 9 | from cti.feeds import build_feed |
| 10 | from cti.models import Indicator, RuleBundle |
| 11 | from cti.ttp import coverage_report, extract_techniques |
| 12 | |
| 13 | CANDIDATES = "candidates" |
| 14 | ACTIVE = "active" |
| 15 | STATE_FILE = "state.json" |
| 16 | |
| 17 | |
| 18 | def collect_indicators(config: dict) -> list[Indicator]: |
| 19 | fixtures_dir = Path(config["fixtures_dir"]) if config.get("use_fixtures") else None |
| 20 | indicators: list[Indicator] = [] |
| 21 | for name, settings in config["feeds"].items(): |
| 22 | if not settings.get("enabled"): |
| 23 | continue |
| 24 | feed = build_feed(name, settings) |
| 25 | try: |
| 26 | indicators.extend(feed.collect(fixtures_dir)) |
| 27 | except FileNotFoundError: |
| 28 | continue |
| 29 | return indicators |
| 30 | |
| 31 | |
| 32 | def build_bundle(config: dict) -> RuleBundle: |
| 33 | raw = collect_indicators(config) |
| 34 | merged = deduplicate(raw) |
| 35 | kept = filter_by_confidence(merged, config["min_confidence"]) |
| 36 | techniques = extract_techniques(kept) |
| 37 | cdb_lists = rules.build_cdb_lists(kept) |
| 38 | rules_xml = rules.build_rules_xml(kept, base_id=config["rules"]["base_id"]) |
| 39 | bundle_id = datetime.now(timezone.utc).strftime("cti-%Y%m%d-%H%M%S") |
| 40 | return RuleBundle( |
| 41 | bundle_id=bundle_id, |
| 42 | generated_at=RuleBundle.now_iso(), |
| 43 | indicators=kept, |
| 44 | techniques=techniques, |
| 45 | cdb_lists=cdb_lists, |
| 46 | rules_xml=rules_xml, |
| 47 | ) |
| 48 | |
| 49 | |
| 50 | def _active_keys(output_dir: Path) -> set[str]: |
| 51 | state = output_dir / ACTIVE / STATE_FILE |
| 52 | if not state.exists(): |
| 53 | return set() |
| 54 | data = json.loads(state.read_text(encoding="utf-8")) |
| 55 | return set(data.get("keys", [])) |
| 56 | |
| 57 | |
| 58 | def compute_diff(bundle: RuleBundle, output_dir: Path) -> dict: |
| 59 | candidate_keys = {f"{i.type}:{i.value.lower()}" for i in bundle.indicators} |
| 60 | active_keys = _active_keys(output_dir) |
| 61 | added = candidate_keys - active_keys |
| 62 | removed = active_keys - candidate_keys |
| 63 | return { |
| 64 | "added": len(added), |
| 65 | "removed": len(removed), |
| 66 | "unchanged": len(candidate_keys & active_keys), |
| 67 | "total": len(candidate_keys), |
| 68 | } |
| 69 | |
| 70 | |
| 71 | def write_candidate(bundle: RuleBundle, output_dir: Path, diff: dict) -> Path: |
| 72 | target = output_dir / CANDIDATES / bundle.bundle_id |
| 73 | (target / "lists").mkdir(parents=True, exist_ok=True) |
| 74 | for name, content in bundle.cdb_lists.items(): |
| 75 | (target / "lists" / name).write_text(content, encoding="utf-8") |
| 76 | (target / "local_cti_rules.xml").write_text(bundle.rules_xml, encoding="utf-8") |
| 77 | (target / "ttp_coverage.md").write_text( |
| 78 | coverage_report(bundle.techniques), encoding="utf-8" |
| 79 | ) |
| 80 | manifest = bundle.manifest() |
| 81 | manifest["diff"] = diff |
| 82 | (target / "manifest.json").write_text( |
| 83 | json.dumps(manifest, indent=2), encoding="utf-8" |
| 84 | ) |
| 85 | keys = sorted(f"{i.type}:{i.value.lower()}" for i in bundle.indicators) |
| 86 | (target / "keys.json").write_text(json.dumps({"keys": keys}), encoding="utf-8") |
| 87 | return target |
| 88 | |
| 89 | |
| 90 | def email_context(bundle: RuleBundle, diff: dict, config: dict) -> dict: |
| 91 | secret = config["approval"].get("secret", "insecure-dev-secret") |
| 92 | token = approval.make_token(secret, bundle.bundle_id) |
| 93 | base = config["approval"]["base_url"].rstrip("/") |
| 94 | top_malware = _top_malware(bundle.indicators) |
| 95 | return { |
| 96 | "bundle_id": bundle.bundle_id, |
| 97 | "generated_at": bundle.generated_at, |
| 98 | "counts": bundle.counts_by_type(), |
| 99 | "diff": diff, |
| 100 | "techniques": [t.to_dict() for t in bundle.techniques[:12]], |
| 101 | "top_malware": top_malware, |
| 102 | "review_url": f"{base}/review/{token}", |
| 103 | "list_sizes": {name: content.count(chr(10)) for name, content in bundle.cdb_lists.items()}, |
| 104 | } |
| 105 | |
| 106 | |
| 107 | def _top_malware(indicators: list[Indicator], limit: int = 6) -> list[tuple[str, int]]: |
| 108 | counts: dict[str, int] = {} |
| 109 | for indicator in indicators: |
| 110 | if indicator.malware: |
| 111 | counts[indicator.malware] = counts.get(indicator.malware, 0) + 1 |
| 112 | return sorted(counts.items(), key=lambda kv: (-kv[1], kv[0]))[:limit] |
| 113 | |
| 114 | |
| 115 | def promote(bundle_id: str, output_dir: Path, active_dir: Path | None = None) -> dict: |
| 116 | candidate = output_dir / CANDIDATES / bundle_id |
| 117 | if not candidate.exists(): |
| 118 | raise FileNotFoundError(bundle_id) |
| 119 | active = output_dir / ACTIVE |
| 120 | (active / "lists").mkdir(parents=True, exist_ok=True) |
| 121 | for list_file in (candidate / "lists").iterdir(): |
| 122 | (active / "lists" / list_file.name).write_text( |
| 123 | list_file.read_text(encoding="utf-8"), encoding="utf-8" |
| 124 | ) |
| 125 | (active / "local_cti_rules.xml").write_text( |
| 126 | (candidate / "local_cti_rules.xml").read_text(encoding="utf-8"), encoding="utf-8" |
| 127 | ) |
| 128 | keys = json.loads((candidate / "keys.json").read_text(encoding="utf-8"))["keys"] |
| 129 | (active / STATE_FILE).write_text( |
| 130 | json.dumps({"bundle_id": bundle_id, "keys": keys}), encoding="utf-8" |
| 131 | ) |
| 132 | if active_dir is not None: |
| 133 | _deploy_to_wazuh(candidate, active_dir) |
| 134 | return {"bundle_id": bundle_id, "deployed": active_dir is not None, "indicators": len(keys)} |
| 135 | |
| 136 | |
| 137 | def _deploy_to_wazuh(candidate: Path, active_dir: Path) -> None: |
| 138 | lists_dir = active_dir / "lists" |
| 139 | rules_dir = active_dir / "rules" |
| 140 | lists_dir.mkdir(parents=True, exist_ok=True) |
| 141 | rules_dir.mkdir(parents=True, exist_ok=True) |
| 142 | for list_file in (candidate / "lists").iterdir(): |
| 143 | (lists_dir / list_file.name).write_text( |
| 144 | list_file.read_text(encoding="utf-8"), encoding="utf-8" |
| 145 | ) |
| 146 | (rules_dir / "local_cti_rules.xml").write_text( |
| 147 | (candidate / "local_cti_rules.xml").read_text(encoding="utf-8"), encoding="utf-8" |
| 148 | ) |
| 149 | |
| 150 | |
| 151 | def run(config: dict) -> dict: |
| 152 | output_dir = Path(config["output_dir"]) |
| 153 | bundle = build_bundle(config) |
| 154 | diff = compute_diff(bundle, output_dir) |
| 155 | path = write_candidate(bundle, output_dir, diff) |
| 156 | context = email_context(bundle, diff, config) |
| 157 | html = approval.render_email(context) |
| 158 | target = approval.send_email( |
| 159 | config, f"CTI rule approval {bundle.bundle_id}", html, output_dir |
| 160 | ) |
| 161 | return { |
| 162 | "bundle_id": bundle.bundle_id, |
| 163 | "candidate_path": str(path), |
| 164 | "indicators": len(bundle.indicators), |
| 165 | "techniques": len(bundle.techniques), |
| 166 | "diff": diff, |
| 167 | "email_sent_to": target, |
| 168 | "review_url": context["review_url"], |
| 169 | } |