| 1 | from __future__ import annotations |
| 2 | |
| 3 | import json |
| 4 | import os |
| 5 | from pathlib import Path |
| 6 | |
| 7 | from flask import Flask, abort, render_template, request |
| 8 | |
| 9 | from cti import approval, pipeline |
| 10 | from cti.config import load_config |
| 11 | |
| 12 | |
| 13 | def create_app(config: dict | None = None) -> Flask: |
| 14 | config = config or load_config(os.environ.get("CTI_CONFIG")) |
| 15 | output_dir = Path(config["output_dir"]) |
| 16 | secret = config["approval"].get("secret", "insecure-dev-secret") |
| 17 | ttl = int(config["approval"]["token_ttl"]) |
| 18 | |
| 19 | app = Flask(__name__, template_folder=str(_templates_dir())) |
| 20 | |
| 21 | def load_manifest(bundle_id: str) -> dict | None: |
| 22 | manifest = output_dir / pipeline.CANDIDATES / bundle_id / "manifest.json" |
| 23 | if not manifest.exists(): |
| 24 | return None |
| 25 | return json.loads(manifest.read_text(encoding="utf-8")) |
| 26 | |
| 27 | def load_coverage(bundle_id: str) -> str: |
| 28 | path = output_dir / pipeline.CANDIDATES / bundle_id / "ttp_coverage.md" |
| 29 | return path.read_text(encoding="utf-8") if path.exists() else "" |
| 30 | |
| 31 | def bundle_status(bundle_id: str) -> str: |
| 32 | candidate = output_dir / pipeline.CANDIDATES / bundle_id |
| 33 | if (candidate / "REJECTED").exists(): |
| 34 | return "rejected" |
| 35 | state = output_dir / pipeline.ACTIVE / pipeline.STATE_FILE |
| 36 | if state.exists(): |
| 37 | data = json.loads(state.read_text(encoding="utf-8")) |
| 38 | if data.get("bundle_id") == bundle_id: |
| 39 | return "approved" |
| 40 | return "pending" |
| 41 | |
| 42 | @app.get("/") |
| 43 | def index(): |
| 44 | candidates_dir = output_dir / pipeline.CANDIDATES |
| 45 | bundles = [] |
| 46 | if candidates_dir.exists(): |
| 47 | for path in sorted(candidates_dir.iterdir(), reverse=True): |
| 48 | manifest = load_manifest(path.name) |
| 49 | if manifest: |
| 50 | manifest["status"] = bundle_status(path.name) |
| 51 | bundles.append(manifest) |
| 52 | return render_template("dashboard.html", bundles=bundles) |
| 53 | |
| 54 | @app.get("/review/<token>") |
| 55 | def review(token): |
| 56 | bundle_id = approval.verify_token(secret, token, ttl) |
| 57 | if not bundle_id: |
| 58 | abort(403) |
| 59 | manifest = load_manifest(bundle_id) |
| 60 | if not manifest: |
| 61 | abort(404) |
| 62 | return render_template( |
| 63 | "review.html", |
| 64 | manifest=manifest, |
| 65 | coverage=load_coverage(bundle_id), |
| 66 | status=bundle_status(bundle_id), |
| 67 | token=token, |
| 68 | ) |
| 69 | |
| 70 | @app.post("/approve/<token>") |
| 71 | def approve(token): |
| 72 | bundle_id = approval.verify_token(secret, token, ttl) |
| 73 | if not bundle_id: |
| 74 | abort(403) |
| 75 | active_dir = config.get("wazuh_etc_dir") |
| 76 | result = pipeline.promote( |
| 77 | bundle_id, output_dir, Path(active_dir) if active_dir else None |
| 78 | ) |
| 79 | return render_template("result.html", action="approved", result=result) |
| 80 | |
| 81 | @app.post("/reject/<token>") |
| 82 | def reject(token): |
| 83 | bundle_id = approval.verify_token(secret, token, ttl) |
| 84 | if not bundle_id: |
| 85 | abort(403) |
| 86 | marker = output_dir / pipeline.CANDIDATES / bundle_id / "REJECTED" |
| 87 | reason = request.form.get("reason", "").strip() |
| 88 | marker.write_text(reason or "rejected by analyst", encoding="utf-8") |
| 89 | return render_template( |
| 90 | "result.html", action="rejected", result={"bundle_id": bundle_id} |
| 91 | ) |
| 92 | |
| 93 | @app.get("/healthz") |
| 94 | def healthz(): |
| 95 | return {"status": "ok"} |
| 96 | |
| 97 | return app |
| 98 | |
| 99 | |
| 100 | def _templates_dir() -> Path: |
| 101 | return Path(__file__).resolve().parents[2] / "templates" |
| 102 | |
| 103 | |
| 104 | app = create_app() |