Zion Boggan
repos/Oversight/tests/test_e2e_v2.py
zionboggan.com ↗
306 lines · python
History for this file →
1
"""
2
OVERSIGHT v0.2 comprehensive end-to-end test.
3
 
4
Covers:
5
   1. Identity + keygen
6
   2. Text watermarking (L1, L2, L3 semantic)
7
   3. Image DCT watermarking
8
   4. PDF metadata marks
9
   5. DOCX metadata marks
10
   6. Seal + open (single recipient)
11
  7. Multi-recipient seal fails closed
12
   8. Policy enforcement (not_after expired)
13
   9. Policy enforcement (max_opens counter)
14
  10. Semantic watermark verification (airgap-strip survivor)
15
  11. Tamper detection
16
  12. Merkle transparency log correctness
17
  13. Perceptual hash lookup (fuzzy match)
18
"""
19
 
20
import io
21
import sys
22
import time
23
import tempfile
24
import hashlib
25
from pathlib import Path
26
 
27
ROOT = Path(__file__).resolve().parent.parent
28
sys.path.insert(0, str(ROOT))
29
 
30
from oversight_core import (
31
    ClassicIdentity, Manifest, Recipient, WatermarkRef,
32
    content_hash, seal, open_sealed, beacon, watermark,
33
)
34
from oversight_core import semantic
35
from oversight_core.container import seal_multi
36
from oversight_core.policy import PolicyContext, PolicyViolation
37
from oversight_core.tlog import TransparencyLog
38
 
39
 
40
def banner(m): print(f"\n{'=' * 64}\n  {m}\n{'=' * 64}")
41
def ok(msg): print(f"  [ok] {msg}")
42
def fail(msg): print(f"  [FAIL] {msg}"); sys.exit(1)
43
 
44
 
45
def main():
46
    banner("1. Identities")
47
    issuer = ClassicIdentity.generate()
48
    alice = ClassicIdentity.generate()
49
    bob = ClassicIdentity.generate()
50
    carol = ClassicIdentity.generate()
51
    ok(f"generated 4 identities")
52
 
53
    banner("2. Text watermarking - L1 + L2 + L3")
54
    text_lines = [f"Supporting paragraph {i}: we begin to show how this is significant and we must help users find answers." for i in range(60)]
55
    original_text = "\n".join(text_lines)
56
    mid_zw = watermark.new_mark_id()
57
    mid_ws = watermark.new_mark_id()
58
    mid_sem = watermark.new_mark_id()
59
 
60
    t = semantic.apply_semantic(original_text, mid_sem)
61
    t = watermark.embed_ws(t, mid_ws)
62
    t = watermark.embed_zw(t, mid_zw)
63
    plaintext = t.encode("utf-8")
64
    ok(f"applied L3/L2/L1 marks; bytes={len(plaintext)}")
65
 
66
    banner("3. Semantic recovery survives airgap-strip")
67
    airgap_stripped = t
68
    for zw in ("\u200b", "\u200c", "\u200d"):
69
        airgap_stripped = airgap_stripped.replace(zw, "")
70
    airgap_stripped = "\n".join(line.rstrip() for line in airgap_stripped.splitlines())
71
 
72
    l1_survived = len(watermark.extract_zw(airgap_stripped)) > 0
73
    l2_result = watermark.extract_ws(airgap_stripped)
74
    l2_survived = l2_result is not None and l2_result == mid_ws
75
    print(f"  L1 survived airgap-strip: {l1_survived} (expected False)")
76
    print(f"  L2 survived airgap-strip: {l2_survived} (expected False)")
77
    if l1_survived or l2_survived:
78
        fail("L1 or L2 unexpectedly survived airgap-strip - test setup bug")
79
 
80
    result = semantic.verify_semantic(airgap_stripped, mid_sem)
81
    print(f"  L3 synonym score:      {result['synonyms_score']:.3f} (match={result['synonyms_match']})")
82
    print(f"  L3 punctuation hits:   {result['punctuation_hits']}")
83
    print(f"  L3 overall match:      {result['overall_match']}")
84
    if not result["overall_match"]:
85
        fail("L3 semantic watermark failed to survive airgap-strip")
86
    ok("L3 semantic watermark SURVIVED airgap-strip - attribution possible")
87
 
88
    wrong_result = semantic.verify_semantic(airgap_stripped, watermark.new_mark_id())
89
    if wrong_result["overall_match"] and wrong_result["synonyms_score"] > 0.65:
90
        fail(f"random mark_id matched (score={wrong_result['synonyms_score']}) - false positive")
91
    ok(f"L3 rejects wrong mark_id (score={wrong_result['synonyms_score']:.3f})")
92
 
93
    banner("4. Image DCT watermarking")
94
    try:
95
        from PIL import Image
96
        import numpy as np
97
        from oversight_core.formats import image as img_fmt
98
        arr = np.random.RandomState(42).randint(64, 200, (256, 256, 3), dtype=np.uint8)
99
        pil = Image.fromarray(arr)
100
        buf = io.BytesIO(); pil.save(buf, format="PNG")
101
        orig_bytes = buf.getvalue()
102
 
103
        img_mark = watermark.new_mark_id()
104
        marked_bytes = img_fmt.embed(orig_bytes, img_mark, alpha=0.10)
105
        ok(f"embedded into image: {len(marked_bytes)} bytes")
106
 
107
        match, score = img_fmt.verify(marked_bytes, img_mark)
108
        print(f"  correct mark score: {score:+.4f} (match={match})")
109
        if not match:
110
            fail(f"DCT watermark verify FAILED for correct mark_id")
111
 
112
        wrong_mark = watermark.new_mark_id()
113
        wrong_match, wrong_score = img_fmt.verify(marked_bytes, wrong_mark)
114
        print(f"  wrong mark score:   {wrong_score:+.4f} (match={wrong_match})")
115
        if wrong_match:
116
            fail("DCT watermark verify matched WRONG mark_id (false positive)")
117
        ok("image DCT watermark verifies correctly and rejects wrong marks")
118
 
119
        from PIL import Image as _I
120
        pil2 = _I.open(io.BytesIO(marked_bytes))
121
        jpeg_buf = io.BytesIO(); pil2.save(jpeg_buf, format="JPEG", quality=75)
122
        match_after_jpeg, score_after_jpeg = img_fmt.verify(jpeg_buf.getvalue(), img_mark)
123
        print(f"  post-JPEG-q75 score: {score_after_jpeg:+.4f} (match={match_after_jpeg})")
124
        if match_after_jpeg:
125
            ok("image watermark SURVIVED JPEG recompression (q=75)")
126
        else:
127
            print(f"  [note] image watermark weakened by JPEG recompression (score below threshold)")
128
 
129
        phash = img_fmt.perceptual_hash(marked_bytes)
130
        ok(f"perceptual hash: {phash}")
131
    except Exception as e:
132
        fail(f"image test error: {e}")
133
 
134
    banner("5. PDF marks")
135
    try:
136
        from pypdf import PdfWriter as _PW
137
        from oversight_core.formats import pdf as pdf_fmt
138
        try:
139
            from reportlab.pdfgen import canvas
140
            buf = io.BytesIO()
141
            c = canvas.Canvas(buf)
142
            c.drawString(100, 750, "Confidential - test document")
143
            c.save()
144
            pdf_bytes = buf.getvalue()
145
        except ImportError:
146
            w = _PW()
147
            w.add_blank_page(width=612, height=792)
148
            buf = io.BytesIO(); w.write(buf)
149
            pdf_bytes = buf.getvalue()
150
 
151
        pdf_mark = watermark.new_mark_id()
152
        marked_pdf = pdf_fmt.embed(pdf_bytes, pdf_mark, issuer_id="acme", file_id="pdf-test-1")
153
        ok(f"embedded into PDF: {len(marked_pdf)} bytes")
154
 
155
        extracted = pdf_fmt.extract(marked_pdf)
156
        if extracted["mark_id"] != pdf_mark.hex():
157
            fail(f"PDF mark mismatch: got {extracted['mark_id']}, expected {pdf_mark.hex()}")
158
        if extracted["issuer_id"] != "acme":
159
            fail(f"PDF issuer mismatch")
160
        ok(f"PDF mark recovered: {extracted['mark_id']}")
161
    except Exception as e:
162
        fail(f"PDF test error: {e}")
163
 
164
    banner("6. DOCX marks")
165
    try:
166
        from docx import Document
167
        from oversight_core.formats import docx as docx_fmt
168
        doc = Document()
169
        doc.add_paragraph("Confidential test document")
170
        doc.add_paragraph("Second paragraph of content")
171
        buf = io.BytesIO(); doc.save(buf)
172
        docx_bytes = buf.getvalue()
173
 
174
        docx_mark = watermark.new_mark_id()
175
        marked_docx = docx_fmt.embed(docx_bytes, docx_mark, issuer_id="acme", file_id="docx-test-1")
176
        ok(f"embedded into DOCX: {len(marked_docx)} bytes")
177
 
178
        ext = docx_fmt.extract(marked_docx)
179
        if ext["mark_id"] != docx_mark.hex():
180
            fail(f"DOCX mark mismatch: got {ext['mark_id']}, expected {docx_mark.hex()}")
181
        ok(f"DOCX mark recovered: {ext['mark_id']}")
182
    except Exception as e:
183
        fail(f"DOCX test error: {e}")
184
 
185
    banner("7. Single-recipient seal + open (regression)")
186
    rec = Recipient(recipient_id="alice@corp", x25519_pub=alice.x25519_pub.hex(), ed25519_pub=alice.ed25519_pub.hex())
187
    m = Manifest.new("test.txt", content_hash(plaintext), len(plaintext), "acme", issuer.ed25519_pub.hex(), rec, "http://localhost:8765", "text/plain")
188
    m.watermarks = [
189
        WatermarkRef(layer="L1_zero_width", mark_id=mid_zw.hex()),
190
        WatermarkRef(layer="L2_whitespace", mark_id=mid_ws.hex()),
191
        WatermarkRef(layer="L3_semantic", mark_id=mid_sem.hex()),
192
    ]
193
    blob = seal(plaintext, m, issuer.ed25519_priv, alice.x25519_pub)
194
    recovered, mm = open_sealed(blob, alice.x25519_priv)
195
    if recovered != plaintext:
196
        fail("recovered plaintext mismatch")
197
    ok(f"seal/open round-trip OK ({len(blob)} bytes)")
198
 
199
    banner("8. Multi-recipient seal fails closed")
200
    m2 = Manifest.new("multi.txt", content_hash(plaintext), len(plaintext), "acme", issuer.ed25519_pub.hex(), rec, "http://localhost:8765", "text/plain")
201
    try:
202
        seal_multi(
203
            plaintext, m2, issuer.ed25519_priv,
204
            [alice.x25519_pub, bob.x25519_pub, carol.x25519_pub],
205
        )
206
        fail("seal_multi should be disabled until the manifest can bind multiple recipients")
207
    except Exception as e:
208
        ok(f"multi-recipient seal correctly rejected: {type(e).__name__}")
209
 
210
    banner("9. Policy: not_after (expired)")
211
    expired_m = Manifest.new("exp.txt", content_hash(plaintext), len(plaintext), "acme", issuer.ed25519_pub.hex(), rec, "http://localhost:8765")
212
    expired_m.policy["not_after"] = int(time.time()) - 60
213
    expired_blob = seal(plaintext, expired_m, issuer.ed25519_priv, alice.x25519_pub)
214
    try:
215
        open_sealed(expired_blob, alice.x25519_priv)
216
        fail("expired file should NOT open")
217
    except PolicyViolation as e:
218
        ok(f"expired file correctly rejected: {e}")
219
 
220
    banner("10. Policy: max_opens counter")
221
    with tempfile.TemporaryDirectory() as td:
222
        ctx = PolicyContext(state_dir=Path(td), mode="LOCAL_ONLY", jurisdiction="GLOBAL")
223
        capped_m = Manifest.new("capped.txt", content_hash(plaintext), len(plaintext), "acme", issuer.ed25519_pub.hex(), rec, "http://localhost:8765")
224
        capped_m.policy["max_opens"] = 2
225
        capped_blob = seal(plaintext, capped_m, issuer.ed25519_priv, alice.x25519_pub)
226
 
227
        for i in range(2):
228
            pt, _ = open_sealed(capped_blob, alice.x25519_priv, policy_ctx=ctx)
229
            if pt != plaintext:
230
                fail(f"open {i+1} recovered wrong plaintext")
231
        ok("first 2 opens succeeded")
232
 
233
        try:
234
            open_sealed(capped_blob, alice.x25519_priv, policy_ctx=ctx)
235
            fail("3rd open should have been rejected")
236
        except PolicyViolation as e:
237
            ok(f"3rd open correctly rejected: {e}")
238
 
239
    banner("11. Tamper detection (ciphertext + manifest)")
240
    bad = bytearray(blob)
241
    bad[-1] ^= 0x01
242
    try:
243
        open_sealed(bytes(bad), alice.x25519_priv)
244
        fail("ciphertext tamper should have been caught")
245
    except Exception as e:
246
        ok(f"ciphertext tamper rejected: {type(e).__name__}")
247
 
248
    bad2 = bytearray(blob)
249
    bad2[30] ^= 0x01
250
    try:
251
        open_sealed(bytes(bad2), alice.x25519_priv)
252
        fail("manifest tamper should have been caught")
253
    except Exception as e:
254
        ok(f"manifest tamper rejected: {type(e).__name__}")
255
 
256
    banner("12. Merkle transparency log")
257
    with tempfile.TemporaryDirectory() as td:
258
        reg_key = ClassicIdentity.generate()
259
        tl = TransparencyLog(td, signing_key_hex=reg_key.ed25519_priv.hex())
260
        idx0 = tl.append({"event": "test", "i": 0})
261
        idx1 = tl.append({"event": "test", "i": 1})
262
        idx2 = tl.append({"event": "test", "i": 2})
263
        idx3 = tl.append({"event": "test", "i": 3})
264
        if tl.size() != 4:
265
            fail(f"tlog size {tl.size()} != 4")
266
        ok(f"appended 4 entries, size={tl.size()}")
267
 
268
        head = tl.signed_head()
269
        ok(f"signed head: size={head['size']} root={head['root'][:16]}...")
270
 
271
        proof = tl.inclusion_proof(idx2)
272
        if proof is None:
273
            fail("inclusion proof for valid index returned None")
274
        if proof["index"] != idx2:
275
            fail(f"proof index mismatch")
276
        ok(f"inclusion proof for idx={idx2}: {len(proof['proof'])} sibling hashes")
277
 
278
        root_before = tl.root()
279
        tl.append({"event": "test", "i": 4})
280
        if tl.root() == root_before:
281
            fail("root did not change after append")
282
        ok("root changes on append (append-only integrity)")
283
 
284
    banner("13. Perceptual hash deterministic")
285
    try:
286
        from oversight_core.formats import image as img_fmt
287
        ph1 = img_fmt.perceptual_hash(marked_bytes)
288
        ph2 = img_fmt.perceptual_hash(marked_bytes)
289
        if ph1 != ph2:
290
            fail("perceptual hash not deterministic")
291
        ok(f"phash deterministic: {ph1}")
292
    except Exception as e:
293
        fail(f"phash test error: {e}")
294
 
295
    banner("ALL TESTS PASSED")
296
 
297
 
298
def test_e2e_v2_full_round_trip():
299
    """Pytest entry point. The scenario is one end-to-end flow with internal
300
    assertions; pytest's value here is collection + CI integration, not
301
    per-step granularity."""
302
    main()
303
 
304
 
305
if __name__ == "__main__":
306
    main()