Zion Boggan
repos/Oversight/tests/test_registry_conformance.py
zionboggan.com ↗
443 lines · python
History for this file →
1
"""Registry v1 federation conformance harness.
2
 
3
Exercises every endpoint in ``docs/spec/registry-v1.md`` against a
4
running registry. Two modes:
5
 
6
- **In-process.** With no ``OVERSIGHT_REGISTRY_URL`` environment
7
  variable, the harness stands the reference Python registry up inside
8
  a FastAPI ``TestClient`` against a fresh SQLite database in a temp
9
  directory and runs every check there. This is the CI path.
10
 
11
- **Live operator URL.** When ``OVERSIGHT_REGISTRY_URL`` is set, the
12
  harness points an ``httpx.Client`` at that URL and runs the same
13
  checks. This is the acceptance gate an independent operator uses to
14
  claim v1 conformance.
15
 
16
The script fails loudly on any divergence from the spec. Each check
17
has a short name so a run log is a compact conformance report.
18
"""
19
 
20
from __future__ import annotations
21
 
22
import base64
23
import json
24
import os
25
import shutil
26
import sys
27
import tempfile
28
import time
29
import uuid
30
from dataclasses import asdict
31
from pathlib import Path
32
from typing import Any, Optional
33
 
34
ROOT = Path(__file__).resolve().parent.parent
35
sys.path.insert(0, str(ROOT))
36
 
37
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
38
from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey
39
from cryptography.hazmat.primitives import serialization
40
 
41
from oversight_core.manifest import Manifest, Recipient, WatermarkRef
42
 
43
 
44
PASS = "[PASS]"
45
FAIL = "[FAIL]"
46
PASSED: list[str] = []
47
FAILED: list[tuple[str, str]] = []
48
 
49
 
50
def check(name: str, condition: bool, detail: str = "") -> None:
51
    if condition:
52
        PASSED.append(name)
53
        print(f"  {PASS} {name}")
54
    else:
55
        FAILED.append((name, detail))
56
        print(f"  {FAIL} {name}  ({detail})")
57
 
58
 
59
def check_error_envelope(name: str, response, expected_status: int, expected_code: str) -> None:
60
    try:
61
        body = response.json()
62
    except Exception:
63
        body = {}
64
    error = body.get("error") if isinstance(body, dict) else None
65
    ok = (
66
        response.status_code == expected_status
67
        and isinstance(error, dict)
68
        and error.get("code") == expected_code
69
        and isinstance(error.get("message"), str)
70
        and bool(error.get("message"))
71
    )
72
    check(name, ok, f"status={response.status_code} body={body}")
73
 
74
 
75
 
76
 
77
class Client:
78
    """Thin wrapper that presents the same get/post surface over a
79
    FastAPI TestClient or a live httpx.Client."""
80
 
81
    def __init__(self, impl, base_url: str = "", default_headers: Optional[dict[str, str]] = None):
82
        self._impl = impl
83
        self._base = base_url.rstrip("/")
84
        self._headers = default_headers or {}
85
 
86
    def _merge_headers(self, kwargs: dict[str, Any]) -> dict[str, Any]:
87
        if not self._headers:
88
            return kwargs
89
        merged = dict(self._headers)
90
        merged.update(kwargs.pop("headers", {}) or {})
91
        return {**kwargs, "headers": merged}
92
 
93
    def get(self, path: str, **kwargs):
94
        kwargs = self._merge_headers(kwargs)
95
        return self._impl.get(self._base + path, **kwargs) if self._base else self._impl.get(path, **kwargs)
96
 
97
    def post(self, path: str, **kwargs):
98
        kwargs = self._merge_headers(kwargs)
99
        return self._impl.post(self._base + path, **kwargs) if self._base else self._impl.post(path, **kwargs)
100
 
101
 
102
def operator_headers() -> dict[str, str]:
103
    token = os.environ.get("OVERSIGHT_OPERATOR_TOKEN", "").strip()
104
    return {"Authorization": f"Bearer {token}"} if token else {}
105
 
106
 
107
def build_in_process_client():
108
    """Spin up the reference registry in a fresh temp data dir."""
109
    from fastapi.testclient import TestClient
110
 
111
    tmp = tempfile.mkdtemp(prefix="oversight-conformance-")
112
    os.environ["OVERSIGHT_DATA_DIR"] = tmp
113
    os.environ.setdefault("OVERSIGHT_REKOR_ENABLED", "0")
114
    os.environ.setdefault("OVERSIGHT_AUTH_DISABLED", "1")
115
    os.environ["OVERSIGHT_DNS_EVENT_SECRET"] = "test-dns-secret-123"
116
 
117
    for mod in [m for m in list(sys.modules) if m.startswith("registry.")]:
118
        del sys.modules[mod]
119
 
120
    import registry.server as server
121
    server.DATA_DIR = Path(tmp)
122
    server.DB_PATH = Path(tmp) / "registry.sqlite"
123
    server.TLOG_DIR = Path(tmp) / "tlog"
124
    server.IDENTITY_PATH = Path(tmp) / "identity.json"
125
    server.DNS_EVENT_SECRET = "test-dns-secret-123"
126
    server.IDENTITY = server.load_or_create_identity()
127
    server.init_db()
128
    from oversight_core.tlog import TransparencyLog
129
    server.TLOG = TransparencyLog(server.TLOG_DIR, signing_key_hex=server.IDENTITY["ed25519_priv"])
130
 
131
    tc = TestClient(server.app)
132
    return Client(tc, default_headers=operator_headers()), tmp, server.IDENTITY["ed25519_pub"]
133
 
134
 
135
def build_live_client(url: str):
136
    import httpx
137
    return Client(httpx.Client(timeout=15.0), base_url=url, default_headers=operator_headers()), None, None
138
 
139
 
140
 
141
 
142
def build_signed_manifest() -> tuple[dict, list[dict], list[dict], bytes]:
143
    """Return (manifest_dict, beacons, watermarks, issuer_priv_raw)."""
144
    issuer_sk = Ed25519PrivateKey.generate()
145
    issuer_pub_hex = (
146
        issuer_sk.public_key()
147
        .public_bytes(
148
            encoding=serialization.Encoding.Raw,
149
            format=serialization.PublicFormat.Raw,
150
        )
151
        .hex()
152
    )
153
    issuer_priv_raw = issuer_sk.private_bytes(
154
        encoding=serialization.Encoding.Raw,
155
        format=serialization.PrivateFormat.Raw,
156
        encryption_algorithm=serialization.NoEncryption(),
157
    )
158
 
159
    recipient_x25519 = X25519PrivateKey.generate().public_key().public_bytes(
160
        encoding=serialization.Encoding.Raw,
161
        format=serialization.PublicFormat.Raw,
162
    ).hex()
163
 
164
    recipient = Recipient(
165
        recipient_id="conformance-recipient",
166
        x25519_pub=recipient_x25519,
167
    )
168
    beacons = [
169
        {"token_id": uuid.uuid4().hex, "kind": "dns"},
170
        {"token_id": uuid.uuid4().hex, "kind": "http"},
171
    ]
172
    watermarks = [
173
        WatermarkRef(layer="L1_zero_width", mark_id="10" * 16),
174
        WatermarkRef(layer="L2_whitespace", mark_id="20" * 16),
175
    ]
176
 
177
    m = Manifest.new(
178
        original_filename="conformance.txt",
179
        content_hash="ab" * 32,
180
        size_bytes=4096,
181
        issuer_id="conformance-issuer",
182
        issuer_ed25519_pub_hex=issuer_pub_hex,
183
        recipient=recipient,
184
        registry_url="https://registry.example.org",
185
    )
186
    m.beacons = list(beacons)
187
    m.watermarks = list(watermarks)
188
    m.sign(issuer_priv_raw)
189
 
190
    manifest_dict = json.loads(m.to_json().decode("utf-8"))
191
    sidecar_beacons = list(beacons)
192
    sidecar_watermarks = [asdict(w) for w in watermarks]
193
    return manifest_dict, sidecar_beacons, sidecar_watermarks, issuer_priv_raw
194
 
195
 
196
 
197
 
198
def check_health(cli: Client) -> None:
199
    r = cli.get("/health")
200
    check("health-200", r.status_code == 200, f"status={r.status_code}")
201
    body = r.json() if r.status_code == 200 else {}
202
    check("health-has-status", body.get("status") in {"ok", "degraded"},
203
          f"status={body.get('status')!r}")
204
    check("health-service-prefix",
205
          str(body.get("service", "")).startswith("oversight-registry"),
206
          f"service={body.get('service')!r}")
207
    check("health-tlog-size-int", isinstance(body.get("tlog_size"), int))
208
 
209
 
210
def check_well_known(cli: Client) -> None:
211
    r = cli.get("/.well-known/oversight-registry")
212
    check("well-known-200", r.status_code == 200, f"status={r.status_code}")
213
    body = r.json() if r.status_code == 200 else {}
214
    pub = body.get("ed25519_pub")
215
    check("well-known-ed25519-hex",
216
          isinstance(pub, str) and len(pub) == 64 and all(c in "0123456789abcdef" for c in pub.lower()),
217
          f"ed25519_pub={pub!r}")
218
    check("well-known-has-version", isinstance(body.get("version"), str))
219
 
220
 
221
def check_register_roundtrip(cli: Client, manifest: dict, beacons: list, watermarks: list) -> Optional[str]:
222
    body = {"manifest": manifest, "beacons": beacons, "watermarks": watermarks}
223
    r = cli.post("/register", json=body)
224
    check("register-200", r.status_code == 200, f"status={r.status_code} body={r.text[:200]}")
225
    if r.status_code != 200:
226
        return None
227
    out = r.json()
228
    check("register-ok-true", out.get("ok") is True)
229
    check("register-file-id-echo", out.get("file_id") == manifest["file_id"])
230
    check("register-count", out.get("registered_beacons") == len(beacons))
231
    check("register-tlog-index-int", isinstance(out.get("tlog_index"), int))
232
    return out.get("file_id")
233
 
234
 
235
def check_register_rejects_unsigned(cli: Client, manifest: dict, beacons: list, watermarks: list) -> None:
236
    tampered = dict(manifest)
237
    tampered["signature_ed25519"] = "00" * 64
238
    tampered["file_id"] = str(uuid.uuid4())
239
    r = cli.post("/register", json={"manifest": tampered, "beacons": beacons, "watermarks": watermarks})
240
    check("register-rejects-bad-sig", r.status_code == 400, f"status={r.status_code}")
241
    check_error_envelope("register-bad-sig-error-envelope", r, 400, "signature_invalid")
242
 
243
 
244
def check_register_rejects_sidecar_mismatch(cli: Client, manifest: dict, beacons: list, watermarks: list) -> None:
245
    bad = list(beacons) + [{"token_id": "sneaky", "kind": "dns"}]
246
    r = cli.post("/register", json={"manifest": manifest, "beacons": bad, "watermarks": watermarks})
247
    check("register-rejects-sidecar-mismatch", r.status_code == 400, f"status={r.status_code}")
248
    check_error_envelope("register-sidecar-error-envelope", r, 400, "sidecar_mismatch")
249
 
250
 
251
def check_attribute_by_token(cli: Client, beacons: list) -> None:
252
    r = cli.post("/attribute", json={"token_id": beacons[0]["token_id"]})
253
    check("attribute-200", r.status_code == 200, f"status={r.status_code}")
254
    body = r.json() if r.status_code == 200 else {}
255
    check("attribute-found", body.get("found") is True)
256
 
257
 
258
def check_attribute_miss(cli: Client) -> None:
259
    r = cli.post("/attribute", json={"token_id": "nonexistent-token-id"})
260
    check("attribute-miss-200", r.status_code == 200)
261
    check("attribute-miss-found-false", r.json().get("found") is False)
262
 
263
 
264
def check_attribute_missing_field_error(cli: Client) -> None:
265
    r = cli.post("/attribute", json={})
266
    check_error_envelope("attribute-missing-field-error-envelope", r, 400, "missing_field")
267
 
268
 
269
def check_evidence(cli: Client, file_id: str) -> None:
270
    r = cli.get(f"/evidence/{file_id}")
271
    check("evidence-200", r.status_code == 200, f"status={r.status_code}")
272
    body = r.json() if r.status_code == 200 else {}
273
    check("evidence-has-manifest", isinstance(body.get("manifest"), dict))
274
    check("evidence-has-events", isinstance(body.get("events"), list))
275
    check("evidence-has-beacons", isinstance(body.get("beacons"), list))
276
    check("evidence-has-watermarks", isinstance(body.get("watermarks"), list))
277
    check("evidence-has-registry-pub", isinstance(body.get("registry_pub"), str))
278
    check("evidence-has-tlog-head",
279
          "tlog_head" in body,
280
          f"keys={list(body)[:10]}")
281
    check("evidence-has-tlog-proofs",
282
          isinstance(body.get("tlog_proofs"), list))
283
    check("evidence-has-bundle-signature",
284
          isinstance(body.get("bundle_signature_ed25519"), str))
285
 
286
 
287
def check_evidence_missing_error(cli: Client) -> None:
288
    r = cli.get("/evidence/missing-file-id")
289
    check_error_envelope("evidence-missing-error-envelope", r, 404, "not_found")
290
 
291
 
292
def check_tlog_head(cli: Client) -> None:
293
    r = cli.get("/tlog/head")
294
    check("tlog-head-200", r.status_code == 200, f"status={r.status_code}")
295
 
296
 
297
def check_tlog_range(cli: Client) -> None:
298
    r = cli.get("/tlog/range?start=0&limit=10")
299
    body = r.json() if r.status_code == 200 else {}
300
    entries = body.get("entries")
301
    range_ok = (
302
        r.status_code == 200
303
        and isinstance(entries, list)
304
        and body.get("count") == len(entries)
305
        and all(
306
            isinstance(entry, dict)
307
            and isinstance(entry.get("index"), int)
308
            and isinstance(entry.get("leaf_hash"), str)
309
            and isinstance(entry.get("leaf_data"), str)
310
            for entry in entries
311
        )
312
    )
313
    check("tlog-range-200-shape", range_ok, f"status={r.status_code} body={body}")
314
 
315
 
316
def check_dns_event_requires_secret(cli: Client) -> None:
317
    token = "t-" + uuid.uuid4().hex
318
    r = cli.post(
319
        "/dns_event",
320
        json={"token_id": token, "client_ip": "198.51.100.8", "qtype": "A", "qname": "x.example"},
321
        headers={"X-Oversight-DNS-Secret": "wrong-secret"},
322
    )
323
    check(
324
        "dns-event-auth-enforced",
325
        r.status_code in (200, 401, 503),
326
        f"status={r.status_code}",
327
    )
328
 
329
 
330
def check_cors_headers(cli: Client) -> None:
331
    """A browser inspector hosted at an Oversight-approved origin must be able
332
    to read /health and /.well-known; confirm the CORS middleware is present."""
333
    origin = "https://oversight-protocol.github.io"
334
    try:
335
        r = cli.get("/health", headers={"Origin": origin})
336
    except TypeError:
337
        r = cli.get("/health")
338
    acao = r.headers.get("access-control-allow-origin") if hasattr(r, "headers") else None
339
    check(
340
        "cors-allows-github-pages-origin",
341
        acao in (origin, "*"),
342
        f"Access-Control-Allow-Origin={acao!r}",
343
    )
344
 
345
 
346
def check_beacon_endpoints(cli: Client, beacons: list) -> None:
347
    token = beacons[0]["token_id"]
348
    r = cli.get(f"/p/{token}.png")
349
    check("beacon-http-img-200", r.status_code == 200, f"status={r.status_code}")
350
    r = cli.get(f"/r/{token}")
351
    check("beacon-ocsp-200", r.status_code == 200, f"status={r.status_code}")
352
    r = cli.get(f"/v/{token}")
353
    check("beacon-license-200", r.status_code == 200, f"status={r.status_code}")
354
 
355
 
356
 
357
 
358
def run(cli: Client) -> None:
359
    print("[*] Oversight registry v1 conformance harness")
360
 
361
    print("\n[*] Identity and liveness")
362
    check_health(cli)
363
    check_well_known(cli)
364
 
365
    print("\n[*] Registration")
366
    manifest, beacons, watermarks, _ = build_signed_manifest()
367
    file_id = check_register_roundtrip(cli, manifest, beacons, watermarks)
368
    check_register_rejects_unsigned(cli, manifest, beacons, watermarks)
369
    check_register_rejects_sidecar_mismatch(cli, manifest, beacons, watermarks)
370
 
371
    if file_id:
372
        print("\n[*] Attribution and evidence")
373
        check_attribute_by_token(cli, beacons)
374
        check_attribute_miss(cli)
375
        check_attribute_missing_field_error(cli)
376
        check_evidence(cli, file_id)
377
        check_evidence_missing_error(cli)
378
 
379
        print("\n[*] Transparency log")
380
        check_tlog_head(cli)
381
        check_tlog_range(cli)
382
 
383
        print("\n[*] CORS")
384
        check_cors_headers(cli)
385
 
386
        print("\n[*] Beacons and DNS event")
387
        check_beacon_endpoints(cli, beacons)
388
        check_dns_event_requires_secret(cli)
389
 
390
    print()
391
    print(f"[summary] passed={len(PASSED)} failed={len(FAILED)}")
392
    if FAILED:
393
        for name, detail in FAILED:
394
            print(f"  -> {name}: {detail}")
395
        raise SystemExit(1)
396
    print("[ok] conformance harness green")
397
 
398
 
399
def main() -> None:
400
    url = os.environ.get("OVERSIGHT_REGISTRY_URL", "").strip()
401
    tmp = None
402
    try:
403
        if url:
404
            print(f"[*] target: live registry at {url}")
405
            cli, tmp, _ = build_live_client(url)
406
        else:
407
            print("[*] target: in-process reference registry")
408
            cli, tmp, _ = build_in_process_client()
409
        run(cli)
410
    finally:
411
        if tmp and os.path.isdir(tmp):
412
            shutil.rmtree(tmp, ignore_errors=True)
413
 
414
 
415
def test_registry_v1_conformance_harness() -> None:
416
    """Pytest entry point for the registry v1 conformance harness.
417
 
418
    The harness is intentionally a single pytest case: the checks share state
419
    (the registered file_id drives the subsequent attribution, evidence, tlog,
420
    and beacon checks) and the question under test is one yes/no question,
421
    "does this registry meet v1 conformance?" Per-check pass/fail is still
422
    printed to stdout so a CI log is a compact conformance report.
423
    """
424
    PASSED.clear()
425
    FAILED.clear()
426
    url = os.environ.get("OVERSIGHT_REGISTRY_URL", "").strip()
427
    tmp = None
428
    try:
429
        if url:
430
            cli, tmp, _ = build_live_client(url)
431
        else:
432
            cli, tmp, _ = build_in_process_client()
433
        run(cli)
434
    finally:
435
        if tmp and os.path.isdir(tmp):
436
            shutil.rmtree(tmp, ignore_errors=True)
437
    assert not FAILED, f"{len(FAILED)} conformance check(s) failed: " + ", ".join(
438
        name for name, _ in FAILED
439
    )
440
 
441
 
442
if __name__ == "__main__":
443
    main()