| 1 | from __future__ import annotations |
| 2 | |
| 3 | import subprocess |
| 4 | import sys |
| 5 | from pathlib import Path |
| 6 | |
| 7 | import yaml |
| 8 | |
| 9 | ROOT = Path(__file__).resolve().parents[1] |
| 10 | RULES = ROOT / "rules" |
| 11 | DIST = ROOT / "dist" |
| 12 | |
| 13 | BACKENDS = { |
| 14 | "splunk": "conf", |
| 15 | "esql": "esql", |
| 16 | "kusto": "kql", |
| 17 | } |
| 18 | |
| 19 | CATEGORY_PIPELINE = { |
| 20 | "process_creation": "sysmon", |
| 21 | "process_access": "sysmon", |
| 22 | "image_load": "sysmon", |
| 23 | "file_event": "sysmon", |
| 24 | "network_connection": "sysmon", |
| 25 | "dns_query": "sysmon", |
| 26 | } |
| 27 | SERVICE_PIPELINE = { |
| 28 | "security": "windows-audit", |
| 29 | "system": "windows-audit", |
| 30 | } |
| 31 | |
| 32 | |
| 33 | def pipeline_for(rule: dict) -> str | None: |
| 34 | ls = rule.get("logsource", {}) |
| 35 | if ls.get("product") == "windows": |
| 36 | if ls.get("category") in CATEGORY_PIPELINE: |
| 37 | return CATEGORY_PIPELINE[ls["category"]] |
| 38 | if ls.get("service") in SERVICE_PIPELINE: |
| 39 | return SERVICE_PIPELINE[ls["service"]] |
| 40 | return None |
| 41 | |
| 42 | |
| 43 | def convert(rule_path: Path, backend: str) -> tuple[bool, str]: |
| 44 | try: |
| 45 | rule = yaml.safe_load(rule_path.read_text()) |
| 46 | except yaml.YAMLError as exc: |
| 47 | return False, f"yaml error: {exc}" |
| 48 | cmd = ["sigma", "convert", "-t", backend, "-s"] |
| 49 | pipeline = pipeline_for(rule or {}) |
| 50 | if pipeline: |
| 51 | cmd += ["-p", pipeline] |
| 52 | else: |
| 53 | cmd += ["--without-pipeline"] |
| 54 | cmd.append(str(rule_path)) |
| 55 | proc = subprocess.run(cmd, capture_output=True, text=True) |
| 56 | if proc.returncode != 0: |
| 57 | return False, proc.stderr.strip().splitlines()[-1] if proc.stderr else "convert failed" |
| 58 | return True, proc.stdout.strip() |
| 59 | |
| 60 | |
| 61 | def main() -> int: |
| 62 | rules = sorted(RULES.rglob("*.yml")) |
| 63 | if not rules: |
| 64 | print("no rules found", file=sys.stderr) |
| 65 | return 1 |
| 66 | ok = skipped = 0 |
| 67 | for backend, ext in BACKENDS.items(): |
| 68 | for rule_path in rules: |
| 69 | success, output = convert(rule_path, backend) |
| 70 | rel = rule_path.relative_to(RULES).with_suffix(f".{ext}") |
| 71 | if success and output: |
| 72 | out = DIST / backend / rel |
| 73 | out.parent.mkdir(parents=True, exist_ok=True) |
| 74 | out.write_text(output + "\n") |
| 75 | ok += 1 |
| 76 | else: |
| 77 | skipped += 1 |
| 78 | print(f" skip {backend}:{rule_path.name} - {output}") |
| 79 | print(f"converted {ok}, skipped {skipped} (correlation rules skip on backends that don't support them)") |
| 80 | return 0 |
| 81 | |
| 82 | |
| 83 | if __name__ == "__main__": |
| 84 | sys.exit(main()) |