Zion Boggan
repos/CTI Detection Automation/src/cti/approval.py
zionboggan.com ↗
71 lines · python
History for this file →
1
from __future__ import annotations
2
 
3
import smtplib
4
from email.mime.multipart import MIMEMultipart
5
from email.mime.text import MIMEText
6
from pathlib import Path
7
 
8
from itsdangerous import BadSignature, SignatureExpired, URLSafeTimedSerializer
9
from jinja2 import Environment, FileSystemLoader, select_autoescape
10
 
11
TEMPLATES_DIR = Path(__file__).resolve().parents[2] / "templates"
12
 
13
_env = Environment(
14
    loader=FileSystemLoader(str(TEMPLATES_DIR)),
15
    autoescape=select_autoescape(["html"]),
16
)
17
 
18
 
19
def serializer(secret: str) -> URLSafeTimedSerializer:
20
    return URLSafeTimedSerializer(secret, salt="cti-rule-approval")
21
 
22
 
23
def make_token(secret: str, bundle_id: str) -> str:
24
    return serializer(secret).dumps({"bundle_id": bundle_id})
25
 
26
 
27
def verify_token(secret: str, token: str, max_age: int) -> str | None:
28
    try:
29
        data = serializer(secret).loads(token, max_age=max_age)
30
    except (BadSignature, SignatureExpired):
31
        return None
32
    return data.get("bundle_id")
33
 
34
 
35
def render_email(context: dict) -> str:
36
    template = _env.get_template("approval_email.html")
37
    return template.render(**context)
38
 
39
 
40
def send_email(config: dict, subject: str, html: str, output_dir: Path) -> str:
41
    email_cfg = config["email"]
42
    backend = email_cfg.get("backend", "file")
43
    to_addr = config["approval"]["analyst_email"]
44
    from_addr = email_cfg.get("from_addr", "cti-pipeline@lab.local")
45
 
46
    if backend == "console":
47
        print(html)
48
        return "console"
49
 
50
    if backend == "file":
51
        out = output_dir / "emails"
52
        out.mkdir(parents=True, exist_ok=True)
53
        path = out / f"{subject.replace(' ', '_').replace('/', '-')}.html"
54
        path.write_text(html, encoding="utf-8")
55
        return str(path)
56
 
57
    message = MIMEMultipart("alternative")
58
    message["Subject"] = subject
59
    message["From"] = from_addr
60
    message["To"] = to_addr
61
    message.attach(MIMEText(html, "html"))
62
 
63
    with smtplib.SMTP(email_cfg["smtp_host"], int(email_cfg["smtp_port"])) as server:
64
        if email_cfg.get("use_tls"):
65
            server.starttls()
66
        user = email_cfg.get("smtp_user")
67
        password = email_cfg.get("smtp_password")
68
        if user and password:
69
            server.login(user, password)
70
        server.sendmail(from_addr, [to_addr], message.as_string())
71
    return to_addr