Zion Boggan
repos/Oversight/integrations/flywheel_oversight_match.py
zionboggan.com ↗
232 lines · python
History for this file →
1
"""
2
oversight_match - Flywheel job module.
3
 
4
Registers a new Flywheel job kind `oversight_match` that takes scraped content
5
(text, attached images, attached PDFs/DOCX) and checks it against the
6
OVERSIGHT registry for leaked-file attribution.
7
 
8
How to register this with Flywheel:
9
    from oversight_integrations.flywheel_oversight_match import handle_scraped
10
    flywheel.register_job("oversight_match", handle_scraped)
11
 
12
Job inputs (dict):
13
    {
14
        "source_url": "https://breachforums.example/thread/12345",
15
        "scraped_at": 1715000000,
16
        "text": "<pasted leaked document text>",
17
        "attachments": [
18
            {"kind": "image", "bytes_hex": "...", "filename": "leaked.png"},
19
            {"kind": "pdf",   "bytes_hex": "...", "filename": "leaked.pdf"},
20
            {"kind": "docx",  "bytes_hex": "...", "filename": "leaked.docx"},
21
        ],
22
    }
23
 
24
Job output (dict):
25
    {
26
        "matches": [
27
            {"layer": "L1_zero_width",  "mark_id": "...", "file_id": "...",
28
             "recipient_id": "...", "issuer_id": "...", "score": 1.0},
29
            {"layer": "L3_semantic",    "mark_id": "...", "score": 0.89, ...},
30
            {"layer": "image_DCT",      "mark_id": "...", "score": 0.12, ...},
31
            {"layer": "perceptual_hash","hash": "...", "file_id": "...", ...},
32
        ],
33
        "scraped_at": 1715000000,
34
        "source_url": "...",
35
    }
36
 
37
On match: raise a priority-1 alert through the Flywheel event bus so the
38
`CanaryKeeper` Perseus agent can notify Zion via Discord.
39
"""
40
 
41
from __future__ import annotations
42
 
43
import time
44
from pathlib import Path
45
from typing import Any, Optional
46
 
47
import httpx
48
 
49
import sys
50
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
51
 
52
from oversight_core import watermark, semantic
53
from oversight_core.formats import image as img_fmt
54
from oversight_core.formats import pdf as pdf_fmt
55
from oversight_core.formats import docx as docx_fmt
56
 
57
 
58
 
59
class RegistryClient:
60
    def __init__(self, url: str, timeout: float = 10.0):
61
        self.url = url.rstrip("/")
62
        self.client = httpx.Client(timeout=timeout)
63
        self._cached_candidates: list[dict] = []
64
        self._candidates_fetched_at: int = 0
65
 
66
    def close(self):
67
        self.client.close()
68
 
69
    def attribute(self, **kwargs) -> dict:
70
        """POST /attribute with any of token_id, mark_id, layer, perceptual_hash."""
71
        r = self.client.post(f"{self.url}/attribute", json=kwargs)
72
        r.raise_for_status()
73
        return r.json()
74
 
75
    def fetch_semantic_candidates(self, cache_ttl: int = 3600) -> list[dict]:
76
        """Fetch L3 semantic candidate mark_ids (cached for cache_ttl seconds)."""
77
        now = int(time.time())
78
        if self._cached_candidates and now - self._candidates_fetched_at < cache_ttl:
79
            return self._cached_candidates
80
        r = self.client.get(f"{self.url}/candidates/semantic", params={"limit": 5000})
81
        r.raise_for_status()
82
        data = r.json()
83
        self._cached_candidates = data["candidates"]
84
        self._candidates_fetched_at = now
85
        return self._cached_candidates
86
 
87
 
88
 
89
def _check_text(text: str, registry: RegistryClient) -> list[dict]:
90
    """
91
    Run L1 / L2 / L3 extractors against leaked text.
92
    L1 and L2 give direct mark_ids (look them up).
93
    L3 requires iterating candidate mark_ids and verifying.
94
    """
95
    matches: list[dict] = []
96
 
97
    for m in watermark.extract_zw(text):
98
        r = registry.attribute(mark_id=m.hex(), layer="L1_zero_width")
99
        if r.get("found"):
100
            matches.append({"layer": "L1_zero_width", "score": 1.0, **r})
101
 
102
    l2 = watermark.extract_ws(text)
103
    if l2:
104
        r = registry.attribute(mark_id=l2.hex(), layer="L2_whitespace")
105
        if r.get("found"):
106
            matches.append({"layer": "L2_whitespace", "score": 1.0, **r})
107
 
108
    candidates = registry.fetch_semantic_candidates()
109
    for cand in candidates:
110
        mark_bytes = bytes.fromhex(cand["mark_id"])
111
        result = semantic.verify_semantic(text, mark_bytes)
112
        if result["overall_match"]:
113
            r = registry.attribute(mark_id=cand["mark_id"], layer="L3_semantic")
114
            if r.get("found"):
115
                matches.append({
116
                    "layer": "L3_semantic",
117
                    "score": result["synonyms_score"],
118
                    "punct_score": result["punctuation_score"],
119
                    **r,
120
                })
121
    return matches
122
 
123
 
124
 
125
def _check_image(image_bytes: bytes, registry: RegistryClient) -> list[dict]:
126
    """DCT watermark verification + perceptual-hash fuzzy lookup."""
127
    matches: list[dict] = []
128
 
129
    try:
130
        phash = img_fmt.perceptual_hash(image_bytes)
131
        r = registry.attribute(perceptual_hash=phash)
132
        if r.get("found"):
133
            matches.append({"layer": "perceptual_hash", "hash": phash, "score": 1.0, **r})
134
    except Exception:
135
        pass
136
 
137
    return matches
138
 
139
 
140
 
141
def _check_pdf(pdf_bytes: bytes, registry: RegistryClient) -> list[dict]:
142
    matches: list[dict] = []
143
    ext = pdf_fmt.extract(pdf_bytes)
144
    if ext.get("mark_id"):
145
        r = registry.attribute(mark_id=ext["mark_id"])
146
        if r.get("found"):
147
            matches.append({"layer": "pdf_metadata", "score": 1.0, **r})
148
    try:
149
        body_text = pdf_fmt.extract_text_for_watermark_recovery(pdf_bytes)
150
        matches.extend(_check_text(body_text, registry))
151
    except Exception:
152
        pass
153
    return matches
154
 
155
 
156
def _check_docx(docx_bytes: bytes, registry: RegistryClient) -> list[dict]:
157
    matches: list[dict] = []
158
    ext = docx_fmt.extract(docx_bytes)
159
    if ext.get("mark_id"):
160
        r = registry.attribute(mark_id=ext["mark_id"])
161
        if r.get("found"):
162
            matches.append({"layer": "docx_metadata", "score": 1.0, **r})
163
    try:
164
        body_text = docx_fmt.extract_text_for_watermark_recovery(docx_bytes)
165
        matches.extend(_check_text(body_text, registry))
166
    except Exception:
167
        pass
168
    return matches
169
 
170
 
171
 
172
def handle_scraped(job_input: dict, registry_url: str) -> dict:
173
    """
174
    Flywheel job entrypoint. Processes one scraped blob and returns
175
    a list of OVERSIGHT attribution matches (empty if nothing matches).
176
    """
177
    registry = RegistryClient(registry_url)
178
    try:
179
        all_matches: list[dict] = []
180
 
181
        text = job_input.get("text", "") or ""
182
        if text:
183
            all_matches.extend(_check_text(text, registry))
184
 
185
        for att in job_input.get("attachments", []):
186
            kind = att.get("kind")
187
            raw = att.get("bytes_hex")
188
            if not raw:
189
                continue
190
            blob = bytes.fromhex(raw)
191
            if kind == "image":
192
                all_matches.extend(_check_image(blob, registry))
193
            elif kind == "pdf":
194
                all_matches.extend(_check_pdf(blob, registry))
195
            elif kind == "docx":
196
                all_matches.extend(_check_docx(blob, registry))
197
 
198
        seen = set()
199
        unique: list[dict] = []
200
        for m in all_matches:
201
            key = (m.get("layer"), m.get("file_id"))
202
            if key not in seen:
203
                seen.add(key)
204
                unique.append(m)
205
 
206
        return {
207
            "matches": unique,
208
            "scraped_at": job_input.get("scraped_at"),
209
            "source_url": job_input.get("source_url"),
210
        }
211
    finally:
212
        registry.close()
213
 
214
 
215
 
216
if __name__ == "__main__":
217
    import argparse
218
    import json
219
 
220
    p = argparse.ArgumentParser()
221
    p.add_argument("--registry", required=True)
222
    p.add_argument("--text", default="")
223
    p.add_argument("--url", default="(cli test)")
224
    args = p.parse_args()
225
 
226
    job = {
227
        "source_url": args.url,
228
        "scraped_at": int(time.time()),
229
        "text": args.text,
230
        "attachments": [],
231
    }
232
    print(json.dumps(handle_scraped(job, args.registry), indent=2))