| 1 | """Small Tkinter GUI for non-technical Oversight users.""" |
| 2 | |
| 3 | from __future__ import annotations |
| 4 | |
| 5 | import json |
| 6 | from pathlib import Path |
| 7 | import tkinter as tk |
| 8 | from tkinter import filedialog, messagebox, ttk |
| 9 | from urllib.parse import urlparse |
| 10 | |
| 11 | from oversight_core import ( |
| 12 | ClassicIdentity, |
| 13 | Manifest, |
| 14 | Recipient, |
| 15 | WatermarkRef, |
| 16 | beacon, |
| 17 | content_hash, |
| 18 | l3_policy, |
| 19 | open_sealed, |
| 20 | seal, |
| 21 | watermark, |
| 22 | ) |
| 23 | from oversight_core.fingerprint import ContentFingerprint |
| 24 | from oversight_core.safe_io import ( |
| 25 | atomic_write_bytes, |
| 26 | atomic_write_private_json, |
| 27 | atomic_write_text, |
| 28 | is_private_key_file, |
| 29 | is_windows_reserved_path, |
| 30 | validate_output_path, |
| 31 | ) |
| 32 | |
| 33 | |
| 34 | class OversightGui(tk.Tk): |
| 35 | def __init__(self) -> None: |
| 36 | super().__init__() |
| 37 | self.title("Oversight Protocol") |
| 38 | self.geometry("760x540") |
| 39 | self._build() |
| 40 | |
| 41 | def _build(self) -> None: |
| 42 | notebook = ttk.Notebook(self) |
| 43 | notebook.pack(fill="both", expand=True, padx=12, pady=12) |
| 44 | self._build_keygen(notebook) |
| 45 | self._build_seal(notebook) |
| 46 | self._build_open(notebook) |
| 47 | |
| 48 | def _row(self, parent, label: str, row: int, browse: bool = False): |
| 49 | ttk.Label(parent, text=label).grid(row=row, column=0, sticky="w", pady=4) |
| 50 | var = tk.StringVar() |
| 51 | ent = ttk.Entry(parent, textvariable=var, width=72) |
| 52 | ent.grid(row=row, column=1, sticky="ew", pady=4) |
| 53 | if browse: |
| 54 | ttk.Button(parent, text="Browse", command=lambda: self._browse(var)).grid(row=row, column=2, padx=4) |
| 55 | parent.columnconfigure(1, weight=1) |
| 56 | return var |
| 57 | |
| 58 | def _browse(self, var: tk.StringVar, save: bool = False) -> None: |
| 59 | path = filedialog.asksaveasfilename() if save else filedialog.askopenfilename() |
| 60 | if path: |
| 61 | var.set(path) |
| 62 | |
| 63 | def _build_keygen(self, notebook) -> None: |
| 64 | frame = ttk.Frame(notebook, padding=12) |
| 65 | notebook.add(frame, text="Generate Keys") |
| 66 | identity_id = self._row(frame, "Identity name", 0) |
| 67 | identity_id.set("alice") |
| 68 | out = self._row(frame, "Private key output", 1) |
| 69 | ttk.Button(frame, text="Choose Output", command=lambda: self._browse(out, save=True)).grid(row=1, column=2, padx=4) |
| 70 | ttk.Button(frame, text="Generate Keypair", command=lambda: self._keygen(identity_id.get(), out.get())).grid(row=2, column=1, sticky="e", pady=12) |
| 71 | |
| 72 | def _build_seal(self, notebook) -> None: |
| 73 | frame = ttk.Frame(notebook, padding=12) |
| 74 | notebook.add(frame, text="Seal File") |
| 75 | self.seal_input = self._row(frame, "Input file", 0, True) |
| 76 | self.seal_issuer = self._row(frame, "Issuer private key", 1, True) |
| 77 | self.seal_recipient = self._row(frame, "Recipient public key", 2, True) |
| 78 | self.seal_out = self._row(frame, "Sealed output", 3) |
| 79 | ttk.Button(frame, text="Choose Output", command=lambda: self._browse(self.seal_out, save=True)).grid(row=3, column=2, padx=4) |
| 80 | self.registry_url = self._row(frame, "Registry URL", 4) |
| 81 | self.registry_url.set("https://registry.oversightprotocol.dev") |
| 82 | self.content_type = self._row(frame, "Content type", 5) |
| 83 | self.content_type.set("text/plain") |
| 84 | self.l3_mode = tk.StringVar(value="auto") |
| 85 | ttk.Label(frame, text="L3 mode").grid(row=6, column=0, sticky="w", pady=4) |
| 86 | ttk.Combobox(frame, textvariable=self.l3_mode, values=["auto", "off", "boilerplate", "full"], state="readonly").grid(row=6, column=1, sticky="w") |
| 87 | self.watermark_enabled = tk.BooleanVar(value=True) |
| 88 | ttk.Checkbutton(frame, text="Embed L1/L2 watermarks", variable=self.watermark_enabled).grid(row=7, column=1, sticky="w") |
| 89 | ttk.Button(frame, text="Seal", command=self._seal_file).grid(row=8, column=1, sticky="e", pady=12) |
| 90 | |
| 91 | def _build_open(self, notebook) -> None: |
| 92 | frame = ttk.Frame(notebook, padding=12) |
| 93 | notebook.add(frame, text="Open File") |
| 94 | self.open_input = self._row(frame, "Sealed file", 0, True) |
| 95 | self.open_identity = self._row(frame, "Recipient private key", 1, True) |
| 96 | self.open_out = self._row(frame, "Plaintext output", 2) |
| 97 | ttk.Button(frame, text="Choose Output", command=lambda: self._browse(self.open_out, save=True)).grid(row=2, column=2, padx=4) |
| 98 | ttk.Button(frame, text="Open", command=self._open_file).grid(row=3, column=1, sticky="e", pady=12) |
| 99 | |
| 100 | def _keygen(self, identity_id: str, out_path: str) -> None: |
| 101 | try: |
| 102 | identity_id = (identity_id or "identity").strip() |
| 103 | if not identity_id: |
| 104 | raise ValueError("Please enter an identity name.") |
| 105 | if len(identity_id) > 256: |
| 106 | raise ValueError("Identity name must be 256 characters or fewer.") |
| 107 | if not out_path: |
| 108 | raise ValueError("Please choose a private key output path.") |
| 109 | path = Path(out_path) |
| 110 | pub_path = _public_key_path(path) |
| 111 | self._prepare_output(path) |
| 112 | self._prepare_output(pub_path, input_paths=[path]) |
| 113 | ident = ClassicIdentity.generate() |
| 114 | out = { |
| 115 | "id": identity_id, |
| 116 | "x25519_priv": ident.x25519_priv.hex(), |
| 117 | "x25519_pub": ident.x25519_pub.hex(), |
| 118 | "ed25519_priv": ident.ed25519_priv.hex(), |
| 119 | "ed25519_pub": ident.ed25519_pub.hex(), |
| 120 | } |
| 121 | _write_private_json(path, out) |
| 122 | atomic_write_text(pub_path, json.dumps({ |
| 123 | "id": out["id"], |
| 124 | "x25519_pub": out["x25519_pub"], |
| 125 | "ed25519_pub": out["ed25519_pub"], |
| 126 | }, indent=2)) |
| 127 | messagebox.showinfo("Oversight", "Keypair generated.") |
| 128 | except Exception as exc: |
| 129 | messagebox.showerror("Oversight", str(exc)) |
| 130 | |
| 131 | def _seal_file(self) -> None: |
| 132 | try: |
| 133 | input_path = _require_file(self.seal_input.get(), "input file") |
| 134 | issuer_path = _require_file(self.seal_issuer.get(), "issuer private key") |
| 135 | recipient_path = _require_file(self.seal_recipient.get(), "recipient public key") |
| 136 | raw_out = self.seal_out.get().strip() |
| 137 | out_path = Path(raw_out) if raw_out else _default_sealed_path(input_path) |
| 138 | self._prepare_output(out_path, input_paths=[input_path, issuer_path, recipient_path]) |
| 139 | plaintext = input_path.read_bytes() |
| 140 | canonical_plaintext = plaintext |
| 141 | issuer = _read_private_identity(issuer_path, "Issuer file") |
| 142 | rec_pub = _read_public_identity(recipient_path, "Recipient file") |
| 143 | watermarks: list[WatermarkRef] = [] |
| 144 | decision = None |
| 145 | |
| 146 | if self.watermark_enabled.get(): |
| 147 | try: |
| 148 | text = plaintext.decode("utf-8") |
| 149 | except UnicodeDecodeError as exc: |
| 150 | raise ValueError( |
| 151 | "File is not UTF-8 text. Uncheck 'Embed L1/L2 watermarks' " |
| 152 | "to seal binary data." |
| 153 | ) from exc |
| 154 | mark_id = watermark.new_mark_id() |
| 155 | decision = l3_policy.decide_l3( |
| 156 | filename=str(input_path), |
| 157 | content_type=_validate_content_type(self.content_type.get()), |
| 158 | text=text, |
| 159 | requested_mode=self.l3_mode.get(), |
| 160 | ) |
| 161 | if decision.enabled: |
| 162 | if not messagebox.askyesno( |
| 163 | "L3 disclosure", |
| 164 | "L3 semantic watermarking changes visible prose.\n\n" |
| 165 | f"Detected document class: {decision.document_class}\n" |
| 166 | f"Mode: {decision.mode}\n" |
| 167 | f"Reason: {decision.reason}\n\n" |
| 168 | "Continue?", |
| 169 | ): |
| 170 | return |
| 171 | text = l3_policy.apply_l3_safe(text, mark_id, mode=decision.mode) |
| 172 | watermarks.append(WatermarkRef(f"L3_semantic_{decision.mode}", mark_id.hex())) |
| 173 | text = watermark.embed_ws(text, mark_id) |
| 174 | text = watermark.embed_zw(text, mark_id) |
| 175 | plaintext = text.encode("utf-8") |
| 176 | watermarks.extend([ |
| 177 | WatermarkRef("L1_zero_width", mark_id.hex()), |
| 178 | WatermarkRef("L2_whitespace", mark_id.hex()), |
| 179 | ]) |
| 180 | |
| 181 | registry_url = _validate_registry_url(self.registry_url.get()) |
| 182 | content_type = _validate_content_type(self.content_type.get()) |
| 183 | recipient = Recipient(rec_pub["id"], rec_pub["x25519_pub"], rec_pub.get("ed25519_pub")) |
| 184 | manifest = Manifest.new( |
| 185 | input_path.name, |
| 186 | content_hash(plaintext), |
| 187 | len(plaintext), |
| 188 | issuer.get("id", "issuer"), |
| 189 | issuer["ed25519_pub"], |
| 190 | recipient, |
| 191 | registry_url, |
| 192 | content_type, |
| 193 | ) |
| 194 | manifest.canonical_content_hash = content_hash(canonical_plaintext) |
| 195 | manifest.watermarks = watermarks |
| 196 | manifest.l3_policy = decision.to_dict() if decision else {} |
| 197 | beacon_domain = _registry_domain(registry_url) |
| 198 | manifest.beacons = [ |
| 199 | b.to_dict() for b in beacon.gen_beacons(beacon_domain, manifest.file_id, rec_pub["id"]) |
| 200 | ] |
| 201 | blob = seal(plaintext, manifest, bytes.fromhex(issuer["ed25519_priv"]), bytes.fromhex(rec_pub["x25519_pub"])) |
| 202 | atomic_write_bytes(out_path, blob) |
| 203 | if watermarks: |
| 204 | fp = ContentFingerprint.from_text(plaintext.decode("utf-8", errors="replace")) |
| 205 | atomic_write_text(out_path.with_suffix(".fingerprint.json"), json.dumps({ |
| 206 | "file_id": manifest.file_id, |
| 207 | "recipient_id": rec_pub["id"], |
| 208 | "canonical_content_hash": manifest.canonical_content_hash, |
| 209 | "l3_policy": manifest.l3_policy, |
| 210 | "fingerprint": fp.to_dict(), |
| 211 | }, indent=2)) |
| 212 | messagebox.showinfo("Oversight", f"Sealed file written.\nfile_id={manifest.file_id}") |
| 213 | except Exception as exc: |
| 214 | messagebox.showerror("Oversight", str(exc)) |
| 215 | |
| 216 | def _open_file(self) -> None: |
| 217 | try: |
| 218 | input_path = _require_file(self.open_input.get(), "sealed file") |
| 219 | identity_path = _require_file(self.open_identity.get(), "recipient private key") |
| 220 | out_path_raw = self.open_out.get().strip() |
| 221 | if not out_path_raw: |
| 222 | raise ValueError("Please choose a plaintext output path.") |
| 223 | out_path = Path(out_path_raw) |
| 224 | self._prepare_output(out_path, input_paths=[input_path, identity_path]) |
| 225 | ident = _read_private_identity(identity_path, "Recipient identity file") |
| 226 | plaintext, _manifest = open_sealed( |
| 227 | input_path.read_bytes(), |
| 228 | bytes.fromhex(ident["x25519_priv"]), |
| 229 | ) |
| 230 | atomic_write_bytes(out_path, plaintext) |
| 231 | messagebox.showinfo("Oversight", "File opened.") |
| 232 | except Exception as exc: |
| 233 | messagebox.showerror("Oversight", str(exc)) |
| 234 | |
| 235 | def _prepare_output(self, path: Path, input_paths: list[Path] | None = None) -> None: |
| 236 | input_paths = input_paths or [] |
| 237 | if is_private_key_file(path): |
| 238 | raise ValueError("Refusing to overwrite an Oversight private key file.") |
| 239 | try: |
| 240 | validate_output_path(path, input_paths=input_paths) |
| 241 | return |
| 242 | except FileExistsError: |
| 243 | if not messagebox.askyesno("Overwrite file?", f"{path} already exists. Overwrite it?"): |
| 244 | raise ValueError("Write cancelled; output file already exists.") |
| 245 | validate_output_path(path, input_paths=input_paths, allow_existing=True) |
| 246 | |
| 247 | |
| 248 | def main() -> None: |
| 249 | app = OversightGui() |
| 250 | app.mainloop() |
| 251 | |
| 252 | |
| 253 | def _write_private_json(path: Path, data: dict) -> None: |
| 254 | """Write private key material with restrictive permissions where supported.""" |
| 255 | atomic_write_private_json(path, data) |
| 256 | |
| 257 | |
| 258 | def _require_file(raw_path: str, label: str) -> Path: |
| 259 | if not raw_path.strip(): |
| 260 | raise ValueError(f"Please choose a {label}.") |
| 261 | path = Path(raw_path) |
| 262 | if is_windows_reserved_path(path): |
| 263 | raise ValueError(f"{label.capitalize()} uses a Windows reserved device name: {path.name}") |
| 264 | if not path.exists() or not path.is_file(): |
| 265 | raise ValueError(f"{label.capitalize()} not found: {path}") |
| 266 | return path |
| 267 | |
| 268 | |
| 269 | def _read_json(path: Path, label: str) -> dict: |
| 270 | try: |
| 271 | data = json.loads(path.read_text(encoding="utf-8")) |
| 272 | except json.JSONDecodeError as exc: |
| 273 | raise ValueError(f"{label} is not valid JSON.") from exc |
| 274 | except UnicodeDecodeError as exc: |
| 275 | raise ValueError(f"{label} is not UTF-8 JSON.") from exc |
| 276 | if not isinstance(data, dict): |
| 277 | raise ValueError(f"{label} must contain a JSON object.") |
| 278 | return data |
| 279 | |
| 280 | |
| 281 | def _read_private_identity(path: Path, label: str) -> dict: |
| 282 | data = _read_json(path, label) |
| 283 | for key in ("x25519_priv", "x25519_pub", "ed25519_priv", "ed25519_pub"): |
| 284 | if key not in data: |
| 285 | raise ValueError(f"{label} does not contain `{key}`; did you select a public key by mistake?") |
| 286 | _validate_hex_field(data[key], key, 32) |
| 287 | if "id" not in data: |
| 288 | raise ValueError(f"{label} does not contain `id`.") |
| 289 | return data |
| 290 | |
| 291 | |
| 292 | def _read_public_identity(path: Path, label: str) -> dict: |
| 293 | data = _read_json(path, label) |
| 294 | for key in ("id", "x25519_pub"): |
| 295 | if key not in data: |
| 296 | raise ValueError(f"{label} does not contain `{key}`.") |
| 297 | _validate_hex_field(data["x25519_pub"], "x25519_pub", 32) |
| 298 | if "ed25519_pub" in data: |
| 299 | _validate_hex_field(data["ed25519_pub"], "ed25519_pub", 32) |
| 300 | return data |
| 301 | |
| 302 | |
| 303 | def _validate_hex_field(value: object, key: str, expected_len: int) -> None: |
| 304 | if not isinstance(value, str): |
| 305 | raise ValueError(f"`{key}` must be hex text.") |
| 306 | try: |
| 307 | raw = bytes.fromhex(value) |
| 308 | except ValueError as exc: |
| 309 | raise ValueError(f"`{key}` is not valid hex.") from exc |
| 310 | if len(raw) != expected_len: |
| 311 | raise ValueError(f"`{key}` must decode to {expected_len} bytes.") |
| 312 | |
| 313 | |
| 314 | def _validate_registry_url(raw_url: str) -> str: |
| 315 | url = (raw_url or "").strip() |
| 316 | parsed = urlparse(url) |
| 317 | if parsed.scheme not in {"http", "https"} or not parsed.netloc: |
| 318 | raise ValueError("Registry URL must be an http(s) URL with a host.") |
| 319 | return url |
| 320 | |
| 321 | |
| 322 | def _registry_domain(registry_url: str) -> str: |
| 323 | return urlparse(registry_url).netloc or "oversightprotocol.dev" |
| 324 | |
| 325 | |
| 326 | def _validate_content_type(raw_content_type: str) -> str: |
| 327 | content_type = (raw_content_type or "application/octet-stream").strip() |
| 328 | if any(ch in content_type for ch in "\r\n\"'<>"): |
| 329 | raise ValueError("Content type contains unsafe characters.") |
| 330 | if "/" not in content_type: |
| 331 | raise ValueError("Content type must look like a MIME type, such as text/plain.") |
| 332 | return content_type |
| 333 | |
| 334 | |
| 335 | def _public_key_path(private_path: Path) -> Path: |
| 336 | name = private_path.name |
| 337 | if name.lower().endswith(".pub.json"): |
| 338 | raise ValueError("Private key output should not end with .pub.json.") |
| 339 | if name.lower().endswith(".priv.json"): |
| 340 | return private_path.with_name(name[:-10] + ".pub.json") |
| 341 | return private_path.with_suffix(".pub.json") |
| 342 | |
| 343 | |
| 344 | def _default_sealed_path(input_path: Path) -> Path: |
| 345 | if input_path.name.lower().endswith(".sealed"): |
| 346 | return input_path.with_name(input_path.name + ".out.sealed") |
| 347 | return Path(f"{input_path}.sealed") |
| 348 | |
| 349 | |
| 350 | if __name__ == "__main__": |
| 351 | main() |