| 1 | """ |
| 2 | oversight_core.timestamp |
| 3 | ======================== |
| 4 | |
| 5 | RFC 3161 qualified timestamp client. Used by the registry to get |
| 6 | independently-auditable timestamps from a Time Stamp Authority, rather than |
| 7 | relying on the registry's own clock. |
| 8 | |
| 9 | Free, no-account TSA options (tested and working): |
| 10 | - https://freetsa.org/tsr - FreeTSA, P-384 EC, valid to 2040 |
| 11 | - http://timestamp.digicert.com - DigiCert, RFC 3161 compliant, widely used |
| 12 | |
| 13 | Every timestamp is: |
| 14 | - signed by the TSA's private key (independently-verifiable) |
| 15 | - contains gen_time from the TSA's clock |
| 16 | - contains a nonce to prevent replay |
| 17 | - commits to our chosen hash of the input |
| 18 | |
| 19 | We store the raw bytes of the TimeStampToken as BLOB in the registry's events |
| 20 | table. A court examiner can independently verify the timestamp offline using |
| 21 | `openssl ts -verify` + the TSA's public cert, without trusting us. |
| 22 | """ |
| 23 | |
| 24 | from __future__ import annotations |
| 25 | |
| 26 | import hashlib |
| 27 | import os |
| 28 | from dataclasses import dataclass |
| 29 | from typing import Optional |
| 30 | |
| 31 | import httpx |
| 32 | |
| 33 | try: |
| 34 | from rfc3161_client import TimestampRequestBuilder, decode_timestamp_response |
| 35 | RFC3161_AVAILABLE = True |
| 36 | except ImportError: |
| 37 | RFC3161_AVAILABLE = False |
| 38 | |
| 39 | |
| 40 | DEFAULT_TSA_CHAIN = [ |
| 41 | "https://freetsa.org/tsr", |
| 42 | "http://timestamp.digicert.com", |
| 43 | ] |
| 44 | |
| 45 | |
| 46 | @dataclass |
| 47 | class QualifiedTimestamp: |
| 48 | """Represents a signed RFC 3161 timestamp that can be independently verified.""" |
| 49 | tsa_url: str |
| 50 | token_bytes: bytes |
| 51 | gen_time_iso: str |
| 52 | serial_number: int |
| 53 | nonce: int |
| 54 | policy_oid: str |
| 55 | message_hash: bytes |
| 56 | |
| 57 | def to_dict(self) -> dict: |
| 58 | """Serialize for storage in the registry evidence bundle.""" |
| 59 | return { |
| 60 | "tsa_url": self.tsa_url, |
| 61 | "token_hex": self.token_bytes.hex(), |
| 62 | "gen_time": self.gen_time_iso, |
| 63 | "serial": self.serial_number, |
| 64 | "nonce": self.nonce, |
| 65 | "policy_oid": self.policy_oid, |
| 66 | "message_hash_hex": self.message_hash.hex(), |
| 67 | } |
| 68 | |
| 69 | |
| 70 | def qualified_timestamp( |
| 71 | data: bytes, |
| 72 | tsa_chain: Optional[list[str]] = None, |
| 73 | timeout: float = 15.0, |
| 74 | ) -> Optional[QualifiedTimestamp]: |
| 75 | """ |
| 76 | Request a qualified timestamp for `data`. Tries each TSA in the chain |
| 77 | until one succeeds; returns None if all fail (offline / network down). |
| 78 | |
| 79 | This is a BEST-EFFORT operation: the caller should proceed even if |
| 80 | qualification fails, and annotate the event as "self-timestamped" rather |
| 81 | than "qualified-timestamped". The registry's signed tree head still provides |
| 82 | tamper evidence for the sequence of events, just not clock independence. |
| 83 | |
| 84 | Example: |
| 85 | ts = qualified_timestamp(event_canonical_bytes) |
| 86 | if ts: |
| 87 | event["qualified_timestamp"] = ts.to_dict() |
| 88 | else: |
| 89 | event["qualified_timestamp"] = None |
| 90 | |
| 91 | The returned QualifiedTimestamp contains the raw TSA token. An external |
| 92 | auditor can verify it with `openssl ts -verify -in token.tsr -data data` |
| 93 | + the TSA's CA certificate (which both FreeTSA and DigiCert publish). |
| 94 | """ |
| 95 | if not RFC3161_AVAILABLE: |
| 96 | return None |
| 97 | |
| 98 | for tsa_url in (tsa_chain or DEFAULT_TSA_CHAIN): |
| 99 | try: |
| 100 | req = TimestampRequestBuilder().data(data).nonce(nonce=True).build() |
| 101 | resp = httpx.post( |
| 102 | tsa_url, |
| 103 | content=req.as_bytes(), |
| 104 | headers={"Content-Type": "application/timestamp-query"}, |
| 105 | timeout=timeout, |
| 106 | ) |
| 107 | if resp.status_code != 200: |
| 108 | continue |
| 109 | tsr = decode_timestamp_response(resp.content) |
| 110 | if tsr.status != 0: |
| 111 | continue |
| 112 | |
| 113 | tst_info = tsr.tst_info |
| 114 | mi = tst_info.message_imprint |
| 115 | |
| 116 | return QualifiedTimestamp( |
| 117 | tsa_url=tsa_url, |
| 118 | token_bytes=tsr.time_stamp_token(), |
| 119 | gen_time_iso=tst_info.gen_time.isoformat(), |
| 120 | serial_number=tst_info.serial_number, |
| 121 | nonce=tst_info.nonce, |
| 122 | policy_oid=tst_info.policy.dotted_string if tst_info.policy else "", |
| 123 | message_hash=bytes(mi.message), |
| 124 | ) |
| 125 | except (httpx.HTTPError, ValueError, TimeoutError, OSError): |
| 126 | continue |
| 127 | |
| 128 | return None |
| 129 | |
| 130 | |
| 131 | def verify_qualified_timestamp( |
| 132 | ts: QualifiedTimestamp, |
| 133 | original_data: bytes, |
| 134 | ) -> tuple[bool, str]: |
| 135 | """ |
| 136 | Light verification: checks that the TSA's claimed message hash matches |
| 137 | sha-512 of original_data. Does NOT verify the TSA's signature or cert |
| 138 | chain - that needs `openssl ts -verify` or equivalent with the TSA's |
| 139 | root cert, which Oversight doesn't ship (users obtain from the TSA). |
| 140 | |
| 141 | Returns (ok, reason). |
| 142 | """ |
| 143 | computed = hashlib.sha512(original_data).digest() |
| 144 | if computed != ts.message_hash: |
| 145 | return False, ( |
| 146 | f"message-hash mismatch: TSA committed to " |
| 147 | f"{ts.message_hash[:16].hex()}..., computed " |
| 148 | f"{computed[:16].hex()}..." |
| 149 | ) |
| 150 | return True, "TSA message-hash matches data; signature verification requires TSA root cert" |