| 1 | """ |
| 2 | oversight_core.container |
| 3 | ======================= |
| 4 | |
| 5 | The `.sealed` container format. Binary layout: |
| 6 | |
| 7 | offset length field |
| 8 | ------ -------- --------------------------------------- |
| 9 | 0 6 magic: b"OSGT\\x01\\x00" |
| 10 | 6 1 format_version (=1) |
| 11 | 7 1 suite_id (1=CLASSIC_V1, 2=HYBRID_V1, 3=HW_P256_V1) |
| 12 | 8 4 manifest_len (u32 big-endian) |
| 13 | 12 M manifest (canonical JSON, signed) |
| 14 | 12+M 4 wrapped_dek_len (u32 BE) |
| 15 | ... W wrapped_dek (JSON: ephemeral_pub, nonce, wrapped_dek) |
| 16 | ... 24 aead_nonce |
| 17 | ... 4 ciphertext_len (u32 BE) |
| 18 | ... C ciphertext (XChaCha20-Poly1305(plaintext)) |
| 19 | |
| 20 | Invariants: |
| 21 | * The manifest is signed BEFORE being inserted; signature is part of the manifest JSON. |
| 22 | * The AEAD associated data (AAD) = content_hash from the manifest. This ties |
| 23 | the ciphertext to the signed manifest: you can't swap ciphertexts between manifests. |
| 24 | * The manifest content_hash = sha256(plaintext). So verifying the plaintext after |
| 25 | decryption against the manifest closes the loop: you know the bytes you're reading |
| 26 | are exactly what the issuer signed for this recipient. |
| 27 | """ |
| 28 | |
| 29 | from __future__ import annotations |
| 30 | |
| 31 | import io |
| 32 | import json |
| 33 | import struct |
| 34 | from dataclasses import dataclass |
| 35 | |
| 36 | from .jcs import jcs_dumps |
| 37 | from typing import Optional |
| 38 | |
| 39 | from . import crypto |
| 40 | from .manifest import Manifest |
| 41 | |
| 42 | |
| 43 | MAGIC = b"OSGT\x01\x00" |
| 44 | SUITE_CLASSIC_V1_ID = 1 |
| 45 | SUITE_HYBRID_V1_ID = 2 |
| 46 | SUITE_HW_P256_V1_ID = 3 |
| 47 | SUITE_ID_TO_NAME = { |
| 48 | SUITE_CLASSIC_V1_ID: crypto.SUITE_CLASSIC_V1, |
| 49 | SUITE_HYBRID_V1_ID: crypto.SUITE_HYBRID_V1, |
| 50 | SUITE_HW_P256_V1_ID: crypto.SUITE_HW_P256_V1, |
| 51 | } |
| 52 | |
| 53 | |
| 54 | MAX_MANIFEST_BYTES = 4 * 1024 * 1024 |
| 55 | MAX_WRAPPED_DEK_BYTES = 1 * 1024 * 1024 |
| 56 | MAX_CIPHERTEXT_BYTES = 4 * 1024 * 1024 * 1024 |
| 57 | |
| 58 | |
| 59 | def _read_exact(buf: io.BytesIO, n: int, field: str) -> bytes: |
| 60 | """Read exactly n bytes or raise ValueError.""" |
| 61 | data = buf.read(n) |
| 62 | if len(data) != n: |
| 63 | raise ValueError(f"truncated file: wanted {n} bytes for {field}, got {len(data)}") |
| 64 | return data |
| 65 | |
| 66 | |
| 67 | @dataclass |
| 68 | class SealedFile: |
| 69 | manifest: Manifest |
| 70 | wrapped_dek: dict |
| 71 | aead_nonce: bytes |
| 72 | ciphertext: bytes |
| 73 | suite_id: int = SUITE_CLASSIC_V1_ID |
| 74 | |
| 75 | |
| 76 | def to_bytes(self) -> bytes: |
| 77 | buf = io.BytesIO() |
| 78 | buf.write(MAGIC) |
| 79 | buf.write(bytes([1, self.suite_id])) |
| 80 | |
| 81 | manifest_json = self.manifest.to_json() |
| 82 | buf.write(struct.pack(">I", len(manifest_json))) |
| 83 | buf.write(manifest_json) |
| 84 | |
| 85 | wrapped_json = jcs_dumps(self.wrapped_dek) |
| 86 | buf.write(struct.pack(">I", len(wrapped_json))) |
| 87 | buf.write(wrapped_json) |
| 88 | |
| 89 | buf.write(self.aead_nonce) |
| 90 | buf.write(struct.pack(">I", len(self.ciphertext))) |
| 91 | buf.write(self.ciphertext) |
| 92 | |
| 93 | return buf.getvalue() |
| 94 | |
| 95 | @classmethod |
| 96 | def from_bytes(cls, data: bytes) -> "SealedFile": |
| 97 | buf = io.BytesIO(data) |
| 98 | magic = _read_exact(buf, 6, "magic") |
| 99 | if magic != MAGIC: |
| 100 | raise ValueError(f"Not a .sealed file (bad magic: {magic!r})") |
| 101 | |
| 102 | hdr = _read_exact(buf, 2, "version/suite") |
| 103 | fmt_ver, suite_id = hdr[0], hdr[1] |
| 104 | if fmt_ver != 1: |
| 105 | raise ValueError(f"Unsupported format version: {fmt_ver}") |
| 106 | |
| 107 | (mlen,) = struct.unpack(">I", _read_exact(buf, 4, "manifest_len")) |
| 108 | if mlen > MAX_MANIFEST_BYTES: |
| 109 | raise ValueError(f"manifest too large: {mlen} > {MAX_MANIFEST_BYTES}") |
| 110 | manifest_json = _read_exact(buf, mlen, "manifest") |
| 111 | manifest = Manifest.from_json(manifest_json) |
| 112 | expected_suite = SUITE_ID_TO_NAME.get(suite_id) |
| 113 | if expected_suite is None: |
| 114 | raise ValueError(f"Unsupported suite id: {suite_id}") |
| 115 | if manifest.suite != expected_suite: |
| 116 | raise ValueError("Container suite id does not match signed manifest suite") |
| 117 | |
| 118 | (wlen,) = struct.unpack(">I", _read_exact(buf, 4, "wrapped_dek_len")) |
| 119 | if wlen > MAX_WRAPPED_DEK_BYTES: |
| 120 | raise ValueError(f"wrapped_dek too large: {wlen} > {MAX_WRAPPED_DEK_BYTES}") |
| 121 | try: |
| 122 | wrapped_dek = json.loads(_read_exact(buf, wlen, "wrapped_dek").decode("utf-8")) |
| 123 | except (UnicodeDecodeError, json.JSONDecodeError) as exc: |
| 124 | raise ValueError("Malformed wrapped DEK JSON") from exc |
| 125 | if not isinstance(wrapped_dek, dict): |
| 126 | raise ValueError("Malformed wrapped DEK: expected JSON object") |
| 127 | |
| 128 | aead_nonce = _read_exact(buf, 24, "aead_nonce") |
| 129 | (clen,) = struct.unpack(">I", _read_exact(buf, 4, "ciphertext_len")) |
| 130 | if clen > MAX_CIPHERTEXT_BYTES: |
| 131 | raise ValueError(f"ciphertext too large: {clen} > {MAX_CIPHERTEXT_BYTES}") |
| 132 | ciphertext = _read_exact(buf, clen, "ciphertext") |
| 133 | if buf.tell() != len(data): |
| 134 | raise ValueError("Trailing bytes after ciphertext") |
| 135 | |
| 136 | return cls( |
| 137 | manifest=manifest, |
| 138 | wrapped_dek=wrapped_dek, |
| 139 | aead_nonce=aead_nonce, |
| 140 | ciphertext=ciphertext, |
| 141 | suite_id=suite_id, |
| 142 | ) |
| 143 | |
| 144 | |
| 145 | |
| 146 | def seal( |
| 147 | plaintext: bytes, |
| 148 | manifest: Manifest, |
| 149 | issuer_ed25519_priv: bytes, |
| 150 | recipient_x25519_pub: bytes, |
| 151 | ) -> bytes: |
| 152 | """ |
| 153 | Produce a .sealed blob for `recipient_x25519_pub`. |
| 154 | |
| 155 | Preconditions: |
| 156 | manifest.content_hash must already be set to sha256(plaintext). |
| 157 | manifest.size_bytes must match len(plaintext). |
| 158 | manifest.recipient.x25519_pub must match recipient_x25519_pub (hex). |
| 159 | """ |
| 160 | if manifest.content_hash != crypto.content_hash(plaintext): |
| 161 | raise ValueError("manifest.content_hash does not match sha256(plaintext)") |
| 162 | if manifest.size_bytes != len(plaintext): |
| 163 | raise ValueError("manifest.size_bytes does not match len(plaintext)") |
| 164 | if manifest.recipient is None: |
| 165 | raise ValueError("manifest.recipient is required for single-recipient seal") |
| 166 | if manifest.recipient.x25519_pub != recipient_x25519_pub.hex(): |
| 167 | raise ValueError("manifest.recipient.x25519_pub does not match the provided pubkey") |
| 168 | if len(recipient_x25519_pub) != 32: |
| 169 | raise ValueError(f"recipient pubkey must be 32 bytes, got {len(recipient_x25519_pub)}") |
| 170 | if len(issuer_ed25519_priv) != 32: |
| 171 | raise ValueError(f"issuer priv key must be 32 bytes, got {len(issuer_ed25519_priv)}") |
| 172 | |
| 173 | manifest.sign(issuer_ed25519_priv) |
| 174 | dek = crypto.random_dek() |
| 175 | wrapped = crypto.wrap_dek_for_recipient(dek, recipient_x25519_pub) |
| 176 | aad = manifest.content_hash.encode("ascii") |
| 177 | nonce, ct = crypto.aead_encrypt(dek, plaintext, aad=aad) |
| 178 | sf = SealedFile( |
| 179 | manifest=manifest, wrapped_dek=wrapped, aead_nonce=nonce, ciphertext=ct, |
| 180 | ) |
| 181 | return sf.to_bytes() |
| 182 | |
| 183 | |
| 184 | def open_sealed( |
| 185 | blob: bytes, |
| 186 | recipient_x25519_priv: bytes, |
| 187 | trusted_issuer_pubs: Optional[set[str]] = None, |
| 188 | policy_ctx: Optional["PolicyContext"] = None, |
| 189 | ) -> tuple[bytes, Manifest]: |
| 190 | """ |
| 191 | Decrypt a .sealed blob. Returns (plaintext, manifest). |
| 192 | |
| 193 | Verification order (fail-fast): |
| 194 | 1. Parse container, reject malformed. |
| 195 | 2. Verify manifest signature (Ed25519). |
| 196 | 3. If trusted_issuer_pubs provided, verify issuer is in set. |
| 197 | 4. Policy check (not_after, not_before, jurisdiction). |
| 198 | 5. Unwrap DEK (multi-recipient: try each slot). |
| 199 | 6. AEAD decrypt with AAD = content_hash (binds ciphertext to manifest). |
| 200 | 7. Post-decrypt SHA-256 check. |
| 201 | 8. Atomically check-and-bump max_opens after successful decryption. |
| 202 | """ |
| 203 | from .policy import check_policy, record_open |
| 204 | |
| 205 | if len(recipient_x25519_priv) != 32: |
| 206 | raise ValueError( |
| 207 | f"recipient priv key must be 32 bytes, got {len(recipient_x25519_priv)}" |
| 208 | ) |
| 209 | |
| 210 | sf = SealedFile.from_bytes(blob) |
| 211 | |
| 212 | if not sf.manifest.verify(): |
| 213 | raise ValueError("Manifest signature invalid") |
| 214 | |
| 215 | if trusted_issuer_pubs is not None: |
| 216 | if sf.manifest.issuer_ed25519_pub not in trusted_issuer_pubs: |
| 217 | raise ValueError( |
| 218 | f"Issuer not trusted: {sf.manifest.issuer_ed25519_pub[:16]}..." |
| 219 | ) |
| 220 | |
| 221 | check_policy(sf.manifest, policy_ctx) |
| 222 | |
| 223 | dek = None |
| 224 | if "slots" in sf.wrapped_dek: |
| 225 | last_exc: Optional[Exception] = None |
| 226 | for slot in sf.wrapped_dek["slots"]: |
| 227 | try: |
| 228 | dek = crypto.unwrap_dek(slot, recipient_x25519_priv) |
| 229 | break |
| 230 | except Exception as e: |
| 231 | last_exc = e |
| 232 | continue |
| 233 | if dek is None: |
| 234 | raise ValueError( |
| 235 | f"No decryptable slot found for this recipient " |
| 236 | f"(tried {len(sf.wrapped_dek['slots'])} slots): {last_exc}" |
| 237 | ) |
| 238 | else: |
| 239 | dek = crypto.unwrap_dek(sf.wrapped_dek, recipient_x25519_priv) |
| 240 | |
| 241 | aad = sf.manifest.content_hash.encode("ascii") |
| 242 | plaintext = crypto.aead_decrypt(dek, sf.aead_nonce, sf.ciphertext, aad=aad) |
| 243 | |
| 244 | if crypto.content_hash(plaintext) != sf.manifest.content_hash: |
| 245 | raise ValueError("Plaintext hash does not match manifest") |
| 246 | |
| 247 | record_open(sf.manifest, policy_ctx) |
| 248 | |
| 249 | return plaintext, sf.manifest |
| 250 | |
| 251 | |
| 252 | def seal_multi( |
| 253 | plaintext: bytes, |
| 254 | manifest: Manifest, |
| 255 | issuer_ed25519_priv: bytes, |
| 256 | recipient_x25519_pubs: list[bytes], |
| 257 | ) -> bytes: |
| 258 | """ |
| 259 | Multi-recipient sealing is intentionally disabled. |
| 260 | |
| 261 | The v1 manifest binds a single recipient identity and public key into the |
| 262 | issuer-signed metadata. Reusing that manifest across multiple recipient key |
| 263 | slots produces containers that decrypt for several recipients while still |
| 264 | claiming only one recipient in signed evidence, which is unsafe for |
| 265 | attribution. Callers must currently emit one sealed file per recipient |
| 266 | until the wire format grows an explicit multi-recipient manifest. |
| 267 | """ |
| 268 | raise ValueError( |
| 269 | "seal_multi is disabled because the v1 manifest only supports a single " |
| 270 | "recipient binding; seal one file per recipient instead" |
| 271 | ) |