Zion Boggan
repos/Oversight/examples/live_demo.py
zionboggan.com ↗
159 lines · python
History for this file →
1
"""
2
Live demo: integration with the OVERSIGHT registry.
3
 
4
Flow:
5
  1. Seal a document for Alice and register it with the registry
6
  2. Simulate the document being opened (triggering image/OCSP/license beacons)
7
  3. Query the registry for attribution via the beacon token_id
8
  4. Simulate the plaintext leaking; recover watermarks and attribute via the registry
9
  5. Pull a full evidence bundle for the file
10
"""
11
 
12
import json
13
import sys
14
import time
15
from pathlib import Path
16
 
17
ROOT = Path(__file__).resolve().parent.parent
18
sys.path.insert(0, str(ROOT))
19
 
20
import httpx
21
 
22
from oversight_core import (
23
    ClassicIdentity, Manifest, Recipient, WatermarkRef,
24
    content_hash, seal, open_sealed, beacon, watermark,
25
)
26
 
27
REG = "http://127.0.0.1:8765"
28
 
29
 
30
def banner(m): print(f"\n{'='*64}\n  {m}\n{'='*64}")
31
 
32
 
33
def main():
34
    banner("1. Generate identities")
35
    issuer = ClassicIdentity.generate()
36
    alice = ClassicIdentity.generate()
37
 
38
    banner("2. Prepare watermarked plaintext")
39
    lines = [f"Acme Q3 forecast - line {i}: confidential projections." for i in range(80)]
40
    original = "\n".join(lines)
41
    mark_zw = watermark.new_mark_id()
42
    mark_ws = watermark.new_mark_id()
43
    wm_text = watermark.embed_ws(watermark.embed_zw(original, mark_zw), mark_ws)
44
    plaintext = wm_text.encode("utf-8")
45
    print(f"  L1 mark = {mark_zw.hex()}")
46
    print(f"  L2 mark = {mark_ws.hex()}")
47
 
48
    banner("3. Build manifest + beacons, then seal")
49
    beacons = beacon.gen_beacons("oversight.local", "pending", "alice@acme.corp")
50
    recipient = Recipient(
51
        recipient_id="alice@acme.corp",
52
        x25519_pub=alice.x25519_pub.hex(),
53
        ed25519_pub=alice.ed25519_pub.hex(),
54
    )
55
    m = Manifest.new(
56
        original_filename="q3_forecast.txt",
57
        content_hash=content_hash(plaintext),
58
        size_bytes=len(plaintext),
59
        issuer_id="acme.corp.legal",
60
        issuer_ed25519_pub_hex=issuer.ed25519_pub.hex(),
61
        recipient=recipient,
62
        registry_url=REG,
63
        content_type="text/plain",
64
    )
65
    m.watermarks = [
66
        WatermarkRef(layer="L1_zero_width", mark_id=mark_zw.hex()),
67
        WatermarkRef(layer="L2_whitespace", mark_id=mark_ws.hex()),
68
    ]
69
    m.beacons = [b.to_dict() for b in beacons]
70
 
71
    blob = seal(plaintext, m, issuer.ed25519_priv, alice.x25519_pub)
72
    print(f"  sealed = {len(blob)} bytes")
73
    print(f"  file_id = {m.file_id}")
74
 
75
    banner("4. Register with registry")
76
    r = httpx.post(f"{REG}/register", json={
77
        "manifest": m.to_dict(),
78
        "beacons": [b.to_dict() for b in beacons],
79
        "watermarks": [{"mark_id": w.mark_id, "layer": w.layer} for w in m.watermarks],
80
    })
81
    print(f"  POST /register -> {r.status_code} {r.json()}")
82
 
83
    banner("5. Simulate reader opening the document (triggers HTTP beacons)")
84
    def local_url(b):
85
        if b.kind == "http_img":
86
            return f"{REG}/p/{b.token_id}.png"
87
        if b.kind == "ocsp":
88
            return f"{REG}/r/{b.token_id}"
89
        if b.kind == "license":
90
            return f"{REG}/v/{b.token_id}"
91
        return None
92
 
93
    triggered = []
94
    for b in beacons:
95
        if b.kind == "dns":
96
            print(f"  [dns ]    would resolve {b.dns_name} (needs DNS server, skipped)")
97
            continue
98
        url = local_url(b)
99
        r = httpx.get(url, follow_redirects=True,
100
                      headers={"User-Agent": "Mozilla/5.0 OfficeDocViewer/2024"})
101
        triggered.append(b.token_id)
102
        print(f"  [{b.kind:<8}] GET {url} -> {r.status_code}")
103
    time.sleep(0.3)
104
 
105
    banner("6. Query registry for attribution via beacon token_id")
106
    tid = triggered[0]
107
    r = httpx.post(f"{REG}/attribute", json={"token_id": tid})
108
    data = r.json()
109
    print(f"  found      = {data['found']}")
110
    print(f"  file_id    = {data['file_id']}")
111
    print(f"  recipient  = {data['recipient_id']}")
112
    print(f"  issuer     = {data['issuer_id']}")
113
    print(f"  events:")
114
    for e in data["recent_events"][:5]:
115
        print(f"    {e['qualified_timestamp']}  {e['kind']:<10}  ip={e['source_ip']}  ua={e['user_agent'][:40]}")
116
 
117
    banner("7. Simulate leak: attacker posts plaintext to breach forum")
118
    decrypted, _ = open_sealed(blob, recipient_x25519_priv=alice.x25519_priv)
119
    leaked_text = decrypted.decode("utf-8")
120
    print(f"  leaked plaintext size: {len(leaked_text)} chars")
121
 
122
    recovered = watermark.recover_marks(leaked_text)
123
    for layer, mlist in recovered.items():
124
        uniq = sorted({mm.hex() for mm in mlist})
125
        if uniq:
126
            print(f"  {layer}: recovered unique IDs = {uniq}")
127
 
128
    banner("8. Attribute leaked copy to recipient")
129
    for layer, mlist in recovered.items():
130
        seen = set()
131
        for mm in mlist:
132
            h = mm.hex()
133
            if h in seen:
134
                continue
135
            seen.add(h)
136
            r = httpx.post(f"{REG}/attribute", json={"mark_id": h, "layer": layer})
137
            d = r.json()
138
            if d.get("found"):
139
                print(f"  [!!] LEAK ATTRIBUTED via {layer} mark {h}")
140
                print(f"       file_id   = {d['file_id']}")
141
                print(f"       recipient = {d['recipient_id']}  <-- source of leak")
142
                print(f"       issuer    = {d['issuer_id']}")
143
 
144
    banner("9. Pull full evidence bundle")
145
    r = httpx.get(f"{REG}/evidence/{m.file_id}")
146
    bundle = r.json()
147
    print(f"  file_id         = {bundle['file_id']}")
148
    print(f"  bundle ts       = {bundle['bundle_generated_at']}")
149
    print(f"  manifest issuer = {bundle['manifest']['issuer_id']}")
150
    print(f"  beacons         = {len(bundle['beacons'])}")
151
    print(f"  watermarks      = {len(bundle['watermarks'])}")
152
    print(f"  events logged   = {len(bundle['events'])}")
153
    print(f"  disclaimer      = {bundle['disclaimer'][:80]}...")
154
 
155
    banner("DEMO COMPLETE")
156
 
157
 
158
if __name__ == "__main__":
159
    main()