| 1 | from __future__ import annotations |
| 2 | |
| 3 | import json |
| 4 | from abc import ABC, abstractmethod |
| 5 | from pathlib import Path |
| 6 | |
| 7 | import requests |
| 8 | |
| 9 | from cti.models import Indicator |
| 10 | |
| 11 | DEFAULT_TIMEOUT = 20 |
| 12 | |
| 13 | |
| 14 | class Feed(ABC): |
| 15 | name: str = "feed" |
| 16 | url: str = "" |
| 17 | method: str = "GET" |
| 18 | |
| 19 | def __init__(self, settings: dict | None = None): |
| 20 | self.settings = settings or {} |
| 21 | |
| 22 | def headers(self) -> dict: |
| 23 | return {} |
| 24 | |
| 25 | def request_body(self) -> dict | None: |
| 26 | return None |
| 27 | |
| 28 | def fetch_raw(self) -> str: |
| 29 | if self.method == "POST": |
| 30 | response = requests.post( |
| 31 | self.url, |
| 32 | headers=self.headers(), |
| 33 | json=self.request_body(), |
| 34 | timeout=DEFAULT_TIMEOUT, |
| 35 | ) |
| 36 | else: |
| 37 | response = requests.get( |
| 38 | self.url, headers=self.headers(), timeout=DEFAULT_TIMEOUT |
| 39 | ) |
| 40 | response.raise_for_status() |
| 41 | return response.text |
| 42 | |
| 43 | def load_fixture(self, fixtures_dir: Path) -> str: |
| 44 | path = fixtures_dir / self.fixture_name() |
| 45 | return path.read_text(encoding="utf-8") |
| 46 | |
| 47 | def fixture_name(self) -> str: |
| 48 | return f"{self.name}.json" |
| 49 | |
| 50 | @abstractmethod |
| 51 | def parse(self, raw: str) -> list[Indicator]: |
| 52 | ... |
| 53 | |
| 54 | def collect(self, fixtures_dir: Path | None = None) -> list[Indicator]: |
| 55 | if fixtures_dir is not None: |
| 56 | raw = self.load_fixture(fixtures_dir) |
| 57 | else: |
| 58 | raw = self.fetch_raw() |
| 59 | return self.parse(raw) |
| 60 | |
| 61 | |
| 62 | def load_json(raw: str): |
| 63 | return json.loads(raw) |