| 1 | import re |
| 2 | from pathlib import Path |
| 3 | |
| 4 | import pytest |
| 5 | import yaml |
| 6 | |
| 7 | RULES = sorted((Path(__file__).resolve().parents[1] / "rules").rglob("*.yml")) |
| 8 | TACTIC_TAGS = { |
| 9 | "attack.reconnaissance", "attack.resource_development", "attack.initial_access", |
| 10 | "attack.execution", "attack.persistence", "attack.privilege_escalation", |
| 11 | "attack.defense_evasion", "attack.credential_access", "attack.discovery", |
| 12 | "attack.lateral_movement", "attack.collection", "attack.command_and_control", |
| 13 | "attack.exfiltration", "attack.impact", |
| 14 | } |
| 15 | TECHNIQUE_RE = re.compile(r"^attack\.t\d{4}(\.\d{3})?$") |
| 16 | |
| 17 | |
| 18 | def load(path): |
| 19 | return yaml.safe_load(path.read_text()) |
| 20 | |
| 21 | |
| 22 | def test_rules_exist(): |
| 23 | assert RULES, "no rules found" |
| 24 | |
| 25 | |
| 26 | @pytest.mark.parametrize("path", RULES, ids=[p.name for p in RULES]) |
| 27 | def test_rule_schema(path): |
| 28 | rule = load(path) |
| 29 | for field in ("title", "id", "status", "description", "tags", "level"): |
| 30 | assert rule.get(field), f"{path.name} missing {field}" |
| 31 | assert "detection" in rule or "correlation" in rule, f"{path.name} has no detection/correlation" |
| 32 | assert rule["level"] in {"informational", "low", "medium", "high", "critical"} |
| 33 | assert rule["status"] in {"experimental", "test", "stable", "deprecated", "unsupported"} |
| 34 | |
| 35 | |
| 36 | @pytest.mark.parametrize("path", RULES, ids=[p.name for p in RULES]) |
| 37 | def test_attack_tags(path): |
| 38 | tags = load(path).get("tags", []) |
| 39 | techniques = [t for t in tags if TECHNIQUE_RE.match(t)] |
| 40 | assert techniques, f"{path.name} has no ATT&CK technique tag" |
| 41 | for t in tags: |
| 42 | if t.startswith("attack."): |
| 43 | assert t in TACTIC_TAGS or TECHNIQUE_RE.match(t), f"{path.name} bad tag {t}" |
| 44 | |
| 45 | |
| 46 | def test_unique_ids(): |
| 47 | ids = [load(p)["id"] for p in RULES] |
| 48 | dupes = {i for i in ids if ids.count(i) > 1} |
| 49 | assert not dupes, f"duplicate rule ids: {dupes}" |
| 50 | |
| 51 | |
| 52 | def test_correlation_refs_resolve(): |
| 53 | names = {load(p).get("name") for p in RULES if load(p).get("name")} |
| 54 | for path in RULES: |
| 55 | rule = load(path) |
| 56 | corr = rule.get("correlation") |
| 57 | if corr: |
| 58 | for ref in corr.get("rules", []): |
| 59 | assert ref in names, f"{path.name} references unknown rule '{ref}'" |