| 1 | """Generate a hybrid (OSGT-HYBRID-v1) .sealed sample + matching identity JSON. |
| 2 | |
| 3 | Self-contained: depends on `cryptography` and `oqs` (liboqs-python). Mirrors the |
| 4 | binary container format from oversight_core/container.py and the hybrid wrap |
| 5 | construction from oversight_core/crypto.py:hybrid_wrap_dek, so the produced |
| 6 | sample is byte-compatible with the production reference implementation. |
| 7 | |
| 8 | Usage (from any host where `oqs` is installed): |
| 9 | python3 gen_hybrid_sample.py --out-dir ./out |
| 10 | |
| 11 | Outputs: |
| 12 | out/tutorial-hybrid.sealed - viewer test fixture |
| 13 | out/tutorial-hybrid-identity.json - recipient X25519 + ML-KEM-768 priv/pub |
| 14 | |
| 15 | The identity is a public test key, NEVER use for real content. |
| 16 | """ |
| 17 | from __future__ import annotations |
| 18 | |
| 19 | import argparse |
| 20 | import hashlib |
| 21 | import json |
| 22 | import os |
| 23 | import struct |
| 24 | import sys |
| 25 | from pathlib import Path |
| 26 | |
| 27 | import oqs |
| 28 | from cryptography.hazmat.primitives import hashes, serialization |
| 29 | from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey |
| 30 | from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey, X25519PublicKey |
| 31 | from cryptography.hazmat.primitives.kdf.hkdf import HKDF |
| 32 | from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305 |
| 33 | |
| 34 | MAGIC = b"OSGT\x01\x00" |
| 35 | FORMAT_VERSION = 1 |
| 36 | SUITE_HYBRID_V1_ID = 2 |
| 37 | SUITE_HYBRID_V1 = "OSGT-HYBRID-v1" |
| 38 | |
| 39 | |
| 40 | |
| 41 | def _hchacha20(key: bytes, nonce16: bytes) -> bytes: |
| 42 | assert len(key) == 32 and len(nonce16) == 16 |
| 43 | state = bytearray(64) |
| 44 | state[0:4] = b"expa"; state[4:8] = b"nd 3"; state[8:12] = b"2-by"; state[12:16] = b"te k" |
| 45 | state[16:48] = key |
| 46 | state[48:64] = nonce16 |
| 47 | s = list(struct.unpack("<16I", bytes(state))) |
| 48 | |
| 49 | def rotl(v, n): return ((v << n) & 0xFFFFFFFF) | (v >> (32 - n)) |
| 50 | def qr(a, b, c, d): |
| 51 | s[a] = (s[a] + s[b]) & 0xFFFFFFFF; s[d] = rotl(s[d] ^ s[a], 16) |
| 52 | s[c] = (s[c] + s[d]) & 0xFFFFFFFF; s[b] = rotl(s[b] ^ s[c], 12) |
| 53 | s[a] = (s[a] + s[b]) & 0xFFFFFFFF; s[d] = rotl(s[d] ^ s[a], 8) |
| 54 | s[c] = (s[c] + s[d]) & 0xFFFFFFFF; s[b] = rotl(s[b] ^ s[c], 7) |
| 55 | |
| 56 | for _ in range(10): |
| 57 | qr(0, 4, 8, 12); qr(1, 5, 9, 13); qr(2, 6, 10, 14); qr(3, 7, 11, 15) |
| 58 | qr(0, 5, 10, 15); qr(1, 6, 11, 12); qr(2, 7, 8, 13); qr(3, 4, 9, 14) |
| 59 | return struct.pack("<8I", s[0], s[1], s[2], s[3], s[12], s[13], s[14], s[15]) |
| 60 | |
| 61 | |
| 62 | def xchacha20poly1305_encrypt(key: bytes, nonce24: bytes, plaintext: bytes, aad: bytes) -> bytes: |
| 63 | if len(key) != 32 or len(nonce24) != 24: |
| 64 | raise ValueError("xchacha20poly1305 requires 32-byte key and 24-byte nonce") |
| 65 | subkey = _hchacha20(key, nonce24[:16]) |
| 66 | nonce12 = b"\x00\x00\x00\x00" + nonce24[16:24] |
| 67 | return ChaCha20Poly1305(subkey).encrypt(nonce12, plaintext, aad) |
| 68 | |
| 69 | |
| 70 | def canonical_bytes(obj: dict) -> bytes: |
| 71 | return json.dumps(obj, sort_keys=True, separators=(",", ":"), ensure_ascii=False).encode("utf-8") |
| 72 | |
| 73 | |
| 74 | def strip_none(obj): |
| 75 | if isinstance(obj, dict): |
| 76 | return {k: strip_none(v) for k, v in obj.items() if v is not None} |
| 77 | if isinstance(obj, list): |
| 78 | return [strip_none(v) for v in obj if v is not None] |
| 79 | return obj |
| 80 | |
| 81 | |
| 82 | def hybrid_wrap_dek(dek: bytes, x25519_pub: bytes, mlkem_pub: bytes) -> tuple[dict, bytes, bytes]: |
| 83 | eph = X25519PrivateKey.generate() |
| 84 | eph_pub = eph.public_key().public_bytes( |
| 85 | encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw |
| 86 | ) |
| 87 | peer_x = X25519PublicKey.from_public_bytes(x25519_pub) |
| 88 | ss_x = eph.exchange(peer_x) |
| 89 | |
| 90 | with oqs.KeyEncapsulation("ML-KEM-768") as kem: |
| 91 | mlkem_ct, ss_pq = kem.encap_secret(mlkem_pub) |
| 92 | |
| 93 | ikm = ss_x + ss_pq + eph_pub + mlkem_ct |
| 94 | kek = HKDF( |
| 95 | algorithm=hashes.SHA256(), length=32, salt=None, |
| 96 | info=b"oversight-hybrid-v1-dek-wrap", |
| 97 | ).derive(ikm) |
| 98 | |
| 99 | nonce = os.urandom(24) |
| 100 | wrapped = xchacha20poly1305_encrypt(kek, nonce, dek, aad=b"oversight-hybrid-dek") |
| 101 | return ({ |
| 102 | "suite": SUITE_HYBRID_V1, |
| 103 | "x25519_ephemeral_pub": eph_pub.hex(), |
| 104 | "mlkem_ciphertext": mlkem_ct.hex(), |
| 105 | "nonce": nonce.hex(), |
| 106 | "wrapped_dek": wrapped.hex(), |
| 107 | }, mlkem_ct, eph_pub) |
| 108 | |
| 109 | |
| 110 | def main() -> int: |
| 111 | p = argparse.ArgumentParser() |
| 112 | p.add_argument("--out-dir", required=True, type=Path) |
| 113 | p.add_argument("--message", default="hello hybrid post-quantum oversight\n") |
| 114 | args = p.parse_args() |
| 115 | args.out_dir.mkdir(parents=True, exist_ok=True) |
| 116 | |
| 117 | rx_priv = X25519PrivateKey.generate() |
| 118 | rx_priv_bytes = rx_priv.private_bytes( |
| 119 | encoding=serialization.Encoding.Raw, |
| 120 | format=serialization.PrivateFormat.Raw, |
| 121 | encryption_algorithm=serialization.NoEncryption(), |
| 122 | ) |
| 123 | rx_pub_bytes = rx_priv.public_key().public_bytes( |
| 124 | encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw |
| 125 | ) |
| 126 | with oqs.KeyEncapsulation("ML-KEM-768") as kem: |
| 127 | mlkem_pub = kem.generate_keypair() |
| 128 | mlkem_priv = kem.export_secret_key() |
| 129 | |
| 130 | issuer = Ed25519PrivateKey.generate() |
| 131 | issuer_pub_bytes = issuer.public_key().public_bytes( |
| 132 | encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw |
| 133 | ) |
| 134 | |
| 135 | plaintext = args.message.encode("utf-8") |
| 136 | content_hash = hashlib.sha256(plaintext).hexdigest() |
| 137 | canonical_content_hash = content_hash |
| 138 | dek = os.urandom(32) |
| 139 | |
| 140 | aead_nonce = os.urandom(24) |
| 141 | ciphertext = xchacha20poly1305_encrypt( |
| 142 | dek, aead_nonce, plaintext, aad=content_hash.encode("ascii") |
| 143 | ) |
| 144 | |
| 145 | wrapped_dek, _mlkem_ct, _eph_pub = hybrid_wrap_dek(dek, rx_pub_bytes, mlkem_pub) |
| 146 | |
| 147 | manifest = { |
| 148 | "suite": SUITE_HYBRID_V1, |
| 149 | "format": "oversight/v1", |
| 150 | "issuer_id": "tutorial-hybrid@oversightprotocol.dev", |
| 151 | "issuer_ed25519_pub": issuer_pub_bytes.hex(), |
| 152 | "issuer_ml_dsa_pub": "", |
| 153 | "recipient": { |
| 154 | "id": "tutorial@oversightprotocol.dev", |
| 155 | "x25519_pub": rx_pub_bytes.hex(), |
| 156 | "mlkem_pub": mlkem_pub.hex(), |
| 157 | }, |
| 158 | "content_type": "text/plain", |
| 159 | "content_hash": content_hash, |
| 160 | "canonical_content_hash": canonical_content_hash, |
| 161 | "l3_policy": {"enabled": False, "mode": "off"}, |
| 162 | "filename": "hello-hybrid.txt", |
| 163 | "signature_ed25519": "", |
| 164 | "signature_ml_dsa": "", |
| 165 | } |
| 166 | |
| 167 | manifest_for_sign = strip_none(manifest) |
| 168 | manifest_for_sign["signature_ed25519"] = "" |
| 169 | manifest_for_sign["signature_ml_dsa"] = "" |
| 170 | sig_bytes = issuer.sign(canonical_bytes(manifest_for_sign)) |
| 171 | manifest["signature_ed25519"] = sig_bytes.hex() |
| 172 | |
| 173 | manifest_serialized = canonical_bytes(strip_none(manifest)) |
| 174 | wrapped_dek_serialized = canonical_bytes(wrapped_dek) |
| 175 | |
| 176 | container = bytearray() |
| 177 | container.extend(MAGIC) |
| 178 | container.extend(bytes([FORMAT_VERSION, SUITE_HYBRID_V1_ID])) |
| 179 | container.extend(struct.pack(">I", len(manifest_serialized))) |
| 180 | container.extend(manifest_serialized) |
| 181 | container.extend(struct.pack(">I", len(wrapped_dek_serialized))) |
| 182 | container.extend(wrapped_dek_serialized) |
| 183 | container.extend(aead_nonce) |
| 184 | container.extend(struct.pack(">I", len(ciphertext))) |
| 185 | container.extend(ciphertext) |
| 186 | |
| 187 | sealed_path = args.out_dir / "tutorial-hybrid.sealed" |
| 188 | identity_path = args.out_dir / "tutorial-hybrid-identity.json" |
| 189 | |
| 190 | sealed_path.write_bytes(bytes(container)) |
| 191 | identity = { |
| 192 | "recipient_id": "tutorial@oversightprotocol.dev", |
| 193 | "x25519_priv": rx_priv_bytes.hex(), |
| 194 | "x25519_pub": rx_pub_bytes.hex(), |
| 195 | "mlkem_priv": mlkem_priv.hex(), |
| 196 | "mlkem_pub": mlkem_pub.hex(), |
| 197 | "ed25519_priv": "public-tutorial-key-does-not-sign", |
| 198 | "ed25519_pub": "public-tutorial-key-does-not-sign", |
| 199 | "_note": "PUBLIC TUTORIAL KEY. Demo-only. Do not use for real content.", |
| 200 | } |
| 201 | identity_path.write_text(json.dumps(identity, indent=2)) |
| 202 | |
| 203 | print(f"[+] wrote {sealed_path} ({sealed_path.stat().st_size} bytes)") |
| 204 | print(f"[+] wrote {identity_path}") |
| 205 | print(f" plaintext SHA-256: {content_hash}") |
| 206 | print(f" suite: {SUITE_HYBRID_V1}") |
| 207 | return 0 |
| 208 | |
| 209 | |
| 210 | if __name__ == "__main__": |
| 211 | sys.exit(main()) |