| 1 | from __future__ import annotations |
| 2 | |
| 3 | from cti.models import Indicator |
| 4 | |
| 5 | |
| 6 | def deduplicate(indicators: list[Indicator]) -> list[Indicator]: |
| 7 | merged: dict[tuple[str, str], Indicator] = {} |
| 8 | for indicator in indicators: |
| 9 | key = indicator.key() |
| 10 | existing = merged.get(key) |
| 11 | if existing is None: |
| 12 | merged[key] = Indicator( |
| 13 | type=indicator.type, |
| 14 | value=indicator.value, |
| 15 | source=indicator.source, |
| 16 | threat_type=indicator.threat_type, |
| 17 | confidence=indicator.confidence, |
| 18 | malware=indicator.malware, |
| 19 | techniques=sorted(set(indicator.techniques)), |
| 20 | tags=sorted(set(indicator.tags)), |
| 21 | reference=indicator.reference, |
| 22 | first_seen=indicator.first_seen, |
| 23 | ) |
| 24 | continue |
| 25 | existing.confidence = max(existing.confidence, indicator.confidence) |
| 26 | existing.techniques = sorted(set(existing.techniques) | set(indicator.techniques)) |
| 27 | existing.tags = sorted(set(existing.tags) | set(indicator.tags)) |
| 28 | existing.malware = existing.malware or indicator.malware |
| 29 | existing.reference = existing.reference or indicator.reference |
| 30 | existing.first_seen = existing.first_seen or indicator.first_seen |
| 31 | if indicator.source not in existing.source.split(","): |
| 32 | existing.source = ",".join(sorted(set(existing.source.split(",") + [indicator.source]))) |
| 33 | return list(merged.values()) |
| 34 | |
| 35 | |
| 36 | def filter_by_confidence(indicators: list[Indicator], minimum: int) -> list[Indicator]: |
| 37 | return [i for i in indicators if i.confidence >= minimum] |