Zion Boggan
repos/Oversight/tests/test_rekor_e2e.py
zionboggan.com ↗
123 lines · python
History for this file →
1
"""
2
test_rekor_e2e
3
==============
4
 
5
Live end-to-end test against a public Sigstore Rekor v2 log.
6
 
7
This test makes real network calls and writes a real (immutable) entry to
8
the public log shard. It is therefore gated behind the OVERSIGHT_REKOR_E2E=1
9
environment variable so routine test runs do not append to the public log.
10
 
11
Run with:
12
    OVERSIGHT_REKOR_E2E=1 pytest tests/test_rekor_e2e.py
13
 
14
What is verified:
15
  1. A DSSE-wrapped Oversight registration predicate uploads successfully.
16
  2. The log returns a JSON response carrying a logIndex (or equivalent
17
     under the v2 field naming).
18
  3. The DSSE envelope verifies under the issuer pubkey AFTER the upload -
19
     i.e., the round-trip did not mutate signature-bearing bytes.
20
  4. The on-log predicate carries recipient_pubkey_sha256, never the raw
21
     X25519 public key (privacy invariant).
22
 
23
Skipped automatically when OVERSIGHT_REKOR_E2E is unset or 0.
24
"""
25
from __future__ import annotations
26
 
27
import base64
28
import os
29
import sys
30
 
31
import pytest
32
 
33
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
34
 
35
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
36
from cryptography.hazmat.primitives import serialization
37
 
38
from oversight_core import rekor as R
39
 
40
 
41
GATE = os.environ.get("OVERSIGHT_REKOR_E2E", "0") == "1"
42
LOG_URL = os.environ.get("OVERSIGHT_REKOR_URL", R.DEFAULT_REKOR_URL)
43
 
44
pytestmark = pytest.mark.skipif(
45
    not GATE,
46
    reason="Set OVERSIGHT_REKOR_E2E=1 to run; this writes to the public Sigstore log",
47
)
48
 
49
 
50
def _new_keypair() -> tuple[bytes, bytes, str]:
51
    sk = Ed25519PrivateKey.generate()
52
    priv_raw = sk.private_bytes_raw()
53
    pub_raw = sk.public_key().public_bytes_raw()
54
    pub_pem = sk.public_key().public_bytes(
55
        encoding=serialization.Encoding.PEM,
56
        format=serialization.PublicFormat.SubjectPublicKeyInfo,
57
    ).decode("ascii")
58
    return priv_raw, pub_raw, pub_pem
59
 
60
 
61
def test_live_upload_round_trip():
62
    priv_raw, pub_raw, pub_pem = _new_keypair()
63
 
64
    fake_x25519 = b"\x42" * 32
65
    recipient_hash = R.hash_recipient_pubkey(fake_x25519.hex())
66
 
67
    predicate = R.OversightRegistrationPredicate(
68
        file_id="e2e-test-" + base64.b16encode(os.urandom(8)).decode().lower(),
69
        issuer_pubkey_ed25519=pub_raw.hex(),
70
        recipient_id="opaque-recipient-id-1",
71
        recipient_pubkey_sha256=recipient_hash,
72
        suite="classic",
73
        registered_at="2026-04-19T00:00:00Z",
74
    )
75
    statement = R.build_statement(
76
        mark_id_hex=predicate.file_id,
77
        content_hash_sha256_hex="ab" * 32,
78
        predicate=predicate,
79
    )
80
    envelope = R.sign_dsse(statement=statement, issuer_ed25519_priv=priv_raw)
81
 
82
    assert R.verify_dsse(envelope, pub_raw), "local DSSE verify failed before upload"
83
 
84
    result = R.upload_dsse(envelope=envelope, issuer_ed25519_pub_pem=pub_pem, log_url=LOG_URL)
85
 
86
    assert result.transparency_log_entry, "rekor returned empty body"
87
 
88
    assert R.verify_dsse(envelope, pub_raw), "DSSE verify failed AFTER upload (envelope mutated?)"
89
 
90
    on_log_payload = base64.b64decode(envelope.payload_b64)
91
    assert fake_x25519.hex() not in on_log_payload.decode("utf-8", errors="ignore"), (
92
        "raw recipient X25519 pubkey leaked into on-log payload"
93
    )
94
 
95
 
96
def test_response_carries_inclusion_data():
97
    """The bundled response must give a verifier enough to verify offline.
98
 
99
    Per the v0.5 plan: the write response is the only place we get an
100
    inclusion proof; there is no online proof-by-index API.
101
    """
102
    priv_raw, pub_raw, pub_pem = _new_keypair()
103
    predicate = R.OversightRegistrationPredicate(
104
        file_id="e2e-incl-" + base64.b16encode(os.urandom(8)).decode().lower(),
105
        issuer_pubkey_ed25519=pub_raw.hex(),
106
        recipient_id="opaque-recipient-id-2",
107
        recipient_pubkey_sha256="0" * 64,
108
        suite="classic",
109
        registered_at="2026-04-19T00:00:00Z",
110
    )
111
    statement = R.build_statement(
112
        mark_id_hex=predicate.file_id,
113
        content_hash_sha256_hex="cd" * 32,
114
        predicate=predicate,
115
    )
116
    envelope = R.sign_dsse(statement=statement, issuer_ed25519_priv=priv_raw)
117
    result = R.upload_dsse(envelope=envelope, issuer_ed25519_pub_pem=pub_pem, log_url=LOG_URL)
118
 
119
    body = result.transparency_log_entry
120
    assert isinstance(body, dict) and body, "rekor body not a non-empty dict"
121
    has_idx = result.log_index is not None
122
    has_proof = any(k in body for k in ("inclusionProof", "inclusion_proof", "logEntry"))
123
    assert has_idx or has_proof, f"response missing index AND proof shape: keys={list(body.keys())}"