Zion Boggan
repos/Oversight/registry/server.py
zionboggan.com ↗
933 lines · python
History for this file →
1
"""
2
OVERSIGHT attribution registry - v0.2 (security-hardened)
3
 
4
Upgrades over initial v0.2:
5
  - Registry identity private key written with 0600 permissions.
6
  - /register requires a valid Ed25519 signature from the issuer over the
7
    canonical manifest; INSERT OR REPLACE is only permitted when the new
8
    signature re-verifies for the SAME issuer pubkey already on file.
9
  - Rate limiter supports X-Forwarded-For when TRUSTED_PROXY env is set.
10
  - Rate limiter bounded with an LRU cap to prevent memory growth.
11
  - SQLite opens with journal_mode=WAL for concurrency.
12
  - FastAPI lifespan (not deprecated on_event).
13
"""
14
 
15
from __future__ import annotations
16
 
17
import json
18
import os
19
import sqlite3
20
import sys
21
import threading
22
import time
23
import hmac
24
import ipaddress
25
from collections import OrderedDict
26
from contextlib import asynccontextmanager, contextmanager
27
from pathlib import Path
28
from typing import Optional
29
 
30
from cryptography.hazmat.primitives.asymmetric.ed25519 import (
31
    Ed25519PrivateKey, Ed25519PublicKey,
32
)
33
from cryptography.hazmat.primitives import serialization
34
from fastapi import FastAPI, Request, HTTPException
35
from fastapi.exceptions import RequestValidationError
36
from fastapi.middleware.cors import CORSMiddleware
37
from fastapi.responses import Response, JSONResponse
38
from pydantic import BaseModel
39
 
40
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
41
from oversight_core.tlog import TransparencyLog
42
from oversight_core.manifest import Manifest
43
from oversight_core import rekor as rekor_mod
44
from oversight_core.jcs import jcs_dumps
45
 
46
 
47
DB_PATH = Path(os.environ.get("OVERSIGHT_DB", "/tmp/oversight-registry.sqlite"))
48
DATA_DIR = Path(os.environ.get("OVERSIGHT_DATA", "/tmp/oversight-data"))
49
TLOG_DIR = DATA_DIR / "tlog"
50
IDENTITY_PATH = DATA_DIR / "registry-identity.json"
51
TRUSTED_PROXY = bool(int(os.environ.get("TRUSTED_PROXY", "0")))
52
DNS_EVENT_SECRET = os.environ.get("OVERSIGHT_DNS_EVENT_SECRET", "")
53
OPERATOR_TOKEN = os.environ.get("OVERSIGHT_OPERATOR_TOKEN", "").strip()
54
AUTH_DISABLED = os.environ.get("OVERSIGHT_AUTH_DISABLED", "").strip() == "1"
55
 
56
REKOR_ENABLED = bool(int(os.environ.get("OVERSIGHT_REKOR_ENABLED", "0")))
57
REKOR_URL = os.environ.get("OVERSIGHT_REKOR_URL", rekor_mod.DEFAULT_REKOR_URL)
58
 
59
 
60
SCHEMA = """
61
CREATE TABLE IF NOT EXISTS beacons (
62
    token_id TEXT PRIMARY KEY,
63
    file_id TEXT NOT NULL,
64
    recipient_id TEXT NOT NULL,
65
    issuer_id TEXT NOT NULL,
66
    kind TEXT NOT NULL,
67
    registered_at INTEGER NOT NULL
68
);
69
CREATE TABLE IF NOT EXISTS watermarks (
70
    mark_id TEXT NOT NULL,
71
    layer TEXT NOT NULL,
72
    file_id TEXT NOT NULL,
73
    recipient_id TEXT NOT NULL,
74
    issuer_id TEXT NOT NULL,
75
    registered_at INTEGER NOT NULL,
76
    PRIMARY KEY (mark_id, layer)
77
);
78
CREATE TABLE IF NOT EXISTS manifests (
79
    file_id TEXT PRIMARY KEY,
80
    recipient_id TEXT NOT NULL,
81
    issuer_id TEXT NOT NULL,
82
    issuer_ed25519_pub TEXT NOT NULL,
83
    manifest_json TEXT NOT NULL,
84
    registered_at INTEGER NOT NULL
85
);
86
CREATE TABLE IF NOT EXISTS events (
87
    id INTEGER PRIMARY KEY AUTOINCREMENT,
88
    token_id TEXT NOT NULL,
89
    file_id TEXT,
90
    recipient_id TEXT,
91
    issuer_id TEXT,
92
    kind TEXT NOT NULL,
93
    source_ip TEXT,
94
    user_agent TEXT,
95
    extra TEXT,
96
    timestamp INTEGER NOT NULL,
97
    qualified_timestamp TEXT,
98
    tlog_index INTEGER
99
);
100
CREATE TABLE IF NOT EXISTS corpus (
101
    file_id TEXT NOT NULL,
102
    hash_kind TEXT NOT NULL,
103
    hash_value TEXT NOT NULL,
104
    metadata TEXT,
105
    registered_at INTEGER NOT NULL,
106
    PRIMARY KEY (file_id, hash_kind, hash_value)
107
);
108
CREATE INDEX IF NOT EXISTS idx_events_token ON events(token_id);
109
CREATE INDEX IF NOT EXISTS idx_events_file ON events(file_id);
110
CREATE INDEX IF NOT EXISTS idx_corpus_hash ON corpus(hash_kind, hash_value);
111
"""
112
 
113
 
114
def load_or_create_identity() -> dict:
115
    DATA_DIR.mkdir(parents=True, exist_ok=True)
116
    if IDENTITY_PATH.exists():
117
        return json.loads(IDENTITY_PATH.read_text())
118
    sk = Ed25519PrivateKey.generate()
119
    pk = sk.public_key()
120
    ident = {
121
        "ed25519_priv": sk.private_bytes(
122
            encoding=serialization.Encoding.Raw,
123
            format=serialization.PrivateFormat.Raw,
124
            encryption_algorithm=serialization.NoEncryption(),
125
        ).hex(),
126
        "ed25519_pub": pk.public_bytes(
127
            encoding=serialization.Encoding.Raw,
128
            format=serialization.PublicFormat.Raw,
129
        ).hex(),
130
        "created_at": int(time.time()),
131
    }
132
    fd = os.open(str(IDENTITY_PATH), os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600)
133
    with os.fdopen(fd, "w") as f:
134
        json.dump(ident, f, indent=2)
135
    return ident
136
 
137
 
138
IDENTITY: Optional[dict] = None
139
TLOG: Optional[TransparencyLog] = None
140
 
141
 
142
@contextmanager
143
def db():
144
    con = sqlite3.connect(DB_PATH)
145
    con.row_factory = sqlite3.Row
146
    con.execute("PRAGMA journal_mode=WAL")
147
    con.execute("PRAGMA synchronous=NORMAL")
148
    try:
149
        yield con
150
        con.commit()
151
    finally:
152
        con.close()
153
 
154
 
155
def init_db():
156
    with db() as con:
157
        con.executescript(SCHEMA)
158
 
159
 
160
def timestamp_stub() -> str:
161
    """Fallback: self-timestamp from registry clock when TSA is unreachable."""
162
    return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
163
 
164
 
165
def qualified_timestamp_or_stub(data: bytes) -> tuple[str, Optional[dict]]:
166
    """
167
    Attempt a qualified RFC 3161 timestamp via the default TSA chain
168
    (FreeTSA, DigiCert - both free, no account). Falls back to a
169
    self-timestamp if all TSAs are unreachable.
170
 
171
    Returns (iso_string, qualified_details_dict_or_None).
172
 
173
    The registry persists the qualified_details dict (if present) in the
174
    events table so external auditors can independently verify the timestamp
175
    against the TSA's root cert, without trusting the registry operator.
176
    """
177
    try:
178
        from oversight_core.timestamp import qualified_timestamp
179
        ts = qualified_timestamp(data)
180
        if ts is not None:
181
            return ts.gen_time_iso, ts.to_dict()
182
    except ImportError:
183
        pass
184
    return timestamp_stub(), None
185
 
186
 
187
 
188
class TokenBucket:
189
    """Per-key token bucket with an LRU bound on state size."""
190
 
191
    def __init__(self, rate: float = 10.0, burst: int = 30, max_keys: int = 100_000):
192
        self.rate = rate
193
        self.burst = burst
194
        self.max_keys = max_keys
195
        self._state: "OrderedDict[str, tuple[float, float]]" = OrderedDict()
196
        self._lock = threading.Lock()
197
 
198
    def allow(self, key: str) -> bool:
199
        now = time.monotonic()
200
        with self._lock:
201
            if key in self._state:
202
                tokens, last = self._state.pop(key)
203
            else:
204
                tokens, last = (float(self.burst), now)
205
            tokens = min(self.burst, tokens + (now - last) * self.rate)
206
            if tokens < 1.0:
207
                self._state[key] = (tokens, now)
208
                self._evict_if_needed()
209
                return False
210
            self._state[key] = (tokens - 1.0, now)
211
            self._evict_if_needed()
212
            return True
213
 
214
    def _evict_if_needed(self):
215
        while len(self._state) > self.max_keys:
216
            self._state.popitem(last=False)
217
 
218
 
219
BUCKET = TokenBucket(rate=10.0, burst=30, max_keys=100_000)
220
 
221
 
222
def _xff_client(xff: str) -> str | None:
223
    """Return the trusted client IP from an X-Forwarded-For header value.
224
 
225
    The directly-connected proxy (Caddy) appends the real client as the
226
    RIGHTMOST entry. Entries to its left are attacker-controlled: a client
227
    may send any XFF header and the proxy appends rather than replaces, so
228
    the leftmost entry must never be trusted for rate-limit bucketing or for
229
    the source_ip written into beacon events. Taking the leftmost let an
230
    attacker pick their rate-limit bucket and forge attribution.
231
    """
232
    parts = [p.strip() for p in xff.split(",") if p.strip()]
233
    return parts[-1] if parts else None
234
 
235
 
236
def _client_key(request: Request) -> str:
237
    """Extract the client identifier used for rate limiting."""
238
    if TRUSTED_PROXY:
239
        xff = request.headers.get("x-forwarded-for", "")
240
        client = _xff_client(xff) if xff else None
241
        if client:
242
            return client
243
    return request.client.host if request.client else "unknown"
244
 
245
 
246
 
247
def _enforce_auth_config():
248
    """Fail closed at boot.
249
 
250
    Without an operator token the public write endpoints (/register,
251
    /attribute) would let anyone self-sign manifests into the append-only
252
    tlog and enumerate attribution over /attribute. Refuse to start in that
253
    state unless the operator has explicitly opted out with
254
    OVERSIGHT_AUTH_DISABLED=1 (intended for isolated local testing only).
255
    """
256
    if not OPERATOR_TOKEN and not AUTH_DISABLED:
257
        raise RuntimeError(
258
            "OVERSIGHT_OPERATOR_TOKEN is required to start the registry. "
259
            "Set it to a strong random value, or set OVERSIGHT_AUTH_DISABLED=1 "
260
            "only for isolated local testing."
261
        )
262
    if not OPERATOR_TOKEN and AUTH_DISABLED:
263
        import warnings
264
        warnings.warn(
265
            "OVERSIGHT_AUTH_DISABLED=1: registry is running without operator "
266
            "authentication. Do NOT do this in production.",
267
            stacklevel=2,
268
        )
269
 
270
 
271
@asynccontextmanager
272
async def lifespan(app: FastAPI):
273
    global IDENTITY, TLOG
274
    _enforce_auth_config()
275
    init_db()
276
    IDENTITY = load_or_create_identity()
277
    TLOG = TransparencyLog(TLOG_DIR, signing_key_hex=IDENTITY["ed25519_priv"])
278
    yield
279
 
280
 
281
app = FastAPI(title="OVERSIGHT Registry", version="0.2.1", lifespan=lifespan)
282
 
283
_default_cors_origins = [
284
    "https://oversight-protocol.github.io",
285
    "https://oversightprotocol.dev",
286
    "https://www.oversightprotocol.dev",
287
    "http://localhost:8000",
288
    "http://127.0.0.1:8000",
289
    "http://localhost:8787",
290
    "http://127.0.0.1:8787",
291
]
292
_extra_origins = [
293
    o.strip()
294
    for o in os.environ.get("OVERSIGHT_CORS_ORIGINS", "").split(",")
295
    if o.strip()
296
]
297
app.add_middleware(
298
    CORSMiddleware,
299
    allow_origins=_default_cors_origins + _extra_origins,
300
    allow_credentials=False,
301
    allow_methods=["GET", "OPTIONS"],
302
    allow_headers=["Accept", "Content-Type"],
303
    max_age=3600,
304
)
305
 
306
 
307
def _registry_error_code(status_code: int, message: str) -> str:
308
    text = message.lower()
309
    if status_code == 401:
310
        return "auth_required"
311
    if status_code == 404:
312
        return "not_found"
313
    if status_code == 409:
314
        return "issuer_mismatch"
315
    if status_code == 429:
316
        return "rate_limited"
317
    if status_code >= 500:
318
        return "server_error"
319
    if "signature" in text:
320
        return "signature_invalid"
321
    if "beacons do not match" in text or "watermarks do not match" in text:
322
        return "sidecar_mismatch"
323
    return "missing_field"
324
 
325
 
326
def _error_envelope(code: str, message: str) -> dict:
327
    return {"error": {"code": code, "message": message}}
328
 
329
 
330
@app.exception_handler(HTTPException)
331
async def _http_exception_handler(_request: Request, exc: HTTPException):
332
    message = str(exc.detail)
333
    return JSONResponse(
334
        status_code=exc.status_code,
335
        content=_error_envelope(_registry_error_code(exc.status_code, message), message),
336
        headers=exc.headers,
337
    )
338
 
339
 
340
@app.exception_handler(RequestValidationError)
341
async def _validation_exception_handler(_request: Request, exc: RequestValidationError):
342
    return JSONResponse(
343
        status_code=400,
344
        content=_error_envelope("missing_field", f"request validation failed: {exc}"),
345
    )
346
 
347
 
348
class RegistrationRequest(BaseModel):
349
    manifest: dict
350
    beacons: list[dict]
351
    watermarks: list[dict]
352
    corpus: Optional[dict] = None
353
 
354
 
355
class AttributionQuery(BaseModel):
356
    token_id: Optional[str] = None
357
    mark_id: Optional[str] = None
358
    layer: Optional[str] = None
359
    perceptual_hash: Optional[str] = None
360
 
361
 
362
def _append_tlog(event: dict) -> int:
363
    return TLOG.append(event) if TLOG else -1
364
 
365
 
366
def _tlog_proofs_for_events(events: list[dict]) -> list[dict]:
367
    """Attach inclusion proofs for event rows that have local tlog indexes."""
368
    if not TLOG:
369
        return []
370
    proofs = []
371
    for i, event in enumerate(events):
372
        idx = event.get("tlog_index")
373
        if idx is None:
374
            continue
375
        try:
376
            idx = int(idx)
377
        except (TypeError, ValueError):
378
            continue
379
        if idx < 0:
380
            continue
381
        proof = TLOG.inclusion_proof(idx)
382
        if proof is not None:
383
            proofs.append({
384
                "event_row": i,
385
                "tlog_index": idx,
386
                "proof": proof,
387
            })
388
    return proofs
389
 
390
 
391
def _attest_to_rekor(
392
    file_id: str,
393
    issuer_pub_hex: str,
394
    recipient_id: str,
395
    recipient_pubkey_hex: Optional[str],
396
    suite: str,
397
    content_hash_sha256_hex: str,
398
    watermarks: list[dict],
399
    mark_id_hex: str,
400
) -> Optional[dict]:
401
    """Sign a registration predicate with the registry's identity key and
402
    append it to a public Rekor v2 log.
403
 
404
    Returns a small JSON-serializable summary on success (log_url, log_index,
405
    log_id, integrated_time) so the response can carry it back to the client.
406
    Returns ``None`` when REKOR_ENABLED is false. Returns a dict with an
407
    ``error`` field (and no log_index) when the upload itself fails - the
408
    caller treats this as non-fatal.
409
    """
410
    if not REKOR_ENABLED or IDENTITY is None:
411
        return None
412
    try:
413
        recipient_hash = (
414
            rekor_mod.hash_recipient_pubkey(recipient_pubkey_hex)
415
            if recipient_pubkey_hex
416
            else "0" * 64
417
        )
418
        predicate = rekor_mod.OversightRegistrationPredicate(
419
            file_id=file_id,
420
            issuer_pubkey_ed25519=issuer_pub_hex,
421
            recipient_id=recipient_id,
422
            recipient_pubkey_sha256=recipient_hash,
423
            suite=suite,
424
            registered_at=timestamp_stub(),
425
            watermarks={
426
                w.get("layer", f"layer_{i}"): w.get("mark_id", "")
427
                for i, w in enumerate(watermarks)
428
                if w.get("mark_id")
429
            },
430
        )
431
        statement = rekor_mod.build_statement(
432
            mark_id_hex=mark_id_hex,
433
            content_hash_sha256_hex=content_hash_sha256_hex,
434
            predicate=predicate,
435
        )
436
        envelope = rekor_mod.sign_dsse(
437
            statement=statement,
438
            issuer_ed25519_priv=bytes.fromhex(IDENTITY["ed25519_priv"]),
439
        )
440
        registry_pub = Ed25519PublicKey.from_public_bytes(
441
            bytes.fromhex(IDENTITY["ed25519_pub"])
442
        )
443
        pub_pem = registry_pub.public_bytes(
444
            encoding=serialization.Encoding.PEM,
445
            format=serialization.PublicFormat.SubjectPublicKeyInfo,
446
        ).decode("ascii")
447
        result = rekor_mod.upload_dsse(
448
            envelope=envelope,
449
            issuer_ed25519_pub_pem=pub_pem,
450
            log_url=REKOR_URL,
451
        )
452
        return {
453
            "log_url": result.log_url,
454
            "log_index": result.log_index,
455
            "log_id": result.log_id,
456
            "integrated_time": result.integrated_time,
457
            "tlog_kind": rekor_mod.TLOG_KIND,
458
            "bundle_schema": rekor_mod.BUNDLE_SCHEMA,
459
        }
460
    except Exception as e:
461
        return {"error": f"{type(e).__name__}: {e}", "tlog_kind": rekor_mod.TLOG_KIND}
462
 
463
 
464
def _rate_limit(request: Request):
465
    if not BUCKET.allow(_client_key(request)):
466
        raise HTTPException(429, "rate limit exceeded")
467
 
468
 
469
def _bearer_or_header_token(request: Request, header_name: str) -> str:
470
    supplied = request.headers.get(header_name, "")
471
    if supplied:
472
        return supplied.strip()
473
    auth = request.headers.get("authorization", "")
474
    if auth.lower().startswith("bearer "):
475
        return auth[7:].strip()
476
    return ""
477
 
478
 
479
def _require_operator_auth(request: Request):
480
    """Require the optional operator bearer token for write-side APIs."""
481
    if not OPERATOR_TOKEN:
482
        return
483
    supplied = _bearer_or_header_token(request, "x-oversight-operator-token")
484
    if hmac.compare_digest(supplied, OPERATOR_TOKEN):
485
        return
486
    raise HTTPException(401, "operator authentication required")
487
 
488
 
489
def _is_loopback_host(host: Optional[str]) -> bool:
490
    if not host:
491
        return False
492
    try:
493
        return ipaddress.ip_address(host).is_loopback
494
    except ValueError:
495
        return host in {"localhost", "testclient"}
496
 
497
 
498
def _verify_dns_event_auth(request: Request):
499
    """Authenticate DNS bridge callbacks before trusting client_ip in the body."""
500
    if DNS_EVENT_SECRET:
501
        supplied = _bearer_or_header_token(request, "x-oversight-dns-secret")
502
        if hmac.compare_digest(supplied, DNS_EVENT_SECRET):
503
            return
504
        raise HTTPException(401, "invalid DNS event secret")
505
 
506
    host = request.client.host if request.client else None
507
    if _is_loopback_host(host):
508
        return
509
    raise HTTPException(
510
        503,
511
        "OVERSIGHT_DNS_EVENT_SECRET is required for non-loopback DNS event callbacks",
512
    )
513
 
514
 
515
def _verify_manifest_signature(manifest_dict: dict) -> tuple[bool, str]:
516
    """
517
    Parse and verify the manifest's embedded Ed25519 signature.
518
    Returns (ok, issuer_pub_hex). issuer_pub_hex is the claimed issuer key.
519
    """
520
    try:
521
        m = Manifest.from_json(jcs_dumps(manifest_dict))
522
    except Exception as e:
523
        return False, ""
524
    return m.verify(), m.issuer_ed25519_pub
525
 
526
 
527
def _canonical_items(items: list[dict]) -> list[str]:
528
    """Normalize registration sidecars for exact signed-manifest comparison."""
529
    return sorted(
530
        jcs_dumps(item).decode("utf-8")
531
        for item in items
532
    )
533
 
534
 
535
def _signed_registration_artifacts(
536
    manifest_dict: dict,
537
    req_beacons: list[dict],
538
    req_watermarks: list[dict],
539
) -> tuple[list[dict], list[dict]]:
540
    """Use the manifest's signed beacons/watermarks as the registry source of truth."""
541
    signed_beacons = manifest_dict.get("beacons") or []
542
    signed_watermarks = manifest_dict.get("watermarks") or []
543
    if _canonical_items(req_beacons) != _canonical_items(signed_beacons):
544
        raise HTTPException(400, "request beacons do not match signed manifest")
545
    if _canonical_items(req_watermarks) != _canonical_items(signed_watermarks):
546
        raise HTTPException(400, "request watermarks do not match signed manifest")
547
    return signed_beacons, signed_watermarks
548
 
549
 
550
@app.post("/register")
551
def register(req: RegistrationRequest, request: Request):
552
    """
553
    Register a sealed file's beacons + watermarks.
554
 
555
    Security requirements:
556
      - The manifest's embedded Ed25519 signature MUST verify.
557
      - If the file_id already exists in our DB, the re-registration's issuer
558
        pubkey MUST match the original. This prevents hostile overwrites of
559
        another issuer's attribution record.
560
      - A per-client rate limit applies.
561
    """
562
    _require_operator_auth(request)
563
    _rate_limit(request)
564
 
565
    m = req.manifest
566
    file_id = m.get("file_id")
567
    recipient = m.get("recipient") or {}
568
    recipient_id = recipient.get("recipient_id", "unknown")
569
    issuer_id = m.get("issuer_id", "unknown")
570
 
571
    if not file_id:
572
        raise HTTPException(400, "manifest missing file_id")
573
 
574
    sig_ok, issuer_pub = _verify_manifest_signature(m)
575
    if not sig_ok:
576
        raise HTTPException(400, "manifest signature invalid")
577
    if not issuer_pub:
578
        raise HTTPException(400, "manifest missing issuer_ed25519_pub")
579
    signed_beacons, signed_watermarks = _signed_registration_artifacts(
580
        m,
581
        req.beacons,
582
        req.watermarks,
583
    )
584
 
585
    now = int(time.time())
586
    with db() as con:
587
        existing = con.execute(
588
            "SELECT issuer_ed25519_pub FROM manifests WHERE file_id=?",
589
            (file_id,),
590
        ).fetchone()
591
        if existing and existing["issuer_ed25519_pub"] != issuer_pub:
592
            raise HTTPException(
593
                409,
594
                f"file_id already registered under a different issuer pubkey "
595
                f"(claimed={issuer_pub[:16]}..., existing={existing['issuer_ed25519_pub'][:16]}...)",
596
            )
597
 
598
        con.execute(
599
            "INSERT OR REPLACE INTO manifests VALUES (?,?,?,?,?,?)",
600
            (file_id, recipient_id, issuer_id, issuer_pub, json.dumps(m), now),
601
        )
602
        for b in signed_beacons:
603
            con.execute(
604
                "INSERT OR REPLACE INTO beacons VALUES (?,?,?,?,?,?)",
605
                (b["token_id"], file_id, recipient_id, issuer_id, b["kind"], now),
606
            )
607
        for w in signed_watermarks:
608
            con.execute(
609
                "INSERT OR REPLACE INTO watermarks VALUES (?,?,?,?,?,?)",
610
                (w["mark_id"], w["layer"], file_id, recipient_id, issuer_id, now),
611
            )
612
        if req.corpus:
613
            for hash_kind, hash_value in req.corpus.items():
614
                if hash_value:
615
                    con.execute(
616
                        "INSERT OR REPLACE INTO corpus VALUES (?,?,?,?,?)",
617
                        (file_id, hash_kind, str(hash_value), None, now),
618
                    )
619
 
620
    tlog_idx = _append_tlog({
621
        "event": "register",
622
        "file_id": file_id,
623
        "recipient_id": recipient_id,
624
        "issuer_id": issuer_id,
625
        "issuer_pub": issuer_pub,
626
        "n_beacons": len(signed_beacons),
627
        "n_watermarks": len(signed_watermarks),
628
        "timestamp": timestamp_stub(),
629
    })
630
 
631
    rekor_result = _attest_to_rekor(
632
        file_id=file_id,
633
        issuer_pub_hex=issuer_pub,
634
        recipient_id=recipient_id,
635
        recipient_pubkey_hex=recipient.get("x25519_pub"),
636
        suite=m.get("suite", "classic"),
637
        content_hash_sha256_hex=m.get("content_hash", "0" * 64),
638
        watermarks=signed_watermarks,
639
        mark_id_hex=next(
640
            (w["mark_id"] for w in signed_watermarks if w.get("mark_id")),
641
            file_id,
642
        ),
643
    )
644
 
645
    return {
646
        "ok": True,
647
        "file_id": file_id,
648
        "registered_beacons": len(signed_beacons),
649
        "tlog_index": tlog_idx,
650
        "rekor": rekor_result,
651
    }
652
 
653
 
654
ONE_PX_PNG = bytes.fromhex(
655
    "89504e470d0a1a0a0000000d49484452000000010000000108060000001f15c489"
656
    "0000000d49444154789c626000000000050001a5f645400000000049454e44ae426082"
657
)
658
 
659
 
660
def _record_event(request: Request, token_id: str, kind: str) -> int:
661
    with db() as con:
662
        row = con.execute(
663
            "SELECT file_id, recipient_id, issuer_id FROM beacons WHERE token_id=?",
664
            (token_id,),
665
        ).fetchone()
666
        file_id = row["file_id"] if row else None
667
        recipient_id = row["recipient_id"] if row else None
668
        issuer_id = row["issuer_id"] if row else None
669
 
670
        client_ip = request.client.host if request.client else None
671
        ua = request.headers.get("user-agent", "")
672
        qts = timestamp_stub()
673
 
674
        tlog_idx = _append_tlog({
675
            "event": "beacon",
676
            "kind": kind,
677
            "token_id": token_id,
678
            "file_id": file_id,
679
            "recipient_id": recipient_id,
680
            "source_ip": client_ip,
681
            "user_agent": ua,
682
            "timestamp": qts,
683
        })
684
 
685
        con.execute(
686
            "INSERT INTO events (token_id,file_id,recipient_id,issuer_id,kind,"
687
            "source_ip,user_agent,extra,timestamp,qualified_timestamp,tlog_index) "
688
            "VALUES (?,?,?,?,?,?,?,?,?,?,?)",
689
            (token_id, file_id, recipient_id, issuer_id, kind,
690
             client_ip, ua, "{}", int(time.time()), qts, tlog_idx),
691
        )
692
        return tlog_idx
693
 
694
 
695
@app.get("/p/{token_id}.png")
696
async def beacon_png(token_id: str, request: Request):
697
    _rate_limit(request)
698
    _record_event(request, token_id, "http_img")
699
    return Response(content=ONE_PX_PNG, media_type="image/png")
700
 
701
 
702
@app.api_route("/ocsp/r/{token_id}", methods=["GET", "POST"])
703
@app.api_route("/r/{token_id}", methods=["GET", "POST"])
704
async def beacon_ocsp(token_id: str, request: Request):
705
    _rate_limit(request)
706
    _record_event(request, token_id, "ocsp")
707
    return Response(status_code=200)
708
 
709
 
710
@app.get("/lic/v/{token_id}")
711
@app.get("/v/{token_id}")
712
async def beacon_license(token_id: str, request: Request):
713
    _rate_limit(request)
714
    _record_event(request, token_id, "license")
715
    return JSONResponse({"valid": True})
716
 
717
 
718
@app.post("/attribute")
719
def attribute(q: AttributionQuery, request: Request):
720
    _require_operator_auth(request)
721
    with db() as con:
722
        row = None
723
        if q.token_id:
724
            row = con.execute(
725
                "SELECT * FROM beacons WHERE token_id=?", (q.token_id,)
726
            ).fetchone()
727
        elif q.mark_id and q.layer:
728
            row = con.execute(
729
                "SELECT * FROM watermarks WHERE mark_id=? AND layer=?",
730
                (q.mark_id, q.layer),
731
            ).fetchone()
732
        elif q.mark_id:
733
            row = con.execute(
734
                "SELECT * FROM watermarks WHERE mark_id=?", (q.mark_id,)
735
            ).fetchone()
736
        elif q.perceptual_hash:
737
            row = con.execute(
738
                "SELECT c.file_id as file_id, b.recipient_id as recipient_id, "
739
                "b.issuer_id as issuer_id "
740
                "FROM corpus c LEFT JOIN beacons b ON c.file_id = b.file_id "
741
                "WHERE c.hash_kind='perceptual' AND c.hash_value=? LIMIT 1",
742
                (q.perceptual_hash,),
743
            ).fetchone()
744
        else:
745
            raise HTTPException(400, "provide token_id, mark_id, or perceptual_hash")
746
 
747
        if not row:
748
            return {"found": False}
749
 
750
        file_id = row["file_id"]
751
        manifest_row = con.execute(
752
            "SELECT manifest_json FROM manifests WHERE file_id=?", (file_id,)
753
        ).fetchone()
754
        manifest = json.loads(manifest_row["manifest_json"]) if manifest_row else None
755
        events = con.execute(
756
            "SELECT kind, source_ip, user_agent, timestamp, qualified_timestamp, tlog_index "
757
            "FROM events WHERE file_id=? ORDER BY timestamp DESC LIMIT 50",
758
            (file_id,),
759
        ).fetchall()
760
 
761
        return {
762
            "found": True,
763
            "file_id": file_id,
764
            "recipient_id": row["recipient_id"],
765
            "issuer_id": row["issuer_id"],
766
            "manifest": manifest,
767
            "recent_events": [dict(e) for e in events],
768
        }
769
 
770
 
771
@app.get("/evidence/{file_id}")
772
def evidence_bundle(file_id: str):
773
    with db() as con:
774
        m = con.execute(
775
            "SELECT manifest_json FROM manifests WHERE file_id=?", (file_id,)
776
        ).fetchone()
777
        if not m:
778
            raise HTTPException(404, "unknown file_id")
779
        events = con.execute(
780
            "SELECT * FROM events WHERE file_id=? ORDER BY timestamp ASC", (file_id,),
781
        ).fetchall()
782
        beacons = con.execute(
783
            "SELECT * FROM beacons WHERE file_id=?", (file_id,)
784
        ).fetchall()
785
        watermarks = con.execute(
786
            "SELECT * FROM watermarks WHERE file_id=?", (file_id,)
787
        ).fetchall()
788
 
789
    event_dicts = [dict(e) for e in events]
790
    bundle = {
791
        "file_id": file_id,
792
        "bundle_generated_at": timestamp_stub(),
793
        "registry_pub": IDENTITY["ed25519_pub"],
794
        "manifest": json.loads(m["manifest_json"]),
795
        "beacons": [dict(b) for b in beacons],
796
        "watermarks": [dict(w) for w in watermarks],
797
        "events": event_dicts,
798
        "tlog_head": TLOG.signed_head() if TLOG else None,
799
        "tlog_proofs": _tlog_proofs_for_events(event_dicts),
800
        "disclaimer": (
801
            "This bundle is a provenance record, not a legal finding. For court use, "
802
            "supplement with RFC 3161 qualified timestamps and ISO/IEC 27037 chain-of-custody."
803
        ),
804
    }
805
    sk = Ed25519PrivateKey.from_private_bytes(bytes.fromhex(IDENTITY["ed25519_priv"]))
806
    msg = jcs_dumps(bundle)
807
    bundle["bundle_signature_ed25519"] = sk.sign(msg).hex()
808
    return bundle
809
 
810
 
811
@app.get("/tlog/head")
812
def tlog_head():
813
    if not TLOG:
814
        raise HTTPException(503, "tlog not initialized")
815
    return TLOG.signed_head()
816
 
817
 
818
@app.get("/tlog/proof/{index}")
819
def tlog_proof(index: int):
820
    if not TLOG:
821
        raise HTTPException(503, "tlog not initialized")
822
    proof = TLOG.inclusion_proof(index)
823
    if proof is None:
824
        raise HTTPException(404, "index out of range")
825
    return proof
826
 
827
 
828
@app.get("/tlog/range")
829
def tlog_range(start: int = 0, limit: int = 500):
830
    """Return tlog leaf entries in [start, start+limit). For CanaryKeeper polling."""
831
    if not TLOG:
832
        raise HTTPException(503, "tlog not initialized")
833
    if start < 0:
834
        raise HTTPException(400, "start must be non-negative")
835
    limit = min(max(1, limit), 1000)
836
    try:
837
        entries = TLOG.range_records(start, limit)
838
    except ValueError as exc:
839
        raise HTTPException(500, f"tlog range validation failed: {exc}") from exc
840
    return {"start": start, "count": len(entries), "entries": entries}
841
 
842
 
843
class DnsEvent(BaseModel):
844
    token_id: str
845
    client_ip: Optional[str] = None
846
    qtype: Optional[str] = None
847
    qname: Optional[str] = None
848
 
849
 
850
@app.post("/dns_event")
851
def dns_event(evt: DnsEvent, request: Request):
852
    """Called by the oversight_dns server when a beacon DNS query arrives."""
853
    _rate_limit(request)
854
    _verify_dns_event_auth(request)
855
    with db() as con:
856
        row = con.execute(
857
            "SELECT file_id, recipient_id, issuer_id FROM beacons WHERE token_id=?",
858
            (evt.token_id,),
859
        ).fetchone()
860
        file_id = row["file_id"] if row else None
861
        recipient_id = row["recipient_id"] if row else None
862
        issuer_id = row["issuer_id"] if row else None
863
 
864
        qts = timestamp_stub()
865
        tlog_idx = _append_tlog({
866
            "event": "beacon",
867
            "kind": "dns",
868
            "token_id": evt.token_id,
869
            "file_id": file_id,
870
            "recipient_id": recipient_id,
871
            "source_ip": evt.client_ip,
872
            "qname": evt.qname,
873
            "qtype": evt.qtype,
874
            "timestamp": qts,
875
        })
876
        con.execute(
877
            "INSERT INTO events (token_id,file_id,recipient_id,issuer_id,kind,"
878
            "source_ip,user_agent,extra,timestamp,qualified_timestamp,tlog_index) "
879
            "VALUES (?,?,?,?,?,?,?,?,?,?,?)",
880
            (evt.token_id, file_id, recipient_id, issuer_id, "dns",
881
             evt.client_ip, "", json.dumps({"qtype": evt.qtype, "qname": evt.qname}),
882
             int(time.time()), qts, tlog_idx),
883
        )
884
    return {"ok": True, "tlog_index": tlog_idx}
885
 
886
 
887
@app.get("/candidates/semantic")
888
def candidates_semantic(limit: int = 1000, since: Optional[int] = None):
889
    """
890
    Flywheel-friendly endpoint: returns recent L3 semantic mark_ids so the
891
    scraper can verify them against leaked text without shipping the whole
892
    watermark table over the wire repeatedly.
893
    """
894
    limit = min(max(1, limit), 10_000)
895
    with db() as con:
896
        if since:
897
            rows = con.execute(
898
                "SELECT mark_id, file_id, recipient_id, registered_at FROM watermarks "
899
                "WHERE layer='L3_semantic' AND registered_at>=? "
900
                "ORDER BY registered_at DESC LIMIT ?",
901
                (since, limit),
902
            ).fetchall()
903
        else:
904
            rows = con.execute(
905
                "SELECT mark_id, file_id, recipient_id, registered_at FROM watermarks "
906
                "WHERE layer='L3_semantic' ORDER BY registered_at DESC LIMIT ?",
907
                (limit,),
908
            ).fetchall()
909
    return {
910
        "generated_at": timestamp_stub(),
911
        "count": len(rows),
912
        "candidates": [dict(r) for r in rows],
913
    }
914
 
915
 
916
@app.get("/health")
917
def health():
918
    return {
919
        "status": "ok",
920
        "service": "oversight-registry",
921
        "version": "0.2.1",
922
        "tlog_size": TLOG.size() if TLOG else 0,
923
    }
924
 
925
 
926
@app.get("/.well-known/oversight-registry")
927
def well_known():
928
    return {
929
        "ed25519_pub": IDENTITY["ed25519_pub"] if IDENTITY else None,
930
        "version": "0.2.1",
931
        "jurisdiction": os.environ.get("OVERSIGHT_JURISDICTION", "GLOBAL"),
932
        "tlog_size": TLOG.size() if TLOG else 0,
933
    }