Zion Boggan
repos/Oversight/oversight_core/container.py
zionboggan.com ↗
271 lines · python
History for this file →
1
"""
2
oversight_core.container
3
=======================
4
 
5
The `.sealed` container format. Binary layout:
6
 
7
    offset  length    field
8
    ------  --------  ---------------------------------------
9
    0       6         magic: b"OSGT\\x01\\x00"
10
    6       1         format_version (=1)
11
    7       1         suite_id (1=CLASSIC_V1, 2=HYBRID_V1, 3=HW_P256_V1)
12
    8       4         manifest_len (u32 big-endian)
13
    12      M         manifest (canonical JSON, signed)
14
    12+M    4         wrapped_dek_len (u32 BE)
15
    ...     W         wrapped_dek (JSON: ephemeral_pub, nonce, wrapped_dek)
16
    ...     24        aead_nonce
17
    ...     4         ciphertext_len (u32 BE)
18
    ...     C         ciphertext (XChaCha20-Poly1305(plaintext))
19
 
20
Invariants:
21
  * The manifest is signed BEFORE being inserted; signature is part of the manifest JSON.
22
  * The AEAD associated data (AAD) = content_hash from the manifest. This ties
23
    the ciphertext to the signed manifest: you can't swap ciphertexts between manifests.
24
  * The manifest content_hash = sha256(plaintext). So verifying the plaintext after
25
    decryption against the manifest closes the loop: you know the bytes you're reading
26
    are exactly what the issuer signed for this recipient.
27
"""
28
 
29
from __future__ import annotations
30
 
31
import io
32
import json
33
import struct
34
from dataclasses import dataclass
35
 
36
from .jcs import jcs_dumps
37
from typing import Optional
38
 
39
from . import crypto
40
from .manifest import Manifest
41
 
42
 
43
MAGIC = b"OSGT\x01\x00"
44
SUITE_CLASSIC_V1_ID = 1
45
SUITE_HYBRID_V1_ID = 2
46
SUITE_HW_P256_V1_ID = 3
47
SUITE_ID_TO_NAME = {
48
    SUITE_CLASSIC_V1_ID: crypto.SUITE_CLASSIC_V1,
49
    SUITE_HYBRID_V1_ID: crypto.SUITE_HYBRID_V1,
50
    SUITE_HW_P256_V1_ID: crypto.SUITE_HW_P256_V1,
51
}
52
 
53
 
54
MAX_MANIFEST_BYTES = 4 * 1024 * 1024
55
MAX_WRAPPED_DEK_BYTES = 1 * 1024 * 1024
56
MAX_CIPHERTEXT_BYTES = 4 * 1024 * 1024 * 1024
57
 
58
 
59
def _read_exact(buf: io.BytesIO, n: int, field: str) -> bytes:
60
    """Read exactly n bytes or raise ValueError."""
61
    data = buf.read(n)
62
    if len(data) != n:
63
        raise ValueError(f"truncated file: wanted {n} bytes for {field}, got {len(data)}")
64
    return data
65
 
66
 
67
@dataclass
68
class SealedFile:
69
    manifest: Manifest
70
    wrapped_dek: dict
71
    aead_nonce: bytes
72
    ciphertext: bytes
73
    suite_id: int = SUITE_CLASSIC_V1_ID
74
 
75
 
76
    def to_bytes(self) -> bytes:
77
        buf = io.BytesIO()
78
        buf.write(MAGIC)
79
        buf.write(bytes([1, self.suite_id]))
80
 
81
        manifest_json = self.manifest.to_json()
82
        buf.write(struct.pack(">I", len(manifest_json)))
83
        buf.write(manifest_json)
84
 
85
        wrapped_json = jcs_dumps(self.wrapped_dek)
86
        buf.write(struct.pack(">I", len(wrapped_json)))
87
        buf.write(wrapped_json)
88
 
89
        buf.write(self.aead_nonce)
90
        buf.write(struct.pack(">I", len(self.ciphertext)))
91
        buf.write(self.ciphertext)
92
 
93
        return buf.getvalue()
94
 
95
    @classmethod
96
    def from_bytes(cls, data: bytes) -> "SealedFile":
97
        buf = io.BytesIO(data)
98
        magic = _read_exact(buf, 6, "magic")
99
        if magic != MAGIC:
100
            raise ValueError(f"Not a .sealed file (bad magic: {magic!r})")
101
 
102
        hdr = _read_exact(buf, 2, "version/suite")
103
        fmt_ver, suite_id = hdr[0], hdr[1]
104
        if fmt_ver != 1:
105
            raise ValueError(f"Unsupported format version: {fmt_ver}")
106
 
107
        (mlen,) = struct.unpack(">I", _read_exact(buf, 4, "manifest_len"))
108
        if mlen > MAX_MANIFEST_BYTES:
109
            raise ValueError(f"manifest too large: {mlen} > {MAX_MANIFEST_BYTES}")
110
        manifest_json = _read_exact(buf, mlen, "manifest")
111
        manifest = Manifest.from_json(manifest_json)
112
        expected_suite = SUITE_ID_TO_NAME.get(suite_id)
113
        if expected_suite is None:
114
            raise ValueError(f"Unsupported suite id: {suite_id}")
115
        if manifest.suite != expected_suite:
116
            raise ValueError("Container suite id does not match signed manifest suite")
117
 
118
        (wlen,) = struct.unpack(">I", _read_exact(buf, 4, "wrapped_dek_len"))
119
        if wlen > MAX_WRAPPED_DEK_BYTES:
120
            raise ValueError(f"wrapped_dek too large: {wlen} > {MAX_WRAPPED_DEK_BYTES}")
121
        try:
122
            wrapped_dek = json.loads(_read_exact(buf, wlen, "wrapped_dek").decode("utf-8"))
123
        except (UnicodeDecodeError, json.JSONDecodeError) as exc:
124
            raise ValueError("Malformed wrapped DEK JSON") from exc
125
        if not isinstance(wrapped_dek, dict):
126
            raise ValueError("Malformed wrapped DEK: expected JSON object")
127
 
128
        aead_nonce = _read_exact(buf, 24, "aead_nonce")
129
        (clen,) = struct.unpack(">I", _read_exact(buf, 4, "ciphertext_len"))
130
        if clen > MAX_CIPHERTEXT_BYTES:
131
            raise ValueError(f"ciphertext too large: {clen} > {MAX_CIPHERTEXT_BYTES}")
132
        ciphertext = _read_exact(buf, clen, "ciphertext")
133
        if buf.tell() != len(data):
134
            raise ValueError("Trailing bytes after ciphertext")
135
 
136
        return cls(
137
            manifest=manifest,
138
            wrapped_dek=wrapped_dek,
139
            aead_nonce=aead_nonce,
140
            ciphertext=ciphertext,
141
            suite_id=suite_id,
142
        )
143
 
144
 
145
 
146
def seal(
147
    plaintext: bytes,
148
    manifest: Manifest,
149
    issuer_ed25519_priv: bytes,
150
    recipient_x25519_pub: bytes,
151
) -> bytes:
152
    """
153
    Produce a .sealed blob for `recipient_x25519_pub`.
154
 
155
    Preconditions:
156
      manifest.content_hash must already be set to sha256(plaintext).
157
      manifest.size_bytes must match len(plaintext).
158
      manifest.recipient.x25519_pub must match recipient_x25519_pub (hex).
159
    """
160
    if manifest.content_hash != crypto.content_hash(plaintext):
161
        raise ValueError("manifest.content_hash does not match sha256(plaintext)")
162
    if manifest.size_bytes != len(plaintext):
163
        raise ValueError("manifest.size_bytes does not match len(plaintext)")
164
    if manifest.recipient is None:
165
        raise ValueError("manifest.recipient is required for single-recipient seal")
166
    if manifest.recipient.x25519_pub != recipient_x25519_pub.hex():
167
        raise ValueError("manifest.recipient.x25519_pub does not match the provided pubkey")
168
    if len(recipient_x25519_pub) != 32:
169
        raise ValueError(f"recipient pubkey must be 32 bytes, got {len(recipient_x25519_pub)}")
170
    if len(issuer_ed25519_priv) != 32:
171
        raise ValueError(f"issuer priv key must be 32 bytes, got {len(issuer_ed25519_priv)}")
172
 
173
    manifest.sign(issuer_ed25519_priv)
174
    dek = crypto.random_dek()
175
    wrapped = crypto.wrap_dek_for_recipient(dek, recipient_x25519_pub)
176
    aad = manifest.content_hash.encode("ascii")
177
    nonce, ct = crypto.aead_encrypt(dek, plaintext, aad=aad)
178
    sf = SealedFile(
179
        manifest=manifest, wrapped_dek=wrapped, aead_nonce=nonce, ciphertext=ct,
180
    )
181
    return sf.to_bytes()
182
 
183
 
184
def open_sealed(
185
    blob: bytes,
186
    recipient_x25519_priv: bytes,
187
    trusted_issuer_pubs: Optional[set[str]] = None,
188
    policy_ctx: Optional["PolicyContext"] = None,
189
) -> tuple[bytes, Manifest]:
190
    """
191
    Decrypt a .sealed blob. Returns (plaintext, manifest).
192
 
193
    Verification order (fail-fast):
194
      1. Parse container, reject malformed.
195
      2. Verify manifest signature (Ed25519).
196
      3. If trusted_issuer_pubs provided, verify issuer is in set.
197
      4. Policy check (not_after, not_before, jurisdiction).
198
      5. Unwrap DEK (multi-recipient: try each slot).
199
      6. AEAD decrypt with AAD = content_hash (binds ciphertext to manifest).
200
      7. Post-decrypt SHA-256 check.
201
      8. Atomically check-and-bump max_opens after successful decryption.
202
    """
203
    from .policy import check_policy, record_open
204
 
205
    if len(recipient_x25519_priv) != 32:
206
        raise ValueError(
207
            f"recipient priv key must be 32 bytes, got {len(recipient_x25519_priv)}"
208
        )
209
 
210
    sf = SealedFile.from_bytes(blob)
211
 
212
    if not sf.manifest.verify():
213
        raise ValueError("Manifest signature invalid")
214
 
215
    if trusted_issuer_pubs is not None:
216
        if sf.manifest.issuer_ed25519_pub not in trusted_issuer_pubs:
217
            raise ValueError(
218
                f"Issuer not trusted: {sf.manifest.issuer_ed25519_pub[:16]}..."
219
            )
220
 
221
    check_policy(sf.manifest, policy_ctx)
222
 
223
    dek = None
224
    if "slots" in sf.wrapped_dek:
225
        last_exc: Optional[Exception] = None
226
        for slot in sf.wrapped_dek["slots"]:
227
            try:
228
                dek = crypto.unwrap_dek(slot, recipient_x25519_priv)
229
                break
230
            except Exception as e:
231
                last_exc = e
232
                continue
233
        if dek is None:
234
            raise ValueError(
235
                f"No decryptable slot found for this recipient "
236
                f"(tried {len(sf.wrapped_dek['slots'])} slots): {last_exc}"
237
            )
238
    else:
239
        dek = crypto.unwrap_dek(sf.wrapped_dek, recipient_x25519_priv)
240
 
241
    aad = sf.manifest.content_hash.encode("ascii")
242
    plaintext = crypto.aead_decrypt(dek, sf.aead_nonce, sf.ciphertext, aad=aad)
243
 
244
    if crypto.content_hash(plaintext) != sf.manifest.content_hash:
245
        raise ValueError("Plaintext hash does not match manifest")
246
 
247
    record_open(sf.manifest, policy_ctx)
248
 
249
    return plaintext, sf.manifest
250
 
251
 
252
def seal_multi(
253
    plaintext: bytes,
254
    manifest: Manifest,
255
    issuer_ed25519_priv: bytes,
256
    recipient_x25519_pubs: list[bytes],
257
) -> bytes:
258
    """
259
    Multi-recipient sealing is intentionally disabled.
260
 
261
    The v1 manifest binds a single recipient identity and public key into the
262
    issuer-signed metadata. Reusing that manifest across multiple recipient key
263
    slots produces containers that decrypt for several recipients while still
264
    claiming only one recipient in signed evidence, which is unsafe for
265
    attribution. Callers must currently emit one sealed file per recipient
266
    until the wire format grows an explicit multi-recipient manifest.
267
    """
268
    raise ValueError(
269
        "seal_multi is disabled because the v1 manifest only supports a single "
270
        "recipient binding; seal one file per recipient instead"
271
    )