Zion Boggan
repos/Oversight/oversight_core/timestamp.py
zionboggan.com ↗
150 lines · python
History for this file →
1
"""
2
oversight_core.timestamp
3
========================
4
 
5
RFC 3161 qualified timestamp client. Used by the registry to get
6
independently-auditable timestamps from a Time Stamp Authority, rather than
7
relying on the registry's own clock.
8
 
9
Free, no-account TSA options (tested and working):
10
    - https://freetsa.org/tsr  - FreeTSA, P-384 EC, valid to 2040
11
    - http://timestamp.digicert.com - DigiCert, RFC 3161 compliant, widely used
12
 
13
Every timestamp is:
14
    - signed by the TSA's private key (independently-verifiable)
15
    - contains gen_time from the TSA's clock
16
    - contains a nonce to prevent replay
17
    - commits to our chosen hash of the input
18
 
19
We store the raw bytes of the TimeStampToken as BLOB in the registry's events
20
table. A court examiner can independently verify the timestamp offline using
21
`openssl ts -verify` + the TSA's public cert, without trusting us.
22
"""
23
 
24
from __future__ import annotations
25
 
26
import hashlib
27
import os
28
from dataclasses import dataclass
29
from typing import Optional
30
 
31
import httpx
32
 
33
try:
34
    from rfc3161_client import TimestampRequestBuilder, decode_timestamp_response
35
    RFC3161_AVAILABLE = True
36
except ImportError:
37
    RFC3161_AVAILABLE = False
38
 
39
 
40
DEFAULT_TSA_CHAIN = [
41
    "https://freetsa.org/tsr",
42
    "http://timestamp.digicert.com",
43
]
44
 
45
 
46
@dataclass
47
class QualifiedTimestamp:
48
    """Represents a signed RFC 3161 timestamp that can be independently verified."""
49
    tsa_url: str
50
    token_bytes: bytes
51
    gen_time_iso: str
52
    serial_number: int
53
    nonce: int
54
    policy_oid: str
55
    message_hash: bytes
56
 
57
    def to_dict(self) -> dict:
58
        """Serialize for storage in the registry evidence bundle."""
59
        return {
60
            "tsa_url": self.tsa_url,
61
            "token_hex": self.token_bytes.hex(),
62
            "gen_time": self.gen_time_iso,
63
            "serial": self.serial_number,
64
            "nonce": self.nonce,
65
            "policy_oid": self.policy_oid,
66
            "message_hash_hex": self.message_hash.hex(),
67
        }
68
 
69
 
70
def qualified_timestamp(
71
    data: bytes,
72
    tsa_chain: Optional[list[str]] = None,
73
    timeout: float = 15.0,
74
) -> Optional[QualifiedTimestamp]:
75
    """
76
    Request a qualified timestamp for `data`. Tries each TSA in the chain
77
    until one succeeds; returns None if all fail (offline / network down).
78
 
79
    This is a BEST-EFFORT operation: the caller should proceed even if
80
    qualification fails, and annotate the event as "self-timestamped" rather
81
    than "qualified-timestamped". The registry's signed tree head still provides
82
    tamper evidence for the sequence of events, just not clock independence.
83
 
84
    Example:
85
        ts = qualified_timestamp(event_canonical_bytes)
86
        if ts:
87
            event["qualified_timestamp"] = ts.to_dict()
88
        else:
89
            event["qualified_timestamp"] = None
90
 
91
    The returned QualifiedTimestamp contains the raw TSA token. An external
92
    auditor can verify it with `openssl ts -verify -in token.tsr -data data`
93
    + the TSA's CA certificate (which both FreeTSA and DigiCert publish).
94
    """
95
    if not RFC3161_AVAILABLE:
96
        return None
97
 
98
    for tsa_url in (tsa_chain or DEFAULT_TSA_CHAIN):
99
        try:
100
            req = TimestampRequestBuilder().data(data).nonce(nonce=True).build()
101
            resp = httpx.post(
102
                tsa_url,
103
                content=req.as_bytes(),
104
                headers={"Content-Type": "application/timestamp-query"},
105
                timeout=timeout,
106
            )
107
            if resp.status_code != 200:
108
                continue
109
            tsr = decode_timestamp_response(resp.content)
110
            if tsr.status != 0:
111
                continue
112
 
113
            tst_info = tsr.tst_info
114
            mi = tst_info.message_imprint
115
 
116
            return QualifiedTimestamp(
117
                tsa_url=tsa_url,
118
                token_bytes=tsr.time_stamp_token(),
119
                gen_time_iso=tst_info.gen_time.isoformat(),
120
                serial_number=tst_info.serial_number,
121
                nonce=tst_info.nonce,
122
                policy_oid=tst_info.policy.dotted_string if tst_info.policy else "",
123
                message_hash=bytes(mi.message),
124
            )
125
        except (httpx.HTTPError, ValueError, TimeoutError, OSError):
126
            continue
127
 
128
    return None
129
 
130
 
131
def verify_qualified_timestamp(
132
    ts: QualifiedTimestamp,
133
    original_data: bytes,
134
) -> tuple[bool, str]:
135
    """
136
    Light verification: checks that the TSA's claimed message hash matches
137
    sha-512 of original_data. Does NOT verify the TSA's signature or cert
138
    chain - that needs `openssl ts -verify` or equivalent with the TSA's
139
    root cert, which Oversight doesn't ship (users obtain from the TSA).
140
 
141
    Returns (ok, reason).
142
    """
143
    computed = hashlib.sha512(original_data).digest()
144
    if computed != ts.message_hash:
145
        return False, (
146
            f"message-hash mismatch: TSA committed to "
147
            f"{ts.message_hash[:16].hex()}..., computed "
148
            f"{computed[:16].hex()}..."
149
        )
150
    return True, "TSA message-hash matches data; signature verification requires TSA root cert"