| | @@ -0,0 +1,169 @@ |
| + | from __future__ import annotations |
| + | |
| + | import json |
| + | from datetime import datetime, timezone |
| + | from pathlib import Path |
| + | |
| + | from cti import approval, rules |
| + | from cti.dedup import deduplicate, filter_by_confidence |
| + | from cti.feeds import build_feed |
| + | from cti.models import Indicator, RuleBundle |
| + | from cti.ttp import coverage_report, extract_techniques |
| + | |
| + | CANDIDATES = "candidates" |
| + | ACTIVE = "active" |
| + | STATE_FILE = "state.json" |
| + | |
| + | |
| + | def collect_indicators(config: dict) -> list[Indicator]: |
| + | fixtures_dir = Path(config["fixtures_dir"]) if config.get("use_fixtures") else None |
| + | indicators: list[Indicator] = [] |
| + | for name, settings in config["feeds"].items(): |
| + | if not settings.get("enabled"): |
| + | continue |
| + | feed = build_feed(name, settings) |
| + | try: |
| + | indicators.extend(feed.collect(fixtures_dir)) |
| + | except FileNotFoundError: |
| + | continue |
| + | return indicators |
| + | |
| + | |
| + | def build_bundle(config: dict) -> RuleBundle: |
| + | raw = collect_indicators(config) |
| + | merged = deduplicate(raw) |
| + | kept = filter_by_confidence(merged, config["min_confidence"]) |
| + | techniques = extract_techniques(kept) |
| + | cdb_lists = rules.build_cdb_lists(kept) |
| + | rules_xml = rules.build_rules_xml(kept, base_id=config["rules"]["base_id"]) |
| + | bundle_id = datetime.now(timezone.utc).strftime("cti-%Y%m%d-%H%M%S") |
| + | return RuleBundle( |
| + | bundle_id=bundle_id, |
| + | generated_at=RuleBundle.now_iso(), |
| + | indicators=kept, |
| + | techniques=techniques, |
| + | cdb_lists=cdb_lists, |
| + | rules_xml=rules_xml, |
| + | ) |
| + | |
| + | |
| + | def _active_keys(output_dir: Path) -> set[str]: |
| + | state = output_dir / ACTIVE / STATE_FILE |
| + | if not state.exists(): |
| + | return set() |
| + | data = json.loads(state.read_text(encoding="utf-8")) |
| + | return set(data.get("keys", [])) |
| + | |
| + | |
| + | def compute_diff(bundle: RuleBundle, output_dir: Path) -> dict: |
| + | candidate_keys = {f"{i.type}:{i.value.lower()}" for i in bundle.indicators} |
| + | active_keys = _active_keys(output_dir) |
| + | added = candidate_keys - active_keys |
| + | removed = active_keys - candidate_keys |
| + | return { |
| + | "added": len(added), |
| + | "removed": len(removed), |
| + | "unchanged": len(candidate_keys & active_keys), |
| + | "total": len(candidate_keys), |
| + | } |
| + | |
| + | |
| + | def write_candidate(bundle: RuleBundle, output_dir: Path, diff: dict) -> Path: |
| + | target = output_dir / CANDIDATES / bundle.bundle_id |
| + | (target / "lists").mkdir(parents=True, exist_ok=True) |
| + | for name, content in bundle.cdb_lists.items(): |
| + | (target / "lists" / name).write_text(content, encoding="utf-8") |
| + | (target / "local_cti_rules.xml").write_text(bundle.rules_xml, encoding="utf-8") |
| + | (target / "ttp_coverage.md").write_text( |
| + | coverage_report(bundle.techniques), encoding="utf-8" |
| + | ) |
| + | manifest = bundle.manifest() |
| + | manifest["diff"] = diff |
| + | (target / "manifest.json").write_text( |
| + | json.dumps(manifest, indent=2), encoding="utf-8" |
| + | ) |
| + | keys = sorted(f"{i.type}:{i.value.lower()}" for i in bundle.indicators) |
| + | (target / "keys.json").write_text(json.dumps({"keys": keys}), encoding="utf-8") |
| + | return target |
| + | |
| + | |
| + | def email_context(bundle: RuleBundle, diff: dict, config: dict) -> dict: |
| + | secret = config["approval"].get("secret", "insecure-dev-secret") |
| + | token = approval.make_token(secret, bundle.bundle_id) |
| + | base = config["approval"]["base_url"].rstrip("/") |
| + | top_malware = _top_malware(bundle.indicators) |
| + | return { |
| + | "bundle_id": bundle.bundle_id, |
| + | "generated_at": bundle.generated_at, |
| + | "counts": bundle.counts_by_type(), |
| + | "diff": diff, |
| + | "techniques": [t.to_dict() for t in bundle.techniques[:12]], |
| + | "top_malware": top_malware, |
| + | "review_url": f"{base}/review/{token}", |
| + | "list_sizes": {name: content.count(chr(10)) for name, content in bundle.cdb_lists.items()}, |
| + | } |
| + | |
| + | |
| + | def _top_malware(indicators: list[Indicator], limit: int = 6) -> list[tuple[str, int]]: |
| + | counts: dict[str, int] = {} |
| + | for indicator in indicators: |
| + | if indicator.malware: |
| + | counts[indicator.malware] = counts.get(indicator.malware, 0) + 1 |
| + | return sorted(counts.items(), key=lambda kv: (-kv[1], kv[0]))[:limit] |
| + | |
| + | |
| + | def promote(bundle_id: str, output_dir: Path, active_dir: Path | None = None) -> dict: |
| + | candidate = output_dir / CANDIDATES / bundle_id |
| + | if not candidate.exists(): |
| + | raise FileNotFoundError(bundle_id) |
| + | active = output_dir / ACTIVE |
| + | (active / "lists").mkdir(parents=True, exist_ok=True) |
| + | for list_file in (candidate / "lists").iterdir(): |
| + | (active / "lists" / list_file.name).write_text( |
| + | list_file.read_text(encoding="utf-8"), encoding="utf-8" |
| + | ) |
| + | (active / "local_cti_rules.xml").write_text( |
| + | (candidate / "local_cti_rules.xml").read_text(encoding="utf-8"), encoding="utf-8" |
| + | ) |
| + | keys = json.loads((candidate / "keys.json").read_text(encoding="utf-8"))["keys"] |
| + | (active / STATE_FILE).write_text( |
| + | json.dumps({"bundle_id": bundle_id, "keys": keys}), encoding="utf-8" |
| + | ) |
| + | if active_dir is not None: |
| + | _deploy_to_wazuh(candidate, active_dir) |
| + | return {"bundle_id": bundle_id, "deployed": active_dir is not None, "indicators": len(keys)} |
| + | |
| + | |
| + | def _deploy_to_wazuh(candidate: Path, active_dir: Path) -> None: |
| + | lists_dir = active_dir / "lists" |
| + | rules_dir = active_dir / "rules" |
| + | lists_dir.mkdir(parents=True, exist_ok=True) |
| + | rules_dir.mkdir(parents=True, exist_ok=True) |
| + | for list_file in (candidate / "lists").iterdir(): |
| + | (lists_dir / list_file.name).write_text( |
| + | list_file.read_text(encoding="utf-8"), encoding="utf-8" |
| + | ) |
| + | (rules_dir / "local_cti_rules.xml").write_text( |
| + | (candidate / "local_cti_rules.xml").read_text(encoding="utf-8"), encoding="utf-8" |
| + | ) |
| + | |
| + | |
| + | def run(config: dict) -> dict: |
| + | output_dir = Path(config["output_dir"]) |
| + | bundle = build_bundle(config) |
| + | diff = compute_diff(bundle, output_dir) |
| + | path = write_candidate(bundle, output_dir, diff) |
| + | context = email_context(bundle, diff, config) |
| + | html = approval.render_email(context) |
| + | target = approval.send_email( |
| + | config, f"CTI rule approval {bundle.bundle_id}", html, output_dir |
| + | ) |
| + | return { |
| + | "bundle_id": bundle.bundle_id, |
| + | "candidate_path": str(path), |
| + | "indicators": len(bundle.indicators), |
| + | "techniques": len(bundle.techniques), |
| + | "diff": diff, |
| + | "email_sent_to": target, |
| + | "review_url": context["review_url"], |
| + | } |