| 1 | """ |
| 2 | End-to-end test of the OVERSIGHT MVP. |
| 3 | |
| 4 | Exercises: |
| 5 | 1. Identity generation (issuer + two recipients) |
| 6 | 2. Sealing a text file for recipient Alice with watermarks + beacons |
| 7 | 3. Inspecting the sealed file (manifest visible, ciphertext opaque) |
| 8 | 4. Alice opens it successfully |
| 9 | 5. Bob (wrong key) fails to open it |
| 10 | 6. Tampering with the ciphertext is detected |
| 11 | 7. Tampering with the manifest is detected |
| 12 | 8. Watermark recovery from leaked plaintext identifies Alice |
| 13 | """ |
| 14 | |
| 15 | import os |
| 16 | import sys |
| 17 | import tempfile |
| 18 | from pathlib import Path |
| 19 | |
| 20 | ROOT = Path(__file__).resolve().parent.parent |
| 21 | sys.path.insert(0, str(ROOT)) |
| 22 | |
| 23 | from oversight_core import ( |
| 24 | ClassicIdentity, |
| 25 | Manifest, |
| 26 | Recipient, |
| 27 | WatermarkRef, |
| 28 | content_hash, |
| 29 | seal, |
| 30 | open_sealed, |
| 31 | beacon, |
| 32 | watermark, |
| 33 | ) |
| 34 | from oversight_core.container import SealedFile |
| 35 | |
| 36 | |
| 37 | def banner(msg): |
| 38 | print(f"\n{'=' * 60}\n {msg}\n{'=' * 60}") |
| 39 | |
| 40 | |
| 41 | def main(): |
| 42 | banner("1. Generate identities") |
| 43 | issuer = ClassicIdentity.generate() |
| 44 | alice = ClassicIdentity.generate() |
| 45 | bob = ClassicIdentity.generate() |
| 46 | print(f" issuer ed25519_pub = {issuer.ed25519_pub.hex()[:32]}...") |
| 47 | print(f" alice x25519_pub = {alice.x25519_pub.hex()[:32]}...") |
| 48 | print(f" bob x25519_pub = {bob.x25519_pub.hex()[:32]}...") |
| 49 | |
| 50 | banner("2. Prepare & watermark plaintext") |
| 51 | lines = [ |
| 52 | "CONFIDENTIAL - Q2 Revenue Memo", |
| 53 | "Revenue for Q2 exceeded projections by 18%.", |
| 54 | "Do not distribute externally.", |
| 55 | "", |
| 56 | ] |
| 57 | for i in range(80): |
| 58 | lines.append(f"Supporting detail line {i}: filler content for watermark room.") |
| 59 | original_text = "\n".join(lines) |
| 60 | mark_zw = watermark.new_mark_id() |
| 61 | mark_ws = watermark.new_mark_id() |
| 62 | wm_text = watermark.embed_zw(original_text, mark_zw) |
| 63 | wm_text = watermark.embed_ws(wm_text, mark_ws) |
| 64 | plaintext = wm_text.encode("utf-8") |
| 65 | print(f" original bytes = {len(original_text.encode())}") |
| 66 | print(f" watermarked = {len(plaintext)}") |
| 67 | print(f" L1 mark (zw) = {mark_zw.hex()}") |
| 68 | print(f" L2 mark (ws) = {mark_ws.hex()}") |
| 69 | |
| 70 | banner("3. Build manifest + beacons for Alice") |
| 71 | beacons = beacon.gen_beacons( |
| 72 | registry_domain="oversight.test", |
| 73 | file_id="will-be-assigned", |
| 74 | recipient_id="alice@example.com", |
| 75 | ) |
| 76 | recipient = Recipient( |
| 77 | recipient_id="alice@example.com", |
| 78 | x25519_pub=alice.x25519_pub.hex(), |
| 79 | ed25519_pub=alice.ed25519_pub.hex(), |
| 80 | ) |
| 81 | manifest = Manifest.new( |
| 82 | original_filename="q2_memo.txt", |
| 83 | content_hash=content_hash(plaintext), |
| 84 | size_bytes=len(plaintext), |
| 85 | issuer_id="acme.corp.legal", |
| 86 | issuer_ed25519_pub_hex=issuer.ed25519_pub.hex(), |
| 87 | recipient=recipient, |
| 88 | registry_url="https://registry.oversight.test", |
| 89 | content_type="text/plain", |
| 90 | ) |
| 91 | manifest.watermarks = [ |
| 92 | WatermarkRef(layer="L1_zero_width", mark_id=mark_zw.hex()), |
| 93 | WatermarkRef(layer="L2_whitespace", mark_id=mark_ws.hex()), |
| 94 | ] |
| 95 | manifest.beacons = [b.to_dict() for b in beacons] |
| 96 | print(f" file_id = {manifest.file_id}") |
| 97 | print(f" beacons = {len(beacons)}") |
| 98 | print(f" marks = {len(manifest.watermarks)}") |
| 99 | |
| 100 | banner("4. Seal") |
| 101 | blob = seal( |
| 102 | plaintext=plaintext, |
| 103 | manifest=manifest, |
| 104 | issuer_ed25519_priv=issuer.ed25519_priv, |
| 105 | recipient_x25519_pub=alice.x25519_pub, |
| 106 | ) |
| 107 | print(f" sealed blob = {len(blob)} bytes") |
| 108 | print(f" magic OK = {blob[:6] == bytes([ord('S'),ord('N'),ord('T'),ord('L'),1,0])}") |
| 109 | print(f" manifest signed = {manifest.verify()}") |
| 110 | |
| 111 | banner("5. Inspect (no key needed for metadata)") |
| 112 | sf = SealedFile.from_bytes(blob) |
| 113 | print(f" manifest.file_id = {sf.manifest.file_id}") |
| 114 | print(f" manifest.recipient = {sf.manifest.recipient.recipient_id}") |
| 115 | print(f" manifest sig valid = {sf.manifest.verify()}") |
| 116 | |
| 117 | banner("6. Alice opens (correct key)") |
| 118 | recovered, m = open_sealed(blob, recipient_x25519_priv=alice.x25519_priv) |
| 119 | print(f" decrypted = {len(recovered)} bytes") |
| 120 | print(f" exact match to original plaintext = {recovered == plaintext}") |
| 121 | |
| 122 | banner("7. Bob (wrong key) attempts to open") |
| 123 | try: |
| 124 | open_sealed(blob, recipient_x25519_priv=bob.x25519_priv) |
| 125 | print(" FAIL - bob should not have been able to decrypt") |
| 126 | sys.exit(1) |
| 127 | except Exception as e: |
| 128 | print(f" correctly rejected: {type(e).__name__}: {str(e)[:60]}") |
| 129 | |
| 130 | banner("8. Tamper with ciphertext") |
| 131 | bad = bytearray(blob) |
| 132 | bad[-1] ^= 0x01 |
| 133 | try: |
| 134 | open_sealed(bytes(bad), recipient_x25519_priv=alice.x25519_priv) |
| 135 | print(" FAIL - ciphertext tamper should have been caught") |
| 136 | sys.exit(1) |
| 137 | except Exception as e: |
| 138 | print(f" correctly rejected: {type(e).__name__}: {str(e)[:60]}") |
| 139 | |
| 140 | banner("9. Tamper with manifest (flip a byte inside the manifest region)") |
| 141 | bad2 = bytearray(blob) |
| 142 | bad2[30] ^= 0x01 |
| 143 | try: |
| 144 | open_sealed(bytes(bad2), recipient_x25519_priv=alice.x25519_priv) |
| 145 | print(" FAIL - manifest tamper should have been caught") |
| 146 | sys.exit(1) |
| 147 | except Exception as e: |
| 148 | print(f" correctly rejected: {type(e).__name__}: {str(e)[:60]}") |
| 149 | |
| 150 | banner("10. Watermark recovery from leaked plaintext") |
| 151 | leaked = recovered.decode("utf-8") |
| 152 | marks = watermark.recover_marks(leaked) |
| 153 | for layer, mlist in marks.items(): |
| 154 | uniq = sorted({m.hex() for m in mlist}) |
| 155 | print(f" {layer}: {len(mlist)} frame(s), unique IDs: {uniq}") |
| 156 | found_zw = mark_zw in marks["L1_zero_width"] |
| 157 | found_ws = any(m == mark_ws for m in marks["L2_whitespace"]) |
| 158 | print(f" L1 recovered = {found_zw}") |
| 159 | print(f" L2 recovered = {found_ws}") |
| 160 | assert found_zw, "L1 watermark recovery failed" |
| 161 | assert found_ws, "L2 watermark recovery failed" |
| 162 | |
| 163 | banner("11. Watermark survives format stripping (paste into new doc)") |
| 164 | pasted = "\n".join(line for line in leaked.splitlines()) |
| 165 | marks2 = watermark.recover_marks(pasted) |
| 166 | print(f" L1 (zw) survived copy-paste: {mark_zw in marks2['L1_zero_width']}") |
| 167 | print(f" L2 (ws) survived copy-paste: " |
| 168 | f"{any(m == mark_ws for m in marks2['L2_whitespace'])}") |
| 169 | |
| 170 | banner("ALL TESTS PASSED") |
| 171 | |
| 172 | |
| 173 | def test_e2e_seal_open_watermark_round_trip(): |
| 174 | """Pytest entry point. The scenario is one end-to-end flow with internal |
| 175 | assertions; pytest's value here is collection + CI integration, not |
| 176 | per-step granularity.""" |
| 177 | main() |
| 178 | |
| 179 | |
| 180 | if __name__ == "__main__": |
| 181 | main() |