| 1 | """ |
| 2 | test_registry_unit |
| 3 | ================== |
| 4 | |
| 5 | Focused registry checks around Rekor attestation construction. |
| 6 | """ |
| 7 | from __future__ import annotations |
| 8 | |
| 9 | import base64 |
| 10 | import json |
| 11 | import os |
| 12 | import sys |
| 13 | from types import SimpleNamespace |
| 14 | |
| 15 | ROOT = os.path.join(os.path.dirname(__file__), "..") |
| 16 | sys.path.insert(0, ROOT) |
| 17 | |
| 18 | from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey |
| 19 | from cryptography.hazmat.primitives import serialization |
| 20 | |
| 21 | import registry.server as registry_server |
| 22 | from fastapi import HTTPException |
| 23 | from oversight_core.tlog import TransparencyLog |
| 24 | |
| 25 | |
| 26 | def _new_identity() -> dict: |
| 27 | sk = Ed25519PrivateKey.generate() |
| 28 | return { |
| 29 | "ed25519_priv": sk.private_bytes_raw().hex(), |
| 30 | "ed25519_pub": sk.public_key().public_bytes_raw().hex(), |
| 31 | } |
| 32 | |
| 33 | |
| 34 | def _fake_request(host: str, headers: dict[str, str] | None = None): |
| 35 | return SimpleNamespace( |
| 36 | client=SimpleNamespace(host=host), |
| 37 | headers=headers or {}, |
| 38 | ) |
| 39 | |
| 40 | |
| 41 | def test_rekor_attestation_uses_real_mark_id_and_digest(): |
| 42 | original_identity = registry_server.IDENTITY |
| 43 | original_enabled = registry_server.REKOR_ENABLED |
| 44 | original_upload = registry_server.rekor_mod.upload_dsse |
| 45 | registry_server.IDENTITY = _new_identity() |
| 46 | registry_server.REKOR_ENABLED = True |
| 47 | captured = {} |
| 48 | |
| 49 | def fake_upload(envelope, issuer_ed25519_pub_pem, log_url): |
| 50 | captured["statement"] = json.loads( |
| 51 | base64.b64decode(envelope.payload_b64).decode("utf-8") |
| 52 | ) |
| 53 | serialization.load_pem_public_key(issuer_ed25519_pub_pem.encode("ascii")) |
| 54 | return type( |
| 55 | "FakeResult", |
| 56 | (), |
| 57 | { |
| 58 | "log_url": log_url, |
| 59 | "log_index": 7, |
| 60 | "log_id": "rekor-log", |
| 61 | "integrated_time": 1776643200, |
| 62 | }, |
| 63 | )() |
| 64 | |
| 65 | registry_server.rekor_mod.upload_dsse = fake_upload |
| 66 | try: |
| 67 | result = registry_server._attest_to_rekor( |
| 68 | file_id="file-123", |
| 69 | issuer_pub_hex="aa" * 32, |
| 70 | recipient_id="recipient-1", |
| 71 | recipient_pubkey_hex="11" * 32, |
| 72 | suite="OSGT-CLASSIC-v1", |
| 73 | content_hash_sha256_hex="bb" * 32, |
| 74 | watermarks=[ |
| 75 | {"layer": "L1_zero_width", "mark_id": "10" * 16}, |
| 76 | {"layer": "L2_whitespace", "mark_id": "20" * 16}, |
| 77 | ], |
| 78 | mark_id_hex="10" * 16, |
| 79 | ) |
| 80 | finally: |
| 81 | registry_server.IDENTITY = original_identity |
| 82 | registry_server.REKOR_ENABLED = original_enabled |
| 83 | registry_server.rekor_mod.upload_dsse = original_upload |
| 84 | |
| 85 | statement = captured["statement"] |
| 86 | assert statement["subject"][0]["name"] == "mark:" + ("10" * 16) |
| 87 | assert statement["subject"][0]["digest"]["sha256"] == "bb" * 32 |
| 88 | assert statement["predicate"]["watermarks"] == { |
| 89 | "L1_zero_width": "10" * 16, |
| 90 | "L2_whitespace": "20" * 16, |
| 91 | } |
| 92 | assert result["log_index"] == 7 |
| 93 | |
| 94 | |
| 95 | def test_register_rejects_unsigned_sidecar_mismatch(): |
| 96 | manifest = { |
| 97 | "beacons": [ |
| 98 | {"token_id": "tok-1", "kind": "http_img", "url": "https://b.example/p/tok-1.png"}, |
| 99 | ], |
| 100 | "watermarks": [ |
| 101 | {"layer": "L1_zero_width", "mark_id": "10" * 16}, |
| 102 | ], |
| 103 | } |
| 104 | try: |
| 105 | registry_server._signed_registration_artifacts( |
| 106 | manifest, |
| 107 | req_beacons=[ |
| 108 | {"token_id": "tok-evil", "kind": "http_img", "url": "https://b.example/p/tok-evil.png"}, |
| 109 | ], |
| 110 | req_watermarks=manifest["watermarks"], |
| 111 | ) |
| 112 | except HTTPException as exc: |
| 113 | assert exc.status_code == 400 |
| 114 | assert "beacons do not match" in exc.detail |
| 115 | else: |
| 116 | raise AssertionError("unsigned request beacons should be rejected") |
| 117 | |
| 118 | try: |
| 119 | registry_server._signed_registration_artifacts( |
| 120 | manifest, |
| 121 | req_beacons=manifest["beacons"], |
| 122 | req_watermarks=[ |
| 123 | {"layer": "L2_whitespace", "mark_id": "20" * 16}, |
| 124 | ], |
| 125 | ) |
| 126 | except HTTPException as exc: |
| 127 | assert exc.status_code == 400 |
| 128 | assert "watermarks do not match" in exc.detail |
| 129 | else: |
| 130 | raise AssertionError("unsigned request watermarks should be rejected") |
| 131 | |
| 132 | |
| 133 | def test_dns_event_requires_secret_for_non_loopback(): |
| 134 | original_secret = registry_server.DNS_EVENT_SECRET |
| 135 | try: |
| 136 | registry_server.DNS_EVENT_SECRET = "" |
| 137 | registry_server._verify_dns_event_auth(_fake_request("127.0.0.1")) |
| 138 | try: |
| 139 | registry_server._verify_dns_event_auth(_fake_request("203.0.113.10")) |
| 140 | except HTTPException as exc: |
| 141 | assert exc.status_code == 503 |
| 142 | assert "OVERSIGHT_DNS_EVENT_SECRET" in exc.detail |
| 143 | else: |
| 144 | raise AssertionError("public DNS callbacks should fail closed without a secret") |
| 145 | |
| 146 | registry_server.DNS_EVENT_SECRET = "shared-secret" |
| 147 | registry_server._verify_dns_event_auth( |
| 148 | _fake_request("203.0.113.10", {"x-oversight-dns-secret": "shared-secret"}) |
| 149 | ) |
| 150 | try: |
| 151 | registry_server._verify_dns_event_auth( |
| 152 | _fake_request("203.0.113.10", {"x-oversight-dns-secret": "wrong"}) |
| 153 | ) |
| 154 | except HTTPException as exc: |
| 155 | assert exc.status_code == 401 |
| 156 | else: |
| 157 | raise AssertionError("wrong DNS callback secret should be rejected") |
| 158 | finally: |
| 159 | registry_server.DNS_EVENT_SECRET = original_secret |
| 160 | |
| 161 | |
| 162 | def test_evidence_bundle_can_attach_tlog_proofs(tmp_path): |
| 163 | original_tlog = registry_server.TLOG |
| 164 | try: |
| 165 | registry_server.TLOG = TransparencyLog(tmp_path) |
| 166 | first = registry_server.TLOG.append({"event": "register", "file_id": "f"}) |
| 167 | second = registry_server.TLOG.append({"event": "beacon", "file_id": "f"}) |
| 168 | proofs = registry_server._tlog_proofs_for_events([ |
| 169 | {"kind": "register", "tlog_index": first}, |
| 170 | {"kind": "beacon", "tlog_index": second}, |
| 171 | {"kind": "offline", "tlog_index": -1}, |
| 172 | ]) |
| 173 | finally: |
| 174 | registry_server.TLOG = original_tlog |
| 175 | |
| 176 | assert [p["event_row"] for p in proofs] == [0, 1] |
| 177 | assert [p["tlog_index"] for p in proofs] == [first, second] |
| 178 | assert all(p["proof"]["root"] for p in proofs) |
| 179 | |
| 180 | |
| 181 | def test_operator_token_gates_write_side_apis_when_configured(): |
| 182 | original_token = registry_server.OPERATOR_TOKEN |
| 183 | try: |
| 184 | registry_server.OPERATOR_TOKEN = "" |
| 185 | registry_server._require_operator_auth(_fake_request("203.0.113.10")) |
| 186 | |
| 187 | registry_server.OPERATOR_TOKEN = "operator-secret" |
| 188 | registry_server._require_operator_auth( |
| 189 | _fake_request("203.0.113.10", {"authorization": "Bearer operator-secret"}) |
| 190 | ) |
| 191 | registry_server._require_operator_auth( |
| 192 | _fake_request("203.0.113.10", {"x-oversight-operator-token": "operator-secret"}) |
| 193 | ) |
| 194 | try: |
| 195 | registry_server._require_operator_auth( |
| 196 | _fake_request("203.0.113.10", {"authorization": "Bearer wrong"}) |
| 197 | ) |
| 198 | except HTTPException as exc: |
| 199 | assert exc.status_code == 401 |
| 200 | else: |
| 201 | raise AssertionError("wrong operator token should be rejected") |
| 202 | finally: |
| 203 | registry_server.OPERATOR_TOKEN = original_token |
| 204 | |
| 205 | |
| 206 | def test_tlog_range_fails_closed_on_corrupt_leaf(tmp_path): |
| 207 | original_tlog = registry_server.TLOG |
| 208 | try: |
| 209 | registry_server.TLOG = TransparencyLog(tmp_path) |
| 210 | registry_server.TLOG.append({"event": "register", "file_id": "f"}) |
| 211 | out = registry_server.tlog_range(start=0, limit=1) |
| 212 | assert out["count"] == 1 |
| 213 | assert out["entries"][0]["index"] == 0 |
| 214 | |
| 215 | (tmp_path / "leaves.jsonl").write_text("{not-json}\n", encoding="utf-8") |
| 216 | try: |
| 217 | registry_server.tlog_range(start=0, limit=1) |
| 218 | except HTTPException as exc: |
| 219 | assert exc.status_code == 500 |
| 220 | assert "tlog range validation failed" in exc.detail |
| 221 | else: |
| 222 | raise AssertionError("corrupt tlog range should fail closed") |
| 223 | finally: |
| 224 | registry_server.TLOG = original_tlog |