Zion Boggan
repos/CTI Detection Automation/src/cti/feeds/base.py
zionboggan.com ↗
63 lines · python
History for this file →
1
from __future__ import annotations
2
 
3
import json
4
from abc import ABC, abstractmethod
5
from pathlib import Path
6
 
7
import requests
8
 
9
from cti.models import Indicator
10
 
11
DEFAULT_TIMEOUT = 20
12
 
13
 
14
class Feed(ABC):
15
    name: str = "feed"
16
    url: str = ""
17
    method: str = "GET"
18
 
19
    def __init__(self, settings: dict | None = None):
20
        self.settings = settings or {}
21
 
22
    def headers(self) -> dict:
23
        return {}
24
 
25
    def request_body(self) -> dict | None:
26
        return None
27
 
28
    def fetch_raw(self) -> str:
29
        if self.method == "POST":
30
            response = requests.post(
31
                self.url,
32
                headers=self.headers(),
33
                json=self.request_body(),
34
                timeout=DEFAULT_TIMEOUT,
35
            )
36
        else:
37
            response = requests.get(
38
                self.url, headers=self.headers(), timeout=DEFAULT_TIMEOUT
39
            )
40
        response.raise_for_status()
41
        return response.text
42
 
43
    def load_fixture(self, fixtures_dir: Path) -> str:
44
        path = fixtures_dir / self.fixture_name()
45
        return path.read_text(encoding="utf-8")
46
 
47
    def fixture_name(self) -> str:
48
        return f"{self.name}.json"
49
 
50
    @abstractmethod
51
    def parse(self, raw: str) -> list[Indicator]:
52
        ...
53
 
54
    def collect(self, fixtures_dir: Path | None = None) -> list[Indicator]:
55
        if fixtures_dir is not None:
56
            raw = self.load_fixture(fixtures_dir)
57
        else:
58
            raw = self.fetch_raw()
59
        return self.parse(raw)
60
 
61
 
62
def load_json(raw: str):
63
    return json.loads(raw)