| 1 | """ |
| 2 | oversight_match - Flywheel job module. |
| 3 | |
| 4 | Registers a new Flywheel job kind `oversight_match` that takes scraped content |
| 5 | (text, attached images, attached PDFs/DOCX) and checks it against the |
| 6 | OVERSIGHT registry for leaked-file attribution. |
| 7 | |
| 8 | How to register this with Flywheel: |
| 9 | from oversight_integrations.flywheel_oversight_match import handle_scraped |
| 10 | flywheel.register_job("oversight_match", handle_scraped) |
| 11 | |
| 12 | Job inputs (dict): |
| 13 | { |
| 14 | "source_url": "https://breachforums.example/thread/12345", |
| 15 | "scraped_at": 1715000000, |
| 16 | "text": "<pasted leaked document text>", |
| 17 | "attachments": [ |
| 18 | {"kind": "image", "bytes_hex": "...", "filename": "leaked.png"}, |
| 19 | {"kind": "pdf", "bytes_hex": "...", "filename": "leaked.pdf"}, |
| 20 | {"kind": "docx", "bytes_hex": "...", "filename": "leaked.docx"}, |
| 21 | ], |
| 22 | } |
| 23 | |
| 24 | Job output (dict): |
| 25 | { |
| 26 | "matches": [ |
| 27 | {"layer": "L1_zero_width", "mark_id": "...", "file_id": "...", |
| 28 | "recipient_id": "...", "issuer_id": "...", "score": 1.0}, |
| 29 | {"layer": "L3_semantic", "mark_id": "...", "score": 0.89, ...}, |
| 30 | {"layer": "image_DCT", "mark_id": "...", "score": 0.12, ...}, |
| 31 | {"layer": "perceptual_hash","hash": "...", "file_id": "...", ...}, |
| 32 | ], |
| 33 | "scraped_at": 1715000000, |
| 34 | "source_url": "...", |
| 35 | } |
| 36 | |
| 37 | On match: raise a priority-1 alert through the Flywheel event bus so the |
| 38 | `CanaryKeeper` Perseus agent can notify Zion via Discord. |
| 39 | """ |
| 40 | |
| 41 | from __future__ import annotations |
| 42 | |
| 43 | import time |
| 44 | from pathlib import Path |
| 45 | from typing import Any, Optional |
| 46 | |
| 47 | import httpx |
| 48 | |
| 49 | import sys |
| 50 | sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) |
| 51 | |
| 52 | from oversight_core import watermark, semantic |
| 53 | from oversight_core.formats import image as img_fmt |
| 54 | from oversight_core.formats import pdf as pdf_fmt |
| 55 | from oversight_core.formats import docx as docx_fmt |
| 56 | |
| 57 | |
| 58 | |
| 59 | class RegistryClient: |
| 60 | def __init__(self, url: str, timeout: float = 10.0): |
| 61 | self.url = url.rstrip("/") |
| 62 | self.client = httpx.Client(timeout=timeout) |
| 63 | self._cached_candidates: list[dict] = [] |
| 64 | self._candidates_fetched_at: int = 0 |
| 65 | |
| 66 | def close(self): |
| 67 | self.client.close() |
| 68 | |
| 69 | def attribute(self, **kwargs) -> dict: |
| 70 | """POST /attribute with any of token_id, mark_id, layer, perceptual_hash.""" |
| 71 | r = self.client.post(f"{self.url}/attribute", json=kwargs) |
| 72 | r.raise_for_status() |
| 73 | return r.json() |
| 74 | |
| 75 | def fetch_semantic_candidates(self, cache_ttl: int = 3600) -> list[dict]: |
| 76 | """Fetch L3 semantic candidate mark_ids (cached for cache_ttl seconds).""" |
| 77 | now = int(time.time()) |
| 78 | if self._cached_candidates and now - self._candidates_fetched_at < cache_ttl: |
| 79 | return self._cached_candidates |
| 80 | r = self.client.get(f"{self.url}/candidates/semantic", params={"limit": 5000}) |
| 81 | r.raise_for_status() |
| 82 | data = r.json() |
| 83 | self._cached_candidates = data["candidates"] |
| 84 | self._candidates_fetched_at = now |
| 85 | return self._cached_candidates |
| 86 | |
| 87 | |
| 88 | |
| 89 | def _check_text(text: str, registry: RegistryClient) -> list[dict]: |
| 90 | """ |
| 91 | Run L1 / L2 / L3 extractors against leaked text. |
| 92 | L1 and L2 give direct mark_ids (look them up). |
| 93 | L3 requires iterating candidate mark_ids and verifying. |
| 94 | """ |
| 95 | matches: list[dict] = [] |
| 96 | |
| 97 | for m in watermark.extract_zw(text): |
| 98 | r = registry.attribute(mark_id=m.hex(), layer="L1_zero_width") |
| 99 | if r.get("found"): |
| 100 | matches.append({"layer": "L1_zero_width", "score": 1.0, **r}) |
| 101 | |
| 102 | l2 = watermark.extract_ws(text) |
| 103 | if l2: |
| 104 | r = registry.attribute(mark_id=l2.hex(), layer="L2_whitespace") |
| 105 | if r.get("found"): |
| 106 | matches.append({"layer": "L2_whitespace", "score": 1.0, **r}) |
| 107 | |
| 108 | candidates = registry.fetch_semantic_candidates() |
| 109 | for cand in candidates: |
| 110 | mark_bytes = bytes.fromhex(cand["mark_id"]) |
| 111 | result = semantic.verify_semantic(text, mark_bytes) |
| 112 | if result["overall_match"]: |
| 113 | r = registry.attribute(mark_id=cand["mark_id"], layer="L3_semantic") |
| 114 | if r.get("found"): |
| 115 | matches.append({ |
| 116 | "layer": "L3_semantic", |
| 117 | "score": result["synonyms_score"], |
| 118 | "punct_score": result["punctuation_score"], |
| 119 | **r, |
| 120 | }) |
| 121 | return matches |
| 122 | |
| 123 | |
| 124 | |
| 125 | def _check_image(image_bytes: bytes, registry: RegistryClient) -> list[dict]: |
| 126 | """DCT watermark verification + perceptual-hash fuzzy lookup.""" |
| 127 | matches: list[dict] = [] |
| 128 | |
| 129 | try: |
| 130 | phash = img_fmt.perceptual_hash(image_bytes) |
| 131 | r = registry.attribute(perceptual_hash=phash) |
| 132 | if r.get("found"): |
| 133 | matches.append({"layer": "perceptual_hash", "hash": phash, "score": 1.0, **r}) |
| 134 | except Exception: |
| 135 | pass |
| 136 | |
| 137 | return matches |
| 138 | |
| 139 | |
| 140 | |
| 141 | def _check_pdf(pdf_bytes: bytes, registry: RegistryClient) -> list[dict]: |
| 142 | matches: list[dict] = [] |
| 143 | ext = pdf_fmt.extract(pdf_bytes) |
| 144 | if ext.get("mark_id"): |
| 145 | r = registry.attribute(mark_id=ext["mark_id"]) |
| 146 | if r.get("found"): |
| 147 | matches.append({"layer": "pdf_metadata", "score": 1.0, **r}) |
| 148 | try: |
| 149 | body_text = pdf_fmt.extract_text_for_watermark_recovery(pdf_bytes) |
| 150 | matches.extend(_check_text(body_text, registry)) |
| 151 | except Exception: |
| 152 | pass |
| 153 | return matches |
| 154 | |
| 155 | |
| 156 | def _check_docx(docx_bytes: bytes, registry: RegistryClient) -> list[dict]: |
| 157 | matches: list[dict] = [] |
| 158 | ext = docx_fmt.extract(docx_bytes) |
| 159 | if ext.get("mark_id"): |
| 160 | r = registry.attribute(mark_id=ext["mark_id"]) |
| 161 | if r.get("found"): |
| 162 | matches.append({"layer": "docx_metadata", "score": 1.0, **r}) |
| 163 | try: |
| 164 | body_text = docx_fmt.extract_text_for_watermark_recovery(docx_bytes) |
| 165 | matches.extend(_check_text(body_text, registry)) |
| 166 | except Exception: |
| 167 | pass |
| 168 | return matches |
| 169 | |
| 170 | |
| 171 | |
| 172 | def handle_scraped(job_input: dict, registry_url: str) -> dict: |
| 173 | """ |
| 174 | Flywheel job entrypoint. Processes one scraped blob and returns |
| 175 | a list of OVERSIGHT attribution matches (empty if nothing matches). |
| 176 | """ |
| 177 | registry = RegistryClient(registry_url) |
| 178 | try: |
| 179 | all_matches: list[dict] = [] |
| 180 | |
| 181 | text = job_input.get("text", "") or "" |
| 182 | if text: |
| 183 | all_matches.extend(_check_text(text, registry)) |
| 184 | |
| 185 | for att in job_input.get("attachments", []): |
| 186 | kind = att.get("kind") |
| 187 | raw = att.get("bytes_hex") |
| 188 | if not raw: |
| 189 | continue |
| 190 | blob = bytes.fromhex(raw) |
| 191 | if kind == "image": |
| 192 | all_matches.extend(_check_image(blob, registry)) |
| 193 | elif kind == "pdf": |
| 194 | all_matches.extend(_check_pdf(blob, registry)) |
| 195 | elif kind == "docx": |
| 196 | all_matches.extend(_check_docx(blob, registry)) |
| 197 | |
| 198 | seen = set() |
| 199 | unique: list[dict] = [] |
| 200 | for m in all_matches: |
| 201 | key = (m.get("layer"), m.get("file_id")) |
| 202 | if key not in seen: |
| 203 | seen.add(key) |
| 204 | unique.append(m) |
| 205 | |
| 206 | return { |
| 207 | "matches": unique, |
| 208 | "scraped_at": job_input.get("scraped_at"), |
| 209 | "source_url": job_input.get("source_url"), |
| 210 | } |
| 211 | finally: |
| 212 | registry.close() |
| 213 | |
| 214 | |
| 215 | |
| 216 | if __name__ == "__main__": |
| 217 | import argparse |
| 218 | import json |
| 219 | |
| 220 | p = argparse.ArgumentParser() |
| 221 | p.add_argument("--registry", required=True) |
| 222 | p.add_argument("--text", default="") |
| 223 | p.add_argument("--url", default="(cli test)") |
| 224 | args = p.parse_args() |
| 225 | |
| 226 | job = { |
| 227 | "source_url": args.url, |
| 228 | "scraped_at": int(time.time()), |
| 229 | "text": args.text, |
| 230 | "attachments": [], |
| 231 | } |
| 232 | print(json.dumps(handle_scraped(job, args.registry), indent=2)) |