Zion Boggan
repos/Oversight/oversight_core/crypto.py
zionboggan.com ↗
457 lines · python
History for this file →
1
"""
2
oversight_core.crypto
3
====================
4
 
5
Vetted primitives only. NO custom crypto.
6
 
7
Classical (ships today):
8
  - X25519 for key agreement
9
  - Ed25519 for signatures
10
  - XChaCha20-Poly1305 for AEAD
11
  - BLAKE2b for hashing / MAC
12
  - HKDF for key derivation
13
  - Argon2id for password-based KDF (via libsodium)
14
 
15
Post-quantum hooks (design-ready; enable via `use_pq=True` once liboqs is linked):
16
  - ML-KEM-768 for key encapsulation (hybrid with X25519)
17
  - ML-DSA-65 for signatures (hybrid with Ed25519)
18
 
19
The container format is crypto-agile: the algorithm suite is declared in the header,
20
so we can roll forward to full PQ without breaking existing sealed files.
21
"""
22
 
23
from __future__ import annotations
24
 
25
import os
26
import secrets
27
from dataclasses import dataclass
28
from typing import Optional
29
 
30
from cryptography.hazmat.primitives.asymmetric.ed25519 import (
31
    Ed25519PrivateKey,
32
    Ed25519PublicKey,
33
)
34
from cryptography.hazmat.primitives.asymmetric.x25519 import (
35
    X25519PrivateKey,
36
    X25519PublicKey,
37
)
38
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
39
from cryptography.hazmat.primitives import hashes, serialization
40
from nacl.bindings import (
41
    crypto_aead_xchacha20poly1305_ietf_encrypt,
42
    crypto_aead_xchacha20poly1305_ietf_decrypt,
43
    crypto_aead_xchacha20poly1305_ietf_NPUBBYTES,
44
    crypto_aead_xchacha20poly1305_ietf_KEYBYTES,
45
)
46
 
47
try:
48
    import contextlib
49
    import os as _os
50
 
51
    with open(_os.devnull, "w") as _devnull:
52
        with contextlib.redirect_stdout(_devnull):
53
            import oqs
54
 
55
    PQ_AVAILABLE = True
56
except Exception:
57
    PQ_AVAILABLE = False
58
 
59
 
60
 
61
SUITE_CLASSIC_V1 = "OSGT-CLASSIC-v1"
62
SUITE_HYBRID_V1 = "OSGT-HYBRID-v1"
63
SUITE_HW_P256_V1 = "OSGT-HW-P256-v1"
64
 
65
P256_PUBLIC_KEY_LEN = 65
66
 
67
XCHACHA_NONCE_LEN = crypto_aead_xchacha20poly1305_ietf_NPUBBYTES
68
XCHACHA_KEY_LEN = crypto_aead_xchacha20poly1305_ietf_KEYBYTES
69
 
70
 
71
 
72
@dataclass
73
class ClassicIdentity:
74
    """Recipient / issuer identity: X25519 (encryption) + Ed25519 (signing)."""
75
    x25519_priv: bytes
76
    x25519_pub: bytes
77
    ed25519_priv: bytes
78
    ed25519_pub: bytes
79
 
80
    @classmethod
81
    def generate(cls) -> "ClassicIdentity":
82
        xsk = X25519PrivateKey.generate()
83
        esk = Ed25519PrivateKey.generate()
84
        return cls(
85
            x25519_priv=xsk.private_bytes(
86
                encoding=serialization.Encoding.Raw,
87
                format=serialization.PrivateFormat.Raw,
88
                encryption_algorithm=serialization.NoEncryption(),
89
            ),
90
            x25519_pub=xsk.public_key().public_bytes(
91
                encoding=serialization.Encoding.Raw,
92
                format=serialization.PublicFormat.Raw,
93
            ),
94
            ed25519_priv=esk.private_bytes(
95
                encoding=serialization.Encoding.Raw,
96
                format=serialization.PrivateFormat.Raw,
97
                encryption_algorithm=serialization.NoEncryption(),
98
            ),
99
            ed25519_pub=esk.public_key().public_bytes(
100
                encoding=serialization.Encoding.Raw,
101
                format=serialization.PublicFormat.Raw,
102
            ),
103
        )
104
 
105
    def public_bundle(self) -> dict:
106
        return {
107
            "x25519_pub": self.x25519_pub.hex(),
108
            "ed25519_pub": self.ed25519_pub.hex(),
109
        }
110
 
111
 
112
 
113
def aead_encrypt(key: bytes, plaintext: bytes, aad: bytes = b"") -> tuple[bytes, bytes]:
114
    """
115
    XChaCha20-Poly1305. Returns (nonce, ciphertext_with_tag).
116
    24-byte nonce = safe to random-generate without coordination.
117
    """
118
    assert len(key) == XCHACHA_KEY_LEN, "XChaCha key must be 32 bytes"
119
    nonce = secrets.token_bytes(XCHACHA_NONCE_LEN)
120
    ct = crypto_aead_xchacha20poly1305_ietf_encrypt(plaintext, aad, nonce, key)
121
    return nonce, ct
122
 
123
 
124
def aead_decrypt(key: bytes, nonce: bytes, ciphertext: bytes, aad: bytes = b"") -> bytes:
125
    return crypto_aead_xchacha20poly1305_ietf_decrypt(ciphertext, aad, nonce, key)
126
 
127
 
128
 
129
def wrap_dek_for_recipient(
130
    dek: bytes,
131
    recipient_x25519_pub: bytes,
132
    ephemeral_priv: Optional[X25519PrivateKey] = None,
133
) -> dict:
134
    """
135
    Encrypt a Data Encryption Key (DEK) for a single recipient using ECIES-style
136
    X25519 key agreement + HKDF-SHA256 + XChaCha20-Poly1305.
137
 
138
    Returns a dict with: ephemeral_pub, nonce, wrapped_dek (all hex).
139
    """
140
    eph = ephemeral_priv or X25519PrivateKey.generate()
141
    peer = X25519PublicKey.from_public_bytes(recipient_x25519_pub)
142
    shared = eph.exchange(peer)
143
 
144
    kek = HKDF(
145
        algorithm=hashes.SHA256(),
146
        length=32,
147
        salt=None,
148
        info=b"oversight-v1-dek-wrap",
149
    ).derive(shared)
150
 
151
    nonce, wrapped = aead_encrypt(kek, dek, aad=b"oversight-dek")
152
    eph_pub = eph.public_key().public_bytes(
153
        encoding=serialization.Encoding.Raw,
154
        format=serialization.PublicFormat.Raw,
155
    )
156
    return {
157
        "ephemeral_pub": eph_pub.hex(),
158
        "nonce": nonce.hex(),
159
        "wrapped_dek": wrapped.hex(),
160
    }
161
 
162
 
163
def unwrap_dek(wrapped: dict, recipient_x25519_priv: bytes) -> bytes:
164
    """Recover the DEK using the recipient's X25519 private key."""
165
    sk = X25519PrivateKey.from_private_bytes(recipient_x25519_priv)
166
    eph_pub = X25519PublicKey.from_public_bytes(bytes.fromhex(wrapped["ephemeral_pub"]))
167
    shared = sk.exchange(eph_pub)
168
 
169
    kek = HKDF(
170
        algorithm=hashes.SHA256(),
171
        length=32,
172
        salt=None,
173
        info=b"oversight-v1-dek-wrap",
174
    ).derive(shared)
175
 
176
    return aead_decrypt(
177
        kek,
178
        bytes.fromhex(wrapped["nonce"]),
179
        bytes.fromhex(wrapped["wrapped_dek"]),
180
        aad=b"oversight-dek",
181
    )
182
 
183
 
184
 
185
def wrap_dek_for_recipient_p256(
186
    dek: bytes,
187
    recipient_p256_pub_sec1: bytes,
188
) -> dict:
189
    """
190
    Encrypt a DEK for a P-256 recipient (typically backed by a PIV-compatible
191
    hardware token: YubiKey, Nitrokey, OnlyKey).
192
 
193
    `recipient_p256_pub_sec1` is the recipient's NIST P-256 public key in
194
    SEC1 uncompressed encoding (65 bytes, ``0x04 || X || Y``).
195
 
196
    Mirrors `oversight-rust/oversight-crypto::wrap_dek_for_recipient_p256`
197
    byte-for-byte: same HKDF info ``oversight-hw-p256-v1-dek-wrap``, same
198
    AEAD AAD ``oversight-hw-p256-dek``. Output JSON shape matches
199
    `WrappedDekP256::to_json_hex` so a sealed file produced by either
200
    implementation opens with either implementation.
201
 
202
    Returns a dict with: suite, ephemeral_pub, nonce, wrapped_dek (all hex).
203
    """
204
    if len(recipient_p256_pub_sec1) != P256_PUBLIC_KEY_LEN:
205
        raise ValueError(
206
            f"recipient_p256_pub_sec1 must be {P256_PUBLIC_KEY_LEN} bytes "
207
            f"(SEC1 uncompressed), got {len(recipient_p256_pub_sec1)}"
208
        )
209
 
210
    from cryptography.hazmat.primitives.asymmetric import ec as _ec
211
 
212
    peer = _ec.EllipticCurvePublicKey.from_encoded_point(
213
        _ec.SECP256R1(), recipient_p256_pub_sec1
214
    )
215
 
216
    eph = _ec.generate_private_key(_ec.SECP256R1())
217
    shared = eph.exchange(_ec.ECDH(), peer)
218
 
219
    kek = HKDF(
220
        algorithm=hashes.SHA256(),
221
        length=32,
222
        salt=None,
223
        info=b"oversight-hw-p256-v1-dek-wrap",
224
    ).derive(shared)
225
 
226
    nonce, wrapped = aead_encrypt(kek, dek, aad=b"oversight-hw-p256-dek")
227
 
228
    eph_pub_bytes = eph.public_key().public_bytes(
229
        encoding=serialization.Encoding.X962,
230
        format=serialization.PublicFormat.UncompressedPoint,
231
    )
232
    if len(eph_pub_bytes) != P256_PUBLIC_KEY_LEN:
233
        raise RuntimeError(
234
            f"P-256 ephemeral pub must be {P256_PUBLIC_KEY_LEN} bytes, got {len(eph_pub_bytes)}"
235
        )
236
 
237
    return {
238
        "suite": SUITE_HW_P256_V1,
239
        "ephemeral_pub": eph_pub_bytes.hex(),
240
        "nonce": nonce.hex(),
241
        "wrapped_dek": wrapped.hex(),
242
    }
243
 
244
 
245
def unwrap_dek_p256(wrapped: dict, recipient_p256_priv_pkcs8_or_int) -> bytes:
246
    """
247
    Recover the DEK for an `OSGT-HW-P256-v1` envelope using the recipient's
248
    P-256 private key.
249
 
250
    `recipient_p256_priv_pkcs8_or_int` accepts either:
251
      - an `EllipticCurvePrivateKey` (e.g., loaded from PKCS
252
        in-process for tests), or
253
      - bytes containing a PKCS
254
      - an integer in the range [1, n-1] (for raw scalar import).
255
 
256
    Mirrors `oversight-rust/oversight-crypto::unwrap_dek_with_provider_p256`.
257
    """
258
    from cryptography.hazmat.primitives.asymmetric import ec as _ec
259
 
260
    for required in ("ephemeral_pub", "nonce", "wrapped_dek"):
261
        if required not in wrapped:
262
            raise ValueError(f"hw-p256 envelope missing field: {required}")
263
 
264
    eph_pub_bytes = bytes.fromhex(wrapped["ephemeral_pub"])
265
    if len(eph_pub_bytes) != P256_PUBLIC_KEY_LEN:
266
        raise ValueError(
267
            f"ephemeral_pub must be {P256_PUBLIC_KEY_LEN} bytes "
268
            f"(SEC1 uncompressed), got {len(eph_pub_bytes)}"
269
        )
270
 
271
    if isinstance(recipient_p256_priv_pkcs8_or_int, _ec.EllipticCurvePrivateKey):
272
        sk = recipient_p256_priv_pkcs8_or_int
273
    elif isinstance(recipient_p256_priv_pkcs8_or_int, (bytes, bytearray)):
274
        sk = serialization.load_der_private_key(
275
            bytes(recipient_p256_priv_pkcs8_or_int), password=None
276
        )
277
        if not isinstance(sk, _ec.EllipticCurvePrivateKey):
278
            raise ValueError("PKCS#8 key is not an EllipticCurvePrivateKey")
279
    elif isinstance(recipient_p256_priv_pkcs8_or_int, int):
280
        sk = _ec.derive_private_key(
281
            recipient_p256_priv_pkcs8_or_int, _ec.SECP256R1()
282
        )
283
    else:
284
        raise TypeError(
285
            "recipient private key must be EllipticCurvePrivateKey, PKCS#8 bytes, or int scalar"
286
        )
287
 
288
    eph_pub = _ec.EllipticCurvePublicKey.from_encoded_point(
289
        _ec.SECP256R1(), eph_pub_bytes
290
    )
291
    shared = sk.exchange(_ec.ECDH(), eph_pub)
292
 
293
    kek = HKDF(
294
        algorithm=hashes.SHA256(),
295
        length=32,
296
        salt=None,
297
        info=b"oversight-hw-p256-v1-dek-wrap",
298
    ).derive(shared)
299
 
300
    return aead_decrypt(
301
        kek,
302
        bytes.fromhex(wrapped["nonce"]),
303
        bytes.fromhex(wrapped["wrapped_dek"]),
304
        aad=b"oversight-hw-p256-dek",
305
    )
306
 
307
 
308
 
309
def sign_manifest(manifest_bytes: bytes, ed25519_priv: bytes) -> bytes:
310
    sk = Ed25519PrivateKey.from_private_bytes(ed25519_priv)
311
    return sk.sign(manifest_bytes)
312
 
313
 
314
def verify_manifest(manifest_bytes: bytes, signature: bytes, ed25519_pub: bytes) -> bool:
315
    try:
316
        Ed25519PublicKey.from_public_bytes(ed25519_pub).verify(signature, manifest_bytes)
317
        return True
318
    except Exception:
319
        return False
320
 
321
 
322
 
323
def pq_kem_keypair() -> tuple[bytes, bytes]:
324
    """Generate ML-KEM-768 keypair. Returns (priv, pub)."""
325
    if not PQ_AVAILABLE:
326
        raise RuntimeError("liboqs not available; install liboqs + liboqs-python")
327
    with oqs.KeyEncapsulation("ML-KEM-768") as kem:
328
        pub = kem.generate_keypair()
329
        priv = kem.export_secret_key()
330
        return priv, pub
331
 
332
 
333
def pq_kem_encap(peer_pub: bytes) -> tuple[bytes, bytes]:
334
    """Encapsulate a shared secret to peer_pub. Returns (ciphertext, shared_secret)."""
335
    if not PQ_AVAILABLE:
336
        raise RuntimeError("liboqs not available")
337
    with oqs.KeyEncapsulation("ML-KEM-768") as kem:
338
        ct, ss = kem.encap_secret(peer_pub)
339
        return ct, ss
340
 
341
 
342
def pq_kem_decap(priv: bytes, ct: bytes) -> bytes:
343
    """Recover shared secret from ciphertext using private key."""
344
    if not PQ_AVAILABLE:
345
        raise RuntimeError("liboqs not available")
346
    with oqs.KeyEncapsulation("ML-KEM-768", secret_key=priv) as kem:
347
        return kem.decap_secret(ct)
348
 
349
 
350
def pq_sig_keypair() -> tuple[bytes, bytes]:
351
    """Generate ML-DSA-65 keypair. Returns (priv, pub)."""
352
    if not PQ_AVAILABLE:
353
        raise RuntimeError("liboqs not available")
354
    with oqs.Signature("ML-DSA-65") as sig:
355
        pub = sig.generate_keypair()
356
        priv = sig.export_secret_key()
357
        return priv, pub
358
 
359
 
360
def pq_sign(msg: bytes, priv: bytes) -> bytes:
361
    if not PQ_AVAILABLE:
362
        raise RuntimeError("liboqs not available")
363
    with oqs.Signature("ML-DSA-65", secret_key=priv) as sig:
364
        return sig.sign(msg)
365
 
366
 
367
def pq_verify(msg: bytes, signature: bytes, pub: bytes) -> bool:
368
    """Narrowly catches signature-verification failures; propagates other errors."""
369
    if not PQ_AVAILABLE:
370
        return False
371
    try:
372
        with oqs.Signature("ML-DSA-65") as ver:
373
            return ver.verify(msg, signature, pub)
374
    except (ValueError, RuntimeError):
375
        return False
376
 
377
 
378
def hybrid_wrap_dek(dek: bytes, x25519_pub: bytes, mlkem_pub: bytes) -> dict:
379
    """
380
    Hybrid DEK wrap: combines X25519 and ML-KEM-768 shared secrets via HKDF.
381
    An attacker must break BOTH X25519 AND ML-KEM-768 to recover the KEK.
382
 
383
    KDF input (defense-in-depth; X-wing-style): the HKDF IKM includes both
384
    shared secrets AND both ciphertexts/ephemeral pubs, binding the KEK to
385
    this specific encapsulation. This prevents any future construction where
386
    an attacker could substitute a valid-but-different ciphertext.
387
    """
388
    if not PQ_AVAILABLE:
389
        raise RuntimeError("liboqs not available - cannot wrap hybrid")
390
    if len(x25519_pub) != 32:
391
        raise ValueError(f"x25519_pub must be 32 bytes, got {len(x25519_pub)}")
392
 
393
    eph = X25519PrivateKey.generate()
394
    peer_x = X25519PublicKey.from_public_bytes(x25519_pub)
395
    ss_x = eph.exchange(peer_x)
396
    mlkem_ct, ss_pq = pq_kem_encap(mlkem_pub)
397
 
398
    eph_pub = eph.public_key().public_bytes(
399
        encoding=serialization.Encoding.Raw,
400
        format=serialization.PublicFormat.Raw,
401
    )
402
 
403
    ikm = ss_x + ss_pq + eph_pub + mlkem_ct
404
    kek = HKDF(
405
        algorithm=hashes.SHA256(), length=32, salt=None,
406
        info=b"oversight-hybrid-v1-dek-wrap",
407
    ).derive(ikm)
408
 
409
    nonce, wrapped = aead_encrypt(kek, dek, aad=b"oversight-hybrid-dek")
410
    return {
411
        "suite": "OSGT-HYBRID-v1",
412
        "x25519_ephemeral_pub": eph_pub.hex(),
413
        "mlkem_ciphertext": mlkem_ct.hex(),
414
        "nonce": nonce.hex(),
415
        "wrapped_dek": wrapped.hex(),
416
    }
417
 
418
 
419
def hybrid_unwrap_dek(wrapped: dict, x25519_priv: bytes, mlkem_priv: bytes) -> bytes:
420
    """Recover DEK from a hybrid-wrapped envelope."""
421
    if not PQ_AVAILABLE:
422
        raise RuntimeError("liboqs not available - cannot unwrap hybrid")
423
    for required in ("x25519_ephemeral_pub", "mlkem_ciphertext", "nonce", "wrapped_dek"):
424
        if required not in wrapped:
425
            raise ValueError(f"hybrid envelope missing field: {required}")
426
 
427
    eph_pub_bytes = bytes.fromhex(wrapped["x25519_ephemeral_pub"])
428
    mlkem_ct = bytes.fromhex(wrapped["mlkem_ciphertext"])
429
 
430
    sk_x = X25519PrivateKey.from_private_bytes(x25519_priv)
431
    eph_pub = X25519PublicKey.from_public_bytes(eph_pub_bytes)
432
    ss_x = sk_x.exchange(eph_pub)
433
    ss_pq = pq_kem_decap(mlkem_priv, mlkem_ct)
434
 
435
    ikm = ss_x + ss_pq + eph_pub_bytes + mlkem_ct
436
    kek = HKDF(
437
        algorithm=hashes.SHA256(), length=32, salt=None,
438
        info=b"oversight-hybrid-v1-dek-wrap",
439
    ).derive(ikm)
440
 
441
    return aead_decrypt(
442
        kek,
443
        bytes.fromhex(wrapped["nonce"]),
444
        bytes.fromhex(wrapped["wrapped_dek"]),
445
        aad=b"oversight-hybrid-dek",
446
    )
447
 
448
 
449
 
450
def random_dek() -> bytes:
451
    return secrets.token_bytes(XCHACHA_KEY_LEN)
452
 
453
 
454
def content_hash(data: bytes) -> str:
455
    digest = hashes.Hash(hashes.SHA256())
456
    digest.update(data)
457
    return digest.finalize().hex()