Zion Boggan
repos/Oversight/integrations/perseus_canarykeeper.py
zionboggan.com ↗
283 lines · python
History for this file →
1
"""
2
CanaryKeeper - OVERSIGHT-attribution → Discord-alert agent for Perseus.
3
 
4
Role: sole owner of the "trap recipient" identities (decoy file recipient
5
keys), and sole escalation path for OVERSIGHT attribution hits. Runs as a
6
Perseus agent alongside Grok / DMCA Shield / etc.
7
 
8
Responsibilities:
9
    1. Poll the registry's tlog for new beacon events (any kind: http_img, dns,
10
       ocsp, license). A beacon fire = a sealed file was opened somewhere.
11
    2. For each event, pull the signed evidence bundle for the file_id.
12
    3. Verify the bundle's registry Ed25519 signature against the pinned
13
       well-known pubkey (no blind trust).
14
    4. Classify: is this a decoy file (trap), a real-recipient file, or unknown?
15
    5. For trap hits → DM Zion on Discord immediately (P1).
16
    6. For real-recipient hits from unexpected geography/time → P2 alert.
17
    7. For Flywheel-discovered leaks → P1.
18
 
19
Trap recipient storage:
20
    Keys stay encrypted at rest under a Perseus Vault master key.
21
    Only CanaryKeeper has the decrypt role - not the main brain, not DMCA Shield.
22
 
23
Usage:
24
    python -m integrations.perseus_canarykeeper \\
25
        --registry https://beacon.example.com \\
26
        --pinned-key <hex> \\
27
        --discord-webhook https://discord.com/api/webhooks/... \\
28
        --owner-id 682818191990587393 \\
29
        --poll-interval 60
30
 
31
Config can also come from env vars:
32
    OVERSIGHT_REGISTRY_URL, OVERSIGHT_PINNED_KEY, DISCORD_WEBHOOK, OWNER_DISCORD_ID
33
"""
34
 
35
from __future__ import annotations
36
 
37
import argparse
38
import json
39
import logging
40
import os
41
import sys
42
import time
43
from pathlib import Path
44
from typing import Optional
45
 
46
import httpx
47
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey
48
from cryptography.exceptions import InvalidSignature
49
 
50
 
51
log = logging.getLogger("canarykeeper")
52
 
53
STATE_PATH = Path(
54
    os.environ.get("CANARYKEEPER_STATE", "/var/lib/canarykeeper/state.json")
55
)
56
 
57
 
58
 
59
def load_state() -> dict:
60
    if not STATE_PATH.exists():
61
        return {
62
            "last_tlog_seen": 0,
63
            "known_file_ids": [],
64
            "trap_file_ids": [],
65
        }
66
    try:
67
        return json.loads(STATE_PATH.read_text())
68
    except (ValueError, OSError):
69
        return {"last_tlog_seen": 0, "known_file_ids": [], "trap_file_ids": []}
70
 
71
 
72
def save_state(state: dict):
73
    STATE_PATH.parent.mkdir(parents=True, exist_ok=True)
74
    tmp = STATE_PATH.with_suffix(".tmp")
75
    tmp.write_text(json.dumps(state, indent=2))
76
    tmp.replace(STATE_PATH)
77
 
78
 
79
 
80
class RegistryMonitor:
81
    def __init__(self, url: str, pinned_pubkey_hex: str):
82
        self.url = url.rstrip("/")
83
        self.pinned_pub = Ed25519PublicKey.from_public_bytes(
84
            bytes.fromhex(pinned_pubkey_hex)
85
        )
86
        self.client = httpx.Client(timeout=15.0)
87
 
88
    def close(self):
89
        self.client.close()
90
 
91
    def tlog_head(self) -> dict:
92
        r = self.client.get(f"{self.url}/tlog/head")
93
        r.raise_for_status()
94
        head = r.json()
95
        sig = bytes.fromhex(head["signature"])
96
        msg = head["signed_message"].encode("utf-8")
97
        try:
98
            self.pinned_pub.verify(sig, msg)
99
        except InvalidSignature:
100
            raise RuntimeError(
101
                "registry /tlog/head signature does not verify under pinned key! "
102
                "possible tampering or key rotation - refusing to proceed"
103
            )
104
        return head
105
 
106
    def evidence_bundle(self, file_id: str) -> Optional[dict]:
107
        try:
108
            r = self.client.get(f"{self.url}/evidence/{file_id}")
109
            if r.status_code == 404:
110
                return None
111
            r.raise_for_status()
112
            bundle = r.json()
113
        except httpx.HTTPError as e:
114
            log.warning(f"evidence fetch failed for {file_id}: {e}")
115
            return None
116
        sig_hex = bundle.pop("bundle_signature_ed25519", None)
117
        if not sig_hex:
118
            log.warning(f"bundle for {file_id} has no signature")
119
            return None
120
        msg = json.dumps(bundle, sort_keys=True, separators=(",", ":"), ensure_ascii=False).encode("utf-8")
121
        try:
122
            self.pinned_pub.verify(bytes.fromhex(sig_hex), msg)
123
        except InvalidSignature:
124
            log.error(f"bundle signature invalid for {file_id} - IGNORING")
125
            return None
126
        bundle["bundle_signature_ed25519"] = sig_hex
127
        return bundle
128
 
129
    def raw_tlog_entries(self, start_index: int) -> list[dict]:
130
        """Fetch raw tlog leaves from start_index to current. Uses the /tlog/range endpoint
131
        if available, else falls back to re-reading the whole log."""
132
        try:
133
            r = self.client.get(
134
                f"{self.url}/tlog/range",
135
                params={"start": start_index, "limit": 500},
136
            )
137
            r.raise_for_status()
138
            return r.json().get("entries", [])
139
        except httpx.HTTPError:
140
            return []
141
 
142
 
143
 
144
class DiscordNotifier:
145
    def __init__(self, webhook_url: str, owner_id: str):
146
        self.webhook = webhook_url
147
        self.owner_id = owner_id
148
        self.client = httpx.Client(timeout=10.0)
149
 
150
    def close(self):
151
        self.client.close()
152
 
153
    def alert(self, priority: str, title: str, body: str):
154
        """Post an alert to Discord. Priority = P1/P2/P3."""
155
        colors = {"P1": 0xFF0000, "P2": 0xFF9900, "P3": 0xFFFF00}
156
        mention = f"<@{self.owner_id}>" if priority == "P1" else ""
157
        payload = {
158
            "content": mention,
159
            "embeds": [{
160
                "title": f"[{priority}] {title}",
161
                "description": body[:4000],
162
                "color": colors.get(priority, 0x0099FF),
163
                "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
164
                "footer": {"text": "OVERSIGHT CanaryKeeper"},
165
            }],
166
        }
167
        try:
168
            r = self.client.post(self.webhook, json=payload)
169
            r.raise_for_status()
170
        except httpx.HTTPError as e:
171
            log.error(f"Discord alert failed: {e}")
172
 
173
 
174
 
175
def process_event(event: dict, state: dict, registry: RegistryMonitor,
176
                  notifier: DiscordNotifier):
177
    """Classify a single tlog event and escalate if it's interesting."""
178
    kind = event.get("event")
179
    if kind != "beacon":
180
        return
181
 
182
    file_id = event.get("file_id")
183
    if not file_id:
184
        return
185
 
186
    is_trap = file_id in state.get("trap_file_ids", [])
187
    beacon_kind = event.get("kind", "unknown")
188
    source_ip = event.get("source_ip") or "unknown"
189
 
190
    if is_trap:
191
        title = f"TRAP FILE OPENED: {file_id[:8]}..."
192
        body = (
193
            f"A decoy file's beacon fired. This is a high-confidence intrusion signal.\n"
194
            f"• beacon kind: `{beacon_kind}`\n"
195
            f"• source IP:   `{source_ip}`\n"
196
            f"• file_id:     `{file_id}`\n"
197
            f"• timestamp:   `{event.get('timestamp', '?')}`\n\n"
198
            f"Action: investigate source IP, pull evidence bundle, consider containment."
199
        )
200
        notifier.alert("P1", title, body)
201
    else:
202
        title = f"Real file beacon: {file_id[:8]}..."
203
        body = (
204
            f"A legitimate sealed file's beacon fired (expected behavior on open).\n"
205
            f"• kind: `{beacon_kind}`, source: `{source_ip}`, "
206
            f"recipient: `{event.get('recipient_id', '?')}`"
207
        )
208
        notifier.alert("P3", title, body)
209
 
210
 
211
def run_once(state: dict, registry: RegistryMonitor, notifier: DiscordNotifier):
212
    """One polling cycle. Fetches new tlog entries and processes each."""
213
    try:
214
        head = registry.tlog_head()
215
    except RuntimeError as e:
216
        notifier.alert("P1", "Registry signature check FAILED", str(e))
217
        raise
218
    except httpx.HTTPError as e:
219
        log.warning(f"registry unreachable: {e}")
220
        return state
221
 
222
    new_size = head["size"]
223
    old_seen = state.get("last_tlog_seen", 0)
224
    if new_size <= old_seen:
225
        return state
226
 
227
    new_entries = registry.raw_tlog_entries(old_seen)
228
    for entry in new_entries:
229
        try:
230
            event = json.loads(entry.get("leaf_data", "{}"))
231
            process_event(event, state, registry, notifier)
232
        except Exception as e:
233
            log.error(f"event processing failed: {e}")
234
 
235
    state["last_tlog_seen"] = new_size
236
    save_state(state)
237
    return state
238
 
239
 
240
def main():
241
    p = argparse.ArgumentParser()
242
    p.add_argument("--registry", default=os.environ.get("OVERSIGHT_REGISTRY_URL"))
243
    p.add_argument("--pinned-key", default=os.environ.get("OVERSIGHT_PINNED_KEY"))
244
    p.add_argument("--discord-webhook", default=os.environ.get("DISCORD_WEBHOOK"))
245
    p.add_argument("--owner-id", default=os.environ.get("OWNER_DISCORD_ID", "682818191990587393"))
246
    p.add_argument("--poll-interval", type=int, default=60)
247
    p.add_argument("--log-level", default="INFO")
248
    args = p.parse_args()
249
 
250
    if not args.registry or not args.pinned_key or not args.discord_webhook:
251
        print("Missing required config: --registry, --pinned-key, --discord-webhook")
252
        sys.exit(2)
253
 
254
    logging.basicConfig(level=args.log_level,
255
                        format="%(asctime)s %(levelname)s %(name)s %(message)s")
256
 
257
    registry = RegistryMonitor(args.registry, args.pinned_key)
258
    notifier = DiscordNotifier(args.discord_webhook, args.owner_id)
259
    state = load_state()
260
 
261
    log.info(f"CanaryKeeper starting (registry={args.registry}, poll={args.poll_interval}s)")
262
    log.info(f"  tracking {len(state.get('trap_file_ids', []))} trap files")
263
    log.info(f"  last tlog seen: {state.get('last_tlog_seen', 0)}")
264
 
265
    notifier.alert("P3", "CanaryKeeper online",
266
                   f"Monitoring {args.registry}, polling every {args.poll_interval}s.")
267
 
268
    try:
269
        while True:
270
            try:
271
                state = run_once(state, registry, notifier)
272
            except Exception as e:
273
                log.exception(f"poll cycle error: {e}")
274
            time.sleep(args.poll_interval)
275
    except KeyboardInterrupt:
276
        log.info("shutting down")
277
    finally:
278
        registry.close()
279
        notifier.close()
280
 
281
 
282
if __name__ == "__main__":
283
    main()