Zion Boggan zionboggan.com ↗

pytest suite + feed fixtures for offline runs

14a47e8   Zion Boggan committed on May 25, 2026 (4 weeks ago)
Makefile +19 -0
@@ -0,0 +1,19 @@
+.PHONY: install run demo serve test lint
+
+install:
+ pip install -r requirements-dev.txt
+
+run:
+ python -m cti.cli run -c config.yaml
+
+demo:
+ CTI_USE_FIXTURES=1 CTI_EMAIL_BACKEND=file CTI_APPROVAL_SECRET=demo-secret python -m cti.cli run -c config.example.yaml
+
+serve:
+ CTI_EMAIL_BACKEND=file CTI_APPROVAL_SECRET=demo-secret python -m cti.cli serve -c config.example.yaml --port 8080
+
+test:
+ pytest -q
+
+lint:
+ ruff check src tests
fixtures/feodo.json +34 -0
@@ -0,0 +1,34 @@
+[
+ {
+ "ip_address": "45.137.21.9",
+ "port": 443,
+ "status": "online",
+ "malware": "CobaltStrike",
+ "first_seen": "2026-05-20 00:00:00",
+ "last_online": "2026-05-27"
+ },
+ {
+ "ip_address": "91.211.88.34",
+ "port": 8443,
+ "status": "online",
+ "malware": "Emotet",
+ "first_seen": "2026-05-22 00:00:00",
+ "last_online": "2026-05-27"
+ },
+ {
+ "ip_address": "194.36.191.55",
+ "port": 443,
+ "status": "online",
+ "malware": "QakBot",
+ "first_seen": "2026-05-24 00:00:00",
+ "last_online": "2026-05-27"
+ },
+ {
+ "ip_address": "185.220.101.45",
+ "port": 9001,
+ "status": "online",
+ "malware": "Dridex",
+ "first_seen": "2026-05-19 00:00:00",
+ "last_online": "2026-05-27"
+ }
+]
fixtures/openphish.txt +4 -0
@@ -0,0 +1,4 @@
+http://billing-secure-portal.com/office365/login
+https://login-secure-update.com/verify/account
+https://account-verify-paypa1.com/signin
+https://m365-mailbox-quota.com/owa/auth
fixtures/otx.json +28 -0
@@ -0,0 +1,28 @@
+{
+ "results": [
+ {
+ "id": "664f1a2b9c1e4d0012a3bc77",
+ "name": "AgentTesla campaign targeting finance sector",
+ "attack_ids": ["T1566.001", "T1056.001", "T1041"],
+ "tags": ["agenttesla", "keylogger", "phishing"],
+ "malware_families": [{"display_name": "AgentTesla"}],
+ "indicators": [
+ {"type": "domain", "indicator": "cdn-jquery-min.net"},
+ {"type": "IPv4", "indicator": "45.137.21.9"},
+ {"type": "FileHash-SHA256", "indicator": "5d41402abc4b2a76b9719d911017c592e1b2c3d4f5a6978899aabbccddeeff00"}
+ ]
+ },
+ {
+ "id": "664f33ce7b2a1f0014ddee01",
+ "name": "Qakbot distribution infrastructure",
+ "attack_ids": ["T1566.001", "T1055"],
+ "tags": ["qakbot", "loader"],
+ "malware_families": [{"display_name": "QakBot"}],
+ "indicators": [
+ {"type": "IPv4", "indicator": "194.36.191.55"},
+ {"type": "domain", "indicator": "secure-doc-share.net"},
+ {"type": "URL", "indicator": "http://secure-doc-share.net/docs/view.php"}
+ ]
+ }
+ ]
+}
fixtures/threatfox.json +75 -0
@@ -0,0 +1,75 @@
+{
+ "query_status": "ok",
+ "data": [
+ {
+ "ioc": "45.137.21.9:443",
+ "ioc_type": "ip:port",
+ "threat_type": "botnet_cc",
+ "malware_printable": "Cobalt Strike",
+ "confidence_level": 100,
+ "first_seen": "2026-05-26 08:14:03 UTC",
+ "tags": ["CobaltStrike", "c2"],
+ "reference": "https://threatfox.abuse.ch/ioc/1287431/"
+ },
+ {
+ "ioc": "193.149.176.12:8080",
+ "ioc_type": "ip:port",
+ "threat_type": "botnet_cc",
+ "malware_printable": "AsyncRAT",
+ "confidence_level": 90,
+ "first_seen": "2026-05-26 11:02:55 UTC",
+ "tags": ["AsyncRAT"],
+ "reference": "https://threatfox.abuse.ch/ioc/1287510/"
+ },
+ {
+ "ioc": "cdn-jquery-min.net",
+ "ioc_type": "domain",
+ "threat_type": "botnet_cc",
+ "malware_printable": "AgentTesla",
+ "confidence_level": 85,
+ "first_seen": "2026-05-25 19:47:11 UTC",
+ "tags": ["AgentTesla", "exfil"],
+ "reference": "https://threatfox.abuse.ch/ioc/1286992/"
+ },
+ {
+ "ioc": "http://update-flashplayer.org/payload/load.php",
+ "ioc_type": "url",
+ "threat_type": "payload_delivery",
+ "malware_printable": "RedLine Stealer",
+ "confidence_level": 80,
+ "first_seen": "2026-05-26 02:31:40 UTC",
+ "tags": ["RedLineStealer", "exploit"],
+ "reference": "https://threatfox.abuse.ch/ioc/1287205/"
+ },
+ {
+ "ioc": "5d41402abc4b2a76b9719d911017c592e1b2c3d4f5a6978899aabbccddeeff00",
+ "ioc_type": "sha256_hash",
+ "threat_type": "payload_delivery",
+ "malware_printable": "AgentTesla",
+ "confidence_level": 95,
+ "first_seen": "2026-05-26 06:18:22 UTC",
+ "tags": ["AgentTesla"],
+ "reference": "https://threatfox.abuse.ch/ioc/1287388/"
+ },
+ {
+ "ioc": "9b74c9897bac770ffc029102a200c5de7f3b88a0a3f7f0d7c1f2e3d4c5b6a798",
+ "ioc_type": "sha256_hash",
+ "threat_type": "payload_delivery",
+ "malware_printable": "RedLine Stealer",
+ "confidence_level": 92,
+ "first_seen": "2026-05-26 07:55:09 UTC",
+ "tags": ["RedLineStealer"],
+ "reference": "https://threatfox.abuse.ch/ioc/1287402/"
+ },
+ {
+ "ioc": "20.50.13.7:80",
+ "ioc_type": "ip:port",
+ "threat_type": "scanner",
+ "malware_printable": null,
+ "confidence_level": 40,
+ "first_seen": "2026-05-26 09:12:00 UTC",
+ "tags": ["scanner"],
+ "reference": "https://threatfox.abuse.ch/ioc/1287455/"
+ }
+ ]
+}
fixtures/urlhaus.json +32 -0
@@ -0,0 +1,32 @@
+{
+ "841201": [
+ {
+ "url": "http://update-flashplayer.org/payload/load.php",
+ "url_status": "online",
+ "threat": "malware_download",
+ "tags": ["exploit", "RedLineStealer"],
+ "date_added": "2026-05-26 02:31:40 UTC",
+ "urlhaus_reference": "https://urlhaus.abuse.ch/url/841201/"
+ }
+ ],
+ "841244": [
+ {
+ "url": "http://178.62.91.14/inv/scan/EmotetBins.exe",
+ "url_status": "online",
+ "threat": "payload_delivery",
+ "tags": ["Emotet", "exe"],
+ "date_added": "2026-05-26 05:11:09 UTC",
+ "urlhaus_reference": "https://urlhaus.abuse.ch/url/841244/"
+ }
+ ],
+ "841290": [
+ {
+ "url": "http://billing-secure-portal.com/office365/login",
+ "url_status": "online",
+ "threat": "malware_download",
+ "tags": ["phishing"],
+ "date_added": "2026-05-26 07:40:55 UTC",
+ "urlhaus_reference": "https://urlhaus.abuse.ch/url/841290/"
+ }
+ ]
+}
requirements-dev.txt +4 -0
@@ -0,0 +1,4 @@
+-r requirements.txt
+pytest==8.3.3
+pytest-cov==5.0.0
+ruff==0.6.9
tests/conftest.py +37 -0
@@ -0,0 +1,37 @@
+from pathlib import Path
+
+import pytest
+
+ROOT = Path(__file__).resolve().parents[1]
+FIXTURES = ROOT / "fixtures"
+
+
+@pytest.fixture
+def fixtures_dir():
+ return FIXTURES
+
+
+@pytest.fixture
+def base_config(tmp_path):
+ return {
+ "min_confidence": 60,
+ "output_dir": str(tmp_path / "output"),
+ "fixtures_dir": str(FIXTURES),
+ "use_fixtures": True,
+ "feeds": {
+ "threatfox": {"enabled": True, "days": 1},
+ "feodo": {"enabled": True},
+ "urlhaus": {"enabled": True},
+ "otx": {"enabled": True},
+ "openphish": {"enabled": True},
+ "leaks": {"enabled": False},
+ },
+ "rules": {"base_id": 100300},
+ "approval": {
+ "base_url": "http://localhost:8080",
+ "token_ttl": 3600,
+ "analyst_email": "analyst@example.com",
+ "secret": "test-secret",
+ },
+ "email": {"backend": "file", "from_addr": "cti@lab.local"},
+ }
tests/test_approval.py +35 -0
@@ -0,0 +1,35 @@
+import time
+
+from cti.approval import make_token, render_email, verify_token
+
+
+def test_token_roundtrip():
+ token = make_token("secret", "cti-20260527-120000")
+ assert verify_token("secret", token, 60) == "cti-20260527-120000"
+
+
+def test_token_rejects_wrong_secret():
+ token = make_token("secret", "bundle")
+ assert verify_token("other", token, 60) is None
+
+
+def test_token_expires():
+ token = make_token("secret", "bundle")
+ time.sleep(1)
+ assert verify_token("secret", token, 0) is None
+
+
+def test_email_renders_summary():
+ html = render_email({
+ "bundle_id": "cti-20260527-120000",
+ "generated_at": "2026-05-27T12:00:00Z",
+ "counts": {"ip": 5, "domain": 3},
+ "diff": {"added": 6, "removed": 1, "total": 8},
+ "techniques": [{"technique_id": "T1071.001", "tactic": "command-and-control", "indicator_count": 4}],
+ "top_malware": [("Cobalt Strike", 2)],
+ "review_url": "http://localhost:8080/review/abc",
+ "list_sizes": {"cti-malicious-ip": 5},
+ })
+ assert "cti-20260527-120000" in html
+ assert "T1071.001" in html
+ assert "http://localhost:8080/review/abc" in html
tests/test_dedup.py +43 -0
@@ -0,0 +1,43 @@
+from cti.dedup import deduplicate, filter_by_confidence
+from cti.models import Indicator
+
+
+def make(value, source, confidence, techniques, malware=None):
+ return Indicator(
+ type="ip",
+ value=value,
+ source=source,
+ threat_type="botnet_cc",
+ confidence=confidence,
+ malware=malware,
+ techniques=techniques,
+ )
+
+
+def test_merges_same_indicator_across_sources():
+ merged = deduplicate([
+ make("45.137.21.9", "threatfox", 100, ["T1071.001"], "Cobalt Strike"),
+ make("45.137.21.9", "feodo", 90, ["T1573"]),
+ make("45.137.21.9", "otx", 60, ["T1059.001"]),
+ ])
+ assert len(merged) == 1
+ indicator = merged[0]
+ assert indicator.confidence == 100
+ assert set(indicator.techniques) == {"T1071.001", "T1573", "T1059.001"}
+ assert set(indicator.source.split(",")) == {"threatfox", "feodo", "otx"}
+ assert indicator.malware == "Cobalt Strike"
+
+
+def test_case_insensitive_key():
+ merged = deduplicate([
+ Indicator(type="domain", value="Evil.COM", source="a", threat_type="c2", confidence=70),
+ Indicator(type="domain", value="evil.com", source="b", threat_type="c2", confidence=80),
+ ])
+ assert len(merged) == 1
+ assert merged[0].confidence == 80
+
+
+def test_confidence_filter():
+ indicators = [make("1.1.1.1", "a", 40, []), make("2.2.2.2", "b", 80, [])]
+ kept = filter_by_confidence(indicators, 60)
+ assert [i.value for i in kept] == ["2.2.2.2"]
tests/test_feeds.py +44 -0
@@ -0,0 +1,44 @@
+from cti.feeds.feodo import FeodoTracker
+from cti.feeds.openphish import OpenPhish
+from cti.feeds.otx import OTX
+from cti.feeds.threatfox import ThreatFox
+from cti.feeds.urlhaus import URLhaus
+
+
+def test_threatfox_parses_types_and_strips_port(fixtures_dir):
+ indicators = ThreatFox().collect(fixtures_dir)
+ ips = [i for i in indicators if i.type == "ip"]
+ assert any(i.value == "45.137.21.9" for i in ips)
+ assert all(":" not in i.value for i in ips)
+ cobalt = next(i for i in ips if i.value == "45.137.21.9")
+ assert "T1071.001" in cobalt.techniques
+
+
+def test_threatfox_maps_hashes(fixtures_dir):
+ indicators = ThreatFox().collect(fixtures_dir)
+ hashes = [i for i in indicators if i.type == "sha256"]
+ assert len(hashes) == 2
+
+
+def test_feodo_marks_c2(fixtures_dir):
+ indicators = FeodoTracker().collect(fixtures_dir)
+ assert all(i.threat_type == "botnet_cc" for i in indicators)
+ assert any(i.malware == "Emotet" for i in indicators)
+
+
+def test_urlhaus_extracts_domain_from_url(fixtures_dir):
+ indicators = URLhaus().collect(fixtures_dir)
+ domains = {i.value for i in indicators if i.type == "domain"}
+ assert "update-flashplayer.org" in domains
+
+
+def test_otx_pulls_attack_ids(fixtures_dir):
+ indicators = OTX().collect(fixtures_dir)
+ assert any("T1566.001" in i.techniques for i in indicators)
+
+
+def test_openphish_classifies_phishing(fixtures_dir):
+ indicators = OpenPhish().collect(fixtures_dir)
+ assert indicators
+ assert all(i.threat_type == "phishing" for i in indicators)
+ assert all("T1566.002" in i.techniques for i in indicators)
tests/test_pipeline.py +46 -0
@@ -0,0 +1,46 @@
+import json
+from pathlib import Path
+
+from cti.pipeline import build_bundle, promote, run
+
+
+def test_build_bundle_dedups_across_feeds(base_config):
+ bundle = build_bundle(base_config)
+ ips = [i for i in bundle.indicators if i.type == "ip" and i.value == "45.137.21.9"]
+ assert len(ips) == 1
+ assert set(ips[0].source.split(",")) >= {"threatfox", "feodo"}
+
+
+def test_low_confidence_dropped(base_config):
+ bundle = build_bundle(base_config)
+ assert all(i.confidence >= 60 for i in bundle.indicators)
+ assert not any(i.value == "20.50.13.7" for i in bundle.indicators)
+
+
+def test_run_writes_candidate_and_email(base_config):
+ result = run(base_config)
+ candidate = Path(result["candidate_path"])
+ assert (candidate / "manifest.json").exists()
+ assert (candidate / "local_cti_rules.xml").exists()
+ assert (candidate / "lists" / "cti-malicious-ip").exists()
+ emails = list((Path(base_config["output_dir"]) / "emails").glob("*.html"))
+ assert emails
+ assert result["review_url"].startswith("http://localhost:8080/review/")
+
+
+def test_promote_moves_to_active(base_config):
+ result = run(base_config)
+ output_dir = Path(base_config["output_dir"])
+ promote(result["bundle_id"], output_dir)
+ state = json.loads((output_dir / "active" / "state.json").read_text())
+ assert state["bundle_id"] == result["bundle_id"]
+ assert (output_dir / "active" / "local_cti_rules.xml").exists()
+
+
+def test_diff_reflects_promotion(base_config):
+ first = run(base_config)
+ output_dir = Path(base_config["output_dir"])
+ promote(first["bundle_id"], output_dir)
+ second = run(base_config)
+ assert second["diff"]["added"] == 0
+ assert second["diff"]["unchanged"] == second["diff"]["total"]
tests/test_rules.py +50 -0
@@ -0,0 +1,50 @@
+from xml.etree import ElementTree
+
+from cti.models import Indicator
+from cti.rules import build_cdb_lists, build_rules_xml
+
+
+def sample():
+ return [
+ Indicator(type="ip", value="45.137.21.9", source="threatfox", threat_type="botnet_cc",
+ confidence=100, malware="Cobalt Strike", techniques=["T1071.001"]),
+ Indicator(type="domain", value="cdn-jquery-min.net", source="threatfox",
+ threat_type="botnet_cc", confidence=85, malware="AgentTesla",
+ techniques=["T1056.001"]),
+ Indicator(type="sha256", value="5d41402abc4b2a76b9719d911017c592e1b2c3d4f5a6978899aabbccddeeff00",
+ source="threatfox", threat_type="payload_delivery", confidence=95,
+ malware="AgentTesla", techniques=["T1204.002"]),
+ Indicator(type="url", value="http://update-flashplayer.org/payload/load.php",
+ source="urlhaus", threat_type="payload_delivery", confidence=80,
+ techniques=["T1105"]),
+ ]
+
+
+def test_cdb_lists_have_expected_buckets():
+ lists = build_cdb_lists(sample())
+ assert "45.137.21.9:Cobalt Strike" in lists["cti-malicious-ip"]
+ assert lists["cti-malicious-domain"].startswith("cdn-jquery-min.net")
+ assert "cti-malware-hash" in lists
+
+
+def test_cdb_label_is_sanitized():
+ indicators = [Indicator(type="ip", value="1.2.3.4", source="x",
+ threat_type="weird:type", confidence=90)]
+ lists = build_cdb_lists(indicators)
+ assert ":weird-type" in lists["cti-malicious-ip"]
+
+
+def test_rules_xml_is_wellformed_and_tagged():
+ xml = build_rules_xml(sample(), base_id=100300)
+ root = ElementTree.fromstring(xml)
+ assert root.tag == "group"
+ rule_ids = [r.get("id") for r in root.findall("rule")]
+ assert "100300" in rule_ids
+ techniques = [t.text for t in root.iter("id")]
+ assert "T1071.001" in techniques
+
+
+def test_rules_reference_generated_lists():
+ xml = build_rules_xml(sample(), base_id=100300)
+ assert "etc/lists/cti-malicious-ip" in xml
+ assert "etc/lists/cti-malware-hash" in xml
tests/test_ttp.py +36 -0
@@ -0,0 +1,36 @@
+from cti.models import Indicator
+from cti.ttp import coverage_report, extract_techniques
+
+
+def test_extract_dedups_and_counts():
+ indicators = [
+ Indicator(type="ip", value="1.1.1.1", source="threatfox", threat_type="c2",
+ confidence=90, techniques=["T1071.001", "T1059.001"]),
+ Indicator(type="ip", value="2.2.2.2", source="feodo", threat_type="c2",
+ confidence=90, techniques=["T1071.001"]),
+ ]
+ techniques = extract_techniques(indicators)
+ by_id = {t.technique_id: t for t in techniques}
+ assert by_id["T1071.001"].indicator_count == 2
+ assert by_id["T1059.001"].indicator_count == 1
+ assert techniques[0].technique_id == "T1071.001"
+
+
+def test_technique_names_resolved():
+ indicators = [
+ Indicator(type="url", value="http://x", source="openphish", threat_type="phishing",
+ confidence=80, techniques=["T1566.002"]),
+ ]
+ techniques = extract_techniques(indicators)
+ assert techniques[0].name.startswith("Phishing")
+ assert techniques[0].tactic == "initial-access"
+
+
+def test_coverage_report_renders_rows():
+ indicators = [
+ Indicator(type="ip", value="1.1.1.1", source="feodo", threat_type="c2",
+ confidence=90, techniques=["T1071.001"]),
+ ]
+ report = coverage_report(extract_techniques(indicators))
+ assert "T1071.001" in report
+ assert "| Technique |" in report
tests/test_web.py +56 -0
@@ -0,0 +1,56 @@
+import pytest
+
+from cti.approval import make_token
+from cti.pipeline import run
+from cti.web import create_app
+
+
+@pytest.fixture
+def app_and_bundle(base_config):
+ result = run(base_config)
+ app = create_app(base_config)
+ app.config["TESTING"] = True
+ return app, result
+
+
+def test_dashboard_lists_bundle(app_and_bundle):
+ app, result = app_and_bundle
+ client = app.test_client()
+ resp = client.get("/")
+ assert resp.status_code == 200
+ assert result["bundle_id"].encode() in resp.data
+
+
+def test_review_requires_valid_token(app_and_bundle):
+ app, _ = app_and_bundle
+ client = app.test_client()
+ assert client.get("/review/garbage").status_code == 403
+
+
+def test_review_renders_for_valid_token(app_and_bundle):
+ app, result = app_and_bundle
+ token = make_token("test-secret", result["bundle_id"])
+ client = app.test_client()
+ resp = client.get(f"/review/{token}")
+ assert resp.status_code == 200
+ assert b"Approve and deploy" in resp.data
+
+
+def test_approve_promotes(app_and_bundle):
+ app, result = app_and_bundle
+ token = make_token("test-secret", result["bundle_id"])
+ client = app.test_client()
+ resp = client.post(f"/approve/{token}")
+ assert resp.status_code == 200
+ assert b"approved" in resp.data
+ follow = client.get(f"/review/{token}")
+ assert b"already been approved" in follow.data
+
+
+def test_reject_marks_bundle(app_and_bundle):
+ app, result = app_and_bundle
+ token = make_token("test-secret", result["bundle_id"])
+ client = app.test_client()
+ resp = client.post(f"/reject/{token}", data={"reason": "false positives"})
+ assert resp.status_code == 200
+ assert b"rejected" in resp.data