| 1 | import json |
| 2 | from pathlib import Path |
| 3 | |
| 4 | from cti.pipeline import build_bundle, promote, run |
| 5 | |
| 6 | |
| 7 | def test_build_bundle_dedups_across_feeds(base_config): |
| 8 | bundle = build_bundle(base_config) |
| 9 | ips = [i for i in bundle.indicators if i.type == "ip" and i.value == "45.137.21.9"] |
| 10 | assert len(ips) == 1 |
| 11 | assert set(ips[0].source.split(",")) >= {"threatfox", "feodo"} |
| 12 | |
| 13 | |
| 14 | def test_low_confidence_dropped(base_config): |
| 15 | bundle = build_bundle(base_config) |
| 16 | assert all(i.confidence >= 60 for i in bundle.indicators) |
| 17 | assert not any(i.value == "20.50.13.7" for i in bundle.indicators) |
| 18 | |
| 19 | |
| 20 | def test_run_writes_candidate_and_email(base_config): |
| 21 | result = run(base_config) |
| 22 | candidate = Path(result["candidate_path"]) |
| 23 | assert (candidate / "manifest.json").exists() |
| 24 | assert (candidate / "local_cti_rules.xml").exists() |
| 25 | assert (candidate / "lists" / "cti-malicious-ip").exists() |
| 26 | emails = list((Path(base_config["output_dir"]) / "emails").glob("*.html")) |
| 27 | assert emails |
| 28 | assert result["review_url"].startswith("http://localhost:8080/review/") |
| 29 | |
| 30 | |
| 31 | def test_promote_moves_to_active(base_config): |
| 32 | result = run(base_config) |
| 33 | output_dir = Path(base_config["output_dir"]) |
| 34 | promote(result["bundle_id"], output_dir) |
| 35 | state = json.loads((output_dir / "active" / "state.json").read_text()) |
| 36 | assert state["bundle_id"] == result["bundle_id"] |
| 37 | assert (output_dir / "active" / "local_cti_rules.xml").exists() |
| 38 | |
| 39 | |
| 40 | def test_diff_reflects_promotion(base_config): |
| 41 | first = run(base_config) |
| 42 | output_dir = Path(base_config["output_dir"]) |
| 43 | promote(first["bundle_id"], output_dir) |
| 44 | second = run(base_config) |
| 45 | assert second["diff"]["added"] == 0 |
| 46 | assert second["diff"]["unchanged"] == second["diff"]["total"] |