| 1 | """ |
| 2 | oversight_core.crypto |
| 3 | ==================== |
| 4 | |
| 5 | Vetted primitives only. NO custom crypto. |
| 6 | |
| 7 | Classical (ships today): |
| 8 | - X25519 for key agreement |
| 9 | - Ed25519 for signatures |
| 10 | - XChaCha20-Poly1305 for AEAD |
| 11 | - BLAKE2b for hashing / MAC |
| 12 | - HKDF for key derivation |
| 13 | - Argon2id for password-based KDF (via libsodium) |
| 14 | |
| 15 | Post-quantum hooks (design-ready; enable via `use_pq=True` once liboqs is linked): |
| 16 | - ML-KEM-768 for key encapsulation (hybrid with X25519) |
| 17 | - ML-DSA-65 for signatures (hybrid with Ed25519) |
| 18 | |
| 19 | The container format is crypto-agile: the algorithm suite is declared in the header, |
| 20 | so we can roll forward to full PQ without breaking existing sealed files. |
| 21 | """ |
| 22 | |
| 23 | from __future__ import annotations |
| 24 | |
| 25 | import os |
| 26 | import secrets |
| 27 | from dataclasses import dataclass |
| 28 | from typing import Optional |
| 29 | |
| 30 | from cryptography.hazmat.primitives.asymmetric.ed25519 import ( |
| 31 | Ed25519PrivateKey, |
| 32 | Ed25519PublicKey, |
| 33 | ) |
| 34 | from cryptography.hazmat.primitives.asymmetric.x25519 import ( |
| 35 | X25519PrivateKey, |
| 36 | X25519PublicKey, |
| 37 | ) |
| 38 | from cryptography.hazmat.primitives.kdf.hkdf import HKDF |
| 39 | from cryptography.hazmat.primitives import hashes, serialization |
| 40 | from nacl.bindings import ( |
| 41 | crypto_aead_xchacha20poly1305_ietf_encrypt, |
| 42 | crypto_aead_xchacha20poly1305_ietf_decrypt, |
| 43 | crypto_aead_xchacha20poly1305_ietf_NPUBBYTES, |
| 44 | crypto_aead_xchacha20poly1305_ietf_KEYBYTES, |
| 45 | ) |
| 46 | |
| 47 | try: |
| 48 | import contextlib |
| 49 | import os as _os |
| 50 | |
| 51 | with open(_os.devnull, "w") as _devnull: |
| 52 | with contextlib.redirect_stdout(_devnull): |
| 53 | import oqs |
| 54 | |
| 55 | PQ_AVAILABLE = True |
| 56 | except Exception: |
| 57 | PQ_AVAILABLE = False |
| 58 | |
| 59 | |
| 60 | |
| 61 | SUITE_CLASSIC_V1 = "OSGT-CLASSIC-v1" |
| 62 | SUITE_HYBRID_V1 = "OSGT-HYBRID-v1" |
| 63 | SUITE_HW_P256_V1 = "OSGT-HW-P256-v1" |
| 64 | |
| 65 | P256_PUBLIC_KEY_LEN = 65 |
| 66 | |
| 67 | XCHACHA_NONCE_LEN = crypto_aead_xchacha20poly1305_ietf_NPUBBYTES |
| 68 | XCHACHA_KEY_LEN = crypto_aead_xchacha20poly1305_ietf_KEYBYTES |
| 69 | |
| 70 | |
| 71 | |
| 72 | @dataclass |
| 73 | class ClassicIdentity: |
| 74 | """Recipient / issuer identity: X25519 (encryption) + Ed25519 (signing).""" |
| 75 | x25519_priv: bytes |
| 76 | x25519_pub: bytes |
| 77 | ed25519_priv: bytes |
| 78 | ed25519_pub: bytes |
| 79 | |
| 80 | @classmethod |
| 81 | def generate(cls) -> "ClassicIdentity": |
| 82 | xsk = X25519PrivateKey.generate() |
| 83 | esk = Ed25519PrivateKey.generate() |
| 84 | return cls( |
| 85 | x25519_priv=xsk.private_bytes( |
| 86 | encoding=serialization.Encoding.Raw, |
| 87 | format=serialization.PrivateFormat.Raw, |
| 88 | encryption_algorithm=serialization.NoEncryption(), |
| 89 | ), |
| 90 | x25519_pub=xsk.public_key().public_bytes( |
| 91 | encoding=serialization.Encoding.Raw, |
| 92 | format=serialization.PublicFormat.Raw, |
| 93 | ), |
| 94 | ed25519_priv=esk.private_bytes( |
| 95 | encoding=serialization.Encoding.Raw, |
| 96 | format=serialization.PrivateFormat.Raw, |
| 97 | encryption_algorithm=serialization.NoEncryption(), |
| 98 | ), |
| 99 | ed25519_pub=esk.public_key().public_bytes( |
| 100 | encoding=serialization.Encoding.Raw, |
| 101 | format=serialization.PublicFormat.Raw, |
| 102 | ), |
| 103 | ) |
| 104 | |
| 105 | def public_bundle(self) -> dict: |
| 106 | return { |
| 107 | "x25519_pub": self.x25519_pub.hex(), |
| 108 | "ed25519_pub": self.ed25519_pub.hex(), |
| 109 | } |
| 110 | |
| 111 | |
| 112 | |
| 113 | def aead_encrypt(key: bytes, plaintext: bytes, aad: bytes = b"") -> tuple[bytes, bytes]: |
| 114 | """ |
| 115 | XChaCha20-Poly1305. Returns (nonce, ciphertext_with_tag). |
| 116 | 24-byte nonce = safe to random-generate without coordination. |
| 117 | """ |
| 118 | assert len(key) == XCHACHA_KEY_LEN, "XChaCha key must be 32 bytes" |
| 119 | nonce = secrets.token_bytes(XCHACHA_NONCE_LEN) |
| 120 | ct = crypto_aead_xchacha20poly1305_ietf_encrypt(plaintext, aad, nonce, key) |
| 121 | return nonce, ct |
| 122 | |
| 123 | |
| 124 | def aead_decrypt(key: bytes, nonce: bytes, ciphertext: bytes, aad: bytes = b"") -> bytes: |
| 125 | return crypto_aead_xchacha20poly1305_ietf_decrypt(ciphertext, aad, nonce, key) |
| 126 | |
| 127 | |
| 128 | |
| 129 | def wrap_dek_for_recipient( |
| 130 | dek: bytes, |
| 131 | recipient_x25519_pub: bytes, |
| 132 | ephemeral_priv: Optional[X25519PrivateKey] = None, |
| 133 | ) -> dict: |
| 134 | """ |
| 135 | Encrypt a Data Encryption Key (DEK) for a single recipient using ECIES-style |
| 136 | X25519 key agreement + HKDF-SHA256 + XChaCha20-Poly1305. |
| 137 | |
| 138 | Returns a dict with: ephemeral_pub, nonce, wrapped_dek (all hex). |
| 139 | """ |
| 140 | eph = ephemeral_priv or X25519PrivateKey.generate() |
| 141 | peer = X25519PublicKey.from_public_bytes(recipient_x25519_pub) |
| 142 | shared = eph.exchange(peer) |
| 143 | |
| 144 | kek = HKDF( |
| 145 | algorithm=hashes.SHA256(), |
| 146 | length=32, |
| 147 | salt=None, |
| 148 | info=b"oversight-v1-dek-wrap", |
| 149 | ).derive(shared) |
| 150 | |
| 151 | nonce, wrapped = aead_encrypt(kek, dek, aad=b"oversight-dek") |
| 152 | eph_pub = eph.public_key().public_bytes( |
| 153 | encoding=serialization.Encoding.Raw, |
| 154 | format=serialization.PublicFormat.Raw, |
| 155 | ) |
| 156 | return { |
| 157 | "ephemeral_pub": eph_pub.hex(), |
| 158 | "nonce": nonce.hex(), |
| 159 | "wrapped_dek": wrapped.hex(), |
| 160 | } |
| 161 | |
| 162 | |
| 163 | def unwrap_dek(wrapped: dict, recipient_x25519_priv: bytes) -> bytes: |
| 164 | """Recover the DEK using the recipient's X25519 private key.""" |
| 165 | sk = X25519PrivateKey.from_private_bytes(recipient_x25519_priv) |
| 166 | eph_pub = X25519PublicKey.from_public_bytes(bytes.fromhex(wrapped["ephemeral_pub"])) |
| 167 | shared = sk.exchange(eph_pub) |
| 168 | |
| 169 | kek = HKDF( |
| 170 | algorithm=hashes.SHA256(), |
| 171 | length=32, |
| 172 | salt=None, |
| 173 | info=b"oversight-v1-dek-wrap", |
| 174 | ).derive(shared) |
| 175 | |
| 176 | return aead_decrypt( |
| 177 | kek, |
| 178 | bytes.fromhex(wrapped["nonce"]), |
| 179 | bytes.fromhex(wrapped["wrapped_dek"]), |
| 180 | aad=b"oversight-dek", |
| 181 | ) |
| 182 | |
| 183 | |
| 184 | |
| 185 | def wrap_dek_for_recipient_p256( |
| 186 | dek: bytes, |
| 187 | recipient_p256_pub_sec1: bytes, |
| 188 | ) -> dict: |
| 189 | """ |
| 190 | Encrypt a DEK for a P-256 recipient (typically backed by a PIV-compatible |
| 191 | hardware token: YubiKey, Nitrokey, OnlyKey). |
| 192 | |
| 193 | `recipient_p256_pub_sec1` is the recipient's NIST P-256 public key in |
| 194 | SEC1 uncompressed encoding (65 bytes, ``0x04 || X || Y``). |
| 195 | |
| 196 | Mirrors `oversight-rust/oversight-crypto::wrap_dek_for_recipient_p256` |
| 197 | byte-for-byte: same HKDF info ``oversight-hw-p256-v1-dek-wrap``, same |
| 198 | AEAD AAD ``oversight-hw-p256-dek``. Output JSON shape matches |
| 199 | `WrappedDekP256::to_json_hex` so a sealed file produced by either |
| 200 | implementation opens with either implementation. |
| 201 | |
| 202 | Returns a dict with: suite, ephemeral_pub, nonce, wrapped_dek (all hex). |
| 203 | """ |
| 204 | if len(recipient_p256_pub_sec1) != P256_PUBLIC_KEY_LEN: |
| 205 | raise ValueError( |
| 206 | f"recipient_p256_pub_sec1 must be {P256_PUBLIC_KEY_LEN} bytes " |
| 207 | f"(SEC1 uncompressed), got {len(recipient_p256_pub_sec1)}" |
| 208 | ) |
| 209 | |
| 210 | from cryptography.hazmat.primitives.asymmetric import ec as _ec |
| 211 | |
| 212 | peer = _ec.EllipticCurvePublicKey.from_encoded_point( |
| 213 | _ec.SECP256R1(), recipient_p256_pub_sec1 |
| 214 | ) |
| 215 | |
| 216 | eph = _ec.generate_private_key(_ec.SECP256R1()) |
| 217 | shared = eph.exchange(_ec.ECDH(), peer) |
| 218 | |
| 219 | kek = HKDF( |
| 220 | algorithm=hashes.SHA256(), |
| 221 | length=32, |
| 222 | salt=None, |
| 223 | info=b"oversight-hw-p256-v1-dek-wrap", |
| 224 | ).derive(shared) |
| 225 | |
| 226 | nonce, wrapped = aead_encrypt(kek, dek, aad=b"oversight-hw-p256-dek") |
| 227 | |
| 228 | eph_pub_bytes = eph.public_key().public_bytes( |
| 229 | encoding=serialization.Encoding.X962, |
| 230 | format=serialization.PublicFormat.UncompressedPoint, |
| 231 | ) |
| 232 | if len(eph_pub_bytes) != P256_PUBLIC_KEY_LEN: |
| 233 | raise RuntimeError( |
| 234 | f"P-256 ephemeral pub must be {P256_PUBLIC_KEY_LEN} bytes, got {len(eph_pub_bytes)}" |
| 235 | ) |
| 236 | |
| 237 | return { |
| 238 | "suite": SUITE_HW_P256_V1, |
| 239 | "ephemeral_pub": eph_pub_bytes.hex(), |
| 240 | "nonce": nonce.hex(), |
| 241 | "wrapped_dek": wrapped.hex(), |
| 242 | } |
| 243 | |
| 244 | |
| 245 | def unwrap_dek_p256(wrapped: dict, recipient_p256_priv_pkcs8_or_int) -> bytes: |
| 246 | """ |
| 247 | Recover the DEK for an `OSGT-HW-P256-v1` envelope using the recipient's |
| 248 | P-256 private key. |
| 249 | |
| 250 | `recipient_p256_priv_pkcs8_or_int` accepts either: |
| 251 | - an `EllipticCurvePrivateKey` (e.g., loaded from PKCS |
| 252 | in-process for tests), or |
| 253 | - bytes containing a PKCS |
| 254 | - an integer in the range [1, n-1] (for raw scalar import). |
| 255 | |
| 256 | Mirrors `oversight-rust/oversight-crypto::unwrap_dek_with_provider_p256`. |
| 257 | """ |
| 258 | from cryptography.hazmat.primitives.asymmetric import ec as _ec |
| 259 | |
| 260 | for required in ("ephemeral_pub", "nonce", "wrapped_dek"): |
| 261 | if required not in wrapped: |
| 262 | raise ValueError(f"hw-p256 envelope missing field: {required}") |
| 263 | |
| 264 | eph_pub_bytes = bytes.fromhex(wrapped["ephemeral_pub"]) |
| 265 | if len(eph_pub_bytes) != P256_PUBLIC_KEY_LEN: |
| 266 | raise ValueError( |
| 267 | f"ephemeral_pub must be {P256_PUBLIC_KEY_LEN} bytes " |
| 268 | f"(SEC1 uncompressed), got {len(eph_pub_bytes)}" |
| 269 | ) |
| 270 | |
| 271 | if isinstance(recipient_p256_priv_pkcs8_or_int, _ec.EllipticCurvePrivateKey): |
| 272 | sk = recipient_p256_priv_pkcs8_or_int |
| 273 | elif isinstance(recipient_p256_priv_pkcs8_or_int, (bytes, bytearray)): |
| 274 | sk = serialization.load_der_private_key( |
| 275 | bytes(recipient_p256_priv_pkcs8_or_int), password=None |
| 276 | ) |
| 277 | if not isinstance(sk, _ec.EllipticCurvePrivateKey): |
| 278 | raise ValueError("PKCS#8 key is not an EllipticCurvePrivateKey") |
| 279 | elif isinstance(recipient_p256_priv_pkcs8_or_int, int): |
| 280 | sk = _ec.derive_private_key( |
| 281 | recipient_p256_priv_pkcs8_or_int, _ec.SECP256R1() |
| 282 | ) |
| 283 | else: |
| 284 | raise TypeError( |
| 285 | "recipient private key must be EllipticCurvePrivateKey, PKCS#8 bytes, or int scalar" |
| 286 | ) |
| 287 | |
| 288 | eph_pub = _ec.EllipticCurvePublicKey.from_encoded_point( |
| 289 | _ec.SECP256R1(), eph_pub_bytes |
| 290 | ) |
| 291 | shared = sk.exchange(_ec.ECDH(), eph_pub) |
| 292 | |
| 293 | kek = HKDF( |
| 294 | algorithm=hashes.SHA256(), |
| 295 | length=32, |
| 296 | salt=None, |
| 297 | info=b"oversight-hw-p256-v1-dek-wrap", |
| 298 | ).derive(shared) |
| 299 | |
| 300 | return aead_decrypt( |
| 301 | kek, |
| 302 | bytes.fromhex(wrapped["nonce"]), |
| 303 | bytes.fromhex(wrapped["wrapped_dek"]), |
| 304 | aad=b"oversight-hw-p256-dek", |
| 305 | ) |
| 306 | |
| 307 | |
| 308 | |
| 309 | def sign_manifest(manifest_bytes: bytes, ed25519_priv: bytes) -> bytes: |
| 310 | sk = Ed25519PrivateKey.from_private_bytes(ed25519_priv) |
| 311 | return sk.sign(manifest_bytes) |
| 312 | |
| 313 | |
| 314 | def verify_manifest(manifest_bytes: bytes, signature: bytes, ed25519_pub: bytes) -> bool: |
| 315 | try: |
| 316 | Ed25519PublicKey.from_public_bytes(ed25519_pub).verify(signature, manifest_bytes) |
| 317 | return True |
| 318 | except Exception: |
| 319 | return False |
| 320 | |
| 321 | |
| 322 | |
| 323 | def pq_kem_keypair() -> tuple[bytes, bytes]: |
| 324 | """Generate ML-KEM-768 keypair. Returns (priv, pub).""" |
| 325 | if not PQ_AVAILABLE: |
| 326 | raise RuntimeError("liboqs not available; install liboqs + liboqs-python") |
| 327 | with oqs.KeyEncapsulation("ML-KEM-768") as kem: |
| 328 | pub = kem.generate_keypair() |
| 329 | priv = kem.export_secret_key() |
| 330 | return priv, pub |
| 331 | |
| 332 | |
| 333 | def pq_kem_encap(peer_pub: bytes) -> tuple[bytes, bytes]: |
| 334 | """Encapsulate a shared secret to peer_pub. Returns (ciphertext, shared_secret).""" |
| 335 | if not PQ_AVAILABLE: |
| 336 | raise RuntimeError("liboqs not available") |
| 337 | with oqs.KeyEncapsulation("ML-KEM-768") as kem: |
| 338 | ct, ss = kem.encap_secret(peer_pub) |
| 339 | return ct, ss |
| 340 | |
| 341 | |
| 342 | def pq_kem_decap(priv: bytes, ct: bytes) -> bytes: |
| 343 | """Recover shared secret from ciphertext using private key.""" |
| 344 | if not PQ_AVAILABLE: |
| 345 | raise RuntimeError("liboqs not available") |
| 346 | with oqs.KeyEncapsulation("ML-KEM-768", secret_key=priv) as kem: |
| 347 | return kem.decap_secret(ct) |
| 348 | |
| 349 | |
| 350 | def pq_sig_keypair() -> tuple[bytes, bytes]: |
| 351 | """Generate ML-DSA-65 keypair. Returns (priv, pub).""" |
| 352 | if not PQ_AVAILABLE: |
| 353 | raise RuntimeError("liboqs not available") |
| 354 | with oqs.Signature("ML-DSA-65") as sig: |
| 355 | pub = sig.generate_keypair() |
| 356 | priv = sig.export_secret_key() |
| 357 | return priv, pub |
| 358 | |
| 359 | |
| 360 | def pq_sign(msg: bytes, priv: bytes) -> bytes: |
| 361 | if not PQ_AVAILABLE: |
| 362 | raise RuntimeError("liboqs not available") |
| 363 | with oqs.Signature("ML-DSA-65", secret_key=priv) as sig: |
| 364 | return sig.sign(msg) |
| 365 | |
| 366 | |
| 367 | def pq_verify(msg: bytes, signature: bytes, pub: bytes) -> bool: |
| 368 | """Narrowly catches signature-verification failures; propagates other errors.""" |
| 369 | if not PQ_AVAILABLE: |
| 370 | return False |
| 371 | try: |
| 372 | with oqs.Signature("ML-DSA-65") as ver: |
| 373 | return ver.verify(msg, signature, pub) |
| 374 | except (ValueError, RuntimeError): |
| 375 | return False |
| 376 | |
| 377 | |
| 378 | def hybrid_wrap_dek(dek: bytes, x25519_pub: bytes, mlkem_pub: bytes) -> dict: |
| 379 | """ |
| 380 | Hybrid DEK wrap: combines X25519 and ML-KEM-768 shared secrets via HKDF. |
| 381 | An attacker must break BOTH X25519 AND ML-KEM-768 to recover the KEK. |
| 382 | |
| 383 | KDF input (defense-in-depth; X-wing-style): the HKDF IKM includes both |
| 384 | shared secrets AND both ciphertexts/ephemeral pubs, binding the KEK to |
| 385 | this specific encapsulation. This prevents any future construction where |
| 386 | an attacker could substitute a valid-but-different ciphertext. |
| 387 | """ |
| 388 | if not PQ_AVAILABLE: |
| 389 | raise RuntimeError("liboqs not available - cannot wrap hybrid") |
| 390 | if len(x25519_pub) != 32: |
| 391 | raise ValueError(f"x25519_pub must be 32 bytes, got {len(x25519_pub)}") |
| 392 | |
| 393 | eph = X25519PrivateKey.generate() |
| 394 | peer_x = X25519PublicKey.from_public_bytes(x25519_pub) |
| 395 | ss_x = eph.exchange(peer_x) |
| 396 | mlkem_ct, ss_pq = pq_kem_encap(mlkem_pub) |
| 397 | |
| 398 | eph_pub = eph.public_key().public_bytes( |
| 399 | encoding=serialization.Encoding.Raw, |
| 400 | format=serialization.PublicFormat.Raw, |
| 401 | ) |
| 402 | |
| 403 | ikm = ss_x + ss_pq + eph_pub + mlkem_ct |
| 404 | kek = HKDF( |
| 405 | algorithm=hashes.SHA256(), length=32, salt=None, |
| 406 | info=b"oversight-hybrid-v1-dek-wrap", |
| 407 | ).derive(ikm) |
| 408 | |
| 409 | nonce, wrapped = aead_encrypt(kek, dek, aad=b"oversight-hybrid-dek") |
| 410 | return { |
| 411 | "suite": "OSGT-HYBRID-v1", |
| 412 | "x25519_ephemeral_pub": eph_pub.hex(), |
| 413 | "mlkem_ciphertext": mlkem_ct.hex(), |
| 414 | "nonce": nonce.hex(), |
| 415 | "wrapped_dek": wrapped.hex(), |
| 416 | } |
| 417 | |
| 418 | |
| 419 | def hybrid_unwrap_dek(wrapped: dict, x25519_priv: bytes, mlkem_priv: bytes) -> bytes: |
| 420 | """Recover DEK from a hybrid-wrapped envelope.""" |
| 421 | if not PQ_AVAILABLE: |
| 422 | raise RuntimeError("liboqs not available - cannot unwrap hybrid") |
| 423 | for required in ("x25519_ephemeral_pub", "mlkem_ciphertext", "nonce", "wrapped_dek"): |
| 424 | if required not in wrapped: |
| 425 | raise ValueError(f"hybrid envelope missing field: {required}") |
| 426 | |
| 427 | eph_pub_bytes = bytes.fromhex(wrapped["x25519_ephemeral_pub"]) |
| 428 | mlkem_ct = bytes.fromhex(wrapped["mlkem_ciphertext"]) |
| 429 | |
| 430 | sk_x = X25519PrivateKey.from_private_bytes(x25519_priv) |
| 431 | eph_pub = X25519PublicKey.from_public_bytes(eph_pub_bytes) |
| 432 | ss_x = sk_x.exchange(eph_pub) |
| 433 | ss_pq = pq_kem_decap(mlkem_priv, mlkem_ct) |
| 434 | |
| 435 | ikm = ss_x + ss_pq + eph_pub_bytes + mlkem_ct |
| 436 | kek = HKDF( |
| 437 | algorithm=hashes.SHA256(), length=32, salt=None, |
| 438 | info=b"oversight-hybrid-v1-dek-wrap", |
| 439 | ).derive(ikm) |
| 440 | |
| 441 | return aead_decrypt( |
| 442 | kek, |
| 443 | bytes.fromhex(wrapped["nonce"]), |
| 444 | bytes.fromhex(wrapped["wrapped_dek"]), |
| 445 | aad=b"oversight-hybrid-dek", |
| 446 | ) |
| 447 | |
| 448 | |
| 449 | |
| 450 | def random_dek() -> bytes: |
| 451 | return secrets.token_bytes(XCHACHA_KEY_LEN) |
| 452 | |
| 453 | |
| 454 | def content_hash(data: bytes) -> str: |
| 455 | digest = hashes.Hash(hashes.SHA256()) |
| 456 | digest.update(data) |
| 457 | return digest.finalize().hex() |