| 1 | """Registry v1 federation conformance harness. |
| 2 | |
| 3 | Exercises every endpoint in ``docs/spec/registry-v1.md`` against a |
| 4 | running registry. Two modes: |
| 5 | |
| 6 | - **In-process.** With no ``OVERSIGHT_REGISTRY_URL`` environment |
| 7 | variable, the harness stands the reference Python registry up inside |
| 8 | a FastAPI ``TestClient`` against a fresh SQLite database in a temp |
| 9 | directory and runs every check there. This is the CI path. |
| 10 | |
| 11 | - **Live operator URL.** When ``OVERSIGHT_REGISTRY_URL`` is set, the |
| 12 | harness points an ``httpx.Client`` at that URL and runs the same |
| 13 | checks. This is the acceptance gate an independent operator uses to |
| 14 | claim v1 conformance. |
| 15 | |
| 16 | The script fails loudly on any divergence from the spec. Each check |
| 17 | has a short name so a run log is a compact conformance report. |
| 18 | """ |
| 19 | |
| 20 | from __future__ import annotations |
| 21 | |
| 22 | import base64 |
| 23 | import json |
| 24 | import os |
| 25 | import shutil |
| 26 | import sys |
| 27 | import tempfile |
| 28 | import time |
| 29 | import uuid |
| 30 | from dataclasses import asdict |
| 31 | from pathlib import Path |
| 32 | from typing import Any, Optional |
| 33 | |
| 34 | ROOT = Path(__file__).resolve().parent.parent |
| 35 | sys.path.insert(0, str(ROOT)) |
| 36 | |
| 37 | from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey |
| 38 | from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey |
| 39 | from cryptography.hazmat.primitives import serialization |
| 40 | |
| 41 | from oversight_core.manifest import Manifest, Recipient, WatermarkRef |
| 42 | |
| 43 | |
| 44 | PASS = "[PASS]" |
| 45 | FAIL = "[FAIL]" |
| 46 | PASSED: list[str] = [] |
| 47 | FAILED: list[tuple[str, str]] = [] |
| 48 | |
| 49 | |
| 50 | def check(name: str, condition: bool, detail: str = "") -> None: |
| 51 | if condition: |
| 52 | PASSED.append(name) |
| 53 | print(f" {PASS} {name}") |
| 54 | else: |
| 55 | FAILED.append((name, detail)) |
| 56 | print(f" {FAIL} {name} ({detail})") |
| 57 | |
| 58 | |
| 59 | def check_error_envelope(name: str, response, expected_status: int, expected_code: str) -> None: |
| 60 | try: |
| 61 | body = response.json() |
| 62 | except Exception: |
| 63 | body = {} |
| 64 | error = body.get("error") if isinstance(body, dict) else None |
| 65 | ok = ( |
| 66 | response.status_code == expected_status |
| 67 | and isinstance(error, dict) |
| 68 | and error.get("code") == expected_code |
| 69 | and isinstance(error.get("message"), str) |
| 70 | and bool(error.get("message")) |
| 71 | ) |
| 72 | check(name, ok, f"status={response.status_code} body={body}") |
| 73 | |
| 74 | |
| 75 | |
| 76 | |
| 77 | class Client: |
| 78 | """Thin wrapper that presents the same get/post surface over a |
| 79 | FastAPI TestClient or a live httpx.Client.""" |
| 80 | |
| 81 | def __init__(self, impl, base_url: str = "", default_headers: Optional[dict[str, str]] = None): |
| 82 | self._impl = impl |
| 83 | self._base = base_url.rstrip("/") |
| 84 | self._headers = default_headers or {} |
| 85 | |
| 86 | def _merge_headers(self, kwargs: dict[str, Any]) -> dict[str, Any]: |
| 87 | if not self._headers: |
| 88 | return kwargs |
| 89 | merged = dict(self._headers) |
| 90 | merged.update(kwargs.pop("headers", {}) or {}) |
| 91 | return {**kwargs, "headers": merged} |
| 92 | |
| 93 | def get(self, path: str, **kwargs): |
| 94 | kwargs = self._merge_headers(kwargs) |
| 95 | return self._impl.get(self._base + path, **kwargs) if self._base else self._impl.get(path, **kwargs) |
| 96 | |
| 97 | def post(self, path: str, **kwargs): |
| 98 | kwargs = self._merge_headers(kwargs) |
| 99 | return self._impl.post(self._base + path, **kwargs) if self._base else self._impl.post(path, **kwargs) |
| 100 | |
| 101 | |
| 102 | def operator_headers() -> dict[str, str]: |
| 103 | token = os.environ.get("OVERSIGHT_OPERATOR_TOKEN", "").strip() |
| 104 | return {"Authorization": f"Bearer {token}"} if token else {} |
| 105 | |
| 106 | |
| 107 | def build_in_process_client(): |
| 108 | """Spin up the reference registry in a fresh temp data dir.""" |
| 109 | from fastapi.testclient import TestClient |
| 110 | |
| 111 | tmp = tempfile.mkdtemp(prefix="oversight-conformance-") |
| 112 | os.environ["OVERSIGHT_DATA_DIR"] = tmp |
| 113 | os.environ.setdefault("OVERSIGHT_REKOR_ENABLED", "0") |
| 114 | os.environ.setdefault("OVERSIGHT_AUTH_DISABLED", "1") |
| 115 | os.environ["OVERSIGHT_DNS_EVENT_SECRET"] = "test-dns-secret-123" |
| 116 | |
| 117 | for mod in [m for m in list(sys.modules) if m.startswith("registry.")]: |
| 118 | del sys.modules[mod] |
| 119 | |
| 120 | import registry.server as server |
| 121 | server.DATA_DIR = Path(tmp) |
| 122 | server.DB_PATH = Path(tmp) / "registry.sqlite" |
| 123 | server.TLOG_DIR = Path(tmp) / "tlog" |
| 124 | server.IDENTITY_PATH = Path(tmp) / "identity.json" |
| 125 | server.DNS_EVENT_SECRET = "test-dns-secret-123" |
| 126 | server.IDENTITY = server.load_or_create_identity() |
| 127 | server.init_db() |
| 128 | from oversight_core.tlog import TransparencyLog |
| 129 | server.TLOG = TransparencyLog(server.TLOG_DIR, signing_key_hex=server.IDENTITY["ed25519_priv"]) |
| 130 | |
| 131 | tc = TestClient(server.app) |
| 132 | return Client(tc, default_headers=operator_headers()), tmp, server.IDENTITY["ed25519_pub"] |
| 133 | |
| 134 | |
| 135 | def build_live_client(url: str): |
| 136 | import httpx |
| 137 | return Client(httpx.Client(timeout=15.0), base_url=url, default_headers=operator_headers()), None, None |
| 138 | |
| 139 | |
| 140 | |
| 141 | |
| 142 | def build_signed_manifest() -> tuple[dict, list[dict], list[dict], bytes]: |
| 143 | """Return (manifest_dict, beacons, watermarks, issuer_priv_raw).""" |
| 144 | issuer_sk = Ed25519PrivateKey.generate() |
| 145 | issuer_pub_hex = ( |
| 146 | issuer_sk.public_key() |
| 147 | .public_bytes( |
| 148 | encoding=serialization.Encoding.Raw, |
| 149 | format=serialization.PublicFormat.Raw, |
| 150 | ) |
| 151 | .hex() |
| 152 | ) |
| 153 | issuer_priv_raw = issuer_sk.private_bytes( |
| 154 | encoding=serialization.Encoding.Raw, |
| 155 | format=serialization.PrivateFormat.Raw, |
| 156 | encryption_algorithm=serialization.NoEncryption(), |
| 157 | ) |
| 158 | |
| 159 | recipient_x25519 = X25519PrivateKey.generate().public_key().public_bytes( |
| 160 | encoding=serialization.Encoding.Raw, |
| 161 | format=serialization.PublicFormat.Raw, |
| 162 | ).hex() |
| 163 | |
| 164 | recipient = Recipient( |
| 165 | recipient_id="conformance-recipient", |
| 166 | x25519_pub=recipient_x25519, |
| 167 | ) |
| 168 | beacons = [ |
| 169 | {"token_id": uuid.uuid4().hex, "kind": "dns"}, |
| 170 | {"token_id": uuid.uuid4().hex, "kind": "http"}, |
| 171 | ] |
| 172 | watermarks = [ |
| 173 | WatermarkRef(layer="L1_zero_width", mark_id="10" * 16), |
| 174 | WatermarkRef(layer="L2_whitespace", mark_id="20" * 16), |
| 175 | ] |
| 176 | |
| 177 | m = Manifest.new( |
| 178 | original_filename="conformance.txt", |
| 179 | content_hash="ab" * 32, |
| 180 | size_bytes=4096, |
| 181 | issuer_id="conformance-issuer", |
| 182 | issuer_ed25519_pub_hex=issuer_pub_hex, |
| 183 | recipient=recipient, |
| 184 | registry_url="https://registry.example.org", |
| 185 | ) |
| 186 | m.beacons = list(beacons) |
| 187 | m.watermarks = list(watermarks) |
| 188 | m.sign(issuer_priv_raw) |
| 189 | |
| 190 | manifest_dict = json.loads(m.to_json().decode("utf-8")) |
| 191 | sidecar_beacons = list(beacons) |
| 192 | sidecar_watermarks = [asdict(w) for w in watermarks] |
| 193 | return manifest_dict, sidecar_beacons, sidecar_watermarks, issuer_priv_raw |
| 194 | |
| 195 | |
| 196 | |
| 197 | |
| 198 | def check_health(cli: Client) -> None: |
| 199 | r = cli.get("/health") |
| 200 | check("health-200", r.status_code == 200, f"status={r.status_code}") |
| 201 | body = r.json() if r.status_code == 200 else {} |
| 202 | check("health-has-status", body.get("status") in {"ok", "degraded"}, |
| 203 | f"status={body.get('status')!r}") |
| 204 | check("health-service-prefix", |
| 205 | str(body.get("service", "")).startswith("oversight-registry"), |
| 206 | f"service={body.get('service')!r}") |
| 207 | check("health-tlog-size-int", isinstance(body.get("tlog_size"), int)) |
| 208 | |
| 209 | |
| 210 | def check_well_known(cli: Client) -> None: |
| 211 | r = cli.get("/.well-known/oversight-registry") |
| 212 | check("well-known-200", r.status_code == 200, f"status={r.status_code}") |
| 213 | body = r.json() if r.status_code == 200 else {} |
| 214 | pub = body.get("ed25519_pub") |
| 215 | check("well-known-ed25519-hex", |
| 216 | isinstance(pub, str) and len(pub) == 64 and all(c in "0123456789abcdef" for c in pub.lower()), |
| 217 | f"ed25519_pub={pub!r}") |
| 218 | check("well-known-has-version", isinstance(body.get("version"), str)) |
| 219 | |
| 220 | |
| 221 | def check_register_roundtrip(cli: Client, manifest: dict, beacons: list, watermarks: list) -> Optional[str]: |
| 222 | body = {"manifest": manifest, "beacons": beacons, "watermarks": watermarks} |
| 223 | r = cli.post("/register", json=body) |
| 224 | check("register-200", r.status_code == 200, f"status={r.status_code} body={r.text[:200]}") |
| 225 | if r.status_code != 200: |
| 226 | return None |
| 227 | out = r.json() |
| 228 | check("register-ok-true", out.get("ok") is True) |
| 229 | check("register-file-id-echo", out.get("file_id") == manifest["file_id"]) |
| 230 | check("register-count", out.get("registered_beacons") == len(beacons)) |
| 231 | check("register-tlog-index-int", isinstance(out.get("tlog_index"), int)) |
| 232 | return out.get("file_id") |
| 233 | |
| 234 | |
| 235 | def check_register_rejects_unsigned(cli: Client, manifest: dict, beacons: list, watermarks: list) -> None: |
| 236 | tampered = dict(manifest) |
| 237 | tampered["signature_ed25519"] = "00" * 64 |
| 238 | tampered["file_id"] = str(uuid.uuid4()) |
| 239 | r = cli.post("/register", json={"manifest": tampered, "beacons": beacons, "watermarks": watermarks}) |
| 240 | check("register-rejects-bad-sig", r.status_code == 400, f"status={r.status_code}") |
| 241 | check_error_envelope("register-bad-sig-error-envelope", r, 400, "signature_invalid") |
| 242 | |
| 243 | |
| 244 | def check_register_rejects_sidecar_mismatch(cli: Client, manifest: dict, beacons: list, watermarks: list) -> None: |
| 245 | bad = list(beacons) + [{"token_id": "sneaky", "kind": "dns"}] |
| 246 | r = cli.post("/register", json={"manifest": manifest, "beacons": bad, "watermarks": watermarks}) |
| 247 | check("register-rejects-sidecar-mismatch", r.status_code == 400, f"status={r.status_code}") |
| 248 | check_error_envelope("register-sidecar-error-envelope", r, 400, "sidecar_mismatch") |
| 249 | |
| 250 | |
| 251 | def check_attribute_by_token(cli: Client, beacons: list) -> None: |
| 252 | r = cli.post("/attribute", json={"token_id": beacons[0]["token_id"]}) |
| 253 | check("attribute-200", r.status_code == 200, f"status={r.status_code}") |
| 254 | body = r.json() if r.status_code == 200 else {} |
| 255 | check("attribute-found", body.get("found") is True) |
| 256 | |
| 257 | |
| 258 | def check_attribute_miss(cli: Client) -> None: |
| 259 | r = cli.post("/attribute", json={"token_id": "nonexistent-token-id"}) |
| 260 | check("attribute-miss-200", r.status_code == 200) |
| 261 | check("attribute-miss-found-false", r.json().get("found") is False) |
| 262 | |
| 263 | |
| 264 | def check_attribute_missing_field_error(cli: Client) -> None: |
| 265 | r = cli.post("/attribute", json={}) |
| 266 | check_error_envelope("attribute-missing-field-error-envelope", r, 400, "missing_field") |
| 267 | |
| 268 | |
| 269 | def check_evidence(cli: Client, file_id: str) -> None: |
| 270 | r = cli.get(f"/evidence/{file_id}") |
| 271 | check("evidence-200", r.status_code == 200, f"status={r.status_code}") |
| 272 | body = r.json() if r.status_code == 200 else {} |
| 273 | check("evidence-has-manifest", isinstance(body.get("manifest"), dict)) |
| 274 | check("evidence-has-events", isinstance(body.get("events"), list)) |
| 275 | check("evidence-has-beacons", isinstance(body.get("beacons"), list)) |
| 276 | check("evidence-has-watermarks", isinstance(body.get("watermarks"), list)) |
| 277 | check("evidence-has-registry-pub", isinstance(body.get("registry_pub"), str)) |
| 278 | check("evidence-has-tlog-head", |
| 279 | "tlog_head" in body, |
| 280 | f"keys={list(body)[:10]}") |
| 281 | check("evidence-has-tlog-proofs", |
| 282 | isinstance(body.get("tlog_proofs"), list)) |
| 283 | check("evidence-has-bundle-signature", |
| 284 | isinstance(body.get("bundle_signature_ed25519"), str)) |
| 285 | |
| 286 | |
| 287 | def check_evidence_missing_error(cli: Client) -> None: |
| 288 | r = cli.get("/evidence/missing-file-id") |
| 289 | check_error_envelope("evidence-missing-error-envelope", r, 404, "not_found") |
| 290 | |
| 291 | |
| 292 | def check_tlog_head(cli: Client) -> None: |
| 293 | r = cli.get("/tlog/head") |
| 294 | check("tlog-head-200", r.status_code == 200, f"status={r.status_code}") |
| 295 | |
| 296 | |
| 297 | def check_tlog_range(cli: Client) -> None: |
| 298 | r = cli.get("/tlog/range?start=0&limit=10") |
| 299 | body = r.json() if r.status_code == 200 else {} |
| 300 | entries = body.get("entries") |
| 301 | range_ok = ( |
| 302 | r.status_code == 200 |
| 303 | and isinstance(entries, list) |
| 304 | and body.get("count") == len(entries) |
| 305 | and all( |
| 306 | isinstance(entry, dict) |
| 307 | and isinstance(entry.get("index"), int) |
| 308 | and isinstance(entry.get("leaf_hash"), str) |
| 309 | and isinstance(entry.get("leaf_data"), str) |
| 310 | for entry in entries |
| 311 | ) |
| 312 | ) |
| 313 | check("tlog-range-200-shape", range_ok, f"status={r.status_code} body={body}") |
| 314 | |
| 315 | |
| 316 | def check_dns_event_requires_secret(cli: Client) -> None: |
| 317 | token = "t-" + uuid.uuid4().hex |
| 318 | r = cli.post( |
| 319 | "/dns_event", |
| 320 | json={"token_id": token, "client_ip": "198.51.100.8", "qtype": "A", "qname": "x.example"}, |
| 321 | headers={"X-Oversight-DNS-Secret": "wrong-secret"}, |
| 322 | ) |
| 323 | check( |
| 324 | "dns-event-auth-enforced", |
| 325 | r.status_code in (200, 401, 503), |
| 326 | f"status={r.status_code}", |
| 327 | ) |
| 328 | |
| 329 | |
| 330 | def check_cors_headers(cli: Client) -> None: |
| 331 | """A browser inspector hosted at an Oversight-approved origin must be able |
| 332 | to read /health and /.well-known; confirm the CORS middleware is present.""" |
| 333 | origin = "https://oversight-protocol.github.io" |
| 334 | try: |
| 335 | r = cli.get("/health", headers={"Origin": origin}) |
| 336 | except TypeError: |
| 337 | r = cli.get("/health") |
| 338 | acao = r.headers.get("access-control-allow-origin") if hasattr(r, "headers") else None |
| 339 | check( |
| 340 | "cors-allows-github-pages-origin", |
| 341 | acao in (origin, "*"), |
| 342 | f"Access-Control-Allow-Origin={acao!r}", |
| 343 | ) |
| 344 | |
| 345 | |
| 346 | def check_beacon_endpoints(cli: Client, beacons: list) -> None: |
| 347 | token = beacons[0]["token_id"] |
| 348 | r = cli.get(f"/p/{token}.png") |
| 349 | check("beacon-http-img-200", r.status_code == 200, f"status={r.status_code}") |
| 350 | r = cli.get(f"/r/{token}") |
| 351 | check("beacon-ocsp-200", r.status_code == 200, f"status={r.status_code}") |
| 352 | r = cli.get(f"/v/{token}") |
| 353 | check("beacon-license-200", r.status_code == 200, f"status={r.status_code}") |
| 354 | |
| 355 | |
| 356 | |
| 357 | |
| 358 | def run(cli: Client) -> None: |
| 359 | print("[*] Oversight registry v1 conformance harness") |
| 360 | |
| 361 | print("\n[*] Identity and liveness") |
| 362 | check_health(cli) |
| 363 | check_well_known(cli) |
| 364 | |
| 365 | print("\n[*] Registration") |
| 366 | manifest, beacons, watermarks, _ = build_signed_manifest() |
| 367 | file_id = check_register_roundtrip(cli, manifest, beacons, watermarks) |
| 368 | check_register_rejects_unsigned(cli, manifest, beacons, watermarks) |
| 369 | check_register_rejects_sidecar_mismatch(cli, manifest, beacons, watermarks) |
| 370 | |
| 371 | if file_id: |
| 372 | print("\n[*] Attribution and evidence") |
| 373 | check_attribute_by_token(cli, beacons) |
| 374 | check_attribute_miss(cli) |
| 375 | check_attribute_missing_field_error(cli) |
| 376 | check_evidence(cli, file_id) |
| 377 | check_evidence_missing_error(cli) |
| 378 | |
| 379 | print("\n[*] Transparency log") |
| 380 | check_tlog_head(cli) |
| 381 | check_tlog_range(cli) |
| 382 | |
| 383 | print("\n[*] CORS") |
| 384 | check_cors_headers(cli) |
| 385 | |
| 386 | print("\n[*] Beacons and DNS event") |
| 387 | check_beacon_endpoints(cli, beacons) |
| 388 | check_dns_event_requires_secret(cli) |
| 389 | |
| 390 | print() |
| 391 | print(f"[summary] passed={len(PASSED)} failed={len(FAILED)}") |
| 392 | if FAILED: |
| 393 | for name, detail in FAILED: |
| 394 | print(f" -> {name}: {detail}") |
| 395 | raise SystemExit(1) |
| 396 | print("[ok] conformance harness green") |
| 397 | |
| 398 | |
| 399 | def main() -> None: |
| 400 | url = os.environ.get("OVERSIGHT_REGISTRY_URL", "").strip() |
| 401 | tmp = None |
| 402 | try: |
| 403 | if url: |
| 404 | print(f"[*] target: live registry at {url}") |
| 405 | cli, tmp, _ = build_live_client(url) |
| 406 | else: |
| 407 | print("[*] target: in-process reference registry") |
| 408 | cli, tmp, _ = build_in_process_client() |
| 409 | run(cli) |
| 410 | finally: |
| 411 | if tmp and os.path.isdir(tmp): |
| 412 | shutil.rmtree(tmp, ignore_errors=True) |
| 413 | |
| 414 | |
| 415 | def test_registry_v1_conformance_harness() -> None: |
| 416 | """Pytest entry point for the registry v1 conformance harness. |
| 417 | |
| 418 | The harness is intentionally a single pytest case: the checks share state |
| 419 | (the registered file_id drives the subsequent attribution, evidence, tlog, |
| 420 | and beacon checks) and the question under test is one yes/no question, |
| 421 | "does this registry meet v1 conformance?" Per-check pass/fail is still |
| 422 | printed to stdout so a CI log is a compact conformance report. |
| 423 | """ |
| 424 | PASSED.clear() |
| 425 | FAILED.clear() |
| 426 | url = os.environ.get("OVERSIGHT_REGISTRY_URL", "").strip() |
| 427 | tmp = None |
| 428 | try: |
| 429 | if url: |
| 430 | cli, tmp, _ = build_live_client(url) |
| 431 | else: |
| 432 | cli, tmp, _ = build_in_process_client() |
| 433 | run(cli) |
| 434 | finally: |
| 435 | if tmp and os.path.isdir(tmp): |
| 436 | shutil.rmtree(tmp, ignore_errors=True) |
| 437 | assert not FAILED, f"{len(FAILED)} conformance check(s) failed: " + ", ".join( |
| 438 | name for name, _ in FAILED |
| 439 | ) |
| 440 | |
| 441 | |
| 442 | if __name__ == "__main__": |
| 443 | main() |