Zion Boggan
repos/Detection As Code/tools/convert.py
zionboggan.com ↗
84 lines · python
History for this file →
1
from __future__ import annotations
2
 
3
import subprocess
4
import sys
5
from pathlib import Path
6
 
7
import yaml
8
 
9
ROOT = Path(__file__).resolve().parents[1]
10
RULES = ROOT / "rules"
11
DIST = ROOT / "dist"
12
 
13
BACKENDS = {
14
    "splunk": "conf",
15
    "esql": "esql",
16
    "kusto": "kql",
17
}
18
 
19
CATEGORY_PIPELINE = {
20
    "process_creation": "sysmon",
21
    "process_access": "sysmon",
22
    "image_load": "sysmon",
23
    "file_event": "sysmon",
24
    "network_connection": "sysmon",
25
    "dns_query": "sysmon",
26
}
27
SERVICE_PIPELINE = {
28
    "security": "windows-audit",
29
    "system": "windows-audit",
30
}
31
 
32
 
33
def pipeline_for(rule: dict) -> str | None:
34
    ls = rule.get("logsource", {})
35
    if ls.get("product") == "windows":
36
        if ls.get("category") in CATEGORY_PIPELINE:
37
            return CATEGORY_PIPELINE[ls["category"]]
38
        if ls.get("service") in SERVICE_PIPELINE:
39
            return SERVICE_PIPELINE[ls["service"]]
40
    return None
41
 
42
 
43
def convert(rule_path: Path, backend: str) -> tuple[bool, str]:
44
    try:
45
        rule = yaml.safe_load(rule_path.read_text())
46
    except yaml.YAMLError as exc:
47
        return False, f"yaml error: {exc}"
48
    cmd = ["sigma", "convert", "-t", backend, "-s"]
49
    pipeline = pipeline_for(rule or {})
50
    if pipeline:
51
        cmd += ["-p", pipeline]
52
    else:
53
        cmd += ["--without-pipeline"]
54
    cmd.append(str(rule_path))
55
    proc = subprocess.run(cmd, capture_output=True, text=True)
56
    if proc.returncode != 0:
57
        return False, proc.stderr.strip().splitlines()[-1] if proc.stderr else "convert failed"
58
    return True, proc.stdout.strip()
59
 
60
 
61
def main() -> int:
62
    rules = sorted(RULES.rglob("*.yml"))
63
    if not rules:
64
        print("no rules found", file=sys.stderr)
65
        return 1
66
    ok = skipped = 0
67
    for backend, ext in BACKENDS.items():
68
        for rule_path in rules:
69
            success, output = convert(rule_path, backend)
70
            rel = rule_path.relative_to(RULES).with_suffix(f".{ext}")
71
            if success and output:
72
                out = DIST / backend / rel
73
                out.parent.mkdir(parents=True, exist_ok=True)
74
                out.write_text(output + "\n")
75
                ok += 1
76
            else:
77
                skipped += 1
78
                print(f"  skip {backend}:{rule_path.name} - {output}")
79
    print(f"converted {ok}, skipped {skipped} (correlation rules skip on backends that don't support them)")
80
    return 0
81
 
82
 
83
if __name__ == "__main__":
84
    sys.exit(main())