| 1 | """ |
| 2 | oversight_core.beacon |
| 3 | ==================== |
| 4 | |
| 5 | Beacon / canary token generation. |
| 6 | |
| 7 | Per-file, per-recipient passive callbacks. When a sealed file is opened (or even |
| 8 | its metadata inspected), one or more beacons fire to the attribution registry. |
| 9 | |
| 10 | Design principles: |
| 11 | - PASSIVE ONLY. No code execution on the reader. No RAT. No "active" payloads. |
| 12 | Beacons are network callbacks that standard document readers make naturally |
| 13 | during rendering (image fetch, URL resolution, font load, license check). |
| 14 | - DIVERSITY. Multiple beacon types per file. Stripping one doesn't defeat the others. |
| 15 | - PER-RECIPIENT. Each recipient's copy has unique beacon URLs. |
| 16 | A callback identifies not just "the file leaked" but "whose copy leaked". |
| 17 | - LEGAL. Beacons only phone home to the registry operator's infrastructure; |
| 18 | they do not exfiltrate data from the reader's machine beyond what any |
| 19 | standard web request reveals (IP, UA, timestamp). |
| 20 | |
| 21 | Beacon types in this MVP: |
| 22 | - DNS beacon (subdomain resolution - fires before HTTP) |
| 23 | - HTTP beacon (image-fetch URL suitable for embedding in Office/PDF docs) |
| 24 | - OCSP-style beacon (cert revocation check - survives very restrictive environments) |
| 25 | - "License check" beacon (HEAD request to a policy endpoint) |
| 26 | |
| 27 | Each beacon is tagged with: |
| 28 | - token_id : unique, unguessable, ties callback -> (file_id, recipient_id) |
| 29 | - beacon_kind : type of callback |
| 30 | - first_seen : to be populated by the registry on receipt |
| 31 | """ |
| 32 | |
| 33 | from __future__ import annotations |
| 34 | |
| 35 | import secrets |
| 36 | from dataclasses import dataclass, asdict |
| 37 | from typing import Optional |
| 38 | |
| 39 | |
| 40 | @dataclass |
| 41 | class Beacon: |
| 42 | token_id: str |
| 43 | kind: str |
| 44 | url: str |
| 45 | dns_name: Optional[str] = None |
| 46 | |
| 47 | def to_dict(self) -> dict: |
| 48 | return asdict(self) |
| 49 | |
| 50 | |
| 51 | def _token() -> str: |
| 52 | return secrets.token_hex(16) |
| 53 | |
| 54 | |
| 55 | def gen_beacons( |
| 56 | registry_domain: str, |
| 57 | file_id: str, |
| 58 | recipient_id: str, |
| 59 | include: Optional[list[str]] = None, |
| 60 | ) -> list[Beacon]: |
| 61 | """ |
| 62 | Generate a set of beacons for a specific (file, recipient) pair. |
| 63 | |
| 64 | The registry_domain must be under the control of the sealing operator. |
| 65 | The token_id is the lookup key - the registry maps token_id -> (file_id, recipient_id). |
| 66 | """ |
| 67 | kinds = include or ["dns", "http_img", "ocsp", "license"] |
| 68 | out: list[Beacon] = [] |
| 69 | |
| 70 | for kind in kinds: |
| 71 | tid = _token() |
| 72 | if kind == "dns": |
| 73 | host = f"{tid}.t.{registry_domain}" |
| 74 | out.append(Beacon( |
| 75 | token_id=tid, |
| 76 | kind="dns", |
| 77 | url=f"dns://{host}", |
| 78 | dns_name=host, |
| 79 | )) |
| 80 | elif kind == "http_img": |
| 81 | out.append(Beacon( |
| 82 | token_id=tid, |
| 83 | kind="http_img", |
| 84 | url=f"https://b.{registry_domain}/p/{tid}.png", |
| 85 | )) |
| 86 | elif kind == "ocsp": |
| 87 | out.append(Beacon( |
| 88 | token_id=tid, |
| 89 | kind="ocsp", |
| 90 | url=f"https://ocsp.{registry_domain}/r/{tid}", |
| 91 | )) |
| 92 | elif kind == "license": |
| 93 | out.append(Beacon( |
| 94 | token_id=tid, |
| 95 | kind="license", |
| 96 | url=f"https://lic.{registry_domain}/v/{tid}", |
| 97 | )) |
| 98 | return out |
| 99 | |
| 100 | |
| 101 | def beacon_to_img_tag(b: Beacon) -> str: |
| 102 | """HTML snippet that many office/PDF renderers will fetch on open.""" |
| 103 | return f'<img src="{b.url}" width="1" height="1" alt=""/>' |
| 104 | |
| 105 | |
| 106 | def beacons_html_block(beacons: list[Beacon]) -> str: |
| 107 | imgs = "\n".join(beacon_to_img_tag(b) for b in beacons if b.kind == "http_img") |
| 108 | return f'<div style="display:none">\n{imgs}\n</div>' |