| 1 | """Cross-language ML-KEM-768 hybrid KEM conformance: Python <-> Rust. |
| 2 | |
| 3 | Proves the OSGT-HYBRID-v1 DEK-wrap construction is byte-identical across the |
| 4 | Python reference (`oversight_core.crypto.hybrid_wrap_dek`/`hybrid_unwrap_dek`) |
| 5 | and the Rust port (`oversight_crypto::hybrid_wrap_dek`/`hybrid_unwrap_dek`) in |
| 6 | both directions: |
| 7 | |
| 8 | [1] Rust recipient -> Python wraps -> Rust unwraps |
| 9 | [2] Python recipient -> Rust wraps -> Python unwraps |
| 10 | |
| 11 | Only ML-KEM *public* keys (1184 bytes) and the X25519 public key cross the |
| 12 | language boundary; each recipient holds its own private key in its native |
| 13 | form (Rust seed / Python liboqs expanded). Requires liboqs + liboqs-python; |
| 14 | SKIPS with a clear message otherwise (CI-safe). |
| 15 | """ |
| 16 | |
| 17 | import json |
| 18 | import os |
| 19 | import subprocess |
| 20 | import sys |
| 21 | |
| 22 | REPO = os.path.join(os.path.dirname(__file__), "..", "..") |
| 23 | sys.path.insert(0, REPO) |
| 24 | |
| 25 | from oversight_core.crypto import PQ_AVAILABLE, hybrid_unwrap_dek, hybrid_wrap_dek |
| 26 | |
| 27 | CARGO = os.path.join(REPO, "oversight-rust", "Cargo.toml") |
| 28 | TARGET = os.environ.get("CARGO_TARGET_DIR", "/root/.cache/oversight-rust-target") |
| 29 | |
| 30 | |
| 31 | def rust(args): |
| 32 | cmd = [ |
| 33 | "cargo", "run", "--manifest-path", CARGO, "--release", "-q", |
| 34 | "-p", "oversight-crypto", "--example", "hybrid_kem_cli", "--", |
| 35 | ] + args |
| 36 | env = dict(os.environ, CARGO_TARGET_DIR=TARGET) |
| 37 | proc = subprocess.run(cmd, capture_output=True, text=True, env=env) |
| 38 | if proc.returncode != 0: |
| 39 | raise RuntimeError(f"rust helper failed: {proc.stderr}") |
| 40 | return proc.stdout.strip() |
| 41 | |
| 42 | |
| 43 | def x25519_keypair(): |
| 44 | from cryptography.hazmat.primitives.serialization import ( |
| 45 | Encoding, NoEncryption, PrivateFormat, PublicFormat, |
| 46 | ) |
| 47 | from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey |
| 48 | |
| 49 | sk = X25519PrivateKey.generate() |
| 50 | pub = sk.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw) |
| 51 | priv = sk.private_bytes(Encoding.Raw, PrivateFormat.Raw, NoEncryption()) |
| 52 | return pub, priv |
| 53 | |
| 54 | |
| 55 | def mlkem_keypair(): |
| 56 | import oqs |
| 57 | |
| 58 | kem = oqs.KeyEncapsulation("ML-KEM-768") |
| 59 | pub = kem.generate_keypair() |
| 60 | priv = kem.export_secret_key() |
| 61 | return pub, priv |
| 62 | |
| 63 | |
| 64 | def main(): |
| 65 | if not PQ_AVAILABLE: |
| 66 | print("SKIP cross-language hybrid KEM: liboqs-python not available") |
| 67 | return 0 |
| 68 | |
| 69 | dek = os.urandom(32) |
| 70 | results = [] |
| 71 | |
| 72 | recv = json.loads(rust(["keygen"])) |
| 73 | env = hybrid_wrap_dek( |
| 74 | dek, bytes.fromhex(recv["x_pub"]), bytes.fromhex(recv["mlkem_pub"]) |
| 75 | ) |
| 76 | env_path = "/tmp/_oversight_hybrid_env1.json" |
| 77 | with open(env_path, "w") as f: |
| 78 | json.dump(env, f) |
| 79 | dek_rs = bytes.fromhex(rust(["unwrap", env_path, recv["x_priv"], recv["mlkem_seed"]])) |
| 80 | ok1 = dek_rs == dek |
| 81 | results.append(ok1) |
| 82 | print(f"[1] PY wrap -> RS unwrap: {'PASS' if ok1 else 'FAIL'}") |
| 83 | |
| 84 | x_pub, x_priv = x25519_keypair() |
| 85 | mlkem_pub, mlkem_priv = mlkem_keypair() |
| 86 | env2 = json.loads(rust(["wrap", x_pub.hex(), mlkem_pub.hex(), dek.hex()])) |
| 87 | dek_py = hybrid_unwrap_dek(env2, x_priv, mlkem_priv) |
| 88 | ok2 = dek_py == dek |
| 89 | results.append(ok2) |
| 90 | print(f"[2] RS wrap -> PY unwrap: {'PASS' if ok2 else 'FAIL'}") |
| 91 | |
| 92 | if all(results): |
| 93 | print("CROSS-LANGUAGE HYBRID KEM: ALL PASS") |
| 94 | return 0 |
| 95 | print("CROSS-LANGUAGE HYBRID KEM: FAIL") |
| 96 | return 1 |
| 97 | |
| 98 | |
| 99 | if __name__ == "__main__": |
| 100 | sys.exit(main()) |