Zion Boggan zionboggan.com ↗
104 lines · python
History for this file →
1
from __future__ import annotations
2
 
3
import json
4
import os
5
from pathlib import Path
6
 
7
from flask import Flask, abort, render_template, request
8
 
9
from cti import approval, pipeline
10
from cti.config import load_config
11
 
12
 
13
def create_app(config: dict | None = None) -> Flask:
14
    config = config or load_config(os.environ.get("CTI_CONFIG"))
15
    output_dir = Path(config["output_dir"])
16
    secret = config["approval"].get("secret", "insecure-dev-secret")
17
    ttl = int(config["approval"]["token_ttl"])
18
 
19
    app = Flask(__name__, template_folder=str(_templates_dir()))
20
 
21
    def load_manifest(bundle_id: str) -> dict | None:
22
        manifest = output_dir / pipeline.CANDIDATES / bundle_id / "manifest.json"
23
        if not manifest.exists():
24
            return None
25
        return json.loads(manifest.read_text(encoding="utf-8"))
26
 
27
    def load_coverage(bundle_id: str) -> str:
28
        path = output_dir / pipeline.CANDIDATES / bundle_id / "ttp_coverage.md"
29
        return path.read_text(encoding="utf-8") if path.exists() else ""
30
 
31
    def bundle_status(bundle_id: str) -> str:
32
        candidate = output_dir / pipeline.CANDIDATES / bundle_id
33
        if (candidate / "REJECTED").exists():
34
            return "rejected"
35
        state = output_dir / pipeline.ACTIVE / pipeline.STATE_FILE
36
        if state.exists():
37
            data = json.loads(state.read_text(encoding="utf-8"))
38
            if data.get("bundle_id") == bundle_id:
39
                return "approved"
40
        return "pending"
41
 
42
    @app.get("/")
43
    def index():
44
        candidates_dir = output_dir / pipeline.CANDIDATES
45
        bundles = []
46
        if candidates_dir.exists():
47
            for path in sorted(candidates_dir.iterdir(), reverse=True):
48
                manifest = load_manifest(path.name)
49
                if manifest:
50
                    manifest["status"] = bundle_status(path.name)
51
                    bundles.append(manifest)
52
        return render_template("dashboard.html", bundles=bundles)
53
 
54
    @app.get("/review/<token>")
55
    def review(token):
56
        bundle_id = approval.verify_token(secret, token, ttl)
57
        if not bundle_id:
58
            abort(403)
59
        manifest = load_manifest(bundle_id)
60
        if not manifest:
61
            abort(404)
62
        return render_template(
63
            "review.html",
64
            manifest=manifest,
65
            coverage=load_coverage(bundle_id),
66
            status=bundle_status(bundle_id),
67
            token=token,
68
        )
69
 
70
    @app.post("/approve/<token>")
71
    def approve(token):
72
        bundle_id = approval.verify_token(secret, token, ttl)
73
        if not bundle_id:
74
            abort(403)
75
        active_dir = config.get("wazuh_etc_dir")
76
        result = pipeline.promote(
77
            bundle_id, output_dir, Path(active_dir) if active_dir else None
78
        )
79
        return render_template("result.html", action="approved", result=result)
80
 
81
    @app.post("/reject/<token>")
82
    def reject(token):
83
        bundle_id = approval.verify_token(secret, token, ttl)
84
        if not bundle_id:
85
            abort(403)
86
        marker = output_dir / pipeline.CANDIDATES / bundle_id / "REJECTED"
87
        reason = request.form.get("reason", "").strip()
88
        marker.write_text(reason or "rejected by analyst", encoding="utf-8")
89
        return render_template(
90
            "result.html", action="rejected", result={"bundle_id": bundle_id}
91
        )
92
 
93
    @app.get("/healthz")
94
    def healthz():
95
        return {"status": "ok"}
96
 
97
    return app
98
 
99
 
100
def _templates_dir() -> Path:
101
    return Path(__file__).resolve().parents[2] / "templates"
102
 
103
 
104
app = create_app()