Zion Boggan
repos/cti-detection-automation/src/cti/feeds/leaks.py
zionboggan.com ↗
47 lines · python
History for this file →
1
from __future__ import annotations
2
 
3
from cti import mitre
4
from cti.feeds.base import Feed, load_json
5
from cti.models import Indicator
6
 
7
 
8
class LeakFeed(Feed):
9
    name = "leaks"
10
    url = ""
11
 
12
    def fetch_raw(self) -> str:
13
        endpoint = self.settings.get("endpoint")
14
        if not endpoint:
15
            return "[]"
16
        self.url = endpoint
17
        return super().fetch_raw()
18
 
19
    def headers(self) -> dict:
20
        token = self.settings.get("token")
21
        return {"Authorization": f"Bearer {token}"} if token else {}
22
 
23
    def parse(self, raw: str) -> list[Indicator]:
24
        entries = load_json(raw)
25
        techniques = sorted(set(mitre.techniques_for_threat_type("leaked_credentials")))
26
        watched = {d.lower() for d in self.settings.get("watch_domains", [])}
27
        indicators: list[Indicator] = []
28
        for entry in entries:
29
            email = (entry.get("email") or "").strip().lower()
30
            if not email or "@" not in email:
31
                continue
32
            domain = email.split("@", 1)[1]
33
            if watched and domain not in watched:
34
                continue
35
            indicators.append(
36
                Indicator(
37
                    type="email",
38
                    value=email,
39
                    source=self.name,
40
                    threat_type="leaked_credentials",
41
                    confidence=70,
42
                    techniques=techniques,
43
                    tags=[entry.get("breach", "unknown_breach")],
44
                    first_seen=entry.get("breach_date"),
45
                )
46
            )
47
        return indicators