| 1 | """ |
| 2 | OVERSIGHT attribution registry - v0.2 (security-hardened) |
| 3 | |
| 4 | Upgrades over initial v0.2: |
| 5 | - Registry identity private key written with 0600 permissions. |
| 6 | - /register requires a valid Ed25519 signature from the issuer over the |
| 7 | canonical manifest; INSERT OR REPLACE is only permitted when the new |
| 8 | signature re-verifies for the SAME issuer pubkey already on file. |
| 9 | - Rate limiter supports X-Forwarded-For when TRUSTED_PROXY env is set. |
| 10 | - Rate limiter bounded with an LRU cap to prevent memory growth. |
| 11 | - SQLite opens with journal_mode=WAL for concurrency. |
| 12 | - FastAPI lifespan (not deprecated on_event). |
| 13 | """ |
| 14 | |
| 15 | from __future__ import annotations |
| 16 | |
| 17 | import json |
| 18 | import os |
| 19 | import sqlite3 |
| 20 | import sys |
| 21 | import threading |
| 22 | import time |
| 23 | import hmac |
| 24 | import ipaddress |
| 25 | from collections import OrderedDict |
| 26 | from contextlib import asynccontextmanager, contextmanager |
| 27 | from pathlib import Path |
| 28 | from typing import Optional |
| 29 | |
| 30 | from cryptography.hazmat.primitives.asymmetric.ed25519 import ( |
| 31 | Ed25519PrivateKey, Ed25519PublicKey, |
| 32 | ) |
| 33 | from cryptography.hazmat.primitives import serialization |
| 34 | from fastapi import FastAPI, Request, HTTPException |
| 35 | from fastapi.exceptions import RequestValidationError |
| 36 | from fastapi.middleware.cors import CORSMiddleware |
| 37 | from fastapi.responses import Response, JSONResponse |
| 38 | from pydantic import BaseModel |
| 39 | |
| 40 | sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) |
| 41 | from oversight_core.tlog import TransparencyLog |
| 42 | from oversight_core.manifest import Manifest |
| 43 | from oversight_core import rekor as rekor_mod |
| 44 | from oversight_core.jcs import jcs_dumps |
| 45 | |
| 46 | |
| 47 | DB_PATH = Path(os.environ.get("OVERSIGHT_DB", "/tmp/oversight-registry.sqlite")) |
| 48 | DATA_DIR = Path(os.environ.get("OVERSIGHT_DATA", "/tmp/oversight-data")) |
| 49 | TLOG_DIR = DATA_DIR / "tlog" |
| 50 | IDENTITY_PATH = DATA_DIR / "registry-identity.json" |
| 51 | TRUSTED_PROXY = bool(int(os.environ.get("TRUSTED_PROXY", "0"))) |
| 52 | DNS_EVENT_SECRET = os.environ.get("OVERSIGHT_DNS_EVENT_SECRET", "") |
| 53 | OPERATOR_TOKEN = os.environ.get("OVERSIGHT_OPERATOR_TOKEN", "").strip() |
| 54 | AUTH_DISABLED = os.environ.get("OVERSIGHT_AUTH_DISABLED", "").strip() == "1" |
| 55 | |
| 56 | REKOR_ENABLED = bool(int(os.environ.get("OVERSIGHT_REKOR_ENABLED", "0"))) |
| 57 | REKOR_URL = os.environ.get("OVERSIGHT_REKOR_URL", rekor_mod.DEFAULT_REKOR_URL) |
| 58 | |
| 59 | |
| 60 | SCHEMA = """ |
| 61 | CREATE TABLE IF NOT EXISTS beacons ( |
| 62 | token_id TEXT PRIMARY KEY, |
| 63 | file_id TEXT NOT NULL, |
| 64 | recipient_id TEXT NOT NULL, |
| 65 | issuer_id TEXT NOT NULL, |
| 66 | kind TEXT NOT NULL, |
| 67 | registered_at INTEGER NOT NULL |
| 68 | ); |
| 69 | CREATE TABLE IF NOT EXISTS watermarks ( |
| 70 | mark_id TEXT NOT NULL, |
| 71 | layer TEXT NOT NULL, |
| 72 | file_id TEXT NOT NULL, |
| 73 | recipient_id TEXT NOT NULL, |
| 74 | issuer_id TEXT NOT NULL, |
| 75 | registered_at INTEGER NOT NULL, |
| 76 | PRIMARY KEY (mark_id, layer) |
| 77 | ); |
| 78 | CREATE TABLE IF NOT EXISTS manifests ( |
| 79 | file_id TEXT PRIMARY KEY, |
| 80 | recipient_id TEXT NOT NULL, |
| 81 | issuer_id TEXT NOT NULL, |
| 82 | issuer_ed25519_pub TEXT NOT NULL, |
| 83 | manifest_json TEXT NOT NULL, |
| 84 | registered_at INTEGER NOT NULL |
| 85 | ); |
| 86 | CREATE TABLE IF NOT EXISTS events ( |
| 87 | id INTEGER PRIMARY KEY AUTOINCREMENT, |
| 88 | token_id TEXT NOT NULL, |
| 89 | file_id TEXT, |
| 90 | recipient_id TEXT, |
| 91 | issuer_id TEXT, |
| 92 | kind TEXT NOT NULL, |
| 93 | source_ip TEXT, |
| 94 | user_agent TEXT, |
| 95 | extra TEXT, |
| 96 | timestamp INTEGER NOT NULL, |
| 97 | qualified_timestamp TEXT, |
| 98 | tlog_index INTEGER |
| 99 | ); |
| 100 | CREATE TABLE IF NOT EXISTS corpus ( |
| 101 | file_id TEXT NOT NULL, |
| 102 | hash_kind TEXT NOT NULL, |
| 103 | hash_value TEXT NOT NULL, |
| 104 | metadata TEXT, |
| 105 | registered_at INTEGER NOT NULL, |
| 106 | PRIMARY KEY (file_id, hash_kind, hash_value) |
| 107 | ); |
| 108 | CREATE INDEX IF NOT EXISTS idx_events_token ON events(token_id); |
| 109 | CREATE INDEX IF NOT EXISTS idx_events_file ON events(file_id); |
| 110 | CREATE INDEX IF NOT EXISTS idx_corpus_hash ON corpus(hash_kind, hash_value); |
| 111 | """ |
| 112 | |
| 113 | |
| 114 | def load_or_create_identity() -> dict: |
| 115 | DATA_DIR.mkdir(parents=True, exist_ok=True) |
| 116 | if IDENTITY_PATH.exists(): |
| 117 | return json.loads(IDENTITY_PATH.read_text()) |
| 118 | sk = Ed25519PrivateKey.generate() |
| 119 | pk = sk.public_key() |
| 120 | ident = { |
| 121 | "ed25519_priv": sk.private_bytes( |
| 122 | encoding=serialization.Encoding.Raw, |
| 123 | format=serialization.PrivateFormat.Raw, |
| 124 | encryption_algorithm=serialization.NoEncryption(), |
| 125 | ).hex(), |
| 126 | "ed25519_pub": pk.public_bytes( |
| 127 | encoding=serialization.Encoding.Raw, |
| 128 | format=serialization.PublicFormat.Raw, |
| 129 | ).hex(), |
| 130 | "created_at": int(time.time()), |
| 131 | } |
| 132 | fd = os.open(str(IDENTITY_PATH), os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600) |
| 133 | with os.fdopen(fd, "w") as f: |
| 134 | json.dump(ident, f, indent=2) |
| 135 | return ident |
| 136 | |
| 137 | |
| 138 | IDENTITY: Optional[dict] = None |
| 139 | TLOG: Optional[TransparencyLog] = None |
| 140 | |
| 141 | |
| 142 | @contextmanager |
| 143 | def db(): |
| 144 | con = sqlite3.connect(DB_PATH) |
| 145 | con.row_factory = sqlite3.Row |
| 146 | con.execute("PRAGMA journal_mode=WAL") |
| 147 | con.execute("PRAGMA synchronous=NORMAL") |
| 148 | try: |
| 149 | yield con |
| 150 | con.commit() |
| 151 | finally: |
| 152 | con.close() |
| 153 | |
| 154 | |
| 155 | def init_db(): |
| 156 | with db() as con: |
| 157 | con.executescript(SCHEMA) |
| 158 | |
| 159 | |
| 160 | def timestamp_stub() -> str: |
| 161 | """Fallback: self-timestamp from registry clock when TSA is unreachable.""" |
| 162 | return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) |
| 163 | |
| 164 | |
| 165 | def qualified_timestamp_or_stub(data: bytes) -> tuple[str, Optional[dict]]: |
| 166 | """ |
| 167 | Attempt a qualified RFC 3161 timestamp via the default TSA chain |
| 168 | (FreeTSA, DigiCert - both free, no account). Falls back to a |
| 169 | self-timestamp if all TSAs are unreachable. |
| 170 | |
| 171 | Returns (iso_string, qualified_details_dict_or_None). |
| 172 | |
| 173 | The registry persists the qualified_details dict (if present) in the |
| 174 | events table so external auditors can independently verify the timestamp |
| 175 | against the TSA's root cert, without trusting the registry operator. |
| 176 | """ |
| 177 | try: |
| 178 | from oversight_core.timestamp import qualified_timestamp |
| 179 | ts = qualified_timestamp(data) |
| 180 | if ts is not None: |
| 181 | return ts.gen_time_iso, ts.to_dict() |
| 182 | except ImportError: |
| 183 | pass |
| 184 | return timestamp_stub(), None |
| 185 | |
| 186 | |
| 187 | |
| 188 | class TokenBucket: |
| 189 | """Per-key token bucket with an LRU bound on state size.""" |
| 190 | |
| 191 | def __init__(self, rate: float = 10.0, burst: int = 30, max_keys: int = 100_000): |
| 192 | self.rate = rate |
| 193 | self.burst = burst |
| 194 | self.max_keys = max_keys |
| 195 | self._state: "OrderedDict[str, tuple[float, float]]" = OrderedDict() |
| 196 | self._lock = threading.Lock() |
| 197 | |
| 198 | def allow(self, key: str) -> bool: |
| 199 | now = time.monotonic() |
| 200 | with self._lock: |
| 201 | if key in self._state: |
| 202 | tokens, last = self._state.pop(key) |
| 203 | else: |
| 204 | tokens, last = (float(self.burst), now) |
| 205 | tokens = min(self.burst, tokens + (now - last) * self.rate) |
| 206 | if tokens < 1.0: |
| 207 | self._state[key] = (tokens, now) |
| 208 | self._evict_if_needed() |
| 209 | return False |
| 210 | self._state[key] = (tokens - 1.0, now) |
| 211 | self._evict_if_needed() |
| 212 | return True |
| 213 | |
| 214 | def _evict_if_needed(self): |
| 215 | while len(self._state) > self.max_keys: |
| 216 | self._state.popitem(last=False) |
| 217 | |
| 218 | |
| 219 | BUCKET = TokenBucket(rate=10.0, burst=30, max_keys=100_000) |
| 220 | |
| 221 | |
| 222 | def _xff_client(xff: str) -> str | None: |
| 223 | """Return the trusted client IP from an X-Forwarded-For header value. |
| 224 | |
| 225 | The directly-connected proxy (Caddy) appends the real client as the |
| 226 | RIGHTMOST entry. Entries to its left are attacker-controlled: a client |
| 227 | may send any XFF header and the proxy appends rather than replaces, so |
| 228 | the leftmost entry must never be trusted for rate-limit bucketing or for |
| 229 | the source_ip written into beacon events. Taking the leftmost let an |
| 230 | attacker pick their rate-limit bucket and forge attribution. |
| 231 | """ |
| 232 | parts = [p.strip() for p in xff.split(",") if p.strip()] |
| 233 | return parts[-1] if parts else None |
| 234 | |
| 235 | |
| 236 | def _client_key(request: Request) -> str: |
| 237 | """Extract the client identifier used for rate limiting.""" |
| 238 | if TRUSTED_PROXY: |
| 239 | xff = request.headers.get("x-forwarded-for", "") |
| 240 | client = _xff_client(xff) if xff else None |
| 241 | if client: |
| 242 | return client |
| 243 | return request.client.host if request.client else "unknown" |
| 244 | |
| 245 | |
| 246 | |
| 247 | def _enforce_auth_config(): |
| 248 | """Fail closed at boot. |
| 249 | |
| 250 | Without an operator token the public write endpoints (/register, |
| 251 | /attribute) would let anyone self-sign manifests into the append-only |
| 252 | tlog and enumerate attribution over /attribute. Refuse to start in that |
| 253 | state unless the operator has explicitly opted out with |
| 254 | OVERSIGHT_AUTH_DISABLED=1 (intended for isolated local testing only). |
| 255 | """ |
| 256 | if not OPERATOR_TOKEN and not AUTH_DISABLED: |
| 257 | raise RuntimeError( |
| 258 | "OVERSIGHT_OPERATOR_TOKEN is required to start the registry. " |
| 259 | "Set it to a strong random value, or set OVERSIGHT_AUTH_DISABLED=1 " |
| 260 | "only for isolated local testing." |
| 261 | ) |
| 262 | if not OPERATOR_TOKEN and AUTH_DISABLED: |
| 263 | import warnings |
| 264 | warnings.warn( |
| 265 | "OVERSIGHT_AUTH_DISABLED=1: registry is running without operator " |
| 266 | "authentication. Do NOT do this in production.", |
| 267 | stacklevel=2, |
| 268 | ) |
| 269 | |
| 270 | |
| 271 | @asynccontextmanager |
| 272 | async def lifespan(app: FastAPI): |
| 273 | global IDENTITY, TLOG |
| 274 | _enforce_auth_config() |
| 275 | init_db() |
| 276 | IDENTITY = load_or_create_identity() |
| 277 | TLOG = TransparencyLog(TLOG_DIR, signing_key_hex=IDENTITY["ed25519_priv"]) |
| 278 | yield |
| 279 | |
| 280 | |
| 281 | app = FastAPI(title="OVERSIGHT Registry", version="0.2.1", lifespan=lifespan) |
| 282 | |
| 283 | _default_cors_origins = [ |
| 284 | "https://oversight-protocol.github.io", |
| 285 | "https://oversightprotocol.dev", |
| 286 | "https://www.oversightprotocol.dev", |
| 287 | "http://localhost:8000", |
| 288 | "http://127.0.0.1:8000", |
| 289 | "http://localhost:8787", |
| 290 | "http://127.0.0.1:8787", |
| 291 | ] |
| 292 | _extra_origins = [ |
| 293 | o.strip() |
| 294 | for o in os.environ.get("OVERSIGHT_CORS_ORIGINS", "").split(",") |
| 295 | if o.strip() |
| 296 | ] |
| 297 | app.add_middleware( |
| 298 | CORSMiddleware, |
| 299 | allow_origins=_default_cors_origins + _extra_origins, |
| 300 | allow_credentials=False, |
| 301 | allow_methods=["GET", "OPTIONS"], |
| 302 | allow_headers=["Accept", "Content-Type"], |
| 303 | max_age=3600, |
| 304 | ) |
| 305 | |
| 306 | |
| 307 | def _registry_error_code(status_code: int, message: str) -> str: |
| 308 | text = message.lower() |
| 309 | if status_code == 401: |
| 310 | return "auth_required" |
| 311 | if status_code == 404: |
| 312 | return "not_found" |
| 313 | if status_code == 409: |
| 314 | return "issuer_mismatch" |
| 315 | if status_code == 429: |
| 316 | return "rate_limited" |
| 317 | if status_code >= 500: |
| 318 | return "server_error" |
| 319 | if "signature" in text: |
| 320 | return "signature_invalid" |
| 321 | if "beacons do not match" in text or "watermarks do not match" in text: |
| 322 | return "sidecar_mismatch" |
| 323 | return "missing_field" |
| 324 | |
| 325 | |
| 326 | def _error_envelope(code: str, message: str) -> dict: |
| 327 | return {"error": {"code": code, "message": message}} |
| 328 | |
| 329 | |
| 330 | @app.exception_handler(HTTPException) |
| 331 | async def _http_exception_handler(_request: Request, exc: HTTPException): |
| 332 | message = str(exc.detail) |
| 333 | return JSONResponse( |
| 334 | status_code=exc.status_code, |
| 335 | content=_error_envelope(_registry_error_code(exc.status_code, message), message), |
| 336 | headers=exc.headers, |
| 337 | ) |
| 338 | |
| 339 | |
| 340 | @app.exception_handler(RequestValidationError) |
| 341 | async def _validation_exception_handler(_request: Request, exc: RequestValidationError): |
| 342 | return JSONResponse( |
| 343 | status_code=400, |
| 344 | content=_error_envelope("missing_field", f"request validation failed: {exc}"), |
| 345 | ) |
| 346 | |
| 347 | |
| 348 | class RegistrationRequest(BaseModel): |
| 349 | manifest: dict |
| 350 | beacons: list[dict] |
| 351 | watermarks: list[dict] |
| 352 | corpus: Optional[dict] = None |
| 353 | |
| 354 | |
| 355 | class AttributionQuery(BaseModel): |
| 356 | token_id: Optional[str] = None |
| 357 | mark_id: Optional[str] = None |
| 358 | layer: Optional[str] = None |
| 359 | perceptual_hash: Optional[str] = None |
| 360 | |
| 361 | |
| 362 | def _append_tlog(event: dict) -> int: |
| 363 | return TLOG.append(event) if TLOG else -1 |
| 364 | |
| 365 | |
| 366 | def _tlog_proofs_for_events(events: list[dict]) -> list[dict]: |
| 367 | """Attach inclusion proofs for event rows that have local tlog indexes.""" |
| 368 | if not TLOG: |
| 369 | return [] |
| 370 | proofs = [] |
| 371 | for i, event in enumerate(events): |
| 372 | idx = event.get("tlog_index") |
| 373 | if idx is None: |
| 374 | continue |
| 375 | try: |
| 376 | idx = int(idx) |
| 377 | except (TypeError, ValueError): |
| 378 | continue |
| 379 | if idx < 0: |
| 380 | continue |
| 381 | proof = TLOG.inclusion_proof(idx) |
| 382 | if proof is not None: |
| 383 | proofs.append({ |
| 384 | "event_row": i, |
| 385 | "tlog_index": idx, |
| 386 | "proof": proof, |
| 387 | }) |
| 388 | return proofs |
| 389 | |
| 390 | |
| 391 | def _attest_to_rekor( |
| 392 | file_id: str, |
| 393 | issuer_pub_hex: str, |
| 394 | recipient_id: str, |
| 395 | recipient_pubkey_hex: Optional[str], |
| 396 | suite: str, |
| 397 | content_hash_sha256_hex: str, |
| 398 | watermarks: list[dict], |
| 399 | mark_id_hex: str, |
| 400 | ) -> Optional[dict]: |
| 401 | """Sign a registration predicate with the registry's identity key and |
| 402 | append it to a public Rekor v2 log. |
| 403 | |
| 404 | Returns a small JSON-serializable summary on success (log_url, log_index, |
| 405 | log_id, integrated_time) so the response can carry it back to the client. |
| 406 | Returns ``None`` when REKOR_ENABLED is false. Returns a dict with an |
| 407 | ``error`` field (and no log_index) when the upload itself fails - the |
| 408 | caller treats this as non-fatal. |
| 409 | """ |
| 410 | if not REKOR_ENABLED or IDENTITY is None: |
| 411 | return None |
| 412 | try: |
| 413 | recipient_hash = ( |
| 414 | rekor_mod.hash_recipient_pubkey(recipient_pubkey_hex) |
| 415 | if recipient_pubkey_hex |
| 416 | else "0" * 64 |
| 417 | ) |
| 418 | predicate = rekor_mod.OversightRegistrationPredicate( |
| 419 | file_id=file_id, |
| 420 | issuer_pubkey_ed25519=issuer_pub_hex, |
| 421 | recipient_id=recipient_id, |
| 422 | recipient_pubkey_sha256=recipient_hash, |
| 423 | suite=suite, |
| 424 | registered_at=timestamp_stub(), |
| 425 | watermarks={ |
| 426 | w.get("layer", f"layer_{i}"): w.get("mark_id", "") |
| 427 | for i, w in enumerate(watermarks) |
| 428 | if w.get("mark_id") |
| 429 | }, |
| 430 | ) |
| 431 | statement = rekor_mod.build_statement( |
| 432 | mark_id_hex=mark_id_hex, |
| 433 | content_hash_sha256_hex=content_hash_sha256_hex, |
| 434 | predicate=predicate, |
| 435 | ) |
| 436 | envelope = rekor_mod.sign_dsse( |
| 437 | statement=statement, |
| 438 | issuer_ed25519_priv=bytes.fromhex(IDENTITY["ed25519_priv"]), |
| 439 | ) |
| 440 | registry_pub = Ed25519PublicKey.from_public_bytes( |
| 441 | bytes.fromhex(IDENTITY["ed25519_pub"]) |
| 442 | ) |
| 443 | pub_pem = registry_pub.public_bytes( |
| 444 | encoding=serialization.Encoding.PEM, |
| 445 | format=serialization.PublicFormat.SubjectPublicKeyInfo, |
| 446 | ).decode("ascii") |
| 447 | result = rekor_mod.upload_dsse( |
| 448 | envelope=envelope, |
| 449 | issuer_ed25519_pub_pem=pub_pem, |
| 450 | log_url=REKOR_URL, |
| 451 | ) |
| 452 | return { |
| 453 | "log_url": result.log_url, |
| 454 | "log_index": result.log_index, |
| 455 | "log_id": result.log_id, |
| 456 | "integrated_time": result.integrated_time, |
| 457 | "tlog_kind": rekor_mod.TLOG_KIND, |
| 458 | "bundle_schema": rekor_mod.BUNDLE_SCHEMA, |
| 459 | } |
| 460 | except Exception as e: |
| 461 | return {"error": f"{type(e).__name__}: {e}", "tlog_kind": rekor_mod.TLOG_KIND} |
| 462 | |
| 463 | |
| 464 | def _rate_limit(request: Request): |
| 465 | if not BUCKET.allow(_client_key(request)): |
| 466 | raise HTTPException(429, "rate limit exceeded") |
| 467 | |
| 468 | |
| 469 | def _bearer_or_header_token(request: Request, header_name: str) -> str: |
| 470 | supplied = request.headers.get(header_name, "") |
| 471 | if supplied: |
| 472 | return supplied.strip() |
| 473 | auth = request.headers.get("authorization", "") |
| 474 | if auth.lower().startswith("bearer "): |
| 475 | return auth[7:].strip() |
| 476 | return "" |
| 477 | |
| 478 | |
| 479 | def _require_operator_auth(request: Request): |
| 480 | """Require the optional operator bearer token for write-side APIs.""" |
| 481 | if not OPERATOR_TOKEN: |
| 482 | return |
| 483 | supplied = _bearer_or_header_token(request, "x-oversight-operator-token") |
| 484 | if hmac.compare_digest(supplied, OPERATOR_TOKEN): |
| 485 | return |
| 486 | raise HTTPException(401, "operator authentication required") |
| 487 | |
| 488 | |
| 489 | def _is_loopback_host(host: Optional[str]) -> bool: |
| 490 | if not host: |
| 491 | return False |
| 492 | try: |
| 493 | return ipaddress.ip_address(host).is_loopback |
| 494 | except ValueError: |
| 495 | return host in {"localhost", "testclient"} |
| 496 | |
| 497 | |
| 498 | def _verify_dns_event_auth(request: Request): |
| 499 | """Authenticate DNS bridge callbacks before trusting client_ip in the body.""" |
| 500 | if DNS_EVENT_SECRET: |
| 501 | supplied = _bearer_or_header_token(request, "x-oversight-dns-secret") |
| 502 | if hmac.compare_digest(supplied, DNS_EVENT_SECRET): |
| 503 | return |
| 504 | raise HTTPException(401, "invalid DNS event secret") |
| 505 | |
| 506 | host = request.client.host if request.client else None |
| 507 | if _is_loopback_host(host): |
| 508 | return |
| 509 | raise HTTPException( |
| 510 | 503, |
| 511 | "OVERSIGHT_DNS_EVENT_SECRET is required for non-loopback DNS event callbacks", |
| 512 | ) |
| 513 | |
| 514 | |
| 515 | def _verify_manifest_signature(manifest_dict: dict) -> tuple[bool, str]: |
| 516 | """ |
| 517 | Parse and verify the manifest's embedded Ed25519 signature. |
| 518 | Returns (ok, issuer_pub_hex). issuer_pub_hex is the claimed issuer key. |
| 519 | """ |
| 520 | try: |
| 521 | m = Manifest.from_json(jcs_dumps(manifest_dict)) |
| 522 | except Exception as e: |
| 523 | return False, "" |
| 524 | return m.verify(), m.issuer_ed25519_pub |
| 525 | |
| 526 | |
| 527 | def _canonical_items(items: list[dict]) -> list[str]: |
| 528 | """Normalize registration sidecars for exact signed-manifest comparison.""" |
| 529 | return sorted( |
| 530 | jcs_dumps(item).decode("utf-8") |
| 531 | for item in items |
| 532 | ) |
| 533 | |
| 534 | |
| 535 | def _signed_registration_artifacts( |
| 536 | manifest_dict: dict, |
| 537 | req_beacons: list[dict], |
| 538 | req_watermarks: list[dict], |
| 539 | ) -> tuple[list[dict], list[dict]]: |
| 540 | """Use the manifest's signed beacons/watermarks as the registry source of truth.""" |
| 541 | signed_beacons = manifest_dict.get("beacons") or [] |
| 542 | signed_watermarks = manifest_dict.get("watermarks") or [] |
| 543 | if _canonical_items(req_beacons) != _canonical_items(signed_beacons): |
| 544 | raise HTTPException(400, "request beacons do not match signed manifest") |
| 545 | if _canonical_items(req_watermarks) != _canonical_items(signed_watermarks): |
| 546 | raise HTTPException(400, "request watermarks do not match signed manifest") |
| 547 | return signed_beacons, signed_watermarks |
| 548 | |
| 549 | |
| 550 | @app.post("/register") |
| 551 | def register(req: RegistrationRequest, request: Request): |
| 552 | """ |
| 553 | Register a sealed file's beacons + watermarks. |
| 554 | |
| 555 | Security requirements: |
| 556 | - The manifest's embedded Ed25519 signature MUST verify. |
| 557 | - If the file_id already exists in our DB, the re-registration's issuer |
| 558 | pubkey MUST match the original. This prevents hostile overwrites of |
| 559 | another issuer's attribution record. |
| 560 | - A per-client rate limit applies. |
| 561 | """ |
| 562 | _require_operator_auth(request) |
| 563 | _rate_limit(request) |
| 564 | |
| 565 | m = req.manifest |
| 566 | file_id = m.get("file_id") |
| 567 | recipient = m.get("recipient") or {} |
| 568 | recipient_id = recipient.get("recipient_id", "unknown") |
| 569 | issuer_id = m.get("issuer_id", "unknown") |
| 570 | |
| 571 | if not file_id: |
| 572 | raise HTTPException(400, "manifest missing file_id") |
| 573 | |
| 574 | sig_ok, issuer_pub = _verify_manifest_signature(m) |
| 575 | if not sig_ok: |
| 576 | raise HTTPException(400, "manifest signature invalid") |
| 577 | if not issuer_pub: |
| 578 | raise HTTPException(400, "manifest missing issuer_ed25519_pub") |
| 579 | signed_beacons, signed_watermarks = _signed_registration_artifacts( |
| 580 | m, |
| 581 | req.beacons, |
| 582 | req.watermarks, |
| 583 | ) |
| 584 | |
| 585 | now = int(time.time()) |
| 586 | with db() as con: |
| 587 | existing = con.execute( |
| 588 | "SELECT issuer_ed25519_pub FROM manifests WHERE file_id=?", |
| 589 | (file_id,), |
| 590 | ).fetchone() |
| 591 | if existing and existing["issuer_ed25519_pub"] != issuer_pub: |
| 592 | raise HTTPException( |
| 593 | 409, |
| 594 | f"file_id already registered under a different issuer pubkey " |
| 595 | f"(claimed={issuer_pub[:16]}..., existing={existing['issuer_ed25519_pub'][:16]}...)", |
| 596 | ) |
| 597 | |
| 598 | con.execute( |
| 599 | "INSERT OR REPLACE INTO manifests VALUES (?,?,?,?,?,?)", |
| 600 | (file_id, recipient_id, issuer_id, issuer_pub, json.dumps(m), now), |
| 601 | ) |
| 602 | for b in signed_beacons: |
| 603 | con.execute( |
| 604 | "INSERT OR REPLACE INTO beacons VALUES (?,?,?,?,?,?)", |
| 605 | (b["token_id"], file_id, recipient_id, issuer_id, b["kind"], now), |
| 606 | ) |
| 607 | for w in signed_watermarks: |
| 608 | con.execute( |
| 609 | "INSERT OR REPLACE INTO watermarks VALUES (?,?,?,?,?,?)", |
| 610 | (w["mark_id"], w["layer"], file_id, recipient_id, issuer_id, now), |
| 611 | ) |
| 612 | if req.corpus: |
| 613 | for hash_kind, hash_value in req.corpus.items(): |
| 614 | if hash_value: |
| 615 | con.execute( |
| 616 | "INSERT OR REPLACE INTO corpus VALUES (?,?,?,?,?)", |
| 617 | (file_id, hash_kind, str(hash_value), None, now), |
| 618 | ) |
| 619 | |
| 620 | tlog_idx = _append_tlog({ |
| 621 | "event": "register", |
| 622 | "file_id": file_id, |
| 623 | "recipient_id": recipient_id, |
| 624 | "issuer_id": issuer_id, |
| 625 | "issuer_pub": issuer_pub, |
| 626 | "n_beacons": len(signed_beacons), |
| 627 | "n_watermarks": len(signed_watermarks), |
| 628 | "timestamp": timestamp_stub(), |
| 629 | }) |
| 630 | |
| 631 | rekor_result = _attest_to_rekor( |
| 632 | file_id=file_id, |
| 633 | issuer_pub_hex=issuer_pub, |
| 634 | recipient_id=recipient_id, |
| 635 | recipient_pubkey_hex=recipient.get("x25519_pub"), |
| 636 | suite=m.get("suite", "classic"), |
| 637 | content_hash_sha256_hex=m.get("content_hash", "0" * 64), |
| 638 | watermarks=signed_watermarks, |
| 639 | mark_id_hex=next( |
| 640 | (w["mark_id"] for w in signed_watermarks if w.get("mark_id")), |
| 641 | file_id, |
| 642 | ), |
| 643 | ) |
| 644 | |
| 645 | return { |
| 646 | "ok": True, |
| 647 | "file_id": file_id, |
| 648 | "registered_beacons": len(signed_beacons), |
| 649 | "tlog_index": tlog_idx, |
| 650 | "rekor": rekor_result, |
| 651 | } |
| 652 | |
| 653 | |
| 654 | ONE_PX_PNG = bytes.fromhex( |
| 655 | "89504e470d0a1a0a0000000d49484452000000010000000108060000001f15c489" |
| 656 | "0000000d49444154789c626000000000050001a5f645400000000049454e44ae426082" |
| 657 | ) |
| 658 | |
| 659 | |
| 660 | def _record_event(request: Request, token_id: str, kind: str) -> int: |
| 661 | with db() as con: |
| 662 | row = con.execute( |
| 663 | "SELECT file_id, recipient_id, issuer_id FROM beacons WHERE token_id=?", |
| 664 | (token_id,), |
| 665 | ).fetchone() |
| 666 | file_id = row["file_id"] if row else None |
| 667 | recipient_id = row["recipient_id"] if row else None |
| 668 | issuer_id = row["issuer_id"] if row else None |
| 669 | |
| 670 | client_ip = request.client.host if request.client else None |
| 671 | ua = request.headers.get("user-agent", "") |
| 672 | qts = timestamp_stub() |
| 673 | |
| 674 | tlog_idx = _append_tlog({ |
| 675 | "event": "beacon", |
| 676 | "kind": kind, |
| 677 | "token_id": token_id, |
| 678 | "file_id": file_id, |
| 679 | "recipient_id": recipient_id, |
| 680 | "source_ip": client_ip, |
| 681 | "user_agent": ua, |
| 682 | "timestamp": qts, |
| 683 | }) |
| 684 | |
| 685 | con.execute( |
| 686 | "INSERT INTO events (token_id,file_id,recipient_id,issuer_id,kind," |
| 687 | "source_ip,user_agent,extra,timestamp,qualified_timestamp,tlog_index) " |
| 688 | "VALUES (?,?,?,?,?,?,?,?,?,?,?)", |
| 689 | (token_id, file_id, recipient_id, issuer_id, kind, |
| 690 | client_ip, ua, "{}", int(time.time()), qts, tlog_idx), |
| 691 | ) |
| 692 | return tlog_idx |
| 693 | |
| 694 | |
| 695 | @app.get("/p/{token_id}.png") |
| 696 | async def beacon_png(token_id: str, request: Request): |
| 697 | _rate_limit(request) |
| 698 | _record_event(request, token_id, "http_img") |
| 699 | return Response(content=ONE_PX_PNG, media_type="image/png") |
| 700 | |
| 701 | |
| 702 | @app.api_route("/ocsp/r/{token_id}", methods=["GET", "POST"]) |
| 703 | @app.api_route("/r/{token_id}", methods=["GET", "POST"]) |
| 704 | async def beacon_ocsp(token_id: str, request: Request): |
| 705 | _rate_limit(request) |
| 706 | _record_event(request, token_id, "ocsp") |
| 707 | return Response(status_code=200) |
| 708 | |
| 709 | |
| 710 | @app.get("/lic/v/{token_id}") |
| 711 | @app.get("/v/{token_id}") |
| 712 | async def beacon_license(token_id: str, request: Request): |
| 713 | _rate_limit(request) |
| 714 | _record_event(request, token_id, "license") |
| 715 | return JSONResponse({"valid": True}) |
| 716 | |
| 717 | |
| 718 | @app.post("/attribute") |
| 719 | def attribute(q: AttributionQuery, request: Request): |
| 720 | _require_operator_auth(request) |
| 721 | with db() as con: |
| 722 | row = None |
| 723 | if q.token_id: |
| 724 | row = con.execute( |
| 725 | "SELECT * FROM beacons WHERE token_id=?", (q.token_id,) |
| 726 | ).fetchone() |
| 727 | elif q.mark_id and q.layer: |
| 728 | row = con.execute( |
| 729 | "SELECT * FROM watermarks WHERE mark_id=? AND layer=?", |
| 730 | (q.mark_id, q.layer), |
| 731 | ).fetchone() |
| 732 | elif q.mark_id: |
| 733 | row = con.execute( |
| 734 | "SELECT * FROM watermarks WHERE mark_id=?", (q.mark_id,) |
| 735 | ).fetchone() |
| 736 | elif q.perceptual_hash: |
| 737 | row = con.execute( |
| 738 | "SELECT c.file_id as file_id, b.recipient_id as recipient_id, " |
| 739 | "b.issuer_id as issuer_id " |
| 740 | "FROM corpus c LEFT JOIN beacons b ON c.file_id = b.file_id " |
| 741 | "WHERE c.hash_kind='perceptual' AND c.hash_value=? LIMIT 1", |
| 742 | (q.perceptual_hash,), |
| 743 | ).fetchone() |
| 744 | else: |
| 745 | raise HTTPException(400, "provide token_id, mark_id, or perceptual_hash") |
| 746 | |
| 747 | if not row: |
| 748 | return {"found": False} |
| 749 | |
| 750 | file_id = row["file_id"] |
| 751 | manifest_row = con.execute( |
| 752 | "SELECT manifest_json FROM manifests WHERE file_id=?", (file_id,) |
| 753 | ).fetchone() |
| 754 | manifest = json.loads(manifest_row["manifest_json"]) if manifest_row else None |
| 755 | events = con.execute( |
| 756 | "SELECT kind, source_ip, user_agent, timestamp, qualified_timestamp, tlog_index " |
| 757 | "FROM events WHERE file_id=? ORDER BY timestamp DESC LIMIT 50", |
| 758 | (file_id,), |
| 759 | ).fetchall() |
| 760 | |
| 761 | return { |
| 762 | "found": True, |
| 763 | "file_id": file_id, |
| 764 | "recipient_id": row["recipient_id"], |
| 765 | "issuer_id": row["issuer_id"], |
| 766 | "manifest": manifest, |
| 767 | "recent_events": [dict(e) for e in events], |
| 768 | } |
| 769 | |
| 770 | |
| 771 | @app.get("/evidence/{file_id}") |
| 772 | def evidence_bundle(file_id: str): |
| 773 | with db() as con: |
| 774 | m = con.execute( |
| 775 | "SELECT manifest_json FROM manifests WHERE file_id=?", (file_id,) |
| 776 | ).fetchone() |
| 777 | if not m: |
| 778 | raise HTTPException(404, "unknown file_id") |
| 779 | events = con.execute( |
| 780 | "SELECT * FROM events WHERE file_id=? ORDER BY timestamp ASC", (file_id,), |
| 781 | ).fetchall() |
| 782 | beacons = con.execute( |
| 783 | "SELECT * FROM beacons WHERE file_id=?", (file_id,) |
| 784 | ).fetchall() |
| 785 | watermarks = con.execute( |
| 786 | "SELECT * FROM watermarks WHERE file_id=?", (file_id,) |
| 787 | ).fetchall() |
| 788 | |
| 789 | event_dicts = [dict(e) for e in events] |
| 790 | bundle = { |
| 791 | "file_id": file_id, |
| 792 | "bundle_generated_at": timestamp_stub(), |
| 793 | "registry_pub": IDENTITY["ed25519_pub"], |
| 794 | "manifest": json.loads(m["manifest_json"]), |
| 795 | "beacons": [dict(b) for b in beacons], |
| 796 | "watermarks": [dict(w) for w in watermarks], |
| 797 | "events": event_dicts, |
| 798 | "tlog_head": TLOG.signed_head() if TLOG else None, |
| 799 | "tlog_proofs": _tlog_proofs_for_events(event_dicts), |
| 800 | "disclaimer": ( |
| 801 | "This bundle is a provenance record, not a legal finding. For court use, " |
| 802 | "supplement with RFC 3161 qualified timestamps and ISO/IEC 27037 chain-of-custody." |
| 803 | ), |
| 804 | } |
| 805 | sk = Ed25519PrivateKey.from_private_bytes(bytes.fromhex(IDENTITY["ed25519_priv"])) |
| 806 | msg = jcs_dumps(bundle) |
| 807 | bundle["bundle_signature_ed25519"] = sk.sign(msg).hex() |
| 808 | return bundle |
| 809 | |
| 810 | |
| 811 | @app.get("/tlog/head") |
| 812 | def tlog_head(): |
| 813 | if not TLOG: |
| 814 | raise HTTPException(503, "tlog not initialized") |
| 815 | return TLOG.signed_head() |
| 816 | |
| 817 | |
| 818 | @app.get("/tlog/proof/{index}") |
| 819 | def tlog_proof(index: int): |
| 820 | if not TLOG: |
| 821 | raise HTTPException(503, "tlog not initialized") |
| 822 | proof = TLOG.inclusion_proof(index) |
| 823 | if proof is None: |
| 824 | raise HTTPException(404, "index out of range") |
| 825 | return proof |
| 826 | |
| 827 | |
| 828 | @app.get("/tlog/range") |
| 829 | def tlog_range(start: int = 0, limit: int = 500): |
| 830 | """Return tlog leaf entries in [start, start+limit). For CanaryKeeper polling.""" |
| 831 | if not TLOG: |
| 832 | raise HTTPException(503, "tlog not initialized") |
| 833 | if start < 0: |
| 834 | raise HTTPException(400, "start must be non-negative") |
| 835 | limit = min(max(1, limit), 1000) |
| 836 | try: |
| 837 | entries = TLOG.range_records(start, limit) |
| 838 | except ValueError as exc: |
| 839 | raise HTTPException(500, f"tlog range validation failed: {exc}") from exc |
| 840 | return {"start": start, "count": len(entries), "entries": entries} |
| 841 | |
| 842 | |
| 843 | class DnsEvent(BaseModel): |
| 844 | token_id: str |
| 845 | client_ip: Optional[str] = None |
| 846 | qtype: Optional[str] = None |
| 847 | qname: Optional[str] = None |
| 848 | |
| 849 | |
| 850 | @app.post("/dns_event") |
| 851 | def dns_event(evt: DnsEvent, request: Request): |
| 852 | """Called by the oversight_dns server when a beacon DNS query arrives.""" |
| 853 | _rate_limit(request) |
| 854 | _verify_dns_event_auth(request) |
| 855 | with db() as con: |
| 856 | row = con.execute( |
| 857 | "SELECT file_id, recipient_id, issuer_id FROM beacons WHERE token_id=?", |
| 858 | (evt.token_id,), |
| 859 | ).fetchone() |
| 860 | file_id = row["file_id"] if row else None |
| 861 | recipient_id = row["recipient_id"] if row else None |
| 862 | issuer_id = row["issuer_id"] if row else None |
| 863 | |
| 864 | qts = timestamp_stub() |
| 865 | tlog_idx = _append_tlog({ |
| 866 | "event": "beacon", |
| 867 | "kind": "dns", |
| 868 | "token_id": evt.token_id, |
| 869 | "file_id": file_id, |
| 870 | "recipient_id": recipient_id, |
| 871 | "source_ip": evt.client_ip, |
| 872 | "qname": evt.qname, |
| 873 | "qtype": evt.qtype, |
| 874 | "timestamp": qts, |
| 875 | }) |
| 876 | con.execute( |
| 877 | "INSERT INTO events (token_id,file_id,recipient_id,issuer_id,kind," |
| 878 | "source_ip,user_agent,extra,timestamp,qualified_timestamp,tlog_index) " |
| 879 | "VALUES (?,?,?,?,?,?,?,?,?,?,?)", |
| 880 | (evt.token_id, file_id, recipient_id, issuer_id, "dns", |
| 881 | evt.client_ip, "", json.dumps({"qtype": evt.qtype, "qname": evt.qname}), |
| 882 | int(time.time()), qts, tlog_idx), |
| 883 | ) |
| 884 | return {"ok": True, "tlog_index": tlog_idx} |
| 885 | |
| 886 | |
| 887 | @app.get("/candidates/semantic") |
| 888 | def candidates_semantic(limit: int = 1000, since: Optional[int] = None): |
| 889 | """ |
| 890 | Flywheel-friendly endpoint: returns recent L3 semantic mark_ids so the |
| 891 | scraper can verify them against leaked text without shipping the whole |
| 892 | watermark table over the wire repeatedly. |
| 893 | """ |
| 894 | limit = min(max(1, limit), 10_000) |
| 895 | with db() as con: |
| 896 | if since: |
| 897 | rows = con.execute( |
| 898 | "SELECT mark_id, file_id, recipient_id, registered_at FROM watermarks " |
| 899 | "WHERE layer='L3_semantic' AND registered_at>=? " |
| 900 | "ORDER BY registered_at DESC LIMIT ?", |
| 901 | (since, limit), |
| 902 | ).fetchall() |
| 903 | else: |
| 904 | rows = con.execute( |
| 905 | "SELECT mark_id, file_id, recipient_id, registered_at FROM watermarks " |
| 906 | "WHERE layer='L3_semantic' ORDER BY registered_at DESC LIMIT ?", |
| 907 | (limit,), |
| 908 | ).fetchall() |
| 909 | return { |
| 910 | "generated_at": timestamp_stub(), |
| 911 | "count": len(rows), |
| 912 | "candidates": [dict(r) for r in rows], |
| 913 | } |
| 914 | |
| 915 | |
| 916 | @app.get("/health") |
| 917 | def health(): |
| 918 | return { |
| 919 | "status": "ok", |
| 920 | "service": "oversight-registry", |
| 921 | "version": "0.2.1", |
| 922 | "tlog_size": TLOG.size() if TLOG else 0, |
| 923 | } |
| 924 | |
| 925 | |
| 926 | @app.get("/.well-known/oversight-registry") |
| 927 | def well_known(): |
| 928 | return { |
| 929 | "ed25519_pub": IDENTITY["ed25519_pub"] if IDENTITY else None, |
| 930 | "version": "0.2.1", |
| 931 | "jurisdiction": os.environ.get("OVERSIGHT_JURISDICTION", "GLOBAL"), |
| 932 | "tlog_size": TLOG.size() if TLOG else 0, |
| 933 | } |