| 1 | from __future__ import annotations |
| 2 | |
| 3 | import smtplib |
| 4 | from email.mime.multipart import MIMEMultipart |
| 5 | from email.mime.text import MIMEText |
| 6 | from pathlib import Path |
| 7 | |
| 8 | from itsdangerous import BadSignature, SignatureExpired, URLSafeTimedSerializer |
| 9 | from jinja2 import Environment, FileSystemLoader, select_autoescape |
| 10 | |
| 11 | TEMPLATES_DIR = Path(__file__).resolve().parents[2] / "templates" |
| 12 | |
| 13 | _env = Environment( |
| 14 | loader=FileSystemLoader(str(TEMPLATES_DIR)), |
| 15 | autoescape=select_autoescape(["html"]), |
| 16 | ) |
| 17 | |
| 18 | |
| 19 | def serializer(secret: str) -> URLSafeTimedSerializer: |
| 20 | return URLSafeTimedSerializer(secret, salt="cti-rule-approval") |
| 21 | |
| 22 | |
| 23 | def make_token(secret: str, bundle_id: str) -> str: |
| 24 | return serializer(secret).dumps({"bundle_id": bundle_id}) |
| 25 | |
| 26 | |
| 27 | def verify_token(secret: str, token: str, max_age: int) -> str | None: |
| 28 | try: |
| 29 | data = serializer(secret).loads(token, max_age=max_age) |
| 30 | except (BadSignature, SignatureExpired): |
| 31 | return None |
| 32 | return data.get("bundle_id") |
| 33 | |
| 34 | |
| 35 | def render_email(context: dict) -> str: |
| 36 | template = _env.get_template("approval_email.html") |
| 37 | return template.render(**context) |
| 38 | |
| 39 | |
| 40 | def send_email(config: dict, subject: str, html: str, output_dir: Path) -> str: |
| 41 | email_cfg = config["email"] |
| 42 | backend = email_cfg.get("backend", "file") |
| 43 | to_addr = config["approval"]["analyst_email"] |
| 44 | from_addr = email_cfg.get("from_addr", "cti-pipeline@lab.local") |
| 45 | |
| 46 | if backend == "console": |
| 47 | print(html) |
| 48 | return "console" |
| 49 | |
| 50 | if backend == "file": |
| 51 | out = output_dir / "emails" |
| 52 | out.mkdir(parents=True, exist_ok=True) |
| 53 | path = out / f"{subject.replace(' ', '_').replace('/', '-')}.html" |
| 54 | path.write_text(html, encoding="utf-8") |
| 55 | return str(path) |
| 56 | |
| 57 | message = MIMEMultipart("alternative") |
| 58 | message["Subject"] = subject |
| 59 | message["From"] = from_addr |
| 60 | message["To"] = to_addr |
| 61 | message.attach(MIMEText(html, "html")) |
| 62 | |
| 63 | with smtplib.SMTP(email_cfg["smtp_host"], int(email_cfg["smtp_port"])) as server: |
| 64 | if email_cfg.get("use_tls"): |
| 65 | server.starttls() |
| 66 | user = email_cfg.get("smtp_user") |
| 67 | password = email_cfg.get("smtp_password") |
| 68 | if user and password: |
| 69 | server.login(user, password) |
| 70 | server.sendmail(from_addr, [to_addr], message.as_string()) |
| 71 | return to_addr |