Zion Boggan
repos/Oversight/tools/gen_hybrid_sample.py
zionboggan.com ↗
211 lines · python
History for this file →
1
"""Generate a hybrid (OSGT-HYBRID-v1) .sealed sample + matching identity JSON.
2
 
3
Self-contained: depends on `cryptography` and `oqs` (liboqs-python). Mirrors the
4
binary container format from oversight_core/container.py and the hybrid wrap
5
construction from oversight_core/crypto.py:hybrid_wrap_dek, so the produced
6
sample is byte-compatible with the production reference implementation.
7
 
8
Usage (from any host where `oqs` is installed):
9
    python3 gen_hybrid_sample.py --out-dir ./out
10
 
11
Outputs:
12
    out/tutorial-hybrid.sealed          - viewer test fixture
13
    out/tutorial-hybrid-identity.json   - recipient X25519 + ML-KEM-768 priv/pub
14
 
15
The identity is a public test key, NEVER use for real content.
16
"""
17
from __future__ import annotations
18
 
19
import argparse
20
import hashlib
21
import json
22
import os
23
import struct
24
import sys
25
from pathlib import Path
26
 
27
import oqs
28
from cryptography.hazmat.primitives import hashes, serialization
29
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
30
from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey, X25519PublicKey
31
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
32
from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305
33
 
34
MAGIC = b"OSGT\x01\x00"
35
FORMAT_VERSION = 1
36
SUITE_HYBRID_V1_ID = 2
37
SUITE_HYBRID_V1 = "OSGT-HYBRID-v1"
38
 
39
 
40
 
41
def _hchacha20(key: bytes, nonce16: bytes) -> bytes:
42
    assert len(key) == 32 and len(nonce16) == 16
43
    state = bytearray(64)
44
    state[0:4]  = b"expa"; state[4:8]  = b"nd 3"; state[8:12]  = b"2-by"; state[12:16] = b"te k"
45
    state[16:48] = key
46
    state[48:64] = nonce16
47
    s = list(struct.unpack("<16I", bytes(state)))
48
 
49
    def rotl(v, n): return ((v << n) & 0xFFFFFFFF) | (v >> (32 - n))
50
    def qr(a, b, c, d):
51
        s[a] = (s[a] + s[b]) & 0xFFFFFFFF; s[d] = rotl(s[d] ^ s[a], 16)
52
        s[c] = (s[c] + s[d]) & 0xFFFFFFFF; s[b] = rotl(s[b] ^ s[c], 12)
53
        s[a] = (s[a] + s[b]) & 0xFFFFFFFF; s[d] = rotl(s[d] ^ s[a], 8)
54
        s[c] = (s[c] + s[d]) & 0xFFFFFFFF; s[b] = rotl(s[b] ^ s[c], 7)
55
 
56
    for _ in range(10):
57
        qr(0, 4,  8, 12); qr(1, 5,  9, 13); qr(2, 6, 10, 14); qr(3, 7, 11, 15)
58
        qr(0, 5, 10, 15); qr(1, 6, 11, 12); qr(2, 7,  8, 13); qr(3, 4,  9, 14)
59
    return struct.pack("<8I", s[0], s[1], s[2], s[3], s[12], s[13], s[14], s[15])
60
 
61
 
62
def xchacha20poly1305_encrypt(key: bytes, nonce24: bytes, plaintext: bytes, aad: bytes) -> bytes:
63
    if len(key) != 32 or len(nonce24) != 24:
64
        raise ValueError("xchacha20poly1305 requires 32-byte key and 24-byte nonce")
65
    subkey = _hchacha20(key, nonce24[:16])
66
    nonce12 = b"\x00\x00\x00\x00" + nonce24[16:24]
67
    return ChaCha20Poly1305(subkey).encrypt(nonce12, plaintext, aad)
68
 
69
 
70
def canonical_bytes(obj: dict) -> bytes:
71
    return json.dumps(obj, sort_keys=True, separators=(",", ":"), ensure_ascii=False).encode("utf-8")
72
 
73
 
74
def strip_none(obj):
75
    if isinstance(obj, dict):
76
        return {k: strip_none(v) for k, v in obj.items() if v is not None}
77
    if isinstance(obj, list):
78
        return [strip_none(v) for v in obj if v is not None]
79
    return obj
80
 
81
 
82
def hybrid_wrap_dek(dek: bytes, x25519_pub: bytes, mlkem_pub: bytes) -> tuple[dict, bytes, bytes]:
83
    eph = X25519PrivateKey.generate()
84
    eph_pub = eph.public_key().public_bytes(
85
        encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw
86
    )
87
    peer_x = X25519PublicKey.from_public_bytes(x25519_pub)
88
    ss_x = eph.exchange(peer_x)
89
 
90
    with oqs.KeyEncapsulation("ML-KEM-768") as kem:
91
        mlkem_ct, ss_pq = kem.encap_secret(mlkem_pub)
92
 
93
    ikm = ss_x + ss_pq + eph_pub + mlkem_ct
94
    kek = HKDF(
95
        algorithm=hashes.SHA256(), length=32, salt=None,
96
        info=b"oversight-hybrid-v1-dek-wrap",
97
    ).derive(ikm)
98
 
99
    nonce = os.urandom(24)
100
    wrapped = xchacha20poly1305_encrypt(kek, nonce, dek, aad=b"oversight-hybrid-dek")
101
    return ({
102
        "suite": SUITE_HYBRID_V1,
103
        "x25519_ephemeral_pub": eph_pub.hex(),
104
        "mlkem_ciphertext": mlkem_ct.hex(),
105
        "nonce": nonce.hex(),
106
        "wrapped_dek": wrapped.hex(),
107
    }, mlkem_ct, eph_pub)
108
 
109
 
110
def main() -> int:
111
    p = argparse.ArgumentParser()
112
    p.add_argument("--out-dir", required=True, type=Path)
113
    p.add_argument("--message", default="hello hybrid post-quantum oversight\n")
114
    args = p.parse_args()
115
    args.out_dir.mkdir(parents=True, exist_ok=True)
116
 
117
    rx_priv = X25519PrivateKey.generate()
118
    rx_priv_bytes = rx_priv.private_bytes(
119
        encoding=serialization.Encoding.Raw,
120
        format=serialization.PrivateFormat.Raw,
121
        encryption_algorithm=serialization.NoEncryption(),
122
    )
123
    rx_pub_bytes = rx_priv.public_key().public_bytes(
124
        encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw
125
    )
126
    with oqs.KeyEncapsulation("ML-KEM-768") as kem:
127
        mlkem_pub = kem.generate_keypair()
128
        mlkem_priv = kem.export_secret_key()
129
 
130
    issuer = Ed25519PrivateKey.generate()
131
    issuer_pub_bytes = issuer.public_key().public_bytes(
132
        encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw
133
    )
134
 
135
    plaintext = args.message.encode("utf-8")
136
    content_hash = hashlib.sha256(plaintext).hexdigest()
137
    canonical_content_hash = content_hash
138
    dek = os.urandom(32)
139
 
140
    aead_nonce = os.urandom(24)
141
    ciphertext = xchacha20poly1305_encrypt(
142
        dek, aead_nonce, plaintext, aad=content_hash.encode("ascii")
143
    )
144
 
145
    wrapped_dek, _mlkem_ct, _eph_pub = hybrid_wrap_dek(dek, rx_pub_bytes, mlkem_pub)
146
 
147
    manifest = {
148
        "suite": SUITE_HYBRID_V1,
149
        "format": "oversight/v1",
150
        "issuer_id": "tutorial-hybrid@oversightprotocol.dev",
151
        "issuer_ed25519_pub": issuer_pub_bytes.hex(),
152
        "issuer_ml_dsa_pub": "",
153
        "recipient": {
154
            "id": "tutorial@oversightprotocol.dev",
155
            "x25519_pub": rx_pub_bytes.hex(),
156
            "mlkem_pub": mlkem_pub.hex(),
157
        },
158
        "content_type": "text/plain",
159
        "content_hash": content_hash,
160
        "canonical_content_hash": canonical_content_hash,
161
        "l3_policy": {"enabled": False, "mode": "off"},
162
        "filename": "hello-hybrid.txt",
163
        "signature_ed25519": "",
164
        "signature_ml_dsa": "",
165
    }
166
 
167
    manifest_for_sign = strip_none(manifest)
168
    manifest_for_sign["signature_ed25519"] = ""
169
    manifest_for_sign["signature_ml_dsa"] = ""
170
    sig_bytes = issuer.sign(canonical_bytes(manifest_for_sign))
171
    manifest["signature_ed25519"] = sig_bytes.hex()
172
 
173
    manifest_serialized = canonical_bytes(strip_none(manifest))
174
    wrapped_dek_serialized = canonical_bytes(wrapped_dek)
175
 
176
    container = bytearray()
177
    container.extend(MAGIC)
178
    container.extend(bytes([FORMAT_VERSION, SUITE_HYBRID_V1_ID]))
179
    container.extend(struct.pack(">I", len(manifest_serialized)))
180
    container.extend(manifest_serialized)
181
    container.extend(struct.pack(">I", len(wrapped_dek_serialized)))
182
    container.extend(wrapped_dek_serialized)
183
    container.extend(aead_nonce)
184
    container.extend(struct.pack(">I", len(ciphertext)))
185
    container.extend(ciphertext)
186
 
187
    sealed_path = args.out_dir / "tutorial-hybrid.sealed"
188
    identity_path = args.out_dir / "tutorial-hybrid-identity.json"
189
 
190
    sealed_path.write_bytes(bytes(container))
191
    identity = {
192
        "recipient_id": "tutorial@oversightprotocol.dev",
193
        "x25519_priv": rx_priv_bytes.hex(),
194
        "x25519_pub": rx_pub_bytes.hex(),
195
        "mlkem_priv": mlkem_priv.hex(),
196
        "mlkem_pub": mlkem_pub.hex(),
197
        "ed25519_priv": "public-tutorial-key-does-not-sign",
198
        "ed25519_pub": "public-tutorial-key-does-not-sign",
199
        "_note": "PUBLIC TUTORIAL KEY. Demo-only. Do not use for real content.",
200
    }
201
    identity_path.write_text(json.dumps(identity, indent=2))
202
 
203
    print(f"[+] wrote {sealed_path} ({sealed_path.stat().st_size} bytes)")
204
    print(f"[+] wrote {identity_path}")
205
    print(f"    plaintext SHA-256: {content_hash}")
206
    print(f"    suite: {SUITE_HYBRID_V1}")
207
    return 0
208
 
209
 
210
if __name__ == "__main__":
211
    sys.exit(main())