Zion Boggan
repos/Oversight/tests/test_e2e.py
zionboggan.com ↗
181 lines · python
History for this file →
1
"""
2
End-to-end test of the OVERSIGHT MVP.
3
 
4
Exercises:
5
  1. Identity generation (issuer + two recipients)
6
  2. Sealing a text file for recipient Alice with watermarks + beacons
7
  3. Inspecting the sealed file (manifest visible, ciphertext opaque)
8
  4. Alice opens it successfully
9
  5. Bob (wrong key) fails to open it
10
  6. Tampering with the ciphertext is detected
11
  7. Tampering with the manifest is detected
12
  8. Watermark recovery from leaked plaintext identifies Alice
13
"""
14
 
15
import os
16
import sys
17
import tempfile
18
from pathlib import Path
19
 
20
ROOT = Path(__file__).resolve().parent.parent
21
sys.path.insert(0, str(ROOT))
22
 
23
from oversight_core import (
24
    ClassicIdentity,
25
    Manifest,
26
    Recipient,
27
    WatermarkRef,
28
    content_hash,
29
    seal,
30
    open_sealed,
31
    beacon,
32
    watermark,
33
)
34
from oversight_core.container import SealedFile
35
 
36
 
37
def banner(msg):
38
    print(f"\n{'=' * 60}\n  {msg}\n{'=' * 60}")
39
 
40
 
41
def main():
42
    banner("1. Generate identities")
43
    issuer = ClassicIdentity.generate()
44
    alice = ClassicIdentity.generate()
45
    bob = ClassicIdentity.generate()
46
    print(f"  issuer  ed25519_pub = {issuer.ed25519_pub.hex()[:32]}...")
47
    print(f"  alice   x25519_pub  = {alice.x25519_pub.hex()[:32]}...")
48
    print(f"  bob     x25519_pub  = {bob.x25519_pub.hex()[:32]}...")
49
 
50
    banner("2. Prepare & watermark plaintext")
51
    lines = [
52
        "CONFIDENTIAL - Q2 Revenue Memo",
53
        "Revenue for Q2 exceeded projections by 18%.",
54
        "Do not distribute externally.",
55
        "",
56
    ]
57
    for i in range(80):
58
        lines.append(f"Supporting detail line {i}: filler content for watermark room.")
59
    original_text = "\n".join(lines)
60
    mark_zw = watermark.new_mark_id()
61
    mark_ws = watermark.new_mark_id()
62
    wm_text = watermark.embed_zw(original_text, mark_zw)
63
    wm_text = watermark.embed_ws(wm_text, mark_ws)
64
    plaintext = wm_text.encode("utf-8")
65
    print(f"  original bytes  = {len(original_text.encode())}")
66
    print(f"  watermarked     = {len(plaintext)}")
67
    print(f"  L1 mark (zw)    = {mark_zw.hex()}")
68
    print(f"  L2 mark (ws)    = {mark_ws.hex()}")
69
 
70
    banner("3. Build manifest + beacons for Alice")
71
    beacons = beacon.gen_beacons(
72
        registry_domain="oversight.test",
73
        file_id="will-be-assigned",
74
        recipient_id="alice@example.com",
75
    )
76
    recipient = Recipient(
77
        recipient_id="alice@example.com",
78
        x25519_pub=alice.x25519_pub.hex(),
79
        ed25519_pub=alice.ed25519_pub.hex(),
80
    )
81
    manifest = Manifest.new(
82
        original_filename="q2_memo.txt",
83
        content_hash=content_hash(plaintext),
84
        size_bytes=len(plaintext),
85
        issuer_id="acme.corp.legal",
86
        issuer_ed25519_pub_hex=issuer.ed25519_pub.hex(),
87
        recipient=recipient,
88
        registry_url="https://registry.oversight.test",
89
        content_type="text/plain",
90
    )
91
    manifest.watermarks = [
92
        WatermarkRef(layer="L1_zero_width", mark_id=mark_zw.hex()),
93
        WatermarkRef(layer="L2_whitespace", mark_id=mark_ws.hex()),
94
    ]
95
    manifest.beacons = [b.to_dict() for b in beacons]
96
    print(f"  file_id = {manifest.file_id}")
97
    print(f"  beacons = {len(beacons)}")
98
    print(f"  marks   = {len(manifest.watermarks)}")
99
 
100
    banner("4. Seal")
101
    blob = seal(
102
        plaintext=plaintext,
103
        manifest=manifest,
104
        issuer_ed25519_priv=issuer.ed25519_priv,
105
        recipient_x25519_pub=alice.x25519_pub,
106
    )
107
    print(f"  sealed blob     = {len(blob)} bytes")
108
    print(f"  magic OK        = {blob[:6] == bytes([ord('S'),ord('N'),ord('T'),ord('L'),1,0])}")
109
    print(f"  manifest signed = {manifest.verify()}")
110
 
111
    banner("5. Inspect (no key needed for metadata)")
112
    sf = SealedFile.from_bytes(blob)
113
    print(f"  manifest.file_id    = {sf.manifest.file_id}")
114
    print(f"  manifest.recipient  = {sf.manifest.recipient.recipient_id}")
115
    print(f"  manifest sig valid  = {sf.manifest.verify()}")
116
 
117
    banner("6. Alice opens (correct key)")
118
    recovered, m = open_sealed(blob, recipient_x25519_priv=alice.x25519_priv)
119
    print(f"  decrypted = {len(recovered)} bytes")
120
    print(f"  exact match to original plaintext = {recovered == plaintext}")
121
 
122
    banner("7. Bob (wrong key) attempts to open")
123
    try:
124
        open_sealed(blob, recipient_x25519_priv=bob.x25519_priv)
125
        print("  FAIL - bob should not have been able to decrypt")
126
        sys.exit(1)
127
    except Exception as e:
128
        print(f"  correctly rejected: {type(e).__name__}: {str(e)[:60]}")
129
 
130
    banner("8. Tamper with ciphertext")
131
    bad = bytearray(blob)
132
    bad[-1] ^= 0x01
133
    try:
134
        open_sealed(bytes(bad), recipient_x25519_priv=alice.x25519_priv)
135
        print("  FAIL - ciphertext tamper should have been caught")
136
        sys.exit(1)
137
    except Exception as e:
138
        print(f"  correctly rejected: {type(e).__name__}: {str(e)[:60]}")
139
 
140
    banner("9. Tamper with manifest (flip a byte inside the manifest region)")
141
    bad2 = bytearray(blob)
142
    bad2[30] ^= 0x01
143
    try:
144
        open_sealed(bytes(bad2), recipient_x25519_priv=alice.x25519_priv)
145
        print("  FAIL - manifest tamper should have been caught")
146
        sys.exit(1)
147
    except Exception as e:
148
        print(f"  correctly rejected: {type(e).__name__}: {str(e)[:60]}")
149
 
150
    banner("10. Watermark recovery from leaked plaintext")
151
    leaked = recovered.decode("utf-8")
152
    marks = watermark.recover_marks(leaked)
153
    for layer, mlist in marks.items():
154
        uniq = sorted({m.hex() for m in mlist})
155
        print(f"  {layer}: {len(mlist)} frame(s), unique IDs: {uniq}")
156
    found_zw = mark_zw in marks["L1_zero_width"]
157
    found_ws = any(m == mark_ws for m in marks["L2_whitespace"])
158
    print(f"  L1 recovered = {found_zw}")
159
    print(f"  L2 recovered = {found_ws}")
160
    assert found_zw, "L1 watermark recovery failed"
161
    assert found_ws, "L2 watermark recovery failed"
162
 
163
    banner("11. Watermark survives format stripping (paste into new doc)")
164
    pasted = "\n".join(line for line in leaked.splitlines())
165
    marks2 = watermark.recover_marks(pasted)
166
    print(f"  L1 (zw) survived copy-paste: {mark_zw in marks2['L1_zero_width']}")
167
    print(f"  L2 (ws) survived copy-paste: "
168
          f"{any(m == mark_ws for m in marks2['L2_whitespace'])}")
169
 
170
    banner("ALL TESTS PASSED")
171
 
172
 
173
def test_e2e_seal_open_watermark_round_trip():
174
    """Pytest entry point. The scenario is one end-to-end flow with internal
175
    assertions; pytest's value here is collection + CI integration, not
176
    per-step granularity."""
177
    main()
178
 
179
 
180
if __name__ == "__main__":
181
    main()