Zion Boggan zionboggan.com ↗

Harden GUI writes and container parsing

Co-authored-by: Codex (GPT-5.4) <noreply@openai.com>
45c3bdc   Zion Boggan committed on Apr 21, 2026 (2 months ago)
CHANGELOG.md +9 -1
@@ -16,6 +16,12 @@ Review-driven hardening from `P:/Oversight/oversight-protocol-review.md`.
L3 application is opt-in; L1/L2 remain available by default.
- `cli/gui.py`: added a Tkinter desktop GUI for key generation, sealing, and
opening files (`oversight gui`) so non-technical users have a starter path.
+- GUI and CLI output writes now fail closed against private-key overwrites,
+ same-path writes, reserved Windows device names, malformed key files, and
+ non-UTF-8 watermark attempts. Private-key writes use atomic replacement and
+ restrictive permissions/ACL hardening where supported.
+- `.sealed` parsing now rejects tampered suite IDs, malformed manifest/wrapped-DEK
+ JSON, unknown manifest fields, and trailing bytes after ciphertext.
- `docs/security.md`: documented L3 collusion/canonicalization limits, layer
survival properties, passive beacon limits, jurisdiction-by-IP limits, and
RFC 3161 timestamp semantics.
@@ -26,7 +32,9 @@ Review-driven hardening from `P:/Oversight/oversight-protocol-review.md`.
before SOC 2 / ISO 27001 work.
- Raised vulnerable dependency floors flagged by Dependabot/PyPI advisory
checks: setuptools, cryptography, PyNaCl, pydantic, python-multipart,
- Pillow, and pypdf now require patched minimums.
+ Pillow, and pypdf now require patched minimums; Rust manifest floors
+ now pin patched minima for sqlx, tokio, rand_core, zip, chrono, regex,
+ once_cell, and tracing-subscriber.
- Added focused regression coverage in `tests/test_l3_policy_unit.py`.
## v0.4.4 - 2026-04-20 security hardening
README.md +12 -1
@@ -104,11 +104,18 @@ collusion/threat-model documentation in `docs/security.md`.
**GUI starter.** `oversight gui` launches a small desktop app for key
generation, sealing, and opening files so non-technical recipients are not
-forced through the CLI.
+forced through the CLI. The GUI and CLI now guard local writes so seal/open
+outputs cannot overwrite selected input files or Oversight private-key JSON;
+private-key generation uses atomic replacement and restrictive permissions or
+best-effort Windows ACL hardening.
**Registry federation draft.** `docs/spec/registry-v1.md` documents the
interoperability contract for compatible registry operators.
+**Public reference metadata.** `docs/META/public-reference.yaml` is the
+authoritative repo source for public version numbers, dependency floors,
+canonical links, writing rules, and website update contracts.
+
## What's new in v0.4.4
**Security hardening over v0.4.3.** This line starts from the v0.4.3 Python
@@ -153,6 +160,10 @@ These items are included in v0.4.4/v0.4.5 and current `main`:
DOCX keyword insertion, and PDF action screening.
- L3 semantic watermarking is opt-in for sensitive classes, requires
disclosure acknowledgement when enabled, and records `canonical_content_hash`.
+- `.sealed` parsing rejects suite-byte tamper, malformed manifest or wrapped-DEK
+ JSON, unknown manifest fields, and trailing bytes after ciphertext.
+- Dependency floors now exclude known vulnerable PyPI and Rust manifest ranges
+ flagged by Dependabot/advisory checks.
## Repository layout
cli/gui.py +166 -27
@@ -6,7 +6,7 @@ import json
from pathlib import Path
import tkinter as tk
from tkinter import filedialog, messagebox, ttk
-import os
+from urllib.parse import urlparse
from oversight_core import (
ClassicIdentity,
@@ -21,6 +21,14 @@ from oversight_core import (
watermark,
)
from oversight_core.fingerprint import ContentFingerprint
+from oversight_core.safe_io import (
+ atomic_write_bytes,
+ atomic_write_private_json,
+ atomic_write_text,
+ is_private_key_file,
+ is_windows_reserved_path,
+ validate_output_path,
+)
class OversightGui(tk.Tk):
@@ -91,17 +99,27 @@ class OversightGui(tk.Tk):
def _keygen(self, identity_id: str, out_path: str) -> None:
try:
+ identity_id = (identity_id or "identity").strip()
+ if not identity_id:
+ raise ValueError("Please enter an identity name.")
+ if len(identity_id) > 256:
+ raise ValueError("Identity name must be 256 characters or fewer.")
+ if not out_path:
+ raise ValueError("Please choose a private key output path.")
+ path = Path(out_path)
+ pub_path = _public_key_path(path)
+ self._prepare_output(path)
+ self._prepare_output(pub_path, input_paths=[path])
ident = ClassicIdentity.generate()
out = {
- "id": identity_id or "identity",
+ "id": identity_id,
"x25519_priv": ident.x25519_priv.hex(),
"x25519_pub": ident.x25519_pub.hex(),
"ed25519_priv": ident.ed25519_priv.hex(),
"ed25519_pub": ident.ed25519_pub.hex(),
}
- path = Path(out_path)
_write_private_json(path, out)
- path.with_suffix(".pub.json").write_text(json.dumps({
+ atomic_write_text(pub_path, json.dumps({
"id": out["id"],
"x25519_pub": out["x25519_pub"],
"ed25519_pub": out["ed25519_pub"],
@@ -112,27 +130,42 @@ class OversightGui(tk.Tk):
def _seal_file(self) -> None:
try:
- input_path = Path(self.seal_input.get())
+ input_path = _require_file(self.seal_input.get(), "input file")
+ issuer_path = _require_file(self.seal_issuer.get(), "issuer private key")
+ recipient_path = _require_file(self.seal_recipient.get(), "recipient public key")
+ raw_out = self.seal_out.get().strip()
+ out_path = Path(raw_out) if raw_out else _default_sealed_path(input_path)
+ self._prepare_output(out_path, input_paths=[input_path, issuer_path, recipient_path])
plaintext = input_path.read_bytes()
canonical_plaintext = plaintext
- issuer = json.loads(Path(self.seal_issuer.get()).read_text())
- rec_pub = json.loads(Path(self.seal_recipient.get()).read_text())
+ issuer = _read_private_identity(issuer_path, "Issuer file")
+ rec_pub = _read_public_identity(recipient_path, "Recipient file")
watermarks: list[WatermarkRef] = []
decision = None
if self.watermark_enabled.get():
- text = plaintext.decode("utf-8")
+ try:
+ text = plaintext.decode("utf-8")
+ except UnicodeDecodeError as exc:
+ raise ValueError(
+ "File is not UTF-8 text. Uncheck 'Embed L1/L2 watermarks' "
+ "to seal binary data."
+ ) from exc
mark_id = watermark.new_mark_id()
decision = l3_policy.decide_l3(
filename=str(input_path),
- content_type=self.content_type.get(),
+ content_type=_validate_content_type(self.content_type.get()),
text=text,
requested_mode=self.l3_mode.get(),
)
if decision.enabled:
if not messagebox.askyesno(
"L3 disclosure",
- "L3 semantic watermarking changes visible prose. Continue?",
+ "L3 semantic watermarking changes visible prose.\n\n"
+ f"Detected document class: {decision.document_class}\n"
+ f"Mode: {decision.mode}\n"
+ f"Reason: {decision.reason}\n\n"
+ "Continue?",
):
return
text = l3_policy.apply_l3_safe(text, mark_id, mode=decision.mode)
@@ -145,6 +178,8 @@ class OversightGui(tk.Tk):
WatermarkRef("L2_whitespace", mark_id.hex()),
])
+ registry_url = _validate_registry_url(self.registry_url.get())
+ content_type = _validate_content_type(self.content_type.get())
recipient = Recipient(rec_pub["id"], rec_pub["x25519_pub"], rec_pub.get("ed25519_pub"))
manifest = Manifest.new(
input_path.name,
@@ -153,21 +188,21 @@ class OversightGui(tk.Tk):
issuer.get("id", "issuer"),
issuer["ed25519_pub"],
recipient,
- self.registry_url.get(),
- self.content_type.get(),
+ registry_url,
+ content_type,
)
manifest.canonical_content_hash = content_hash(canonical_plaintext)
manifest.watermarks = watermarks
manifest.l3_policy = decision.to_dict() if decision else {}
+ beacon_domain = _registry_domain(registry_url)
manifest.beacons = [
- b.to_dict() for b in beacon.gen_beacons("oversightprotocol.dev", "pending", rec_pub["id"])
+ b.to_dict() for b in beacon.gen_beacons(beacon_domain, manifest.file_id, rec_pub["id"])
]
- out_path = Path(self.seal_out.get() or f"{input_path}.sealed")
blob = seal(plaintext, manifest, bytes.fromhex(issuer["ed25519_priv"]), bytes.fromhex(rec_pub["x25519_pub"]))
- out_path.write_bytes(blob)
+ atomic_write_bytes(out_path, blob)
if watermarks:
fp = ContentFingerprint.from_text(plaintext.decode("utf-8", errors="replace"))
- out_path.with_suffix(".fingerprint.json").write_text(json.dumps({
+ atomic_write_text(out_path.with_suffix(".fingerprint.json"), json.dumps({
"file_id": manifest.file_id,
"recipient_id": rec_pub["id"],
"canonical_content_hash": manifest.canonical_content_hash,
@@ -180,16 +215,35 @@ class OversightGui(tk.Tk):
def _open_file(self) -> None:
try:
- ident = json.loads(Path(self.open_identity.get()).read_text())
+ input_path = _require_file(self.open_input.get(), "sealed file")
+ identity_path = _require_file(self.open_identity.get(), "recipient private key")
+ out_path_raw = self.open_out.get().strip()
+ if not out_path_raw:
+ raise ValueError("Please choose a plaintext output path.")
+ out_path = Path(out_path_raw)
+ self._prepare_output(out_path, input_paths=[input_path, identity_path])
+ ident = _read_private_identity(identity_path, "Recipient identity file")
plaintext, _manifest = open_sealed(
- Path(self.open_input.get()).read_bytes(),
+ input_path.read_bytes(),
bytes.fromhex(ident["x25519_priv"]),
)
- Path(self.open_out.get()).write_bytes(plaintext)
+ atomic_write_bytes(out_path, plaintext)
messagebox.showinfo("Oversight", "File opened.")
except Exception as exc:
messagebox.showerror("Oversight", str(exc))
+ def _prepare_output(self, path: Path, input_paths: list[Path] | None = None) -> None:
+ input_paths = input_paths or []
+ if is_private_key_file(path):
+ raise ValueError("Refusing to overwrite an Oversight private key file.")
+ try:
+ validate_output_path(path, input_paths=input_paths)
+ return
+ except FileExistsError:
+ if not messagebox.askyesno("Overwrite file?", f"{path} already exists. Overwrite it?"):
+ raise ValueError("Write cancelled; output file already exists.")
+ validate_output_path(path, input_paths=input_paths, allow_existing=True)
+
def main() -> None:
app = OversightGui()
@@ -198,14 +252,99 @@ def main() -> None:
def _write_private_json(path: Path, data: dict) -> None:
"""Write private key material with restrictive permissions where supported."""
- path.parent.mkdir(parents=True, exist_ok=True)
- payload = json.dumps(data, indent=2)
- if os.name == "posix":
- fd = os.open(path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600)
- with os.fdopen(fd, "w", encoding="utf-8") as f:
- f.write(payload)
- else:
- path.write_text(payload, encoding="utf-8")
+ atomic_write_private_json(path, data)
+
+
+def _require_file(raw_path: str, label: str) -> Path:
+ if not raw_path.strip():
+ raise ValueError(f"Please choose a {label}.")
+ path = Path(raw_path)
+ if is_windows_reserved_path(path):
+ raise ValueError(f"{label.capitalize()} uses a Windows reserved device name: {path.name}")
+ if not path.exists() or not path.is_file():
+ raise ValueError(f"{label.capitalize()} not found: {path}")
+ return path
+
+
+def _read_json(path: Path, label: str) -> dict:
+ try:
+ data = json.loads(path.read_text(encoding="utf-8"))
+ except json.JSONDecodeError as exc:
+ raise ValueError(f"{label} is not valid JSON.") from exc
+ except UnicodeDecodeError as exc:
+ raise ValueError(f"{label} is not UTF-8 JSON.") from exc
+ if not isinstance(data, dict):
+ raise ValueError(f"{label} must contain a JSON object.")
+ return data
+
+
+def _read_private_identity(path: Path, label: str) -> dict:
+ data = _read_json(path, label)
+ for key in ("x25519_priv", "x25519_pub", "ed25519_priv", "ed25519_pub"):
+ if key not in data:
+ raise ValueError(f"{label} does not contain `{key}`; did you select a public key by mistake?")
+ _validate_hex_field(data[key], key, 32)
+ if "id" not in data:
+ raise ValueError(f"{label} does not contain `id`.")
+ return data
+
+
+def _read_public_identity(path: Path, label: str) -> dict:
+ data = _read_json(path, label)
+ for key in ("id", "x25519_pub"):
+ if key not in data:
+ raise ValueError(f"{label} does not contain `{key}`.")
+ _validate_hex_field(data["x25519_pub"], "x25519_pub", 32)
+ if "ed25519_pub" in data:
+ _validate_hex_field(data["ed25519_pub"], "ed25519_pub", 32)
+ return data
+
+
+def _validate_hex_field(value: object, key: str, expected_len: int) -> None:
+ if not isinstance(value, str):
+ raise ValueError(f"`{key}` must be hex text.")
+ try:
+ raw = bytes.fromhex(value)
+ except ValueError as exc:
+ raise ValueError(f"`{key}` is not valid hex.") from exc
+ if len(raw) != expected_len:
+ raise ValueError(f"`{key}` must decode to {expected_len} bytes.")
+
+
+def _validate_registry_url(raw_url: str) -> str:
+ url = (raw_url or "").strip()
+ parsed = urlparse(url)
+ if parsed.scheme not in {"http", "https"} or not parsed.netloc:
+ raise ValueError("Registry URL must be an http(s) URL with a host.")
+ return url
+
+
+def _registry_domain(registry_url: str) -> str:
+ return urlparse(registry_url).netloc or "oversightprotocol.dev"
+
+
+def _validate_content_type(raw_content_type: str) -> str:
+ content_type = (raw_content_type or "application/octet-stream").strip()
+ if any(ch in content_type for ch in "\r\n\"'<>"):
+ raise ValueError("Content type contains unsafe characters.")
+ if "/" not in content_type:
+ raise ValueError("Content type must look like a MIME type, such as text/plain.")
+ return content_type
+
+
+def _public_key_path(private_path: Path) -> Path:
+ name = private_path.name
+ if name.lower().endswith(".pub.json"):
+ raise ValueError("Private key output should not end with .pub.json.")
+ if name.lower().endswith(".priv.json"):
+ return private_path.with_name(name[:-10] + ".pub.json")
+ return private_path.with_suffix(".pub.json")
+
+
+def _default_sealed_path(input_path: Path) -> Path:
+ if input_path.name.lower().endswith(".sealed"):
+ return input_path.with_name(input_path.name + ".out.sealed")
+ return Path(f"{input_path}.sealed")
if __name__ == "__main__":
cli/oversight.py +46 -26
@@ -49,11 +49,21 @@ from oversight_core import (
from oversight_core.container import SealedFile
from oversight_core import semantic
from oversight_core.fingerprint import ContentFingerprint
+from oversight_core.safe_io import (
+ atomic_write_bytes,
+ atomic_write_private_json,
+ atomic_write_text,
+ validate_output_path,
+)
# ---------------- keygen ----------------
def cmd_keygen(args):
+ out_path = Path(args.out)
+ pub_path = out_path.with_suffix(".pub.json")
+ validate_output_path(out_path)
+ validate_output_path(pub_path, input_paths=[out_path])
ident = ClassicIdentity.generate()
out = {
"id": args.id or "identity",
@@ -62,10 +72,9 @@ def cmd_keygen(args):
"ed25519_priv": ident.ed25519_priv.hex(),
"ed25519_pub": ident.ed25519_pub.hex(),
}
- Path(args.out).write_text(json.dumps(out, indent=2))
+ atomic_write_private_json(out_path, out)
# also write a public-only sibling
- pub_path = Path(args.out).with_suffix(".pub.json")
- pub_path.write_text(json.dumps({
+ atomic_write_text(pub_path, json.dumps({
"id": out["id"],
"x25519_pub": out["x25519_pub"],
"ed25519_pub": out["ed25519_pub"],
@@ -77,9 +86,14 @@ def cmd_keygen(args):
# ---------------- seal ----------------
def cmd_seal(args):
- plaintext = Path(args.input).read_bytes()
- issuer = json.loads(Path(args.issuer_key).read_text())
- rec_pub = json.loads(Path(args.recipient_pub).read_text())
+ input_path = Path(args.input)
+ issuer_path = Path(args.issuer_key)
+ recipient_path = Path(args.recipient_pub)
+ out_path = Path(args.out)
+ validate_output_path(out_path, input_paths=[input_path, issuer_path, recipient_path])
+ plaintext = input_path.read_bytes()
+ issuer = json.loads(issuer_path.read_text())
+ rec_pub = json.loads(recipient_path.read_text())
canonical_plaintext = plaintext
@@ -142,14 +156,8 @@ def cmd_seal(args):
)
# Beacons
- beacons = beacon.gen_beacons(
- registry_domain=args.registry_domain,
- file_id="pending", # will be replaced after manifest.new assigns file_id
- recipient_id=rec_pub["id"],
- )
-
manifest = Manifest.new(
- original_filename=Path(args.input).name,
+ original_filename=input_path.name,
content_hash=content_hash(plaintext),
size_bytes=len(plaintext),
issuer_id=args.issuer_id,
@@ -161,6 +169,11 @@ def cmd_seal(args):
manifest.canonical_content_hash = content_hash(canonical_plaintext)
if l3_decision:
manifest.l3_policy = l3_decision.to_dict()
+ beacons = beacon.gen_beacons(
+ registry_domain=args.registry_domain,
+ file_id=manifest.file_id,
+ recipient_id=rec_pub["id"],
+ )
manifest.watermarks = watermarks_for_manifest
manifest.beacons = [b.to_dict() for b in beacons]
@@ -183,7 +196,7 @@ def cmd_seal(args):
recipient_x25519_pub=bytes.fromhex(rec_pub["x25519_pub"]),
)
- Path(args.out).write_bytes(blob)
+ atomic_write_bytes(out_path, blob)
print(f"[+] wrote {args.out} ({len(blob)} bytes)")
print(f"[+] file_id={manifest.file_id}")
print(f"[+] recipient={recipient.recipient_id}")
@@ -191,8 +204,8 @@ def cmd_seal(args):
# Store fingerprint alongside the sealed file
if fingerprint:
- fp_path = Path(args.out).with_suffix(".fingerprint.json")
- fp_path.write_text(json.dumps({
+ fp_path = out_path.with_suffix(".fingerprint.json")
+ atomic_write_text(fp_path, json.dumps({
"file_id": manifest.file_id,
"recipient_id": rec_pub["id"],
"mark_id": watermarks_for_manifest[0].mark_id if watermarks_for_manifest else None,
@@ -226,13 +239,17 @@ def cmd_seal(args):
# ---------------- open ----------------
def cmd_open(args):
- blob = Path(args.input).read_bytes()
- ident = json.loads(Path(args.identity).read_text())
+ input_path = Path(args.input)
+ identity_path = Path(args.identity)
+ out_path = Path(args.out)
+ validate_output_path(out_path, input_paths=[input_path, identity_path])
+ blob = input_path.read_bytes()
+ ident = json.loads(identity_path.read_text())
plaintext, manifest = open_sealed(
blob,
recipient_x25519_priv=bytes.fromhex(ident["x25519_priv"]),
)
- Path(args.out).write_bytes(plaintext)
+ atomic_write_bytes(out_path, plaintext)
print(f"[+] decrypted to {args.out}")
print(f"[+] file_id = {manifest.file_id}")
print(f"[+] issuer = {manifest.issuer_id}")
@@ -481,13 +498,16 @@ def main():
args = p.parse_args()
- {
- "keygen": cmd_keygen,
- "seal": cmd_seal,
- "open": cmd_open,
- "inspect": cmd_inspect,
- "attribute": cmd_attribute,
- }[args.cmd](args)
+ try:
+ {
+ "keygen": cmd_keygen,
+ "seal": cmd_seal,
+ "open": cmd_open,
+ "inspect": cmd_inspect,
+ "attribute": cmd_attribute,
+ }[args.cmd](args)
+ except (ValueError, FileExistsError, OSError, json.JSONDecodeError) as exc:
+ raise SystemExit(f"[!] {exc}") from exc
if __name__ == "__main__":
cli/oversight_rich.py +33 -12
@@ -50,6 +50,12 @@ from oversight_core import (
)
from oversight_core.container import SealedFile
from oversight_core.fingerprint import ContentFingerprint
+from oversight_core.safe_io import (
+ atomic_write_bytes,
+ atomic_write_private_json,
+ atomic_write_text,
+ validate_output_path,
+)
# ---------------------------------------------------------------------------
# Constants
@@ -263,6 +269,13 @@ def cmd_keys_generate(args):
"Use --force to overwrite."
)
sys.exit(1)
+ pub_path = out_path.with_suffix(".pub.json")
+ try:
+ validate_output_path(out_path, allow_existing=args.force)
+ validate_output_path(pub_path, input_paths=[out_path], allow_existing=args.force)
+ except (ValueError, FileExistsError) as exc:
+ error_panel(str(exc))
+ sys.exit(1)
with Progress(
SpinnerColumn(),
@@ -280,15 +293,14 @@ def cmd_keys_generate(args):
"ed25519_pub": ident.ed25519_pub.hex(),
}
- pub_path = out_path.with_suffix(".pub.json")
pub_data = {
"id": identity_name,
"x25519_pub": ident.x25519_pub.hex(),
"ed25519_pub": ident.ed25519_pub.hex(),
}
- out_path.write_text(json.dumps(priv_data, indent=2))
- pub_path.write_text(json.dumps(pub_data, indent=2))
+ atomic_write_private_json(out_path, priv_data)
+ atomic_write_text(pub_path, json.dumps(pub_data, indent=2))
# Update config if we have one
if config_dir and not args.out:
@@ -503,6 +515,11 @@ def cmd_seal(args):
# Determine output path
out_path = Path(args.out) if args.out else input_path.with_suffix(".sealed")
+ try:
+ validate_output_path(out_path, input_paths=[input_path, issuer_key_path, recipient_pub_path])
+ except (ValueError, FileExistsError) as exc:
+ error_panel(str(exc))
+ sys.exit(1)
# Resolve settings
registry_url = args.registry_url or cfg.get("registry_url", "http://localhost:8000")
@@ -596,12 +613,6 @@ def cmd_seal(args):
ed25519_pub=rec_pub.get("ed25519_pub"),
)
- beacons = beacon.gen_beacons(
- registry_domain=registry_domain,
- file_id="pending",
- recipient_id=rec_pub["id"],
- )
-
manifest = Manifest.new(
original_filename=input_path.name,
content_hash=content_hash(plaintext),
@@ -615,6 +626,11 @@ def cmd_seal(args):
manifest.canonical_content_hash = content_hash(canonical_plaintext)
if l3_decision:
manifest.l3_policy = l3_decision.to_dict()
+ beacons = beacon.gen_beacons(
+ registry_domain=registry_domain,
+ file_id=manifest.file_id,
+ recipient_id=rec_pub["id"],
+ )
manifest.watermarks = watermarks_for_manifest
manifest.beacons = [b.to_dict() for b in beacons]
progress.advance(task)
@@ -640,11 +656,11 @@ def cmd_seal(args):
# Step 7: Write output
progress.update(task, description="Writing sealed file...")
- out_path.write_bytes(blob)
+ atomic_write_bytes(out_path, blob)
if fingerprint:
fp_path = out_path.with_suffix(".fingerprint.json")
- fp_path.write_text(json.dumps({
+ atomic_write_text(fp_path, json.dumps({
"file_id": manifest.file_id,
"recipient_id": rec_pub["id"],
"mark_id": mark_id.hex() if mark_id else None,
@@ -734,6 +750,11 @@ def cmd_open(args):
sys.exit(1)
out_path = Path(args.out) if args.out else input_path.with_suffix("")
+ try:
+ validate_output_path(out_path, input_paths=[input_path, identity_path])
+ except (ValueError, FileExistsError) as exc:
+ error_panel(str(exc))
+ sys.exit(1)
ident = json.loads(identity_path.read_text())
@@ -750,7 +771,7 @@ def cmd_open(args):
blob,
recipient_x25519_priv=bytes.fromhex(ident["x25519_priv"]),
)
- out_path.write_bytes(plaintext)
+ atomic_write_bytes(out_path, plaintext)
except ValueError as e:
error_panel(
f"Decryption failed: {e}",
docs/security.md +17 -0
@@ -52,6 +52,23 @@ Until those mitigations land, issuers should treat L3 as attribution evidence
against ordinary leaks and low-to-medium effort stripping, not as a perfect
collusion-resistant watermark.
+## GUI and Local File Safety
+
+The desktop GUI and CLI treat private identity files as high-value key
+material. Seal and open operations MUST NOT overwrite selected input paths or
+files that parse as Oversight private keys. Existing non-key outputs require an
+explicit GUI confirmation, while CLI writes fail closed so operators must choose
+a new path or remove the old file deliberately. Private-key writes use atomic
+temporary files plus replacement; POSIX writes request `0600`, and Windows GUI
+key generation applies a best-effort ACL narrowing after replacement.
+
+Container parsing is intentionally strict. The unsigned suite byte in the binary
+header must match the signed manifest suite, malformed JSON is normalized to
+clean `ValueError` failures, unknown manifest fields are rejected, and trailing
+bytes after the ciphertext are not accepted. These checks keep audit tools and
+future consumers from trusting attacker-controlled side channels outside the
+signed manifest and AEAD-protected ciphertext.
+
## Passive Beacons
Passive beacons are forensic telemetry, not a detection guarantee. Absence of
oversight-rust/Cargo.toml +1 -1
@@ -32,7 +32,7 @@ ed25519-dalek = { version = "2", features = ["rand_core"] }
chacha20poly1305 = { version = "0.10", features = ["alloc"] }
hkdf = "0.12"
sha2 = "0.10"
-rand_core = "0.6"
+rand_core = "0.6.2"
# Serialization / encoding
serde = { version = "1", features = ["derive"] }
oversight-rust/oversight-formats/Cargo.toml +1 -1
@@ -20,7 +20,7 @@ hex.workspace = true
lopdf = { version = "0.34", optional = true }
# DOCX (OOXML): zip for container, quick-xml for XML parsing
-zip = { version = "2", default-features = false, features = ["deflate"], optional = true }
+zip = { version = "2.3.0", default-features = false, features = ["deflate"], optional = true }
quick-xml = { version = "0.36", features = ["serialize"], optional = true }
# Image pixel access
oversight-rust/oversight-registry/Cargo.toml +4 -4
@@ -29,16 +29,16 @@ rand_core = { workspace = true, features = ["getrandom"] }
# Web framework
axum = { version = "0.7", features = ["macros"] }
-tokio = { version = "1", features = ["full"] }
+tokio = { version = "1.44.2", features = ["full"] }
tower = { version = "0.4" }
tower-http = { version = "0.5", features = ["cors", "trace"] }
# Database
-sqlx = { version = "0.7", features = ["runtime-tokio", "sqlite"] }
+sqlx = { version = "0.8.1", features = ["runtime-tokio", "sqlite"] }
# Utilities
clap = { workspace = true }
anyhow = "1"
-chrono = { version = "0.4", features = ["serde"] }
+chrono = { version = "0.4.20", features = ["serde"] }
tracing = "0.1"
-tracing-subscriber = { version = "0.3", features = ["env-filter"] }
+tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }
oversight-rust/oversight-semantic/Cargo.toml +2 -2
@@ -7,6 +7,6 @@ license.workspace = true
description = "L3 semantic watermarking for Oversight - airgap-strip-survivor synonym rotation"
[dependencies]
-regex = "1"
-once_cell = "1"
+regex = "1.5.5"
+once_cell = "1.0.1"
sha2.workspace = true
oversight_core/container.py +17 -1
@@ -41,6 +41,10 @@ from .manifest import Manifest
MAGIC = b"OSGT\x01\x00"
SUITE_CLASSIC_V1_ID = 1
SUITE_HYBRID_V1_ID = 2
+SUITE_ID_TO_NAME = {
+ SUITE_CLASSIC_V1_ID: crypto.SUITE_CLASSIC_V1,
+ SUITE_HYBRID_V1_ID: crypto.SUITE_HYBRID_V1,
+}
# Hard caps to prevent DoS via attacker-controlled length fields.
@@ -105,17 +109,29 @@ class SealedFile:
raise ValueError(f"manifest too large: {mlen} > {MAX_MANIFEST_BYTES}")
manifest_json = _read_exact(buf, mlen, "manifest")
manifest = Manifest.from_json(manifest_json)
+ expected_suite = SUITE_ID_TO_NAME.get(suite_id)
+ if expected_suite is None:
+ raise ValueError(f"Unsupported suite id: {suite_id}")
+ if manifest.suite != expected_suite:
+ raise ValueError("Container suite id does not match signed manifest suite")
(wlen,) = struct.unpack(">I", _read_exact(buf, 4, "wrapped_dek_len"))
if wlen > MAX_WRAPPED_DEK_BYTES:
raise ValueError(f"wrapped_dek too large: {wlen} > {MAX_WRAPPED_DEK_BYTES}")
- wrapped_dek = json.loads(_read_exact(buf, wlen, "wrapped_dek").decode("utf-8"))
+ try:
+ wrapped_dek = json.loads(_read_exact(buf, wlen, "wrapped_dek").decode("utf-8"))
+ except (UnicodeDecodeError, json.JSONDecodeError) as exc:
+ raise ValueError("Malformed wrapped DEK JSON") from exc
+ if not isinstance(wrapped_dek, dict):
+ raise ValueError("Malformed wrapped DEK: expected JSON object")
aead_nonce = _read_exact(buf, 24, "aead_nonce")
(clen,) = struct.unpack(">I", _read_exact(buf, 4, "ciphertext_len"))
if clen > MAX_CIPHERTEXT_BYTES:
raise ValueError(f"ciphertext too large: {clen} > {MAX_CIPHERTEXT_BYTES}")
ciphertext = _read_exact(buf, clen, "ciphertext")
+ if buf.tell() != len(data):
+ raise ValueError("Trailing bytes after ciphertext")
return cls(
manifest=manifest,
oversight_core/manifest.py +36 -6
@@ -15,7 +15,7 @@ from __future__ import annotations
import json
import time
import uuid
-from dataclasses import dataclass, field, asdict
+from dataclasses import dataclass, field, asdict, fields
from typing import Optional
from .crypto import sign_manifest, verify_manifest, SUITE_CLASSIC_V1
@@ -156,13 +156,43 @@ class Manifest:
@classmethod
def from_json(cls, data: bytes) -> "Manifest":
- d = json.loads(data.decode("utf-8"))
+ try:
+ d = json.loads(data.decode("utf-8"))
+ except (UnicodeDecodeError, json.JSONDecodeError) as exc:
+ raise ValueError("Malformed manifest JSON") from exc
+ if not isinstance(d, dict):
+ raise ValueError("Malformed manifest: expected JSON object")
+
rec = d.pop("recipient", None)
wms = d.pop("watermarks", [])
- m = cls(**d)
- if rec:
- m.recipient = Recipient(**rec)
- m.watermarks = [WatermarkRef(**w) for w in wms]
+ allowed = {f.name for f in fields(cls)}
+ unknown = sorted(set(d) - allowed)
+ if unknown:
+ raise ValueError(f"Unknown manifest field: {unknown[0]}")
+ try:
+ m = cls(**d)
+ if rec:
+ if not isinstance(rec, dict):
+ raise ValueError("Malformed manifest recipient")
+ rec_allowed = {f.name for f in fields(Recipient)}
+ rec_unknown = sorted(set(rec) - rec_allowed)
+ if rec_unknown:
+ raise ValueError(f"Unknown recipient field: {rec_unknown[0]}")
+ m.recipient = Recipient(**rec)
+ if not isinstance(wms, list):
+ raise ValueError("Malformed manifest watermarks")
+ wm_allowed = {f.name for f in fields(WatermarkRef)}
+ watermarks = []
+ for w in wms:
+ if not isinstance(w, dict):
+ raise ValueError("Malformed manifest watermark")
+ wm_unknown = sorted(set(w) - wm_allowed)
+ if wm_unknown:
+ raise ValueError(f"Unknown watermark field: {wm_unknown[0]}")
+ watermarks.append(WatermarkRef(**w))
+ m.watermarks = watermarks
+ except TypeError as exc:
+ raise ValueError("Malformed manifest fields") from exc
return m
# ---- signing & verification ----
oversight_core/safe_io.py +127 -0
@@ -0,0 +1,127 @@
+"""Filesystem safety helpers for key and sealed-file writes."""
+
+from __future__ import annotations
+
+import json
+import os
+from pathlib import Path
+import subprocess
+import tempfile
+from typing import Iterable
+
+
+WINDOWS_RESERVED_NAMES = {
+ "CON", "PRN", "AUX", "NUL",
+ *(f"COM{i}" for i in range(1, 10)),
+ *(f"LPT{i}" for i in range(1, 10)),
+}
+
+
+def is_windows_reserved_path(path: Path) -> bool:
+ """Return True if the final path component targets a Windows device name."""
+ name = path.name.rstrip(" .")
+ if not name:
+ return False
+ return name.split(".", 1)[0].upper() in WINDOWS_RESERVED_NAMES
+
+
+def is_private_key_file(path: Path) -> bool:
+ """Best-effort detection for Oversight private identity JSON files."""
+ if not path.exists() or not path.is_file():
+ return False
+ try:
+ data = json.loads(path.read_text(encoding="utf-8"))
+ except (OSError, UnicodeDecodeError, json.JSONDecodeError):
+ return False
+ return is_private_key_dict(data)
+
+
+def is_private_key_dict(data: object) -> bool:
+ return (
+ isinstance(data, dict)
+ and isinstance(data.get("x25519_priv"), str)
+ and isinstance(data.get("ed25519_priv"), str)
+ )
+
+
+def same_path(a: Path, b: Path) -> bool:
+ try:
+ return a.resolve(strict=False) == b.resolve(strict=False)
+ except OSError:
+ return os.path.abspath(a) == os.path.abspath(b)
+
+
+def validate_output_path(
+ path: Path,
+ *,
+ input_paths: Iterable[Path] = (),
+ allow_existing: bool = False,
+ block_private_keys: bool = True,
+) -> None:
+ """Reject destructive or confusing output paths before writing."""
+ if not str(path) or not path.name:
+ raise ValueError("Please choose an output path.")
+ if is_windows_reserved_path(path):
+ raise ValueError(f"Refusing to write to Windows reserved device name: {path.name}")
+ for input_path in input_paths:
+ if input_path and same_path(path, input_path):
+ raise ValueError("Output path must be different from every input path.")
+ if block_private_keys and is_private_key_file(path):
+ raise ValueError("Refusing to overwrite an Oversight private key file.")
+ if path.exists() and not allow_existing:
+ raise FileExistsError(f"Refusing to overwrite existing file: {path}")
+
+
+def atomic_write_bytes(path: Path, data: bytes, *, mode: int | None = None) -> None:
+ """Write bytes via temp file + fsync + atomic replace in the same directory."""
+ path.parent.mkdir(parents=True, exist_ok=True)
+ fd, tmp_name = tempfile.mkstemp(prefix=f".{path.name}.", suffix=".tmp", dir=path.parent)
+ tmp_path = Path(tmp_name)
+ try:
+ if mode is not None and os.name == "posix":
+ os.fchmod(fd, mode)
+ with os.fdopen(fd, "wb") as f:
+ f.write(data)
+ f.flush()
+ os.fsync(f.fileno())
+ os.replace(tmp_path, path)
+ except Exception:
+ try:
+ tmp_path.unlink()
+ except OSError:
+ pass
+ raise
+
+
+def atomic_write_text(path: Path, text: str, *, mode: int | None = None) -> None:
+ atomic_write_bytes(path, text.encode("utf-8"), mode=mode)
+
+
+def atomic_write_private_json(path: Path, data: dict) -> None:
+ payload = json.dumps(data, indent=2)
+ atomic_write_text(path, payload, mode=0o600)
+ if os.name == "nt":
+ harden_windows_private_file_acl(path)
+
+
+def harden_windows_private_file_acl(path: Path) -> None:
+ """Best-effort Windows ACL narrowing for private key files."""
+ user = os.environ.get("USERNAME")
+ if not user:
+ return
+ domain = os.environ.get("USERDOMAIN")
+ principal = f"{domain}\\{user}" if domain else user
+ subprocess.run(
+ [
+ "icacls",
+ str(path),
+ "/inheritance:r",
+ "/grant:r",
+ f"{principal}:(R,W)",
+ "SYSTEM:(F)",
+ "Administrators:(F)",
+ ],
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ check=False,
+ )
tests/test_gui_hardening_unit.py +145 -0
@@ -0,0 +1,145 @@
+"""
+test_gui_hardening_unit
+=======================
+
+Focused checks for GUI/CLI filesystem safety and container parser hardening.
+"""
+
+from __future__ import annotations
+
+import json
+import sys
+import tempfile
+from pathlib import Path
+
+ROOT = Path(__file__).resolve().parent.parent
+sys.path.insert(0, str(ROOT))
+
+from cli import gui
+from oversight_core import ClassicIdentity, Manifest, Recipient, content_hash, seal
+from oversight_core.container import SealedFile
+from oversight_core.safe_io import is_private_key_file, validate_output_path
+
+
+def _identity_dict(identity_id: str = "alice") -> dict:
+ ident = ClassicIdentity.generate()
+ return {
+ "id": identity_id,
+ "x25519_priv": ident.x25519_priv.hex(),
+ "x25519_pub": ident.x25519_pub.hex(),
+ "ed25519_priv": ident.ed25519_priv.hex(),
+ "ed25519_pub": ident.ed25519_pub.hex(),
+ }
+
+
+def _sealed_blob() -> bytes:
+ issuer = ClassicIdentity.generate()
+ recipient = ClassicIdentity.generate()
+ plaintext = b"hello oversight"
+ manifest = Manifest.new(
+ "hello.txt",
+ content_hash(plaintext),
+ len(plaintext),
+ "issuer",
+ issuer.ed25519_pub.hex(),
+ Recipient("alice", recipient.x25519_pub.hex(), recipient.ed25519_pub.hex()),
+ "https://registry.oversightprotocol.dev",
+ "text/plain",
+ )
+ return seal(plaintext, manifest, issuer.ed25519_priv, recipient.x25519_pub)
+
+
+def t1_private_key_outputs_are_blocked():
+ with tempfile.TemporaryDirectory() as td:
+ key_path = Path(td) / "alice.priv.json"
+ key_path.write_text(json.dumps(_identity_dict()), encoding="utf-8")
+ assert is_private_key_file(key_path), "fixture should parse as private key"
+ try:
+ validate_output_path(key_path)
+ except ValueError as exc:
+ assert "private key" in str(exc)
+ else:
+ raise AssertionError("private key overwrite was not blocked")
+ print(" [PASS] private key output targets are hard-blocked")
+
+
+def t2_same_path_outputs_are_blocked():
+ with tempfile.TemporaryDirectory() as td:
+ input_path = Path(td) / "source.txt"
+ input_path.write_text("source", encoding="utf-8")
+ try:
+ validate_output_path(input_path, input_paths=[input_path])
+ except ValueError as exc:
+ assert "different" in str(exc)
+ else:
+ raise AssertionError("same-path output was not blocked")
+ print(" [PASS] output paths cannot equal input paths")
+
+
+def t3_windows_reserved_names_are_rejected():
+ try:
+ validate_output_path(Path("NUL.priv.json"))
+ except ValueError as exc:
+ assert "reserved" in str(exc)
+ else:
+ raise AssertionError("Windows reserved output name was not blocked")
+ print(" [PASS] Windows reserved output names are rejected")
+
+
+def t4_gui_key_shape_errors_are_friendly():
+ with tempfile.TemporaryDirectory() as td:
+ pub_path = Path(td) / "alice.pub.json"
+ pub_path.write_text(json.dumps({"id": "alice", "x25519_pub": "00" * 32}), encoding="utf-8")
+ try:
+ gui._read_private_identity(pub_path, "Issuer file")
+ except ValueError as exc:
+ assert "public key" in str(exc) and "x25519_priv" in str(exc)
+ else:
+ raise AssertionError("public key accepted as private identity")
+ print(" [PASS] key-shape mistakes get actionable GUI errors")
+
+
+def t5_gui_registry_domain_uses_user_url():
+ assert gui._registry_domain("https://registry.example.test:8443/api") == "registry.example.test:8443"
+ print(" [PASS] GUI beacon domain derives from the configured registry URL")
+
+
+def t6_container_rejects_suite_id_tamper():
+ blob = bytearray(_sealed_blob())
+ blob[7] ^= 0x01
+ try:
+ SealedFile.from_bytes(bytes(blob))
+ except ValueError as exc:
+ assert "suite" in str(exc).lower()
+ else:
+ raise AssertionError("suite_id tamper was accepted")
+ print(" [PASS] unauthenticated suite_id tamper is rejected")
+
+
+def t7_container_rejects_trailing_bytes():
+ try:
+ SealedFile.from_bytes(_sealed_blob() + b"junk")
+ except ValueError as exc:
+ assert "Trailing bytes" in str(exc)
+ else:
+ raise AssertionError("trailing bytes were accepted")
+ print(" [PASS] trailing bytes after ciphertext are rejected")
+
+
+def main():
+ print("=" * 60)
+ print(" GUI/CLI hardening - focused unit tests")
+ print("=" * 60)
+ t1_private_key_outputs_are_blocked()
+ t2_same_path_outputs_are_blocked()
+ t3_windows_reserved_names_are_rejected()
+ t4_gui_key_shape_errors_are_friendly()
+ t5_gui_registry_domain_uses_user_url()
+ t6_container_rejects_suite_id_tamper()
+ t7_container_rejects_trailing_bytes()
+ print()
+ print(" ALL TESTS PASSED - 7/7")
+
+
+if __name__ == "__main__":
+ main()