Zion Boggan
repos/Oversight/tests/test_registry_unit.py
zionboggan.com ↗
224 lines · python
History for this file →
1
"""
2
test_registry_unit
3
==================
4
 
5
Focused registry checks around Rekor attestation construction.
6
"""
7
from __future__ import annotations
8
 
9
import base64
10
import json
11
import os
12
import sys
13
from types import SimpleNamespace
14
 
15
ROOT = os.path.join(os.path.dirname(__file__), "..")
16
sys.path.insert(0, ROOT)
17
 
18
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
19
from cryptography.hazmat.primitives import serialization
20
 
21
import registry.server as registry_server
22
from fastapi import HTTPException
23
from oversight_core.tlog import TransparencyLog
24
 
25
 
26
def _new_identity() -> dict:
27
    sk = Ed25519PrivateKey.generate()
28
    return {
29
        "ed25519_priv": sk.private_bytes_raw().hex(),
30
        "ed25519_pub": sk.public_key().public_bytes_raw().hex(),
31
    }
32
 
33
 
34
def _fake_request(host: str, headers: dict[str, str] | None = None):
35
    return SimpleNamespace(
36
        client=SimpleNamespace(host=host),
37
        headers=headers or {},
38
    )
39
 
40
 
41
def test_rekor_attestation_uses_real_mark_id_and_digest():
42
    original_identity = registry_server.IDENTITY
43
    original_enabled = registry_server.REKOR_ENABLED
44
    original_upload = registry_server.rekor_mod.upload_dsse
45
    registry_server.IDENTITY = _new_identity()
46
    registry_server.REKOR_ENABLED = True
47
    captured = {}
48
 
49
    def fake_upload(envelope, issuer_ed25519_pub_pem, log_url):
50
        captured["statement"] = json.loads(
51
            base64.b64decode(envelope.payload_b64).decode("utf-8")
52
        )
53
        serialization.load_pem_public_key(issuer_ed25519_pub_pem.encode("ascii"))
54
        return type(
55
            "FakeResult",
56
            (),
57
            {
58
                "log_url": log_url,
59
                "log_index": 7,
60
                "log_id": "rekor-log",
61
                "integrated_time": 1776643200,
62
            },
63
        )()
64
 
65
    registry_server.rekor_mod.upload_dsse = fake_upload
66
    try:
67
        result = registry_server._attest_to_rekor(
68
            file_id="file-123",
69
            issuer_pub_hex="aa" * 32,
70
            recipient_id="recipient-1",
71
            recipient_pubkey_hex="11" * 32,
72
            suite="OSGT-CLASSIC-v1",
73
            content_hash_sha256_hex="bb" * 32,
74
            watermarks=[
75
                {"layer": "L1_zero_width", "mark_id": "10" * 16},
76
                {"layer": "L2_whitespace", "mark_id": "20" * 16},
77
            ],
78
            mark_id_hex="10" * 16,
79
        )
80
    finally:
81
        registry_server.IDENTITY = original_identity
82
        registry_server.REKOR_ENABLED = original_enabled
83
        registry_server.rekor_mod.upload_dsse = original_upload
84
 
85
    statement = captured["statement"]
86
    assert statement["subject"][0]["name"] == "mark:" + ("10" * 16)
87
    assert statement["subject"][0]["digest"]["sha256"] == "bb" * 32
88
    assert statement["predicate"]["watermarks"] == {
89
        "L1_zero_width": "10" * 16,
90
        "L2_whitespace": "20" * 16,
91
    }
92
    assert result["log_index"] == 7
93
 
94
 
95
def test_register_rejects_unsigned_sidecar_mismatch():
96
    manifest = {
97
        "beacons": [
98
            {"token_id": "tok-1", "kind": "http_img", "url": "https://b.example/p/tok-1.png"},
99
        ],
100
        "watermarks": [
101
            {"layer": "L1_zero_width", "mark_id": "10" * 16},
102
        ],
103
    }
104
    try:
105
        registry_server._signed_registration_artifacts(
106
            manifest,
107
            req_beacons=[
108
                {"token_id": "tok-evil", "kind": "http_img", "url": "https://b.example/p/tok-evil.png"},
109
            ],
110
            req_watermarks=manifest["watermarks"],
111
        )
112
    except HTTPException as exc:
113
        assert exc.status_code == 400
114
        assert "beacons do not match" in exc.detail
115
    else:
116
        raise AssertionError("unsigned request beacons should be rejected")
117
 
118
    try:
119
        registry_server._signed_registration_artifacts(
120
            manifest,
121
            req_beacons=manifest["beacons"],
122
            req_watermarks=[
123
                {"layer": "L2_whitespace", "mark_id": "20" * 16},
124
            ],
125
        )
126
    except HTTPException as exc:
127
        assert exc.status_code == 400
128
        assert "watermarks do not match" in exc.detail
129
    else:
130
        raise AssertionError("unsigned request watermarks should be rejected")
131
 
132
 
133
def test_dns_event_requires_secret_for_non_loopback():
134
    original_secret = registry_server.DNS_EVENT_SECRET
135
    try:
136
        registry_server.DNS_EVENT_SECRET = ""
137
        registry_server._verify_dns_event_auth(_fake_request("127.0.0.1"))
138
        try:
139
            registry_server._verify_dns_event_auth(_fake_request("203.0.113.10"))
140
        except HTTPException as exc:
141
            assert exc.status_code == 503
142
            assert "OVERSIGHT_DNS_EVENT_SECRET" in exc.detail
143
        else:
144
            raise AssertionError("public DNS callbacks should fail closed without a secret")
145
 
146
        registry_server.DNS_EVENT_SECRET = "shared-secret"
147
        registry_server._verify_dns_event_auth(
148
            _fake_request("203.0.113.10", {"x-oversight-dns-secret": "shared-secret"})
149
        )
150
        try:
151
            registry_server._verify_dns_event_auth(
152
                _fake_request("203.0.113.10", {"x-oversight-dns-secret": "wrong"})
153
            )
154
        except HTTPException as exc:
155
            assert exc.status_code == 401
156
        else:
157
            raise AssertionError("wrong DNS callback secret should be rejected")
158
    finally:
159
        registry_server.DNS_EVENT_SECRET = original_secret
160
 
161
 
162
def test_evidence_bundle_can_attach_tlog_proofs(tmp_path):
163
    original_tlog = registry_server.TLOG
164
    try:
165
        registry_server.TLOG = TransparencyLog(tmp_path)
166
        first = registry_server.TLOG.append({"event": "register", "file_id": "f"})
167
        second = registry_server.TLOG.append({"event": "beacon", "file_id": "f"})
168
        proofs = registry_server._tlog_proofs_for_events([
169
            {"kind": "register", "tlog_index": first},
170
            {"kind": "beacon", "tlog_index": second},
171
            {"kind": "offline", "tlog_index": -1},
172
        ])
173
    finally:
174
        registry_server.TLOG = original_tlog
175
 
176
    assert [p["event_row"] for p in proofs] == [0, 1]
177
    assert [p["tlog_index"] for p in proofs] == [first, second]
178
    assert all(p["proof"]["root"] for p in proofs)
179
 
180
 
181
def test_operator_token_gates_write_side_apis_when_configured():
182
    original_token = registry_server.OPERATOR_TOKEN
183
    try:
184
        registry_server.OPERATOR_TOKEN = ""
185
        registry_server._require_operator_auth(_fake_request("203.0.113.10"))
186
 
187
        registry_server.OPERATOR_TOKEN = "operator-secret"
188
        registry_server._require_operator_auth(
189
            _fake_request("203.0.113.10", {"authorization": "Bearer operator-secret"})
190
        )
191
        registry_server._require_operator_auth(
192
            _fake_request("203.0.113.10", {"x-oversight-operator-token": "operator-secret"})
193
        )
194
        try:
195
            registry_server._require_operator_auth(
196
                _fake_request("203.0.113.10", {"authorization": "Bearer wrong"})
197
            )
198
        except HTTPException as exc:
199
            assert exc.status_code == 401
200
        else:
201
            raise AssertionError("wrong operator token should be rejected")
202
    finally:
203
        registry_server.OPERATOR_TOKEN = original_token
204
 
205
 
206
def test_tlog_range_fails_closed_on_corrupt_leaf(tmp_path):
207
    original_tlog = registry_server.TLOG
208
    try:
209
        registry_server.TLOG = TransparencyLog(tmp_path)
210
        registry_server.TLOG.append({"event": "register", "file_id": "f"})
211
        out = registry_server.tlog_range(start=0, limit=1)
212
        assert out["count"] == 1
213
        assert out["entries"][0]["index"] == 0
214
 
215
        (tmp_path / "leaves.jsonl").write_text("{not-json}\n", encoding="utf-8")
216
        try:
217
            registry_server.tlog_range(start=0, limit=1)
218
        except HTTPException as exc:
219
            assert exc.status_code == 500
220
            assert "tlog range validation failed" in exc.detail
221
        else:
222
            raise AssertionError("corrupt tlog range should fail closed")
223
    finally:
224
        registry_server.TLOG = original_tlog