| 1 | """ |
| 2 | test_rekor_unit |
| 3 | =============== |
| 4 | |
| 5 | Offline unit tests for oversight_core.rekor. |
| 6 | |
| 7 | Covers (no network): |
| 8 | 1. DSSE PAE construction matches the spec byte-for-byte against a fixture. |
| 9 | 2. sign_dsse + verify_dsse round trip. |
| 10 | 3. verify_dsse rejects a tampered payload. |
| 11 | 4. verify_dsse rejects a wrong-key signature. |
| 12 | 5. build_statement produces the expected in-toto v1 shape. |
| 13 | 6. Envelope JSON serialization is canonical (JCS; no whitespace). |
| 14 | 7. verify_inclusion_offline returns False when transparency_log_entry is empty. |
| 15 | 8. verify_inclusion_offline rejects mismatched subject digests. |
| 16 | |
| 17 | Running this requires no external services; e2e Rekor tests live in |
| 18 | test_rekor_e2e.py (added in v0.5 Session B). |
| 19 | """ |
| 20 | from __future__ import annotations |
| 21 | |
| 22 | import base64 |
| 23 | import os |
| 24 | import sys |
| 25 | |
| 26 | sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) |
| 27 | |
| 28 | from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey |
| 29 | |
| 30 | from oversight_core import rekor as R |
| 31 | |
| 32 | |
| 33 | def _new_keypair() -> tuple[bytes, bytes]: |
| 34 | sk = Ed25519PrivateKey.generate() |
| 35 | return ( |
| 36 | sk.private_bytes_raw(), |
| 37 | sk.public_key().public_bytes_raw(), |
| 38 | ) |
| 39 | |
| 40 | |
| 41 | def test_pae_byte_exact(): |
| 42 | pae = R._pae("application/vnd.in-toto+json", b'{"a":1}') |
| 43 | expect = b"DSSEv1 28 application/vnd.in-toto+json 7 " + b'{"a":1}' |
| 44 | assert pae == expect, f"PAE mismatch:\n got {pae!r}\n expect {expect!r}" |
| 45 | |
| 46 | |
| 47 | def test_sign_verify_roundtrip(): |
| 48 | priv, pub = _new_keypair() |
| 49 | pred = R.OversightRegistrationPredicate( |
| 50 | file_id="00000000-0000-4000-8000-000000000001", |
| 51 | issuer_pubkey_ed25519=pub.hex(), |
| 52 | recipient_id="alice@test", |
| 53 | recipient_pubkey_sha256="00" * 32, |
| 54 | suite="OSGT-CLASSIC-v1", |
| 55 | registered_at="2026-04-19T07:00:00Z", |
| 56 | ) |
| 57 | stmt = R.build_statement("aa" * 16, "bb" * 32, pred) |
| 58 | env = R.sign_dsse(stmt, priv) |
| 59 | assert R.verify_dsse(env, pub), "valid envelope failed verification" |
| 60 | |
| 61 | |
| 62 | def test_tamper_payload_rejected(): |
| 63 | priv, pub = _new_keypair() |
| 64 | pred = R.OversightRegistrationPredicate( |
| 65 | file_id="x", |
| 66 | issuer_pubkey_ed25519=pub.hex(), |
| 67 | recipient_id="r", |
| 68 | recipient_pubkey_sha256="00" * 32, |
| 69 | suite="s", |
| 70 | registered_at="t", |
| 71 | ) |
| 72 | env = R.sign_dsse(R.build_statement("a", "b", pred), priv) |
| 73 | tampered = R.DSSEEnvelope( |
| 74 | payload_b64=base64.b64encode(b'{"evil":1}').decode(), |
| 75 | payload_type=env.payload_type, |
| 76 | signatures=env.signatures, |
| 77 | ) |
| 78 | assert not R.verify_dsse(tampered, pub), "tampered payload accepted!" |
| 79 | |
| 80 | |
| 81 | def test_wrong_key_rejected(): |
| 82 | priv, _ = _new_keypair() |
| 83 | _, other_pub = _new_keypair() |
| 84 | pred = R.OversightRegistrationPredicate( |
| 85 | file_id="x", |
| 86 | issuer_pubkey_ed25519="zz", |
| 87 | recipient_id="r", |
| 88 | recipient_pubkey_sha256="00" * 32, |
| 89 | suite="s", |
| 90 | registered_at="t", |
| 91 | ) |
| 92 | env = R.sign_dsse(R.build_statement("a", "b", pred), priv) |
| 93 | assert not R.verify_dsse(env, other_pub), "wrong-key sig verified!" |
| 94 | |
| 95 | |
| 96 | def test_statement_shape(): |
| 97 | pred = R.OversightRegistrationPredicate( |
| 98 | file_id="fid", |
| 99 | issuer_pubkey_ed25519="pp", |
| 100 | recipient_id="rid", |
| 101 | recipient_pubkey_sha256="rxhash", |
| 102 | suite="OSGT-CLASSIC-v1", |
| 103 | registered_at="2026-04-19T00:00:00Z", |
| 104 | ) |
| 105 | s = R.build_statement("mark1234", "deadbeef" * 8, pred) |
| 106 | assert s["_type"] == R.STATEMENT_TYPE |
| 107 | assert s["predicateType"] == R.PREDICATE_TYPE |
| 108 | assert s["subject"][0]["name"] == "mark:mark1234" |
| 109 | assert s["subject"][0]["digest"]["sha256"].startswith("deadbeef") |
| 110 | assert s["predicate"]["suite"] == "OSGT-CLASSIC-v1" |
| 111 | |
| 112 | |
| 113 | def test_canonical_envelope_json(): |
| 114 | priv, _ = _new_keypair() |
| 115 | pred = R.OversightRegistrationPredicate( |
| 116 | file_id="x", |
| 117 | issuer_pubkey_ed25519="pp", |
| 118 | recipient_id="r", |
| 119 | recipient_pubkey_sha256="00" * 32, |
| 120 | suite="s", |
| 121 | registered_at="t", |
| 122 | ) |
| 123 | env = R.sign_dsse(R.build_statement("a", "b" * 32, pred), priv) |
| 124 | raw = env.to_json() |
| 125 | again = R.DSSEEnvelope.from_json(raw).to_json() |
| 126 | assert raw == again, "envelope JSON not canonical (round-trip differs)" |
| 127 | assert " " not in raw and "\n" not in raw, "envelope JSON has whitespace" |
| 128 | |
| 129 | |
| 130 | def test_offline_verify_rejects_empty_tle(): |
| 131 | priv, pub = _new_keypair() |
| 132 | pred = R.OversightRegistrationPredicate( |
| 133 | file_id="x", |
| 134 | issuer_pubkey_ed25519="pp", |
| 135 | recipient_id="r", |
| 136 | recipient_pubkey_sha256="00" * 32, |
| 137 | suite="s", |
| 138 | registered_at="t", |
| 139 | ) |
| 140 | env = R.sign_dsse(R.build_statement("a", "b" * 32, pred), priv) |
| 141 | ok, reason = R.verify_inclusion_offline({}, env, pub, "b" * 32) |
| 142 | assert not ok and "transparency_log_entry" in reason, reason |
| 143 | |
| 144 | |
| 145 | def test_recipient_pubkey_never_appears_raw(): |
| 146 | """Privacy: raw X25519 recipient key must never end up in the on-log payload.""" |
| 147 | priv, _ = _new_keypair() |
| 148 | raw_pub_hex = "11" * 32 |
| 149 | pred = R.OversightRegistrationPredicate( |
| 150 | file_id="x", |
| 151 | issuer_pubkey_ed25519="pp", |
| 152 | recipient_id="r", |
| 153 | recipient_pubkey_sha256=R.hash_recipient_pubkey(raw_pub_hex), |
| 154 | suite="s", |
| 155 | registered_at="t", |
| 156 | ) |
| 157 | stmt = R.build_statement("a", "b" * 32, pred) |
| 158 | env = R.sign_dsse(stmt, priv) |
| 159 | raw_payload = base64.b64decode(env.payload_b64).decode() |
| 160 | assert raw_pub_hex not in raw_payload, "RAW recipient pubkey leaked into on-log payload" |
| 161 | assert pred.recipient_pubkey_sha256 in raw_payload |
| 162 | assert pred.recipient_pubkey_sha256 != raw_pub_hex |
| 163 | |
| 164 | |
| 165 | def test_predicate_carries_version_int(): |
| 166 | pred = R.OversightRegistrationPredicate( |
| 167 | file_id="x", |
| 168 | issuer_pubkey_ed25519="pp", |
| 169 | recipient_id="r", |
| 170 | recipient_pubkey_sha256="00" * 32, |
| 171 | suite="s", |
| 172 | registered_at="t", |
| 173 | ) |
| 174 | d = pred.to_dict() |
| 175 | assert d.get("predicate_version") == 1, d |
| 176 | |
| 177 | |
| 178 | def test_bundle_has_5year_replay_fields(): |
| 179 | """Bundle must carry log_pubkey, checkpoint, schema URI, schema int.""" |
| 180 | priv, _ = _new_keypair() |
| 181 | pred = R.OversightRegistrationPredicate( |
| 182 | file_id="x", |
| 183 | issuer_pubkey_ed25519="pp", |
| 184 | recipient_id="r", |
| 185 | recipient_pubkey_sha256="00" * 32, |
| 186 | suite="s", |
| 187 | registered_at="t", |
| 188 | ) |
| 189 | env = R.sign_dsse(R.build_statement("a", "b" * 32, pred), priv) |
| 190 | upload = R.RekorUploadResult( |
| 191 | log_url="https://log2025-1.rekor.sigstore.dev", |
| 192 | log_index=42, |
| 193 | log_id="abc", |
| 194 | integrated_time=1776600000, |
| 195 | transparency_log_entry={"logEntry": "..."}, |
| 196 | log_pubkey_pem="-----BEGIN PUBLIC KEY-----\nFAKE\n-----END PUBLIC KEY-----", |
| 197 | checkpoint="rekor.sigstore.dev\n42\nABC=\n-- rekor sig...", |
| 198 | ) |
| 199 | bundle = R.build_bundle( |
| 200 | manifest_dict={"file_id": "x"}, |
| 201 | manifest_sig_hex="aa" * 64, |
| 202 | upload=upload, |
| 203 | dsse_envelope=env, |
| 204 | rfc3161_token_b64="dummy", |
| 205 | rfc3161_chain_b64="chainpem", |
| 206 | ) |
| 207 | assert bundle["bundle_schema"] == 2 |
| 208 | assert bundle["tlog_kind"] == "rekor-v2-dsse" |
| 209 | rekor = bundle["rekor"] |
| 210 | assert rekor["log_pubkey_pem"], "log_pubkey missing" |
| 211 | assert rekor["checkpoint"], "checkpoint missing" |
| 212 | assert rekor["log_entry_schema"] == "rekor/v1.TransparencyLogEntry" |
| 213 | assert bundle["rfc3161_chain"] == "chainpem" |
| 214 | |
| 215 | |
| 216 | def test_offline_verify_rejects_digest_mismatch(): |
| 217 | priv, pub = _new_keypair() |
| 218 | pred = R.OversightRegistrationPredicate( |
| 219 | file_id="x", |
| 220 | issuer_pubkey_ed25519="pp", |
| 221 | recipient_id="r", |
| 222 | recipient_pubkey_sha256="00" * 32, |
| 223 | suite="s", |
| 224 | registered_at="t", |
| 225 | ) |
| 226 | env = R.sign_dsse(R.build_statement("a", "b" * 32, pred), priv) |
| 227 | ok, reason = R.verify_inclusion_offline( |
| 228 | {"transparency_log_entry": {"logEntry": {"kindVersion": {"kind": "dsse"}}}}, |
| 229 | env, |
| 230 | pub, |
| 231 | "c" * 32, |
| 232 | ) |
| 233 | assert not ok and "subject digest" in reason, reason |