Zion Boggan
repos/Oversight/cli/oversight.py
zionboggan.com ↗
554 lines · python
History for this file →
1
"""
2
OVERSIGHT CLI.
3
 
4
Usage:
5
  oversight keygen --out identity.json
6
                       Generate a new classic identity (X25519 + Ed25519).
7
 
8
  oversight seal INPUT --recipient-pub PUB.json --issuer-id ID \\
9
      --issuer-key ISSUER.json --registry-url URL --out OUT.sealed [--watermark]
10
                       Produce a .sealed file for a recipient.
11
 
12
  oversight open INPUT.sealed --identity IDENT.json --out PLAINTEXT
13
                       Decrypt a .sealed file.
14
 
15
  oversight inspect INPUT.sealed
16
                       Dump the (signed) manifest without decrypting.
17
 
18
  oversight attribute --leak LEAK.txt --registry URL
19
                       Read watermark marks out of leaked text and query registry
20
                       to identify the source recipient.
21
"""
22
 
23
from __future__ import annotations
24
 
25
import argparse
26
import json
27
import sys
28
from pathlib import Path
29
 
30
import httpx
31
 
32
ROOT = Path(__file__).resolve().parent.parent
33
sys.path.insert(0, str(ROOT))
34
 
35
from oversight_core import (
36
    ClassicIdentity,
37
    Manifest,
38
    Recipient,
39
    WatermarkRef,
40
    content_hash,
41
    seal,
42
    open_sealed,
43
    beacon,
44
    watermark,
45
    l3_policy,
46
)
47
from oversight_core.container import SealedFile
48
from oversight_core import semantic
49
from oversight_core.fingerprint import ContentFingerprint
50
from oversight_core.safe_io import (
51
    atomic_write_bytes,
52
    atomic_write_private_json,
53
    atomic_write_text,
54
    validate_output_path,
55
)
56
 
57
 
58
 
59
def cmd_keygen(args):
60
    out_path = Path(args.out)
61
    pub_path = out_path.with_suffix(".pub.json")
62
    validate_output_path(out_path)
63
    validate_output_path(pub_path, input_paths=[out_path])
64
    ident = ClassicIdentity.generate()
65
    out = {
66
        "id": args.id or "identity",
67
        "x25519_priv": ident.x25519_priv.hex(),
68
        "x25519_pub": ident.x25519_pub.hex(),
69
        "ed25519_priv": ident.ed25519_priv.hex(),
70
        "ed25519_pub": ident.ed25519_pub.hex(),
71
    }
72
    atomic_write_private_json(out_path, out)
73
    atomic_write_text(pub_path, json.dumps({
74
        "id": out["id"],
75
        "x25519_pub": out["x25519_pub"],
76
        "ed25519_pub": out["ed25519_pub"],
77
    }, indent=2))
78
    print(f"[+] wrote private identity to {args.out}")
79
    print(f"[+] wrote public  identity to {pub_path}")
80
 
81
 
82
 
83
def cmd_seal(args):
84
    input_path = Path(args.input)
85
    issuer_path = Path(args.issuer_key)
86
    recipient_path = Path(args.recipient_pub)
87
    out_path = Path(args.out)
88
    validate_output_path(out_path, input_paths=[input_path, issuer_path, recipient_path])
89
    plaintext = input_path.read_bytes()
90
    issuer = json.loads(issuer_path.read_text())
91
    rec_pub = json.loads(recipient_path.read_text())
92
 
93
    canonical_plaintext = plaintext
94
 
95
    watermarks_for_manifest: list[WatermarkRef] = []
96
    l3_decision = None
97
    if args.watermark:
98
        try:
99
            text = plaintext.decode("utf-8")
100
        except UnicodeDecodeError:
101
            print("[!] --watermark requires UTF-8 text input; skipping marks")
102
            text = None
103
 
104
        if text is not None:
105
            mark_id = watermark.new_mark_id()
106
 
107
            l3_decision = l3_policy.decide_l3(
108
                filename=args.input,
109
                content_type=args.content_type,
110
                text=text,
111
                declared_class=args.document_class,
112
                requested_mode=args.l3_mode,
113
            )
114
 
115
            if l3_decision.enabled:
116
                if not args.l3_ack and not _confirm_l3(l3_decision):
117
                    raise SystemExit(
118
                        "L3 changes visible text. Re-run with --l3-mode off, "
119
                        "--l3-mode boilerplate, or --l3-ack to acknowledge."
120
                    )
121
                text = l3_policy.apply_l3_safe(text, mark_id, mode=l3_decision.mode)
122
                watermarks_for_manifest.append(WatermarkRef(
123
                    layer=f"L3_semantic_{l3_decision.mode}", mark_id=mark_id.hex()
124
                ))
125
 
126
            text = watermark.embed_ws(text, mark_id)
127
            text = watermark.embed_zw(text, mark_id)
128
            plaintext = text.encode("utf-8")
129
 
130
            watermarks_for_manifest.append(WatermarkRef(
131
                layer="L1_zero_width", mark_id=mark_id.hex()
132
            ))
133
            watermarks_for_manifest.append(WatermarkRef(
134
                layer="L2_whitespace", mark_id=mark_id.hex()
135
            ))
136
            print(f"[+] embedded L1 mark {mark_id.hex()}")
137
            print(f"[+] embedded L2 mark {mark_id.hex()}")
138
            if l3_decision and l3_decision.enabled:
139
                print(f"[+] embedded L3 mark {mark_id.hex()} ({l3_decision.mode})")
140
            elif l3_decision:
141
                print(f"[!] L3 skipped: {l3_decision.reason} ({'; '.join(l3_decision.warnings)})")
142
 
143
    recipient = Recipient(
144
        recipient_id=rec_pub["id"],
145
        x25519_pub=rec_pub["x25519_pub"],
146
        ed25519_pub=rec_pub.get("ed25519_pub"),
147
    )
148
 
149
    manifest = Manifest.new(
150
        original_filename=input_path.name,
151
        content_hash=content_hash(plaintext),
152
        size_bytes=len(plaintext),
153
        issuer_id=args.issuer_id,
154
        issuer_ed25519_pub_hex=issuer["ed25519_pub"],
155
        recipient=recipient,
156
        registry_url=args.registry_url,
157
        content_type=args.content_type,
158
    )
159
    manifest.canonical_content_hash = content_hash(canonical_plaintext)
160
    if l3_decision:
161
        manifest.l3_policy = l3_decision.to_dict()
162
    beacons = beacon.gen_beacons(
163
        registry_domain=args.registry_domain,
164
        file_id=manifest.file_id,
165
        recipient_id=rec_pub["id"],
166
    )
167
    manifest.watermarks = watermarks_for_manifest
168
    manifest.beacons = [b.to_dict() for b in beacons]
169
 
170
    fingerprint = None
171
    try:
172
        fingerprint_text = plaintext.decode("utf-8")
173
        fingerprint = ContentFingerprint.from_text(fingerprint_text)
174
        print(f"[+] content fingerprint: {len(fingerprint.winnowing_fp)} winnow hashes, "
175
              f"{len(fingerprint.sentence_fp)} sentence hashes")
176
    except UnicodeDecodeError:
177
        pass
178
 
179
    blob = seal(
180
        plaintext=plaintext,
181
        manifest=manifest,
182
        issuer_ed25519_priv=bytes.fromhex(issuer["ed25519_priv"]),
183
        recipient_x25519_pub=bytes.fromhex(rec_pub["x25519_pub"]),
184
    )
185
 
186
    atomic_write_bytes(out_path, blob)
187
    print(f"[+] wrote {args.out} ({len(blob)} bytes)")
188
    print(f"[+] file_id={manifest.file_id}")
189
    print(f"[+] recipient={recipient.recipient_id}")
190
    print(f"[+] beacons={len(beacons)}  watermarks={len(watermarks_for_manifest)}")
191
 
192
    if fingerprint:
193
        fp_path = out_path.with_suffix(".fingerprint.json")
194
        atomic_write_text(fp_path, json.dumps({
195
            "file_id": manifest.file_id,
196
            "recipient_id": rec_pub["id"],
197
            "mark_id": watermarks_for_manifest[0].mark_id if watermarks_for_manifest else None,
198
            "canonical_content_hash": manifest.canonical_content_hash,
199
            "l3_policy": manifest.l3_policy,
200
            "fingerprint": fingerprint.to_dict(),
201
        }, indent=2))
202
        print(f"[+] wrote fingerprint to {fp_path}")
203
 
204
    if args.register:
205
        reg_payload = {
206
            "manifest": manifest.to_dict(),
207
            "beacons": [b.to_dict() for b in beacons],
208
            "watermarks": [w.__dict__ for w in watermarks_for_manifest],
209
        }
210
        if fingerprint:
211
            reg_payload["fingerprint"] = fingerprint.to_dict()
212
        try:
213
            resp = httpx.post(
214
                f"{args.register.rstrip('/')}/register",
215
                json=reg_payload,
216
                timeout=10,
217
            )
218
            resp.raise_for_status()
219
            print(f"[+] registered with {args.register}: {resp.json()}")
220
        except Exception as e:
221
            print(f"[!] registry registration failed: {e}")
222
 
223
 
224
 
225
def cmd_open(args):
226
    input_path = Path(args.input)
227
    identity_path = Path(args.identity)
228
    out_path = Path(args.out)
229
    validate_output_path(out_path, input_paths=[input_path, identity_path])
230
    blob = input_path.read_bytes()
231
    ident = json.loads(identity_path.read_text())
232
    plaintext, manifest = open_sealed(
233
        blob,
234
        recipient_x25519_priv=bytes.fromhex(ident["x25519_priv"]),
235
    )
236
    atomic_write_bytes(out_path, plaintext)
237
    print(f"[+] decrypted to {args.out}")
238
    print(f"[+] file_id   = {manifest.file_id}")
239
    print(f"[+] issuer    = {manifest.issuer_id}")
240
    print(f"[+] recipient = {manifest.recipient.recipient_id if manifest.recipient else '?'}")
241
    print(f"[+] marks     = {len(manifest.watermarks)}")
242
    print(f"[+] beacons   = {len(manifest.beacons)}")
243
 
244
 
245
def _confirm_l3(decision) -> bool:
246
    print("[!] L3 semantic watermarking changes visible prose.")
247
    print(f"    document_class={decision.document_class} mode={decision.mode}")
248
    print(f"    reason={decision.reason}")
249
    if not sys.stdin.isatty():
250
        return False
251
    answer = input("    Type 'I ACKNOWLEDGE' to continue: ").strip()
252
    return answer == "I ACKNOWLEDGE"
253
 
254
 
255
 
256
def cmd_inspect(args):
257
    blob = Path(args.input).read_bytes()
258
    sf = SealedFile.from_bytes(blob)
259
    print(json.dumps(sf.manifest.to_dict(), indent=2, default=str))
260
    print()
261
    print(f"[valid manifest signature] {sf.manifest.verify()}")
262
 
263
 
264
 
265
def cmd_attribute(args):
266
    text = Path(args.leak).read_text(encoding="utf-8", errors="replace")
267
 
268
    print("[*] Phase 1: Direct extraction (L1 + L2)")
269
    l1_marks = watermark.extract_zw(text)
270
    l2_candidate, l2_conf, l2_bits, l2_needed = watermark.extract_ws_partial(text)
271
 
272
    l1_unique = list(set(l1_marks))
273
    direct_candidates: list[bytes] = list(l1_unique)
274
    if l2_candidate and l2_conf >= 0.5:
275
        if l2_candidate not in direct_candidates:
276
            direct_candidates.append(l2_candidate)
277
 
278
    if l1_unique:
279
        print(f"    L1: {len(l1_marks)} frames, {len(l1_unique)} unique mark(s)")
280
        for m in l1_unique:
281
            print(f"        {m.hex()}")
282
    else:
283
        print("    L1: no zero-width frames found (stripped?)")
284
 
285
    if l2_conf >= 1.0:
286
        print(f"    L2: {l2_bits}/{l2_needed} bits recovered (100%): {l2_candidate.hex()}")
287
    elif l2_conf > 0:
288
        print(f"    L2: {l2_bits}/{l2_needed} bits recovered ({l2_conf:.0%}): {l2_candidate.hex()} (partial)")
289
    else:
290
        print("    L2: no trailing whitespace marks found (stripped?)")
291
 
292
    registry_candidates: list[bytes] = []
293
    print(f"\n[*] Phase 2: Registry query ({args.registry})")
294
    if direct_candidates:
295
        for m in direct_candidates:
296
            try:
297
                resp = httpx.post(
298
                    f"{args.registry.rstrip('/')}/attribute",
299
                    json={"mark_id": m.hex(), "layer": "L1_zero_width"},
300
                    timeout=10,
301
                )
302
                data = resp.json()
303
                if data.get("found"):
304
                    print(f"    MATCH: {m.hex()} -> recipient={data['recipient_id']}, "
305
                          f"file={data['file_id']}")
306
            except Exception as e:
307
                print(f"    registry query failed: {e}")
308
 
309
    try:
310
        resp = httpx.get(
311
            f"{args.registry.rstrip('/')}/marks",
312
            timeout=10,
313
        )
314
        if resp.status_code == 200:
315
            registry_data = resp.json()
316
            for entry in registry_data.get("marks", []):
317
                mid_bytes = bytes.fromhex(entry["mark_id"])
318
                if mid_bytes not in registry_candidates:
319
                    registry_candidates.append(mid_bytes)
320
            print(f"    fetched {len(registry_candidates)} candidate mark_id(s) from registry")
321
    except Exception:
322
        pass
323
 
324
    all_candidates = direct_candidates + [
325
        m for m in registry_candidates if m not in direct_candidates
326
    ]
327
 
328
    print(f"\n[*] Phase 3: L3 semantic verification ({len(all_candidates)} candidate(s))")
329
    if all_candidates:
330
        l3_hits = watermark.verify_l3(text, all_candidates)
331
        if l3_hits:
332
            for mid, score, detail in l3_hits:
333
                print(f"    L3 MATCH: {mid.hex()} score={score:.2f} "
334
                      f"(synonyms={detail['synonyms_score']:.2f}, "
335
                      f"punct={detail['punctuation_hits']}, "
336
                      f"dict={detail['dict_version']})")
337
        else:
338
            print("    L3: no candidates matched above threshold")
339
    else:
340
        print("    L3: no candidates available (L1/L2 stripped, registry unreachable?)")
341
 
342
    print("\n[*] Phase 4: Multi-layer fusion")
343
    result = watermark.recover_marks_v2(text, all_candidates if all_candidates else None)
344
    if result["candidates"]:
345
        for mark_id, score, layers in result["candidates"]:
346
            print(f"    {mark_id.hex()}  score={score:.3f}  layers={layers}")
347
        best = result["candidates"][0]
348
        print(f"\n[!!] BEST ATTRIBUTION: {best[0].hex()}")
349
        print(f"     confidence = {best[1]:.1%}")
350
        print(f"     evidence   = {best[2]}")
351
 
352
        try:
353
            resp = httpx.post(
354
                f"{args.registry.rstrip('/')}/attribute",
355
                json={"mark_id": best[0].hex(), "layer": "fused"},
356
                timeout=10,
357
            )
358
            data = resp.json()
359
            if data.get("found"):
360
                print(f"     file_id    = {data['file_id']}")
361
                print(f"     recipient  = {data['recipient_id']}")
362
                print(f"     issuer     = {data['issuer_id']}")
363
        except Exception:
364
            pass
365
    else:
366
        print("    No marks recovered from any layer.")
367
        print("\n[*] Diagnostics:")
368
        for d in result["diagnostics"]:
369
            print(f"    {d}")
370
 
371
    if args.fingerprints:
372
        print(f"\n[*] Phase 5: Content fingerprint comparison")
373
        leak_fp = ContentFingerprint.from_text(text)
374
        print(f"    Leak fingerprint: {len(leak_fp.winnowing_fp)} winnow hashes, "
375
              f"{len(leak_fp.sentence_fp)} sentence hashes")
376
 
377
        best_fp_match = None
378
        best_fp_score = 0.0
379
 
380
        fp_dir = Path(args.fingerprints)
381
        if fp_dir.is_dir():
382
            fp_files = list(fp_dir.glob("*.fingerprint.json"))
383
        elif fp_dir.is_file():
384
            fp_files = [fp_dir]
385
        else:
386
            fp_files = []
387
            print(f"    [!] fingerprint path not found: {args.fingerprints}")
388
 
389
        for fp_file in fp_files:
390
            try:
391
                fp_data = json.loads(fp_file.read_text())
392
                stored_fp = ContentFingerprint.from_dict(fp_data["fingerprint"])
393
                sim = leak_fp.similarity(stored_fp)
394
                recipient_id = fp_data.get("recipient_id", "unknown")
395
                mark_id = fp_data.get("mark_id", "unknown")
396
 
397
                if sim["combined"] >= 0.1:
398
                    print(f"    {fp_file.name}: recipient={recipient_id} "
399
                          f"winnow={sim['winnowing']:.2f} "
400
                          f"sentence={sim['sentence']:.2f} "
401
                          f"combined={sim['combined']:.2f} "
402
                          f"[{sim['verdict']}]")
403
 
404
                if sim["combined"] > best_fp_score:
405
                    best_fp_score = sim["combined"]
406
                    best_fp_match = {
407
                        "file": fp_file.name,
408
                        "recipient_id": recipient_id,
409
                        "mark_id": mark_id,
410
                        "similarity": sim,
411
                    }
412
            except Exception as e:
413
                print(f"    [!] error reading {fp_file.name}: {e}")
414
 
415
        if best_fp_match and best_fp_score >= 0.3:
416
            verdict = best_fp_match["similarity"]["verdict"]
417
            print(f"\n[!!] FINGERPRINT ATTRIBUTION [{verdict}]:")
418
            print(f"     recipient  = {best_fp_match['recipient_id']}")
419
            print(f"     mark_id    = {best_fp_match['mark_id']}")
420
            print(f"     confidence = {best_fp_score:.1%}")
421
            print(f"     winnowing  = {best_fp_match['similarity']['winnowing']:.1%}")
422
            print(f"     sentence   = {best_fp_match['similarity']['sentence']:.1%}")
423
        elif fp_files:
424
            print("    No fingerprint match above threshold.")
425
        else:
426
            print("    No fingerprint files found to compare against.")
427
 
428
 
429
 
430
def cmd_siem(args):
431
    if args.siem_cmd != "export":
432
        raise SystemExit(f"[!] unknown siem command: {args.siem_cmd}")
433
 
434
    from oversight_core import siem
435
 
436
    events = siem.iter_registry_events(
437
        args.db,
438
        since_unix=args.since,
439
        limit=args.limit,
440
        registry_id=args.registry_id,
441
    )
442
 
443
    output = args.output
444
    if output == "-":
445
        sink: siem.Sink = siem.StdoutSink()
446
    elif output.startswith("http://") or output.startswith("https://"):
447
        headers = {}
448
        for h in args.header:
449
            if ":" not in h:
450
                raise SystemExit(f"[!] --header must be 'Key: Value', got: {h!r}")
451
            k, v = h.split(":", 1)
452
            headers[k.strip()] = v.strip()
453
        sink = siem.HTTPJSONSink(output, headers=headers)
454
    else:
455
        sink = siem.FileSink(output, mode="a")
456
 
457
    try:
458
        splunk_kwargs = {}
459
        if args.format == "splunk":
460
            splunk_kwargs = {
461
                "source": args.splunk_source,
462
                "sourcetype": args.splunk_sourcetype,
463
                "index": args.splunk_index,
464
            }
465
        count = siem.export_events(
466
            events=events,
467
            fmt=args.format,
468
            sink=sink,
469
            splunk_kwargs=splunk_kwargs,
470
        )
471
    finally:
472
        sink.close()
473
 
474
    sys.stderr.write(f"[ok] exported {count} event(s) as {args.format}\n")
475
 
476
 
477
def main():
478
    p = argparse.ArgumentParser(prog="oversight")
479
    sub = p.add_subparsers(dest="cmd", required=True)
480
 
481
    k = sub.add_parser("keygen")
482
    k.add_argument("--out", required=True)
483
    k.add_argument("--id", default=None)
484
 
485
    s = sub.add_parser("seal")
486
    s.add_argument("input")
487
    s.add_argument("--recipient-pub", required=True)
488
    s.add_argument("--issuer-id", required=True)
489
    s.add_argument("--issuer-key", required=True)
490
    s.add_argument("--registry-url", required=True)
491
    s.add_argument("--registry-domain", default="oversightprotocol.dev")
492
    s.add_argument("--out", required=True)
493
    s.add_argument("--content-type", default="application/octet-stream")
494
    s.add_argument("--watermark", action="store_true", help="embed text watermarks")
495
    s.add_argument("--l3-mode", choices=("auto", "off", "full", "boilerplate"), default="auto",
496
                   help="semantic L3 mode; auto disables L3 for wording-sensitive document classes")
497
    s.add_argument("--l3-ack", action="store_true",
498
                   help="acknowledge that enabled L3 makes recipient text non-identical")
499
    s.add_argument("--document-class",
500
                   choices=("auto", "prose", "legal", "regulatory", "technical_spec",
501
                            "source_code", "sql", "log", "structured_data"),
502
                   default="auto",
503
                   help="declare document class for L3 safety decisions")
504
    s.add_argument("--register", default=None, help="POST manifest to this registry URL")
505
 
506
    o = sub.add_parser("open")
507
    o.add_argument("input")
508
    o.add_argument("--identity", required=True)
509
    o.add_argument("--out", required=True)
510
 
511
    i = sub.add_parser("inspect")
512
    i.add_argument("input")
513
 
514
    a = sub.add_parser("attribute")
515
    a.add_argument("--leak", required=True)
516
    a.add_argument("--registry", required=True)
517
    a.add_argument("--fingerprints", default=None,
518
                   help="path to fingerprint file or directory for VM-strip detection")
519
 
520
    x = sub.add_parser("siem", help="export registry events for Splunk, Sentinel, or Elastic")
521
    xsub = x.add_subparsers(dest="siem_cmd", required=True)
522
    xe = xsub.add_parser("export", help="emit events from the registry SQLite db")
523
    xe.add_argument("--db", required=True, help="path to registry.sqlite")
524
    xe.add_argument("--format", choices=("splunk", "ecs", "sentinel"), required=True)
525
    xe.add_argument("--registry-id", required=True,
526
                    help="registry identifier (typically its ed25519 public key hex)")
527
    xe.add_argument("--since", type=int, default=None,
528
                    help="only emit events with timestamp >= unix epoch seconds")
529
    xe.add_argument("--limit", type=int, default=None)
530
    xe.add_argument("--output", default="-",
531
                    help="'-' for stdout, a file path, or http(s):// URL for HTTP POST")
532
    xe.add_argument("--header", action="append", default=[],
533
                    help="HTTP header in 'Key: Value' form, repeatable")
534
    xe.add_argument("--splunk-index", default=None)
535
    xe.add_argument("--splunk-source", default="oversight:registry")
536
    xe.add_argument("--splunk-sourcetype", default="oversight:beacon")
537
 
538
    args = p.parse_args()
539
 
540
    try:
541
        {
542
            "keygen": cmd_keygen,
543
            "seal": cmd_seal,
544
            "open": cmd_open,
545
            "inspect": cmd_inspect,
546
            "attribute": cmd_attribute,
547
            "siem": cmd_siem,
548
        }[args.cmd](args)
549
    except (ValueError, FileExistsError, OSError, json.JSONDecodeError) as exc:
550
        raise SystemExit(f"[!] {exc}") from exc
551
 
552
 
553
if __name__ == "__main__":
554
    main()