Zion Boggan
repos/Claude Dispatch/bin/dispatch_lib.py
zionboggan.com ↗
178 lines · python
History for this file →
1
"""Shared helpers for the two-host dispatch channel.
2
 
3
Path resolution: set DISPATCH_ROOT to the shared filesystem path that both
4
nodes can read and write. On Linux this is typically a mounted NFS or SMB
5
share; on Windows it is the mapped drive letter for the same share.
6
 
7
Task envelope v2:
8
  { "id": "<uuid>", "from": "<node-id>", "to": "<node-id>",
9
    "created": "<ISO-8601 UTC>", "priority": "low|normal|high",
10
    "request": "<free text>",
11
    "require_ack": bool,
12
    "require_dangerous": bool,
13
    "timeout_s": int,
14
    "max_output_bytes": int,
15
    "schema": 2,
16
    "hmac": "<hex sha256 hmac>" }
17
"""
18
from __future__ import annotations
19
import hashlib
20
import hmac
21
import json
22
import os
23
import pathlib
24
import sys
25
import time
26
import uuid
27
from datetime import datetime, timezone
28
 
29
 
30
NODE_A = os.environ.get("DISPATCH_NODE_A", "a")
31
NODE_B = os.environ.get("DISPATCH_NODE_B", "b")
32
 
33
 
34
def _resolve_root() -> pathlib.Path:
35
    env = os.environ.get("DISPATCH_ROOT")
36
    if env:
37
        return pathlib.Path(env)
38
    raise RuntimeError(
39
        "DISPATCH_ROOT environment variable is required. "
40
        "Point it at a filesystem path that both nodes can read and write."
41
    )
42
 
43
 
44
ROOT = _resolve_root()
45
KEY_PATH = ROOT / "keys" / "hmac.key"
46
LOG_PATH = ROOT / "session-log.jsonl"
47
KILLSWITCH = ROOT / "KILLSWITCH"
48
 
49
SCHEMA_VERSION = 2
50
DEFAULTS = {
51
    "require_ack": False,
52
    "require_dangerous": False,
53
    "timeout_s": 600,
54
    "max_output_bytes": 2_000_000,
55
}
56
 
57
 
58
def _key() -> bytes:
59
    return KEY_PATH.read_text().strip().encode()
60
 
61
 
62
def _digest(task: dict) -> str:
63
    payload = "|".join([
64
        task["id"], task["from"], task["to"], task["created"],
65
        task["priority"], task["request"],
66
    ]).encode()
67
    return hmac.new(_key(), payload, hashlib.sha256).hexdigest()
68
 
69
 
70
def sign(task: dict) -> dict:
71
    task["hmac"] = _digest(task)
72
    return task
73
 
74
 
75
def verify(task: dict) -> bool:
76
    provided = task.get("hmac", "")
77
    if not provided:
78
        return False
79
    try:
80
        return hmac.compare_digest(provided, _digest(task))
81
    except KeyError:
82
        return False
83
 
84
 
85
def new_task(sender: str, recipient: str, request: str,
86
             priority: str = "normal", callback: str = "file",
87
             require_ack: bool = False,
88
             require_dangerous: bool = False,
89
             timeout_s: int = 600,
90
             max_output_bytes: int = 2_000_000) -> dict:
91
    task = {
92
        "id": str(uuid.uuid4()),
93
        "from": sender,
94
        "to": recipient,
95
        "created": datetime.now(timezone.utc).isoformat(),
96
        "priority": priority,
97
        "request": request,
98
        "callback": callback,
99
        "require_ack": require_ack,
100
        "require_dangerous": require_dangerous,
101
        "timeout_s": timeout_s,
102
        "max_output_bytes": max_output_bytes,
103
        "schema": SCHEMA_VERSION,
104
    }
105
    return sign(task)
106
 
107
 
108
def fill_defaults(task: dict) -> dict:
109
    for k, v in DEFAULTS.items():
110
        task.setdefault(k, v)
111
    return task
112
 
113
 
114
def killswitch_tripped() -> tuple[bool, str]:
115
    if KILLSWITCH.exists():
116
        try:
117
            return True, KILLSWITCH.read_text().strip()
118
        except Exception:
119
            return True, "unreadable"
120
    return False, ""
121
 
122
 
123
def _lane_dir_name(recipient: str) -> str:
124
    if recipient == NODE_A:
125
        return f"{NODE_B}-to-{NODE_A}"
126
    return f"{NODE_A}-to-{NODE_B}"
127
 
128
 
129
def inbox_for(recipient: str) -> pathlib.Path:
130
    return ROOT / _lane_dir_name(recipient) / "inbox"
131
 
132
 
133
def lane(recipient: str, stage: str) -> pathlib.Path:
134
    lane_dir = ROOT / _lane_dir_name(recipient) / stage
135
    lane_dir.mkdir(parents=True, exist_ok=True)
136
    return lane_dir
137
 
138
 
139
def enqueue(task: dict) -> pathlib.Path:
140
    lane_dir = inbox_for(task["to"])
141
    lane_dir.mkdir(parents=True, exist_ok=True)
142
    path = lane_dir / f"{task['id']}.json"
143
    tmp = path.with_suffix(".json.tmp")
144
    tmp.write_text(json.dumps(task, indent=2))
145
    os.replace(tmp, path)
146
    return path
147
 
148
 
149
def log_event(session: str, event: str, summary: str,
150
              task_id: str | None = None, **extra) -> None:
151
    entry = {
152
        "ts": datetime.now(timezone.utc).isoformat(),
153
        "session": session,
154
        "event": event,
155
        "summary": summary[:140],
156
        "task_id": task_id,
157
    }
158
    if extra:
159
        entry.update(extra)
160
    with LOG_PATH.open("a") as f:
161
        f.write(json.dumps(entry) + "\n")
162
 
163
 
164
def heartbeat(session: str, extra: dict | None = None) -> pathlib.Path:
165
    hb = ROOT / "heartbeats" / f"{session}.json"
166
    hb.parent.mkdir(parents=True, exist_ok=True)
167
    payload = {
168
        "host": session,
169
        "ts": datetime.now(timezone.utc).isoformat(),
170
        "unix": int(time.time()),
171
        "pid": os.getpid(),
172
    }
173
    if extra:
174
        payload.update(extra)
175
    tmp = hb.with_suffix(".json.tmp")
176
    tmp.write_text(json.dumps(payload, indent=2))
177
    os.replace(tmp, hb)
178
    return hb