Zion Boggan
repos/Oversight/examples/live_demo_v2.py
zionboggan.com ↗
161 lines · python
History for this file →
1
"""OVERSIGHT v0.2 live demo - full registry integration including tlog and signed bundles."""
2
 
3
import sys
4
import time
5
import json
6
from pathlib import Path
7
 
8
ROOT = Path(__file__).resolve().parent.parent
9
sys.path.insert(0, str(ROOT))
10
 
11
import httpx
12
from oversight_core import (
13
    ClassicIdentity, Manifest, Recipient, WatermarkRef,
14
    content_hash, seal, open_sealed, beacon, watermark,
15
)
16
from oversight_core import semantic
17
from oversight_core.jcs import jcs_dumps
18
 
19
REG = "http://127.0.0.1:8765"
20
 
21
 
22
def banner(m): print(f"\n{'='*64}\n  {m}\n{'='*64}")
23
 
24
 
25
def main():
26
    banner("1. Check registry is up, show well-known")
27
    r = httpx.get(f"{REG}/.well-known/oversight-registry")
28
    wk = r.json()
29
    print(f"  registry pub = {wk['ed25519_pub'][:32]}...")
30
    print(f"  version      = {wk['version']}")
31
    print(f"  tlog_size    = {wk['tlog_size']}")
32
 
33
    banner("2. Seal a multi-layer-watermarked document for Alice")
34
    issuer = ClassicIdentity.generate()
35
    alice = ClassicIdentity.generate()
36
 
37
    lines = [f"Acme Q3 forecast line {i}: we begin to show significant results and help our customers find answers." for i in range(60)]
38
    original = "\n".join(lines)
39
    mid_zw = watermark.new_mark_id()
40
    mid_ws = watermark.new_mark_id()
41
    mid_sem = watermark.new_mark_id()
42
    t = semantic.apply_semantic(original, mid_sem)
43
    t = watermark.embed_ws(t, mid_ws)
44
    t = watermark.embed_zw(t, mid_zw)
45
    plaintext = t.encode("utf-8")
46
    print(f"  plaintext {len(plaintext)} bytes, 3-layer watermarked")
47
 
48
    beacons = beacon.gen_beacons("oversight.local", "pending", "alice@acme")
49
    rec = Recipient(recipient_id="alice@acme", x25519_pub=alice.x25519_pub.hex(), ed25519_pub=alice.ed25519_pub.hex())
50
    m = Manifest.new("q3_forecast.txt", content_hash(plaintext), len(plaintext),
51
                     "acme", issuer.ed25519_pub.hex(), rec, REG, "text/plain")
52
    m.watermarks = [
53
        WatermarkRef(layer="L1_zero_width", mark_id=mid_zw.hex()),
54
        WatermarkRef(layer="L2_whitespace", mark_id=mid_ws.hex()),
55
        WatermarkRef(layer="L3_semantic",   mark_id=mid_sem.hex()),
56
    ]
57
    m.beacons = [b.to_dict() for b in beacons]
58
    seal(plaintext, m, issuer.ed25519_priv, alice.x25519_pub)
59
    print(f"  file_id = {m.file_id}")
60
 
61
    banner("3. Register with v0.2 registry (tlog-backed)")
62
    r = httpx.post(f"{REG}/register", json={
63
        "manifest": m.to_dict(),
64
        "beacons": [b.to_dict() for b in beacons],
65
        "watermarks": [{"mark_id": w.mark_id, "layer": w.layer} for w in m.watermarks],
66
    })
67
    reg_resp = r.json()
68
    print(f"  /register -> {r.status_code}")
69
    print(f"  file_id     = {reg_resp['file_id']}")
70
    print(f"  tlog_index  = {reg_resp['tlog_index']}")
71
 
72
    banner("4. Trigger beacons (HTTP image + OCSP + license)")
73
    for b in beacons:
74
        if b.kind == "dns":
75
            continue
76
        url_map = {
77
            "http_img": f"{REG}/p/{b.token_id}.png",
78
            "ocsp": f"{REG}/r/{b.token_id}",
79
            "license": f"{REG}/v/{b.token_id}",
80
        }
81
        r = httpx.get(url_map[b.kind], headers={"User-Agent": "OfficeDocViewer/2024"})
82
        print(f"  [{b.kind:<8}] -> {r.status_code}")
83
 
84
    banner("5. Query tlog head and get signed tree state")
85
    r = httpx.get(f"{REG}/tlog/head")
86
    head = r.json()
87
    print(f"  tlog size      = {head['size']}")
88
    print(f"  tlog root      = {head['root'][:32]}...")
89
    print(f"  signature      = {head['signature'][:32]}...")
90
 
91
    banner("6. Get inclusion proof for registration event")
92
    r = httpx.get(f"{REG}/tlog/proof/{reg_resp['tlog_index']}")
93
    proof = r.json()
94
    print(f"  proof for idx={proof['index']}:")
95
    print(f"    leaf hash  = {proof['leaf_hash'][:32]}...")
96
    print(f"    root       = {proof['root'][:32]}...")
97
    print(f"    siblings   = {len(proof['proof'])} hashes")
98
 
99
    banner("7. Simulate airgap-strip attack on a leaked copy")
100
    decrypted, _ = open_sealed(seal(plaintext, m, issuer.ed25519_priv, alice.x25519_pub), alice.x25519_priv)
101
    leaked = decrypted.decode()
102
    for zw in ("\u200b", "\u200c", "\u200d"):
103
        leaked = leaked.replace(zw, "")
104
    leaked = "\n".join(line.rstrip() for line in leaked.splitlines())
105
    print(f"  post-strip leaked size: {len(leaked)} chars")
106
 
107
    banner("8. L3 semantic attribution against registry")
108
    result = semantic.verify_semantic(leaked, mid_sem)
109
    print(f"  synonyms score = {result['synonyms_score']:.3f} (match={result['synonyms_match']})")
110
    print(f"  overall match  = {result['overall_match']}")
111
    if result["overall_match"]:
112
        r = httpx.post(f"{REG}/attribute", json={"mark_id": mid_sem.hex(), "layer": "L3_semantic"})
113
        data = r.json()
114
        if data.get("found"):
115
            print(f"  [!!] LEAK ATTRIBUTED via L3 semantic watermark")
116
            print(f"       file_id   = {data['file_id']}")
117
            print(f"       recipient = {data['recipient_id']}  (leaked by)")
118
            print(f"       issuer    = {data['issuer_id']}")
119
 
120
    banner("9. Request SIGNED evidence bundle")
121
    r = httpx.get(f"{REG}/evidence/{m.file_id}")
122
    bundle = r.json()
123
    print(f"  file_id         = {bundle['file_id']}")
124
    print(f"  bundle ts       = {bundle['bundle_generated_at']}")
125
    print(f"  registry pub    = {bundle['registry_pub'][:32]}...")
126
    print(f"  signature       = {bundle['bundle_signature_ed25519'][:32]}...")
127
    print(f"  tlog head size  = {bundle['tlog_head']['size']}")
128
    print(f"  beacons         = {len(bundle['beacons'])}")
129
    print(f"  watermarks      = {len(bundle['watermarks'])}")
130
    print(f"  events logged   = {len(bundle['events'])}")
131
 
132
    banner("10. Verify the bundle signature (as an external auditor would)")
133
    from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey
134
    pub = Ed25519PublicKey.from_public_bytes(bytes.fromhex(bundle["registry_pub"]))
135
    sig = bytes.fromhex(bundle.pop("bundle_signature_ed25519"))
136
    msg = jcs_dumps(bundle)
137
    try:
138
        pub.verify(sig, msg)
139
        print("  [ok] bundle signature VERIFIED - this bundle came from this registry.")
140
    except Exception as e:
141
        print(f"  [FAIL] signature verification failed: {e}")
142
 
143
    banner("11. Rate-limit test: hit beacon 50x rapidly")
144
    ok_count = 0
145
    throttled_count = 0
146
    for _ in range(50):
147
        r = httpx.get(f"{REG}/p/{beacons[1].token_id}.png")
148
        if r.status_code == 200:
149
            ok_count += 1
150
        elif r.status_code == 429:
151
            throttled_count += 1
152
    print(f"  allowed  = {ok_count}")
153
    print(f"  throttled= {throttled_count}")
154
    if throttled_count > 0:
155
        print("  [ok] rate limiter is working")
156
 
157
    banner("DEMO COMPLETE - v0.2")
158
 
159
 
160
if __name__ == "__main__":
161
    main()