Zion Boggan zionboggan.com ↗

cross-feed dedup and mitre att&ck extraction

182f54b   Zion Boggan committed on May 16, 2026 (1 month ago)
src/cti/dedup.py +37 -0
@@ -0,0 +1,37 @@
+from __future__ import annotations
+
+from cti.models import Indicator
+
+
+def deduplicate(indicators: list[Indicator]) -> list[Indicator]:
+ merged: dict[tuple[str, str], Indicator] = {}
+ for indicator in indicators:
+ key = indicator.key()
+ existing = merged.get(key)
+ if existing is None:
+ merged[key] = Indicator(
+ type=indicator.type,
+ value=indicator.value,
+ source=indicator.source,
+ threat_type=indicator.threat_type,
+ confidence=indicator.confidence,
+ malware=indicator.malware,
+ techniques=sorted(set(indicator.techniques)),
+ tags=sorted(set(indicator.tags)),
+ reference=indicator.reference,
+ first_seen=indicator.first_seen,
+ )
+ continue
+ existing.confidence = max(existing.confidence, indicator.confidence)
+ existing.techniques = sorted(set(existing.techniques) | set(indicator.techniques))
+ existing.tags = sorted(set(existing.tags) | set(indicator.tags))
+ existing.malware = existing.malware or indicator.malware
+ existing.reference = existing.reference or indicator.reference
+ existing.first_seen = existing.first_seen or indicator.first_seen
+ if indicator.source not in existing.source.split(","):
+ existing.source = ",".join(sorted(set(existing.source.split(",") + [indicator.source])))
+ return list(merged.values())
+
+
+def filter_by_confidence(indicators: list[Indicator], minimum: int) -> list[Indicator]:
+ return [i for i in indicators if i.confidence >= minimum]
src/cti/ttp.py +37 -0
@@ -0,0 +1,37 @@
+from __future__ import annotations
+
+from cti import mitre
+from cti.models import Indicator, Technique
+
+
+def extract_techniques(indicators: list[Indicator]) -> list[Technique]:
+ table: dict[str, Technique] = {}
+ for indicator in indicators:
+ sources = indicator.source.split(",")
+ for technique_id in indicator.techniques:
+ technique = table.get(technique_id)
+ if technique is None:
+ technique = Technique(
+ technique_id=technique_id,
+ name=mitre.name_for(technique_id),
+ tactic=mitre.tactic_for(technique_id),
+ )
+ table[technique_id] = technique
+ technique.sources.update(sources)
+ technique.indicator_count += 1
+ return sorted(table.values(), key=lambda t: (-t.indicator_count, t.technique_id))
+
+
+def coverage_report(techniques: list[Technique]) -> str:
+ lines = [
+ "# TTP coverage",
+ "",
+ "| Technique | Tactic | Name | Indicators | Sources |",
+ "|---|---|---|---|---|",
+ ]
+ for technique in techniques:
+ lines.append(
+ f"| {technique.technique_id} | {technique.tactic} | {technique.name} "
+ f"| {technique.indicator_count} | {', '.join(sorted(technique.sources))} |"
+ )
+ return "\n".join(lines) + "\n"