| 1 | from cti.dedup import deduplicate, filter_by_confidence |
| 2 | from cti.models import Indicator |
| 3 | |
| 4 | |
| 5 | def make(value, source, confidence, techniques, malware=None): |
| 6 | return Indicator( |
| 7 | type="ip", |
| 8 | value=value, |
| 9 | source=source, |
| 10 | threat_type="botnet_cc", |
| 11 | confidence=confidence, |
| 12 | malware=malware, |
| 13 | techniques=techniques, |
| 14 | ) |
| 15 | |
| 16 | |
| 17 | def test_merges_same_indicator_across_sources(): |
| 18 | merged = deduplicate([ |
| 19 | make("45.137.21.9", "threatfox", 100, ["T1071.001"], "Cobalt Strike"), |
| 20 | make("45.137.21.9", "feodo", 90, ["T1573"]), |
| 21 | make("45.137.21.9", "otx", 60, ["T1059.001"]), |
| 22 | ]) |
| 23 | assert len(merged) == 1 |
| 24 | indicator = merged[0] |
| 25 | assert indicator.confidence == 100 |
| 26 | assert set(indicator.techniques) == {"T1071.001", "T1573", "T1059.001"} |
| 27 | assert set(indicator.source.split(",")) == {"threatfox", "feodo", "otx"} |
| 28 | assert indicator.malware == "Cobalt Strike" |
| 29 | |
| 30 | |
| 31 | def test_case_insensitive_key(): |
| 32 | merged = deduplicate([ |
| 33 | Indicator(type="domain", value="Evil.COM", source="a", threat_type="c2", confidence=70), |
| 34 | Indicator(type="domain", value="evil.com", source="b", threat_type="c2", confidence=80), |
| 35 | ]) |
| 36 | assert len(merged) == 1 |
| 37 | assert merged[0].confidence == 80 |
| 38 | |
| 39 | |
| 40 | def test_confidence_filter(): |
| 41 | indicators = [make("1.1.1.1", "a", 40, []), make("2.2.2.2", "b", 80, [])] |
| 42 | kept = filter_by_confidence(indicators, 60) |
| 43 | assert [i.value for i in kept] == ["2.2.2.2"] |