Zion Boggan zionboggan.com ↗
84 lines · python
History for this file →
1
from __future__ import annotations
2
 
3
import os
4
from pathlib import Path
5
 
6
import yaml
7
 
8
DEFAULTS = {
9
    "min_confidence": 60,
10
    "output_dir": "output",
11
    "fixtures_dir": "fixtures",
12
    "use_fixtures": False,
13
    "feeds": {
14
        "threatfox": {"enabled": True, "days": 1},
15
        "feodo": {"enabled": True},
16
        "urlhaus": {"enabled": True},
17
        "otx": {"enabled": True},
18
        "openphish": {"enabled": True},
19
        "leaks": {"enabled": False, "watch_domains": []},
20
    },
21
    "rules": {"base_id": 100300},
22
    "approval": {
23
        "base_url": "http://localhost:8080",
24
        "token_ttl": 86400,
25
        "analyst_email": "analyst@example.com",
26
    },
27
    "email": {
28
        "backend": "file",
29
        "from_addr": "cti-pipeline@lab.local",
30
        "smtp_host": "localhost",
31
        "smtp_port": 25,
32
        "use_tls": False,
33
    },
34
}
35
 
36
ENV_SECRETS = {
37
    ("approval", "secret"): "CTI_APPROVAL_SECRET",
38
    ("email", "smtp_user"): "CTI_SMTP_USER",
39
    ("email", "smtp_password"): "CTI_SMTP_PASSWORD",
40
}
41
 
42
FEED_SECRET_ENV = {
43
    "threatfox": ("auth_key", "THREATFOX_AUTH_KEY"),
44
    "otx": ("api_key", "OTX_API_KEY"),
45
    "leaks": ("token", "CTI_LEAKS_TOKEN"),
46
}
47
 
48
 
49
def _merge(base: dict, override: dict) -> dict:
50
    result = dict(base)
51
    for key, value in override.items():
52
        if isinstance(value, dict) and isinstance(result.get(key), dict):
53
            result[key] = _merge(result[key], value)
54
        else:
55
            result[key] = value
56
    return result
57
 
58
 
59
def load_config(path: str | None = None) -> dict:
60
    config = DEFAULTS
61
    if path and Path(path).exists():
62
        loaded = yaml.safe_load(Path(path).read_text(encoding="utf-8")) or {}
63
        config = _merge(DEFAULTS, loaded)
64
    else:
65
        config = _merge(DEFAULTS, {})
66
 
67
    for (section, key), env in ENV_SECRETS.items():
68
        value = os.environ.get(env)
69
        if value:
70
            config.setdefault(section, {})[key] = value
71
 
72
    for feed_name, (key, env) in FEED_SECRET_ENV.items():
73
        value = os.environ.get(env)
74
        if value:
75
            config["feeds"].setdefault(feed_name, {})[key] = value
76
 
77
    if os.environ.get("CTI_USE_FIXTURES") == "1":
78
        config["use_fixtures"] = True
79
 
80
    backend = os.environ.get("CTI_EMAIL_BACKEND")
81
    if backend:
82
        config["email"]["backend"] = backend
83
 
84
    return config