| 1 | """ |
| 2 | OVERSIGHT v0.2 comprehensive end-to-end test. |
| 3 | |
| 4 | Covers: |
| 5 | 1. Identity + keygen |
| 6 | 2. Text watermarking (L1, L2, L3 semantic) |
| 7 | 3. Image DCT watermarking |
| 8 | 4. PDF metadata marks |
| 9 | 5. DOCX metadata marks |
| 10 | 6. Seal + open (single recipient) |
| 11 | 7. Multi-recipient seal fails closed |
| 12 | 8. Policy enforcement (not_after expired) |
| 13 | 9. Policy enforcement (max_opens counter) |
| 14 | 10. Semantic watermark verification (airgap-strip survivor) |
| 15 | 11. Tamper detection |
| 16 | 12. Merkle transparency log correctness |
| 17 | 13. Perceptual hash lookup (fuzzy match) |
| 18 | """ |
| 19 | |
| 20 | import io |
| 21 | import sys |
| 22 | import time |
| 23 | import tempfile |
| 24 | import hashlib |
| 25 | from pathlib import Path |
| 26 | |
| 27 | ROOT = Path(__file__).resolve().parent.parent |
| 28 | sys.path.insert(0, str(ROOT)) |
| 29 | |
| 30 | from oversight_core import ( |
| 31 | ClassicIdentity, Manifest, Recipient, WatermarkRef, |
| 32 | content_hash, seal, open_sealed, beacon, watermark, |
| 33 | ) |
| 34 | from oversight_core import semantic |
| 35 | from oversight_core.container import seal_multi |
| 36 | from oversight_core.policy import PolicyContext, PolicyViolation |
| 37 | from oversight_core.tlog import TransparencyLog |
| 38 | |
| 39 | |
| 40 | def banner(m): print(f"\n{'=' * 64}\n {m}\n{'=' * 64}") |
| 41 | def ok(msg): print(f" [ok] {msg}") |
| 42 | def fail(msg): print(f" [FAIL] {msg}"); sys.exit(1) |
| 43 | |
| 44 | |
| 45 | def main(): |
| 46 | banner("1. Identities") |
| 47 | issuer = ClassicIdentity.generate() |
| 48 | alice = ClassicIdentity.generate() |
| 49 | bob = ClassicIdentity.generate() |
| 50 | carol = ClassicIdentity.generate() |
| 51 | ok(f"generated 4 identities") |
| 52 | |
| 53 | banner("2. Text watermarking - L1 + L2 + L3") |
| 54 | text_lines = [f"Supporting paragraph {i}: we begin to show how this is significant and we must help users find answers." for i in range(60)] |
| 55 | original_text = "\n".join(text_lines) |
| 56 | mid_zw = watermark.new_mark_id() |
| 57 | mid_ws = watermark.new_mark_id() |
| 58 | mid_sem = watermark.new_mark_id() |
| 59 | |
| 60 | t = semantic.apply_semantic(original_text, mid_sem) |
| 61 | t = watermark.embed_ws(t, mid_ws) |
| 62 | t = watermark.embed_zw(t, mid_zw) |
| 63 | plaintext = t.encode("utf-8") |
| 64 | ok(f"applied L3/L2/L1 marks; bytes={len(plaintext)}") |
| 65 | |
| 66 | banner("3. Semantic recovery survives airgap-strip") |
| 67 | airgap_stripped = t |
| 68 | for zw in ("\u200b", "\u200c", "\u200d"): |
| 69 | airgap_stripped = airgap_stripped.replace(zw, "") |
| 70 | airgap_stripped = "\n".join(line.rstrip() for line in airgap_stripped.splitlines()) |
| 71 | |
| 72 | l1_survived = len(watermark.extract_zw(airgap_stripped)) > 0 |
| 73 | l2_result = watermark.extract_ws(airgap_stripped) |
| 74 | l2_survived = l2_result is not None and l2_result == mid_ws |
| 75 | print(f" L1 survived airgap-strip: {l1_survived} (expected False)") |
| 76 | print(f" L2 survived airgap-strip: {l2_survived} (expected False)") |
| 77 | if l1_survived or l2_survived: |
| 78 | fail("L1 or L2 unexpectedly survived airgap-strip - test setup bug") |
| 79 | |
| 80 | result = semantic.verify_semantic(airgap_stripped, mid_sem) |
| 81 | print(f" L3 synonym score: {result['synonyms_score']:.3f} (match={result['synonyms_match']})") |
| 82 | print(f" L3 punctuation hits: {result['punctuation_hits']}") |
| 83 | print(f" L3 overall match: {result['overall_match']}") |
| 84 | if not result["overall_match"]: |
| 85 | fail("L3 semantic watermark failed to survive airgap-strip") |
| 86 | ok("L3 semantic watermark SURVIVED airgap-strip - attribution possible") |
| 87 | |
| 88 | wrong_result = semantic.verify_semantic(airgap_stripped, watermark.new_mark_id()) |
| 89 | if wrong_result["overall_match"] and wrong_result["synonyms_score"] > 0.65: |
| 90 | fail(f"random mark_id matched (score={wrong_result['synonyms_score']}) - false positive") |
| 91 | ok(f"L3 rejects wrong mark_id (score={wrong_result['synonyms_score']:.3f})") |
| 92 | |
| 93 | banner("4. Image DCT watermarking") |
| 94 | try: |
| 95 | from PIL import Image |
| 96 | import numpy as np |
| 97 | from oversight_core.formats import image as img_fmt |
| 98 | arr = np.random.RandomState(42).randint(64, 200, (256, 256, 3), dtype=np.uint8) |
| 99 | pil = Image.fromarray(arr) |
| 100 | buf = io.BytesIO(); pil.save(buf, format="PNG") |
| 101 | orig_bytes = buf.getvalue() |
| 102 | |
| 103 | img_mark = watermark.new_mark_id() |
| 104 | marked_bytes = img_fmt.embed(orig_bytes, img_mark, alpha=0.10) |
| 105 | ok(f"embedded into image: {len(marked_bytes)} bytes") |
| 106 | |
| 107 | match, score = img_fmt.verify(marked_bytes, img_mark) |
| 108 | print(f" correct mark score: {score:+.4f} (match={match})") |
| 109 | if not match: |
| 110 | fail(f"DCT watermark verify FAILED for correct mark_id") |
| 111 | |
| 112 | wrong_mark = watermark.new_mark_id() |
| 113 | wrong_match, wrong_score = img_fmt.verify(marked_bytes, wrong_mark) |
| 114 | print(f" wrong mark score: {wrong_score:+.4f} (match={wrong_match})") |
| 115 | if wrong_match: |
| 116 | fail("DCT watermark verify matched WRONG mark_id (false positive)") |
| 117 | ok("image DCT watermark verifies correctly and rejects wrong marks") |
| 118 | |
| 119 | from PIL import Image as _I |
| 120 | pil2 = _I.open(io.BytesIO(marked_bytes)) |
| 121 | jpeg_buf = io.BytesIO(); pil2.save(jpeg_buf, format="JPEG", quality=75) |
| 122 | match_after_jpeg, score_after_jpeg = img_fmt.verify(jpeg_buf.getvalue(), img_mark) |
| 123 | print(f" post-JPEG-q75 score: {score_after_jpeg:+.4f} (match={match_after_jpeg})") |
| 124 | if match_after_jpeg: |
| 125 | ok("image watermark SURVIVED JPEG recompression (q=75)") |
| 126 | else: |
| 127 | print(f" [note] image watermark weakened by JPEG recompression (score below threshold)") |
| 128 | |
| 129 | phash = img_fmt.perceptual_hash(marked_bytes) |
| 130 | ok(f"perceptual hash: {phash}") |
| 131 | except Exception as e: |
| 132 | fail(f"image test error: {e}") |
| 133 | |
| 134 | banner("5. PDF marks") |
| 135 | try: |
| 136 | from pypdf import PdfWriter as _PW |
| 137 | from oversight_core.formats import pdf as pdf_fmt |
| 138 | try: |
| 139 | from reportlab.pdfgen import canvas |
| 140 | buf = io.BytesIO() |
| 141 | c = canvas.Canvas(buf) |
| 142 | c.drawString(100, 750, "Confidential - test document") |
| 143 | c.save() |
| 144 | pdf_bytes = buf.getvalue() |
| 145 | except ImportError: |
| 146 | w = _PW() |
| 147 | w.add_blank_page(width=612, height=792) |
| 148 | buf = io.BytesIO(); w.write(buf) |
| 149 | pdf_bytes = buf.getvalue() |
| 150 | |
| 151 | pdf_mark = watermark.new_mark_id() |
| 152 | marked_pdf = pdf_fmt.embed(pdf_bytes, pdf_mark, issuer_id="acme", file_id="pdf-test-1") |
| 153 | ok(f"embedded into PDF: {len(marked_pdf)} bytes") |
| 154 | |
| 155 | extracted = pdf_fmt.extract(marked_pdf) |
| 156 | if extracted["mark_id"] != pdf_mark.hex(): |
| 157 | fail(f"PDF mark mismatch: got {extracted['mark_id']}, expected {pdf_mark.hex()}") |
| 158 | if extracted["issuer_id"] != "acme": |
| 159 | fail(f"PDF issuer mismatch") |
| 160 | ok(f"PDF mark recovered: {extracted['mark_id']}") |
| 161 | except Exception as e: |
| 162 | fail(f"PDF test error: {e}") |
| 163 | |
| 164 | banner("6. DOCX marks") |
| 165 | try: |
| 166 | from docx import Document |
| 167 | from oversight_core.formats import docx as docx_fmt |
| 168 | doc = Document() |
| 169 | doc.add_paragraph("Confidential test document") |
| 170 | doc.add_paragraph("Second paragraph of content") |
| 171 | buf = io.BytesIO(); doc.save(buf) |
| 172 | docx_bytes = buf.getvalue() |
| 173 | |
| 174 | docx_mark = watermark.new_mark_id() |
| 175 | marked_docx = docx_fmt.embed(docx_bytes, docx_mark, issuer_id="acme", file_id="docx-test-1") |
| 176 | ok(f"embedded into DOCX: {len(marked_docx)} bytes") |
| 177 | |
| 178 | ext = docx_fmt.extract(marked_docx) |
| 179 | if ext["mark_id"] != docx_mark.hex(): |
| 180 | fail(f"DOCX mark mismatch: got {ext['mark_id']}, expected {docx_mark.hex()}") |
| 181 | ok(f"DOCX mark recovered: {ext['mark_id']}") |
| 182 | except Exception as e: |
| 183 | fail(f"DOCX test error: {e}") |
| 184 | |
| 185 | banner("7. Single-recipient seal + open (regression)") |
| 186 | rec = Recipient(recipient_id="alice@corp", x25519_pub=alice.x25519_pub.hex(), ed25519_pub=alice.ed25519_pub.hex()) |
| 187 | m = Manifest.new("test.txt", content_hash(plaintext), len(plaintext), "acme", issuer.ed25519_pub.hex(), rec, "http://localhost:8765", "text/plain") |
| 188 | m.watermarks = [ |
| 189 | WatermarkRef(layer="L1_zero_width", mark_id=mid_zw.hex()), |
| 190 | WatermarkRef(layer="L2_whitespace", mark_id=mid_ws.hex()), |
| 191 | WatermarkRef(layer="L3_semantic", mark_id=mid_sem.hex()), |
| 192 | ] |
| 193 | blob = seal(plaintext, m, issuer.ed25519_priv, alice.x25519_pub) |
| 194 | recovered, mm = open_sealed(blob, alice.x25519_priv) |
| 195 | if recovered != plaintext: |
| 196 | fail("recovered plaintext mismatch") |
| 197 | ok(f"seal/open round-trip OK ({len(blob)} bytes)") |
| 198 | |
| 199 | banner("8. Multi-recipient seal fails closed") |
| 200 | m2 = Manifest.new("multi.txt", content_hash(plaintext), len(plaintext), "acme", issuer.ed25519_pub.hex(), rec, "http://localhost:8765", "text/plain") |
| 201 | try: |
| 202 | seal_multi( |
| 203 | plaintext, m2, issuer.ed25519_priv, |
| 204 | [alice.x25519_pub, bob.x25519_pub, carol.x25519_pub], |
| 205 | ) |
| 206 | fail("seal_multi should be disabled until the manifest can bind multiple recipients") |
| 207 | except Exception as e: |
| 208 | ok(f"multi-recipient seal correctly rejected: {type(e).__name__}") |
| 209 | |
| 210 | banner("9. Policy: not_after (expired)") |
| 211 | expired_m = Manifest.new("exp.txt", content_hash(plaintext), len(plaintext), "acme", issuer.ed25519_pub.hex(), rec, "http://localhost:8765") |
| 212 | expired_m.policy["not_after"] = int(time.time()) - 60 |
| 213 | expired_blob = seal(plaintext, expired_m, issuer.ed25519_priv, alice.x25519_pub) |
| 214 | try: |
| 215 | open_sealed(expired_blob, alice.x25519_priv) |
| 216 | fail("expired file should NOT open") |
| 217 | except PolicyViolation as e: |
| 218 | ok(f"expired file correctly rejected: {e}") |
| 219 | |
| 220 | banner("10. Policy: max_opens counter") |
| 221 | with tempfile.TemporaryDirectory() as td: |
| 222 | ctx = PolicyContext(state_dir=Path(td), mode="LOCAL_ONLY", jurisdiction="GLOBAL") |
| 223 | capped_m = Manifest.new("capped.txt", content_hash(plaintext), len(plaintext), "acme", issuer.ed25519_pub.hex(), rec, "http://localhost:8765") |
| 224 | capped_m.policy["max_opens"] = 2 |
| 225 | capped_blob = seal(plaintext, capped_m, issuer.ed25519_priv, alice.x25519_pub) |
| 226 | |
| 227 | for i in range(2): |
| 228 | pt, _ = open_sealed(capped_blob, alice.x25519_priv, policy_ctx=ctx) |
| 229 | if pt != plaintext: |
| 230 | fail(f"open {i+1} recovered wrong plaintext") |
| 231 | ok("first 2 opens succeeded") |
| 232 | |
| 233 | try: |
| 234 | open_sealed(capped_blob, alice.x25519_priv, policy_ctx=ctx) |
| 235 | fail("3rd open should have been rejected") |
| 236 | except PolicyViolation as e: |
| 237 | ok(f"3rd open correctly rejected: {e}") |
| 238 | |
| 239 | banner("11. Tamper detection (ciphertext + manifest)") |
| 240 | bad = bytearray(blob) |
| 241 | bad[-1] ^= 0x01 |
| 242 | try: |
| 243 | open_sealed(bytes(bad), alice.x25519_priv) |
| 244 | fail("ciphertext tamper should have been caught") |
| 245 | except Exception as e: |
| 246 | ok(f"ciphertext tamper rejected: {type(e).__name__}") |
| 247 | |
| 248 | bad2 = bytearray(blob) |
| 249 | bad2[30] ^= 0x01 |
| 250 | try: |
| 251 | open_sealed(bytes(bad2), alice.x25519_priv) |
| 252 | fail("manifest tamper should have been caught") |
| 253 | except Exception as e: |
| 254 | ok(f"manifest tamper rejected: {type(e).__name__}") |
| 255 | |
| 256 | banner("12. Merkle transparency log") |
| 257 | with tempfile.TemporaryDirectory() as td: |
| 258 | reg_key = ClassicIdentity.generate() |
| 259 | tl = TransparencyLog(td, signing_key_hex=reg_key.ed25519_priv.hex()) |
| 260 | idx0 = tl.append({"event": "test", "i": 0}) |
| 261 | idx1 = tl.append({"event": "test", "i": 1}) |
| 262 | idx2 = tl.append({"event": "test", "i": 2}) |
| 263 | idx3 = tl.append({"event": "test", "i": 3}) |
| 264 | if tl.size() != 4: |
| 265 | fail(f"tlog size {tl.size()} != 4") |
| 266 | ok(f"appended 4 entries, size={tl.size()}") |
| 267 | |
| 268 | head = tl.signed_head() |
| 269 | ok(f"signed head: size={head['size']} root={head['root'][:16]}...") |
| 270 | |
| 271 | proof = tl.inclusion_proof(idx2) |
| 272 | if proof is None: |
| 273 | fail("inclusion proof for valid index returned None") |
| 274 | if proof["index"] != idx2: |
| 275 | fail(f"proof index mismatch") |
| 276 | ok(f"inclusion proof for idx={idx2}: {len(proof['proof'])} sibling hashes") |
| 277 | |
| 278 | root_before = tl.root() |
| 279 | tl.append({"event": "test", "i": 4}) |
| 280 | if tl.root() == root_before: |
| 281 | fail("root did not change after append") |
| 282 | ok("root changes on append (append-only integrity)") |
| 283 | |
| 284 | banner("13. Perceptual hash deterministic") |
| 285 | try: |
| 286 | from oversight_core.formats import image as img_fmt |
| 287 | ph1 = img_fmt.perceptual_hash(marked_bytes) |
| 288 | ph2 = img_fmt.perceptual_hash(marked_bytes) |
| 289 | if ph1 != ph2: |
| 290 | fail("perceptual hash not deterministic") |
| 291 | ok(f"phash deterministic: {ph1}") |
| 292 | except Exception as e: |
| 293 | fail(f"phash test error: {e}") |
| 294 | |
| 295 | banner("ALL TESTS PASSED") |
| 296 | |
| 297 | |
| 298 | def test_e2e_v2_full_round_trip(): |
| 299 | """Pytest entry point. The scenario is one end-to-end flow with internal |
| 300 | assertions; pytest's value here is collection + CI integration, not |
| 301 | per-step granularity.""" |
| 302 | main() |
| 303 | |
| 304 | |
| 305 | if __name__ == "__main__": |
| 306 | main() |