Zion Boggan
repos/Oversight/cli/gui.py
zionboggan.com ↗
351 lines · python
History for this file →
1
"""Small Tkinter GUI for non-technical Oversight users."""
2
 
3
from __future__ import annotations
4
 
5
import json
6
from pathlib import Path
7
import tkinter as tk
8
from tkinter import filedialog, messagebox, ttk
9
from urllib.parse import urlparse
10
 
11
from oversight_core import (
12
    ClassicIdentity,
13
    Manifest,
14
    Recipient,
15
    WatermarkRef,
16
    beacon,
17
    content_hash,
18
    l3_policy,
19
    open_sealed,
20
    seal,
21
    watermark,
22
)
23
from oversight_core.fingerprint import ContentFingerprint
24
from oversight_core.safe_io import (
25
    atomic_write_bytes,
26
    atomic_write_private_json,
27
    atomic_write_text,
28
    is_private_key_file,
29
    is_windows_reserved_path,
30
    validate_output_path,
31
)
32
 
33
 
34
class OversightGui(tk.Tk):
35
    def __init__(self) -> None:
36
        super().__init__()
37
        self.title("Oversight Protocol")
38
        self.geometry("760x540")
39
        self._build()
40
 
41
    def _build(self) -> None:
42
        notebook = ttk.Notebook(self)
43
        notebook.pack(fill="both", expand=True, padx=12, pady=12)
44
        self._build_keygen(notebook)
45
        self._build_seal(notebook)
46
        self._build_open(notebook)
47
 
48
    def _row(self, parent, label: str, row: int, browse: bool = False):
49
        ttk.Label(parent, text=label).grid(row=row, column=0, sticky="w", pady=4)
50
        var = tk.StringVar()
51
        ent = ttk.Entry(parent, textvariable=var, width=72)
52
        ent.grid(row=row, column=1, sticky="ew", pady=4)
53
        if browse:
54
            ttk.Button(parent, text="Browse", command=lambda: self._browse(var)).grid(row=row, column=2, padx=4)
55
        parent.columnconfigure(1, weight=1)
56
        return var
57
 
58
    def _browse(self, var: tk.StringVar, save: bool = False) -> None:
59
        path = filedialog.asksaveasfilename() if save else filedialog.askopenfilename()
60
        if path:
61
            var.set(path)
62
 
63
    def _build_keygen(self, notebook) -> None:
64
        frame = ttk.Frame(notebook, padding=12)
65
        notebook.add(frame, text="Generate Keys")
66
        identity_id = self._row(frame, "Identity name", 0)
67
        identity_id.set("alice")
68
        out = self._row(frame, "Private key output", 1)
69
        ttk.Button(frame, text="Choose Output", command=lambda: self._browse(out, save=True)).grid(row=1, column=2, padx=4)
70
        ttk.Button(frame, text="Generate Keypair", command=lambda: self._keygen(identity_id.get(), out.get())).grid(row=2, column=1, sticky="e", pady=12)
71
 
72
    def _build_seal(self, notebook) -> None:
73
        frame = ttk.Frame(notebook, padding=12)
74
        notebook.add(frame, text="Seal File")
75
        self.seal_input = self._row(frame, "Input file", 0, True)
76
        self.seal_issuer = self._row(frame, "Issuer private key", 1, True)
77
        self.seal_recipient = self._row(frame, "Recipient public key", 2, True)
78
        self.seal_out = self._row(frame, "Sealed output", 3)
79
        ttk.Button(frame, text="Choose Output", command=lambda: self._browse(self.seal_out, save=True)).grid(row=3, column=2, padx=4)
80
        self.registry_url = self._row(frame, "Registry URL", 4)
81
        self.registry_url.set("https://registry.oversightprotocol.dev")
82
        self.content_type = self._row(frame, "Content type", 5)
83
        self.content_type.set("text/plain")
84
        self.l3_mode = tk.StringVar(value="auto")
85
        ttk.Label(frame, text="L3 mode").grid(row=6, column=0, sticky="w", pady=4)
86
        ttk.Combobox(frame, textvariable=self.l3_mode, values=["auto", "off", "boilerplate", "full"], state="readonly").grid(row=6, column=1, sticky="w")
87
        self.watermark_enabled = tk.BooleanVar(value=True)
88
        ttk.Checkbutton(frame, text="Embed L1/L2 watermarks", variable=self.watermark_enabled).grid(row=7, column=1, sticky="w")
89
        ttk.Button(frame, text="Seal", command=self._seal_file).grid(row=8, column=1, sticky="e", pady=12)
90
 
91
    def _build_open(self, notebook) -> None:
92
        frame = ttk.Frame(notebook, padding=12)
93
        notebook.add(frame, text="Open File")
94
        self.open_input = self._row(frame, "Sealed file", 0, True)
95
        self.open_identity = self._row(frame, "Recipient private key", 1, True)
96
        self.open_out = self._row(frame, "Plaintext output", 2)
97
        ttk.Button(frame, text="Choose Output", command=lambda: self._browse(self.open_out, save=True)).grid(row=2, column=2, padx=4)
98
        ttk.Button(frame, text="Open", command=self._open_file).grid(row=3, column=1, sticky="e", pady=12)
99
 
100
    def _keygen(self, identity_id: str, out_path: str) -> None:
101
        try:
102
            identity_id = (identity_id or "identity").strip()
103
            if not identity_id:
104
                raise ValueError("Please enter an identity name.")
105
            if len(identity_id) > 256:
106
                raise ValueError("Identity name must be 256 characters or fewer.")
107
            if not out_path:
108
                raise ValueError("Please choose a private key output path.")
109
            path = Path(out_path)
110
            pub_path = _public_key_path(path)
111
            self._prepare_output(path)
112
            self._prepare_output(pub_path, input_paths=[path])
113
            ident = ClassicIdentity.generate()
114
            out = {
115
                "id": identity_id,
116
                "x25519_priv": ident.x25519_priv.hex(),
117
                "x25519_pub": ident.x25519_pub.hex(),
118
                "ed25519_priv": ident.ed25519_priv.hex(),
119
                "ed25519_pub": ident.ed25519_pub.hex(),
120
            }
121
            _write_private_json(path, out)
122
            atomic_write_text(pub_path, json.dumps({
123
                "id": out["id"],
124
                "x25519_pub": out["x25519_pub"],
125
                "ed25519_pub": out["ed25519_pub"],
126
            }, indent=2))
127
            messagebox.showinfo("Oversight", "Keypair generated.")
128
        except Exception as exc:
129
            messagebox.showerror("Oversight", str(exc))
130
 
131
    def _seal_file(self) -> None:
132
        try:
133
            input_path = _require_file(self.seal_input.get(), "input file")
134
            issuer_path = _require_file(self.seal_issuer.get(), "issuer private key")
135
            recipient_path = _require_file(self.seal_recipient.get(), "recipient public key")
136
            raw_out = self.seal_out.get().strip()
137
            out_path = Path(raw_out) if raw_out else _default_sealed_path(input_path)
138
            self._prepare_output(out_path, input_paths=[input_path, issuer_path, recipient_path])
139
            plaintext = input_path.read_bytes()
140
            canonical_plaintext = plaintext
141
            issuer = _read_private_identity(issuer_path, "Issuer file")
142
            rec_pub = _read_public_identity(recipient_path, "Recipient file")
143
            watermarks: list[WatermarkRef] = []
144
            decision = None
145
 
146
            if self.watermark_enabled.get():
147
                try:
148
                    text = plaintext.decode("utf-8")
149
                except UnicodeDecodeError as exc:
150
                    raise ValueError(
151
                        "File is not UTF-8 text. Uncheck 'Embed L1/L2 watermarks' "
152
                        "to seal binary data."
153
                    ) from exc
154
                mark_id = watermark.new_mark_id()
155
                decision = l3_policy.decide_l3(
156
                    filename=str(input_path),
157
                    content_type=_validate_content_type(self.content_type.get()),
158
                    text=text,
159
                    requested_mode=self.l3_mode.get(),
160
                )
161
                if decision.enabled:
162
                    if not messagebox.askyesno(
163
                        "L3 disclosure",
164
                        "L3 semantic watermarking changes visible prose.\n\n"
165
                        f"Detected document class: {decision.document_class}\n"
166
                        f"Mode: {decision.mode}\n"
167
                        f"Reason: {decision.reason}\n\n"
168
                        "Continue?",
169
                    ):
170
                        return
171
                    text = l3_policy.apply_l3_safe(text, mark_id, mode=decision.mode)
172
                    watermarks.append(WatermarkRef(f"L3_semantic_{decision.mode}", mark_id.hex()))
173
                text = watermark.embed_ws(text, mark_id)
174
                text = watermark.embed_zw(text, mark_id)
175
                plaintext = text.encode("utf-8")
176
                watermarks.extend([
177
                    WatermarkRef("L1_zero_width", mark_id.hex()),
178
                    WatermarkRef("L2_whitespace", mark_id.hex()),
179
                ])
180
 
181
            registry_url = _validate_registry_url(self.registry_url.get())
182
            content_type = _validate_content_type(self.content_type.get())
183
            recipient = Recipient(rec_pub["id"], rec_pub["x25519_pub"], rec_pub.get("ed25519_pub"))
184
            manifest = Manifest.new(
185
                input_path.name,
186
                content_hash(plaintext),
187
                len(plaintext),
188
                issuer.get("id", "issuer"),
189
                issuer["ed25519_pub"],
190
                recipient,
191
                registry_url,
192
                content_type,
193
            )
194
            manifest.canonical_content_hash = content_hash(canonical_plaintext)
195
            manifest.watermarks = watermarks
196
            manifest.l3_policy = decision.to_dict() if decision else {}
197
            beacon_domain = _registry_domain(registry_url)
198
            manifest.beacons = [
199
                b.to_dict() for b in beacon.gen_beacons(beacon_domain, manifest.file_id, rec_pub["id"])
200
            ]
201
            blob = seal(plaintext, manifest, bytes.fromhex(issuer["ed25519_priv"]), bytes.fromhex(rec_pub["x25519_pub"]))
202
            atomic_write_bytes(out_path, blob)
203
            if watermarks:
204
                fp = ContentFingerprint.from_text(plaintext.decode("utf-8", errors="replace"))
205
                atomic_write_text(out_path.with_suffix(".fingerprint.json"), json.dumps({
206
                    "file_id": manifest.file_id,
207
                    "recipient_id": rec_pub["id"],
208
                    "canonical_content_hash": manifest.canonical_content_hash,
209
                    "l3_policy": manifest.l3_policy,
210
                    "fingerprint": fp.to_dict(),
211
                }, indent=2))
212
            messagebox.showinfo("Oversight", f"Sealed file written.\nfile_id={manifest.file_id}")
213
        except Exception as exc:
214
            messagebox.showerror("Oversight", str(exc))
215
 
216
    def _open_file(self) -> None:
217
        try:
218
            input_path = _require_file(self.open_input.get(), "sealed file")
219
            identity_path = _require_file(self.open_identity.get(), "recipient private key")
220
            out_path_raw = self.open_out.get().strip()
221
            if not out_path_raw:
222
                raise ValueError("Please choose a plaintext output path.")
223
            out_path = Path(out_path_raw)
224
            self._prepare_output(out_path, input_paths=[input_path, identity_path])
225
            ident = _read_private_identity(identity_path, "Recipient identity file")
226
            plaintext, _manifest = open_sealed(
227
                input_path.read_bytes(),
228
                bytes.fromhex(ident["x25519_priv"]),
229
            )
230
            atomic_write_bytes(out_path, plaintext)
231
            messagebox.showinfo("Oversight", "File opened.")
232
        except Exception as exc:
233
            messagebox.showerror("Oversight", str(exc))
234
 
235
    def _prepare_output(self, path: Path, input_paths: list[Path] | None = None) -> None:
236
        input_paths = input_paths or []
237
        if is_private_key_file(path):
238
            raise ValueError("Refusing to overwrite an Oversight private key file.")
239
        try:
240
            validate_output_path(path, input_paths=input_paths)
241
            return
242
        except FileExistsError:
243
            if not messagebox.askyesno("Overwrite file?", f"{path} already exists. Overwrite it?"):
244
                raise ValueError("Write cancelled; output file already exists.")
245
            validate_output_path(path, input_paths=input_paths, allow_existing=True)
246
 
247
 
248
def main() -> None:
249
    app = OversightGui()
250
    app.mainloop()
251
 
252
 
253
def _write_private_json(path: Path, data: dict) -> None:
254
    """Write private key material with restrictive permissions where supported."""
255
    atomic_write_private_json(path, data)
256
 
257
 
258
def _require_file(raw_path: str, label: str) -> Path:
259
    if not raw_path.strip():
260
        raise ValueError(f"Please choose a {label}.")
261
    path = Path(raw_path)
262
    if is_windows_reserved_path(path):
263
        raise ValueError(f"{label.capitalize()} uses a Windows reserved device name: {path.name}")
264
    if not path.exists() or not path.is_file():
265
        raise ValueError(f"{label.capitalize()} not found: {path}")
266
    return path
267
 
268
 
269
def _read_json(path: Path, label: str) -> dict:
270
    try:
271
        data = json.loads(path.read_text(encoding="utf-8"))
272
    except json.JSONDecodeError as exc:
273
        raise ValueError(f"{label} is not valid JSON.") from exc
274
    except UnicodeDecodeError as exc:
275
        raise ValueError(f"{label} is not UTF-8 JSON.") from exc
276
    if not isinstance(data, dict):
277
        raise ValueError(f"{label} must contain a JSON object.")
278
    return data
279
 
280
 
281
def _read_private_identity(path: Path, label: str) -> dict:
282
    data = _read_json(path, label)
283
    for key in ("x25519_priv", "x25519_pub", "ed25519_priv", "ed25519_pub"):
284
        if key not in data:
285
            raise ValueError(f"{label} does not contain `{key}`; did you select a public key by mistake?")
286
        _validate_hex_field(data[key], key, 32)
287
    if "id" not in data:
288
        raise ValueError(f"{label} does not contain `id`.")
289
    return data
290
 
291
 
292
def _read_public_identity(path: Path, label: str) -> dict:
293
    data = _read_json(path, label)
294
    for key in ("id", "x25519_pub"):
295
        if key not in data:
296
            raise ValueError(f"{label} does not contain `{key}`.")
297
    _validate_hex_field(data["x25519_pub"], "x25519_pub", 32)
298
    if "ed25519_pub" in data:
299
        _validate_hex_field(data["ed25519_pub"], "ed25519_pub", 32)
300
    return data
301
 
302
 
303
def _validate_hex_field(value: object, key: str, expected_len: int) -> None:
304
    if not isinstance(value, str):
305
        raise ValueError(f"`{key}` must be hex text.")
306
    try:
307
        raw = bytes.fromhex(value)
308
    except ValueError as exc:
309
        raise ValueError(f"`{key}` is not valid hex.") from exc
310
    if len(raw) != expected_len:
311
        raise ValueError(f"`{key}` must decode to {expected_len} bytes.")
312
 
313
 
314
def _validate_registry_url(raw_url: str) -> str:
315
    url = (raw_url or "").strip()
316
    parsed = urlparse(url)
317
    if parsed.scheme not in {"http", "https"} or not parsed.netloc:
318
        raise ValueError("Registry URL must be an http(s) URL with a host.")
319
    return url
320
 
321
 
322
def _registry_domain(registry_url: str) -> str:
323
    return urlparse(registry_url).netloc or "oversightprotocol.dev"
324
 
325
 
326
def _validate_content_type(raw_content_type: str) -> str:
327
    content_type = (raw_content_type or "application/octet-stream").strip()
328
    if any(ch in content_type for ch in "\r\n\"'<>"):
329
        raise ValueError("Content type contains unsafe characters.")
330
    if "/" not in content_type:
331
        raise ValueError("Content type must look like a MIME type, such as text/plain.")
332
    return content_type
333
 
334
 
335
def _public_key_path(private_path: Path) -> Path:
336
    name = private_path.name
337
    if name.lower().endswith(".pub.json"):
338
        raise ValueError("Private key output should not end with .pub.json.")
339
    if name.lower().endswith(".priv.json"):
340
        return private_path.with_name(name[:-10] + ".pub.json")
341
    return private_path.with_suffix(".pub.json")
342
 
343
 
344
def _default_sealed_path(input_path: Path) -> Path:
345
    if input_path.name.lower().endswith(".sealed"):
346
        return input_path.with_name(input_path.name + ".out.sealed")
347
    return Path(f"{input_path}.sealed")
348
 
349
 
350
if __name__ == "__main__":
351
    main()