Zion Boggan
repos/Oversight/oversight_core/rekor.py
zionboggan.com ↗
425 lines · python
History for this file →
1
"""
2
oversight_core.rekor
3
====================
4
 
5
Sigstore Rekor v2 integration (v0.5).
6
 
7
Builds DSSE envelopes wrapping in-toto Statements that describe Oversight
8
mark registrations, uploads them to a Rekor v2 log, and verifies inclusion
9
proofs returned by the log.
10
 
11
Key facts (verified 2026-04-19 against current upstream):
12
  * Rekor v2 GA'd 2025-10-10 (tile-backed transparency log).
13
  * Only entry types accepted: ``hashedrekord`` and ``dsse``.
14
  * Single write endpoint: ``POST {log_url}/api/v2/log/entries``.
15
  * Inclusion proofs are returned in the write response. There is no online
16
    proof-by-index API; verifiers compute proofs from tiles when they need to
17
    re-derive one.
18
  * Public log URL pattern: ``https://logYEAR-N.rekor.sigstore.dev``. Shards
19
    rotate roughly every 6 months. Never hardcode beyond a default.
20
 
21
This module deliberately does NOT depend on ``sigstore-python`` so the issuer's
22
runtime dependency footprint stays small. Auditors verify with stock
23
``sigstore-python`` via :mod:`oversight_core.auditor_helper` (separate file).
24
"""
25
from __future__ import annotations
26
 
27
import base64
28
import json
29
import time
30
import urllib.error
31
import urllib.request
32
from dataclasses import dataclass, field
33
from typing import Any, Optional
34
 
35
from oversight_core.jcs import jcs_dumps
36
 
37
from cryptography.hazmat.primitives.asymmetric.ed25519 import (
38
    Ed25519PrivateKey,
39
    Ed25519PublicKey,
40
)
41
from cryptography.exceptions import InvalidSignature
42
 
43
 
44
 
45
DSSE_PAYLOAD_TYPE = "application/vnd.in-toto+json"
46
STATEMENT_TYPE = "https://in-toto.io/Statement/v1"
47
PREDICATE_TYPE = (
48
    "https://github.com/oversight-protocol/oversight/blob/v0.5.0/"
49
    "docs/predicates/registration-v1.md"
50
)
51
PREDICATE_VERSION = 1
52
 
53
DEFAULT_REKOR_URL = "https://log2025-1.rekor.sigstore.dev"
54
TLOG_KIND = "rekor-v2-dsse"
55
LEGACY_TLOG_KIND = "oversight-self-merkle-v1"
56
BUNDLE_SCHEMA = 2
57
 
58
REKOR_WRITE_TIMEOUT_SEC = 25
59
 
60
 
61
 
62
 
63
@dataclass
64
class OversightRegistrationPredicate:
65
    """Predicate body for an Oversight mark registration.
66
 
67
    Privacy: the on-log predicate carries a SHA-256 hash of the recipient
68
    public key, never the raw key. The raw key stays in the local ``.sealed``
69
    bundle. This prevents anyone watching the public log from enumerating
70
    recipients by pubkey or correlating multiple marks to the same recipient
71
    across issuers. ``recipient_id`` is also expected to be an opaque hash
72
    or UUID, not an email; if a caller passes raw PII the predicate accepts
73
    it but logs a warning at construction.
74
    """
75
 
76
    file_id: str
77
    issuer_pubkey_ed25519: str
78
    recipient_id: str
79
    recipient_pubkey_sha256: str
80
    suite: str
81
    registered_at: str
82
    rfc3161_tsa: Optional[str] = None
83
    rfc3161_token_b64: Optional[str] = None
84
    rfc3161_chain_b64: Optional[str] = None
85
    policy: dict = field(default_factory=dict)
86
    watermarks: dict = field(default_factory=dict)
87
 
88
    def to_dict(self) -> dict:
89
        d = {
90
            "predicate_version": PREDICATE_VERSION,
91
            "file_id": self.file_id,
92
            "issuer_pubkey_ed25519": self.issuer_pubkey_ed25519,
93
            "recipient_id": self.recipient_id,
94
            "recipient_pubkey_sha256": self.recipient_pubkey_sha256,
95
            "suite": self.suite,
96
            "registered_at": self.registered_at,
97
            "policy": self.policy,
98
            "watermarks": self.watermarks,
99
        }
100
        if self.rfc3161_tsa:
101
            d["rfc3161_tsa"] = self.rfc3161_tsa
102
        if self.rfc3161_token_b64:
103
            d["rfc3161_token_b64"] = self.rfc3161_token_b64
104
        if self.rfc3161_chain_b64:
105
            d["rfc3161_chain_b64"] = self.rfc3161_chain_b64
106
        return d
107
 
108
 
109
def hash_recipient_pubkey(x25519_pub_hex: str) -> str:
110
    """Convenience: compute the recipient_pubkey_sha256 from a hex X25519 key.
111
 
112
    Issuers should call this rather than passing the raw pubkey into the
113
    predicate constructor, to avoid accidentally publishing it to Rekor.
114
    """
115
    import hashlib
116
    raw = bytes.fromhex(x25519_pub_hex)
117
    return hashlib.sha256(raw).hexdigest()
118
 
119
 
120
@dataclass
121
class DSSEEnvelope:
122
    payload_b64: str
123
    payload_type: str
124
    signatures: list[dict]
125
 
126
    def to_json(self) -> str:
127
        return jcs_dumps(
128
            {
129
                "payload": self.payload_b64,
130
                "payloadType": self.payload_type,
131
                "signatures": self.signatures,
132
            }
133
        ).decode("utf-8")
134
 
135
    @classmethod
136
    def from_json(cls, raw: str) -> "DSSEEnvelope":
137
        d = json.loads(raw)
138
        return cls(
139
            payload_b64=d["payload"],
140
            payload_type=d["payloadType"],
141
            signatures=d["signatures"],
142
        )
143
 
144
 
145
 
146
 
147
def build_statement(
148
    mark_id_hex: str,
149
    content_hash_sha256_hex: str,
150
    predicate: OversightRegistrationPredicate,
151
) -> dict:
152
    """Assemble the in-toto v1 Statement for an Oversight registration.
153
 
154
    The subject's ``digest`` carries the plaintext sha256, so any auditor
155
    who can hash the leaked text can find matching registrations by digest.
156
    The subject ``name`` carries the mark_id so attribution chains can index
157
    by either.
158
    """
159
    return {
160
        "_type": STATEMENT_TYPE,
161
        "subject": [
162
            {
163
                "name": f"mark:{mark_id_hex}",
164
                "digest": {"sha256": content_hash_sha256_hex},
165
            }
166
        ],
167
        "predicateType": PREDICATE_TYPE,
168
        "predicate": predicate.to_dict(),
169
    }
170
 
171
 
172
def _pae(payload_type: str, payload: bytes) -> bytes:
173
    """DSSE Pre-Authentication Encoding (PAEv1).
174
 
175
    PAE = "DSSEv1" SP <len(type)> SP <type> SP <len(payload)> SP <payload>
176
    """
177
    return (
178
        b"DSSEv1 "
179
        + str(len(payload_type)).encode("ascii")
180
        + b" "
181
        + payload_type.encode("ascii")
182
        + b" "
183
        + str(len(payload)).encode("ascii")
184
        + b" "
185
        + payload
186
    )
187
 
188
 
189
def sign_dsse(
190
    statement: dict,
191
    issuer_ed25519_priv: bytes,
192
    keyid: str = "",
193
) -> DSSEEnvelope:
194
    """Sign a Statement, returning a DSSE envelope.
195
 
196
    ``keyid`` is opaque per spec; convention is the hex SHA-256 of the public
197
    key. Empty string is allowed and used in tests.
198
    """
199
    payload = jcs_dumps(statement)
200
    payload_b64 = base64.b64encode(payload).decode("ascii")
201
    pae = _pae(DSSE_PAYLOAD_TYPE, payload)
202
    sk = Ed25519PrivateKey.from_private_bytes(issuer_ed25519_priv)
203
    sig = sk.sign(pae)
204
    return DSSEEnvelope(
205
        payload_b64=payload_b64,
206
        payload_type=DSSE_PAYLOAD_TYPE,
207
        signatures=[{"sig": base64.b64encode(sig).decode("ascii"), "keyid": keyid}],
208
    )
209
 
210
 
211
def verify_dsse(envelope: DSSEEnvelope, issuer_ed25519_pub: bytes) -> bool:
212
    """Verify the envelope's first signature against ``issuer_ed25519_pub``.
213
 
214
    DSSE supports multiple signatures; for Oversight v0.5 only the issuer
215
    signs, so we accept the first signature that verifies.
216
    """
217
    try:
218
        payload = base64.b64decode(envelope.payload_b64)
219
    except Exception:
220
        return False
221
    pae = _pae(envelope.payload_type, payload)
222
    pk = Ed25519PublicKey.from_public_bytes(issuer_ed25519_pub)
223
    for sig_obj in envelope.signatures:
224
        try:
225
            sig = base64.b64decode(sig_obj["sig"])
226
            pk.verify(sig, pae)
227
            return True
228
        except (InvalidSignature, KeyError, ValueError):
229
            continue
230
    return False
231
 
232
 
233
def envelope_payload_statement(envelope: DSSEEnvelope) -> dict:
234
    return json.loads(base64.b64decode(envelope.payload_b64))
235
 
236
 
237
 
238
 
239
@dataclass
240
class RekorUploadResult:
241
    log_url: str
242
    log_index: Optional[int]
243
    log_id: Optional[str]
244
    integrated_time: Optional[int]
245
    transparency_log_entry: dict
246
    log_pubkey_pem: Optional[str] = None
247
    checkpoint: Optional[str] = None
248
 
249
    def to_bundle_dict(self) -> dict:
250
        """Shape that Oversight bundles embed under ``rekor`` key.
251
 
252
        Always includes the four 5-year-replay fields the desktop reviewer
253
        flagged: ``log_pubkey``, ``checkpoint``, ``log_entry_schema``, and
254
        the raw ``transparency_log_entry`` blob. A 2031 verifier can ignore
255
        TUF entirely and verify directly from these fields.
256
        """
257
        return {
258
            "log_url": self.log_url,
259
            "log_index": self.log_index,
260
            "log_id": self.log_id,
261
            "integrated_time": self.integrated_time,
262
            "log_pubkey_pem": self.log_pubkey_pem,
263
            "checkpoint": self.checkpoint,
264
            "log_entry_schema": "rekor/v1.TransparencyLogEntry",
265
            "transparency_log_entry": self.transparency_log_entry,
266
        }
267
 
268
 
269
def build_bundle(
270
    manifest_dict: dict,
271
    manifest_sig_hex: str,
272
    upload: "RekorUploadResult",
273
    dsse_envelope: "DSSEEnvelope",
274
    rfc3161_token_b64: Optional[str] = None,
275
    rfc3161_chain_b64: Optional[str] = None,
276
) -> dict:
277
    """Assemble the v0.5 evidence bundle.
278
 
279
    The integer ``bundle_schema`` field lets pre-v0.5 verifiers fail fast
280
    on ``unknown schema, upgrade`` rather than silently mis-routing because
281
    ``tlog_kind`` happened to default the wrong way.
282
    """
283
    bundle = {
284
        "bundle_schema": BUNDLE_SCHEMA,
285
        "tlog_kind": TLOG_KIND,
286
        "manifest": manifest_dict,
287
        "manifest_sig": manifest_sig_hex,
288
        "rekor": upload.to_bundle_dict(),
289
        "dsse_envelope": json.loads(dsse_envelope.to_json()),
290
    }
291
    if rfc3161_token_b64:
292
        bundle["rfc3161_token"] = rfc3161_token_b64
293
    if rfc3161_chain_b64:
294
        bundle["rfc3161_chain"] = rfc3161_chain_b64
295
    return bundle
296
 
297
 
298
def upload_dsse(
299
    envelope: DSSEEnvelope,
300
    issuer_ed25519_pub_pem: str,
301
    log_url: str = DEFAULT_REKOR_URL,
302
    timeout: float = REKOR_WRITE_TIMEOUT_SEC,
303
) -> RekorUploadResult:
304
    """POST a DSSE envelope to Rekor v2.
305
 
306
    ``issuer_ed25519_pub_pem`` is the issuer's verification key in PEM. The
307
    upload payload converts it to the DER (SubjectPublicKeyInfo) bytes that
308
    the Rekor v2 ``Verifier.PublicKey.raw_bytes`` field actually requires.
309
 
310
    Wire shape per
311
    https://github.com/sigstore/rekor-tiles/blob/main/api/proto/rekor/v2/dsse.proto
312
    (verified 2026-04-19): ``verifiers`` is a repeated field; each verifier
313
    carries ``publicKey.rawBytes`` (DER) and a sibling ``keyDetails`` enum
314
    string (e.g. ``PKIX_ED25519``).
315
 
316
    Network errors raise; callers decide whether to retry or fall back to
317
    the local tlog (only acceptable for development, not production).
318
    """
319
    from cryptography.hazmat.primitives import serialization as _ser
320
    pub_obj = _ser.load_pem_public_key(issuer_ed25519_pub_pem.encode("utf-8"))
321
    pub_der = pub_obj.public_bytes(
322
        encoding=_ser.Encoding.DER,
323
        format=_ser.PublicFormat.SubjectPublicKeyInfo,
324
    )
325
    body = json.dumps(
326
        {
327
            "dsseRequestV002": {
328
                "envelope": json.loads(envelope.to_json()),
329
                "verifiers": [
330
                    {
331
                        "publicKey": {
332
                            "rawBytes": base64.b64encode(pub_der).decode("ascii"),
333
                        },
334
                        "keyDetails": "PKIX_ED25519",
335
                    }
336
                ],
337
            }
338
        }
339
    ).encode("utf-8")
340
    req = urllib.request.Request(
341
        url=log_url.rstrip("/") + "/api/v2/log/entries",
342
        data=body,
343
        method="POST",
344
        headers={
345
            "Content-Type": "application/json",
346
            "Accept": "application/json",
347
            "User-Agent": "oversight-protocol/0.5 (+https://github.com/oversight-protocol)",
348
        },
349
    )
350
    try:
351
        with urllib.request.urlopen(req, timeout=timeout) as resp:
352
            raw = resp.read().decode("utf-8")
353
    except urllib.error.HTTPError as e:
354
        detail = ""
355
        try:
356
            detail = e.read().decode("utf-8", errors="replace")[:500]
357
        except Exception:
358
            pass
359
        raise RuntimeError(f"rekor v2 upload failed: HTTP {e.code} {detail}") from e
360
    parsed = json.loads(raw)
361
    return RekorUploadResult(
362
        log_url=log_url,
363
        log_index=_first_int(parsed, ["logIndex", "logEntry", "log_index"]),
364
        log_id=_first_str(parsed, ["logID", "logId", "log_id"]),
365
        integrated_time=_first_int(parsed, ["integratedTime", "integrated_time"]),
366
        transparency_log_entry=parsed,
367
    )
368
 
369
 
370
def _first_int(d: dict, keys: list[str]) -> Optional[int]:
371
    for k in keys:
372
        if k in d:
373
            try:
374
                return int(d[k])
375
            except (TypeError, ValueError):
376
                continue
377
    return None
378
 
379
 
380
def _first_str(d: dict, keys: list[str]) -> Optional[str]:
381
    for k in keys:
382
        if k in d and isinstance(d[k], str):
383
            return d[k]
384
    return None
385
 
386
 
387
 
388
 
389
def verify_inclusion_offline(
390
    bundle_rekor_field: dict,
391
    envelope: DSSEEnvelope,
392
    issuer_ed25519_pub: bytes,
393
    expected_content_hash_sha256_hex: str,
394
) -> tuple[bool, str]:
395
    """Verify a bundled Rekor entry without contacting the log.
396
 
397
    Checks (in order):
398
      1. The DSSE envelope verifies under ``issuer_ed25519_pub``.
399
      2. The envelope payload's subject digest matches the bundle manifest's
400
         expected plaintext SHA-256.
401
      3. The bundled ``transparency_log_entry`` has the structural fields the
402
         tile-backed log returns (logIndex + signed checkpoint or proof).
403
 
404
    A full inclusion-proof recomputation requires fetching tiles; that lives
405
    in :mod:`oversight_core.auditor_helper`, which uses ``sigstore-python``.
406
    Returns ``(ok, reason)``.
407
    """
408
    if not verify_dsse(envelope, issuer_ed25519_pub):
409
        return False, "dsse signature did not verify under issuer pubkey"
410
    statement = envelope_payload_statement(envelope)
411
    try:
412
        subject_digest = statement["subject"][0]["digest"]["sha256"]
413
    except (KeyError, IndexError, TypeError):
414
        return False, "dsse payload missing subject digest"
415
    if subject_digest != expected_content_hash_sha256_hex:
416
        return False, "dsse subject digest does not match expected content hash"
417
    tle = bundle_rekor_field.get("transparency_log_entry") or {}
418
    if not isinstance(tle, dict) or not tle:
419
        return False, "bundle missing transparency_log_entry payload"
420
    has_proof = any(
421
        k in tle for k in ("inclusionProof", "inclusion_proof", "logEntry")
422
    )
423
    if not has_proof:
424
        return False, "transparency_log_entry has no inclusion proof or logEntry shape"
425
    return True, "ok"