Zion Boggan zionboggan.com ↗

convert tool: compile rules to splunk / elastic / sentinel

382d0fb   Zion Boggan committed on May 29, 2026 (3 weeks ago)
Makefile +21 -0
@@ -0,0 +1,21 @@
+.PHONY: install lint test convert correlations all clean
+
+install:
+ pip install -r requirements.txt
+
+lint:
+ sigma check rules/
+
+test:
+ pytest -q
+
+convert:
+ python tools/convert.py
+
+correlations:
+ sigma convert -t splunk --without-pipeline rules/linux/credential-access/
+
+all: lint test convert
+
+clean:
+ rm -rf dist .pytest_cache
tools/convert.py +84 -0
@@ -0,0 +1,84 @@
+from __future__ import annotations
+
+import subprocess
+import sys
+from pathlib import Path
+
+import yaml
+
+ROOT = Path(__file__).resolve().parents[1]
+RULES = ROOT / "rules"
+DIST = ROOT / "dist"
+
+BACKENDS = {
+ "splunk": "conf",
+ "esql": "esql",
+ "kusto": "kql",
+}
+
+CATEGORY_PIPELINE = {
+ "process_creation": "sysmon",
+ "process_access": "sysmon",
+ "image_load": "sysmon",
+ "file_event": "sysmon",
+ "network_connection": "sysmon",
+ "dns_query": "sysmon",
+}
+SERVICE_PIPELINE = {
+ "security": "windows-audit",
+ "system": "windows-audit",
+}
+
+
+def pipeline_for(rule: dict) -> str | None:
+ ls = rule.get("logsource", {})
+ if ls.get("product") == "windows":
+ if ls.get("category") in CATEGORY_PIPELINE:
+ return CATEGORY_PIPELINE[ls["category"]]
+ if ls.get("service") in SERVICE_PIPELINE:
+ return SERVICE_PIPELINE[ls["service"]]
+ return None
+
+
+def convert(rule_path: Path, backend: str) -> tuple[bool, str]:
+ try:
+ rule = yaml.safe_load(rule_path.read_text())
+ except yaml.YAMLError as exc:
+ return False, f"yaml error: {exc}"
+ cmd = ["sigma", "convert", "-t", backend, "-s"]
+ pipeline = pipeline_for(rule or {})
+ if pipeline:
+ cmd += ["-p", pipeline]
+ else:
+ cmd += ["--without-pipeline"]
+ cmd.append(str(rule_path))
+ proc = subprocess.run(cmd, capture_output=True, text=True)
+ if proc.returncode != 0:
+ return False, proc.stderr.strip().splitlines()[-1] if proc.stderr else "convert failed"
+ return True, proc.stdout.strip()
+
+
+def main() -> int:
+ rules = sorted(RULES.rglob("*.yml"))
+ if not rules:
+ print("no rules found", file=sys.stderr)
+ return 1
+ ok = skipped = 0
+ for backend, ext in BACKENDS.items():
+ for rule_path in rules:
+ success, output = convert(rule_path, backend)
+ rel = rule_path.relative_to(RULES).with_suffix(f".{ext}")
+ if success and output:
+ out = DIST / backend / rel
+ out.parent.mkdir(parents=True, exist_ok=True)
+ out.write_text(output + "\n")
+ ok += 1
+ else:
+ skipped += 1
+ print(f" skip {backend}:{rule_path.name} - {output}")
+ print(f"converted {ok}, skipped {skipped} (correlation rules skip on backends that don't support them)")
+ return 0
+
+
+if __name__ == "__main__":
+ sys.exit(main())