Zion Boggan
repos/cti-detection-automation/tests/test_pipeline.py
zionboggan.com ↗
46 lines · python
History for this file →
1
import json
2
from pathlib import Path
3
 
4
from cti.pipeline import build_bundle, promote, run
5
 
6
 
7
def test_build_bundle_dedups_across_feeds(base_config):
8
    bundle = build_bundle(base_config)
9
    ips = [i for i in bundle.indicators if i.type == "ip" and i.value == "45.137.21.9"]
10
    assert len(ips) == 1
11
    assert set(ips[0].source.split(",")) >= {"threatfox", "feodo"}
12
 
13
 
14
def test_low_confidence_dropped(base_config):
15
    bundle = build_bundle(base_config)
16
    assert all(i.confidence >= 60 for i in bundle.indicators)
17
    assert not any(i.value == "20.50.13.7" for i in bundle.indicators)
18
 
19
 
20
def test_run_writes_candidate_and_email(base_config):
21
    result = run(base_config)
22
    candidate = Path(result["candidate_path"])
23
    assert (candidate / "manifest.json").exists()
24
    assert (candidate / "local_cti_rules.xml").exists()
25
    assert (candidate / "lists" / "cti-malicious-ip").exists()
26
    emails = list((Path(base_config["output_dir"]) / "emails").glob("*.html"))
27
    assert emails
28
    assert result["review_url"].startswith("http://localhost:8080/review/")
29
 
30
 
31
def test_promote_moves_to_active(base_config):
32
    result = run(base_config)
33
    output_dir = Path(base_config["output_dir"])
34
    promote(result["bundle_id"], output_dir)
35
    state = json.loads((output_dir / "active" / "state.json").read_text())
36
    assert state["bundle_id"] == result["bundle_id"]
37
    assert (output_dir / "active" / "local_cti_rules.xml").exists()
38
 
39
 
40
def test_diff_reflects_promotion(base_config):
41
    first = run(base_config)
42
    output_dir = Path(base_config["output_dir"])
43
    promote(first["bundle_id"], output_dir)
44
    second = run(base_config)
45
    assert second["diff"]["added"] == 0
46
    assert second["diff"]["unchanged"] == second["diff"]["total"]