Zion Boggan zionboggan.com ↗

initial commit

fc45fff   Zion Boggan committed on Apr 16, 2026 (2 months ago)
LICENSE +21 -0
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) 2026
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
README.md +201 -0
@@ -0,0 +1,201 @@
+# claude-dispatch
+
+HMAC-signed, file-system-mediated job dispatch between two agent sessions
+on different hosts. Built around the use case of two Claude Code sessions
+running on separate machines and needing to hand work to each other without
+either one having to drive the other interactively.
+
+The transport is a shared filesystem (NFS, SMB, anything mounted on both
+sides). The integrity story is HMAC-SHA256 signing of the task envelope.
+The execution story is a per-side watcher that polls the inbox, verifies
+the signature, optionally waits for a human ack, then spawns a headless
+agent worker. Results land back as JSON files on the originating side.
+
+There are no public-facing ports, no broker process, and no network code
+beyond an optional outbound Discord webhook for human notifications.
+
+## Why this exists
+
+If you run agent sessions on more than one machine, you eventually want
+them to be able to delegate work to each other. A Claude Code session on
+your dev box can hand a "go pull this repo and run the test suite on the
+GPU box" job to a peer session running on the GPU box, get back the result
+as a structured file, and continue. The two sessions never need each
+other's terminals open or attached.
+
+The pieces this needs:
+
+- A shared place to drop signed task envelopes
+- A way to verify the sender is who they say
+- A bounded executor on the receiving side that can't run away
+- A way to halt the whole thing instantly when something is wrong
+- An audit log
+
+That's the whole project.
+
+## Architecture
+
+```
+ node A node B
++---------+ +---------+
+| agent | | agent |
+| session | | session |
++----+----+ +----+----+
+ | |
+ | dispatch-send --to b --request "..." |
+ v v
++---------+ shared filesystem (NFS, SMB, etc.) +---------+
+| inbox |<------------------------------------- >| inbox |
++---------+ a-to-b/ , b-to-a/ +---------+
+ ^ ^
+ | |
++---------+ +---------+
+| watcher | verify HMAC, gate on ack, spawn exec | watcher |
++----+----+ +----+----+
+ | |
+ v v
++---------+ +---------+
+| exec | spawn headless agent, capture, log | exec |
++----+----+ +----+----+
+ | |
+ +--> done/<id>.result.json + done/<id>.log <---------+
+```
+
+## Components
+
+```
+bin/
+ dispatch_lib.py shared helpers: HMAC sign/verify, task envelope,
+ lane resolution, killswitch check, append-only log
+ dispatch_watcher.py per-side polling loop: verifies inbox, promotes
+ to pending-ack or processing, spawns exec with a
+ concurrency cap, cleans up markers
+ dispatch-send CLI: enqueue a signed task to the other side's inbox
+ dispatch-exec headless executor: reads a task, spawns the
+ configured agent binary, captures, writes done/
+ dispatch-ack CLI: approve a pending-ack task
+ dispatch-watch-a wrapper that runs the watcher with --side a
+ dispatch-watch-b wrapper that runs the watcher with --side b
+```
+
+## Task envelope (v2)
+
+```json
+{
+ "id": "<uuid>",
+ "from": "<node-id>",
+ "to": "<node-id>",
+ "created": "<ISO-8601 UTC>",
+ "priority": "low | normal | high",
+ "request": "<free text>",
+ "require_ack": false,
+ "require_dangerous": false,
+ "timeout_s": 600,
+ "max_output_bytes": 2000000,
+ "schema": 2,
+ "hmac": "<hex sha256 hmac>"
+}
+```
+
+The HMAC covers `id | from | to | created | priority | request`.
+Tampering with any of those invalidates the signature; the watcher moves
+bad-HMAC tasks to `rejected/` and logs.
+
+`require_ack=true` parks the task in `pending-ack/` until a human or an
+automation drops an `<id>.ack` file next to it. `require_dangerous=true`
+runs the executor in `bypassPermissions` mode; default is `acceptEdits`.
+
+## Directory layout under `$DISPATCH_ROOT`
+
+```
+$DISPATCH_ROOT/
+ keys/hmac.key shared secret (chmod 600)
+ KILLSWITCH if this file exists, no new exec spawns
+ session-log.jsonl append-only event log
+ heartbeats/<node>.json last-write liveness for each side
+ a-to-b/
+ inbox/ pending-ack/ processing/ done/ rejected/
+ b-to-a/
+ inbox/ pending-ack/ processing/ done/ rejected/
+```
+
+## Setup
+
+```bash
+export DISPATCH_ROOT=/mnt/shared/dispatch
+export DISPATCH_NODE_A=devbox
+export DISPATCH_NODE_B=gpubox
+
+mkdir -p "$DISPATCH_ROOT/keys"
+openssl rand -hex 32 > "$DISPATCH_ROOT/keys/hmac.key"
+chmod 600 "$DISPATCH_ROOT/keys/hmac.key"
+
+mkdir -p "$DISPATCH_ROOT/${DISPATCH_NODE_A}-to-${DISPATCH_NODE_B}"/{inbox,pending-ack,processing,done,rejected}
+mkdir -p "$DISPATCH_ROOT/${DISPATCH_NODE_B}-to-${DISPATCH_NODE_A}"/{inbox,pending-ack,processing,done,rejected}
+mkdir -p "$DISPATCH_ROOT/heartbeats"
+```
+
+On node A:
+
+```bash
+DISPATCH_SIDE=devbox bin/dispatch-watch-a
+```
+
+On node B:
+
+```bash
+DISPATCH_SIDE=gpubox bin/dispatch-watch-b
+```
+
+Both watchers should run under systemd (or your supervisor of choice) for
+liveness.
+
+## Usage
+
+Send a task from A to B:
+
+```bash
+DISPATCH_FROM=devbox bin/dispatch-send \
+ --to gpubox \
+ --request "Run the smoke test suite on this branch and report numbers."
+```
+
+`dispatch-send` prints the task id and the inbox path. The watcher on B
+verifies the HMAC, decides whether to require an ack, and (if auto-exec)
+spawns the executor.
+
+The result lands at:
+
+```
+$DISPATCH_ROOT/devbox-to-gpubox/done/<id>.result.json
+$DISPATCH_ROOT/devbox-to-gpubox/done/<id>.log
+```
+
+## Killswitch
+
+```bash
+echo "stopping for reason X" > "$DISPATCH_ROOT/KILLSWITCH"
+```
+
+Both watchers refuse to spawn new exec processes while the file exists.
+Running exec processes are not interrupted; they finish or hit their
+timeout. Remove the file to resume.
+
+## Knobs
+
+Environment variables read by the watcher and exec:
+
+| Var | Default | Meaning |
+| ------------------------- | ------- | -------------------------------------- |
+| `DISPATCH_ROOT` | - | required, shared filesystem path |
+| `DISPATCH_NODE_A` | `a` | node A identifier |
+| `DISPATCH_NODE_B` | `b` | node B identifier |
+| `DISPATCH_POLL_SEC` | `3` | watcher poll interval |
+| `DISPATCH_MAX_PARALLEL` | `2` | concurrent exec processes per side |
+| `DISPATCH_AGENT_BIN` | `claude` | executor command |
+| `DISPATCH_FROM` | - | default sender id for `dispatch-send` |
+| `DISPATCH_SIDE` | - | this node's id (for `dispatch-ack`) |
+
+## License
+
+MIT. See [LICENSE](LICENSE).
bin/dispatch-ack +49 -0
@@ -0,0 +1,49 @@
+#!/usr/bin/env python3
+"""Approve a pending-ack task so the watcher promotes it to processing/.
+
+Usage:
+ dispatch-ack <task-id-or-prefix> [--side <node-id>]
+"""
+from __future__ import annotations
+import argparse
+import os
+import pathlib
+import sys
+
+sys.path.insert(0, str(pathlib.Path(__file__).resolve().parent))
+import dispatch_lib as d
+
+
+def main() -> int:
+ ap = argparse.ArgumentParser()
+ ap.add_argument("task_id", help="task id or unique prefix")
+ ap.add_argument("--side", default=os.environ.get("DISPATCH_SIDE"),
+ help="this node's id (or set DISPATCH_SIDE)")
+ args = ap.parse_args()
+
+ if not args.side:
+ print("ERR: --side required (or set DISPATCH_SIDE)", file=sys.stderr)
+ return 2
+
+ other = d.NODE_B if args.side == d.NODE_A else d.NODE_A
+ pending = d.ROOT / f"{other}-to-{args.side}" / "pending-ack"
+ pending.mkdir(parents=True, exist_ok=True)
+
+ matches = list(pending.glob(f"{args.task_id}*.json"))
+ if not matches:
+ print(f"ERR: no pending task matching '{args.task_id}'", file=sys.stderr)
+ return 1
+ if len(matches) > 1:
+ print(f"ERR: prefix '{args.task_id}' matches {len(matches)} tasks; be more specific",
+ file=sys.stderr)
+ return 1
+
+ task_path = matches[0]
+ ack_path = task_path.with_suffix(".ack")
+ ack_path.write_text("ack\n")
+ print(f"acked: {task_path.name}")
+ return 0
+
+
+if __name__ == "__main__":
+ raise SystemExit(main())
bin/dispatch-exec +198 -0
@@ -0,0 +1,198 @@
+#!/usr/bin/env python3
+"""Headless executor for dispatch tasks.
+
+Reads a task JSON file, spawns the configured agent binary in headless mode,
+captures output within caps, and writes:
+ done/<id>.result.json - structured result (summary, exit code, timing)
+ done/<id>.log - full stdout/stderr for audit
+
+Honors the killswitch - refuses to start if KILLSWITCH file exists.
+
+Usage:
+ dispatch-exec <path-to-task-json>
+"""
+from __future__ import annotations
+import json
+import os
+import pathlib
+import shutil
+import signal
+import subprocess
+import sys
+import time
+from datetime import datetime, timezone
+
+sys.path.insert(0, str(pathlib.Path(__file__).resolve().parent))
+import dispatch_lib as d
+
+
+def _locate_agent_bin() -> str:
+ env = os.environ.get("DISPATCH_AGENT_BIN")
+ if env and os.access(env, os.X_OK):
+ return env
+ found = shutil.which("claude")
+ if found:
+ return found
+ return "claude"
+
+
+AGENT_BIN = _locate_agent_bin()
+SIDE = os.environ.get("DISPATCH_SIDE", "exec")
+SYSTEM_PROMPT = (
+ "You are running as a headless dispatch worker. A peer agent session "
+ "sent you this task via signed dispatch. Execute it end-to-end and "
+ "report results. Be terse. The task request follows."
+)
+
+
+def _other(node: str) -> str:
+ return d.NODE_B if node == d.NODE_A else d.NODE_A
+
+
+def killswitch_check(task_id: str) -> bool:
+ tripped, reason = d.killswitch_tripped()
+ if tripped:
+ d.log_event(SIDE, "exec_killswitch", f"refused task: {reason[:80]}", task_id)
+ return True
+ return False
+
+
+def processing_path(task: dict) -> pathlib.Path:
+ receiver = task["to"]
+ return d.ROOT / f"{_other(receiver)}-to-{receiver}" / "processing" / f"{task['id']}.json"
+
+
+def run_task(task: dict) -> int:
+ task_id = task["id"]
+ d.fill_defaults(task)
+ if killswitch_check(task_id):
+ return 99
+
+ timeout = int(task.get("timeout_s", 600))
+ max_bytes = int(task.get("max_output_bytes", 2_000_000))
+ dangerous = bool(task.get("require_dangerous", False))
+ perm_mode = "bypassPermissions" if dangerous else "acceptEdits"
+
+ sender = task["from"]
+ done = d.ROOT / f"{sender}-to-{_other(sender)}" / "done"
+ done.mkdir(parents=True, exist_ok=True)
+ log_path = done / f"{task_id}.log"
+ result_path = done / f"{task_id}.result.json"
+
+ cmd = [
+ AGENT_BIN, "-p", task["request"],
+ "--permission-mode", perm_mode,
+ "--output-format", "stream-json",
+ "--include-partial-messages",
+ "--verbose",
+ "--append-system-prompt", SYSTEM_PROMPT,
+ ]
+ if dangerous:
+ cmd.append("--dangerously-skip-permissions")
+
+ d.log_event(SIDE, "task_exec_start", task["request"][:80], task_id,
+ perm_mode=perm_mode, timeout_s=timeout)
+
+ started = time.time()
+ out_bytes = bytearray()
+ exit_code = 0
+ try:
+ with log_path.open("wb") as logf:
+ logf.write(f"# task {task_id}\n# started {datetime.now(timezone.utc).isoformat()}\n".encode())
+ logf.write(f"# perm_mode={perm_mode} timeout={timeout}s max_bytes={max_bytes}\n".encode())
+ logf.write(b"# ---\n")
+ logf.flush()
+ proc = subprocess.Popen(
+ cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
+ )
+ try:
+ while True:
+ rc = proc.poll()
+ chunk = proc.stdout.read(8192) if proc.stdout else b""
+ if chunk:
+ if len(out_bytes) + len(chunk) <= max_bytes:
+ out_bytes.extend(chunk)
+ logf.write(chunk)
+ logf.flush()
+ if rc is not None and not chunk:
+ break
+ if time.time() - started > timeout:
+ proc.send_signal(signal.SIGTERM)
+ try:
+ proc.wait(timeout=10)
+ except subprocess.TimeoutExpired:
+ proc.kill()
+ raise TimeoutError(f"exceeded {timeout}s")
+ exit_code = proc.returncode
+ except TimeoutError as e:
+ logf.write(f"\n# TIMEOUT: {e}\n".encode())
+ exit_code = 124
+ except Exception as e:
+ exit_code = 127
+ err_msg = f"{type(e).__name__}: {e}"
+ d.log_event(SIDE, "task_exec_err", err_msg[:120], task_id)
+ try:
+ with log_path.open("ab") as lf:
+ lf.write(f"\n# EXEC ERROR: {err_msg}\n".encode())
+ except Exception:
+ pass
+
+ elapsed = round(time.time() - started, 2)
+ status = "ok" if exit_code == 0 else ("timeout" if exit_code == 124 else "error")
+
+ final_text = ""
+ try:
+ for line in out_bytes.decode("utf-8", errors="replace").splitlines():
+ if not line.strip():
+ continue
+ try:
+ msg = json.loads(line)
+ except Exception:
+ continue
+ if msg.get("type") == "result" and msg.get("result"):
+ final_text = msg["result"]
+ except Exception:
+ pass
+
+ result = {
+ "id": task_id,
+ "answered_by": SIDE,
+ "answered_at": datetime.now(timezone.utc).isoformat(),
+ "request": task["request"],
+ "status": status,
+ "exit_code": exit_code,
+ "elapsed_s": elapsed,
+ "log_path": str(log_path),
+ "final_text": final_text[:8000] if final_text else "",
+ "output_tail": out_bytes[-8000:].decode("utf-8", errors="replace") if out_bytes else "",
+ "output_bytes": len(out_bytes),
+ "perm_mode": perm_mode,
+ }
+ tmp = result_path.with_suffix(".json.tmp")
+ tmp.write_text(json.dumps(result, indent=2))
+ os.replace(tmp, result_path)
+
+ proc_path = processing_path(task)
+ if proc_path.exists():
+ proc_path.unlink()
+
+ d.log_event(SIDE, "task_exec_end",
+ f"{status} exit={exit_code} elapsed={elapsed}s", task_id,
+ exit_code=exit_code, status=status)
+ return exit_code
+
+
+def main() -> int:
+ if len(sys.argv) != 2:
+ print("usage: dispatch-exec <task.json>", file=sys.stderr)
+ return 2
+ path = pathlib.Path(sys.argv[1])
+ if not path.exists():
+ print(f"ERR: {path} not found", file=sys.stderr)
+ return 2
+ task = json.loads(path.read_text())
+ return run_task(task)
+
+
+if __name__ == "__main__":
+ raise SystemExit(main())
bin/dispatch-send +42 -0
@@ -0,0 +1,42 @@
+#!/usr/bin/env python3
+"""Enqueue a signed task to the other node's inbox.
+
+Usage:
+ dispatch-send --from <node-id> --to <node-id> --request "..." [--priority normal]
+"""
+from __future__ import annotations
+import argparse
+import os
+import pathlib
+import sys
+
+sys.path.insert(0, str(pathlib.Path(__file__).resolve().parent))
+import dispatch_lib as d
+
+
+def main() -> int:
+ ap = argparse.ArgumentParser()
+ ap.add_argument("--to", required=True, help="recipient node id")
+ ap.add_argument("--from", dest="sender", default=os.environ.get("DISPATCH_FROM"),
+ help="sender node id (or set DISPATCH_FROM)")
+ ap.add_argument("--request", required=True)
+ ap.add_argument("--priority", default="normal", choices=["low", "normal", "high"])
+ ap.add_argument("--callback", default="file", choices=["file", "discord", "none"])
+ args = ap.parse_args()
+
+ if not args.sender:
+ print("ERR: --from required (or set DISPATCH_FROM)", file=sys.stderr)
+ return 2
+ if args.sender == args.to:
+ print("ERR: --from and --to must differ", file=sys.stderr)
+ return 2
+
+ task = d.new_task(args.sender, args.to, args.request, args.priority, args.callback)
+ path = d.enqueue(task)
+ d.log_event(args.sender, "task_send", f"-> {args.to}: {args.request[:80]}", task["id"])
+ print(f"{task['id']}\t{path}")
+ return 0
+
+
+if __name__ == "__main__":
+ raise SystemExit(main())
bin/dispatch-watch-a +2 -0
@@ -0,0 +1,2 @@
+#!/usr/bin/env bash
+exec python3 "$(dirname "$0")/dispatch_watcher.py" --side "${DISPATCH_NODE_A:-a}" "$@"
bin/dispatch-watch-b +2 -0
@@ -0,0 +1,2 @@
+#!/usr/bin/env bash
+exec python3 "$(dirname "$0")/dispatch_watcher.py" --side "${DISPATCH_NODE_B:-b}" "$@"
bin/dispatch_lib.py +178 -0
@@ -0,0 +1,178 @@
+"""Shared helpers for the two-host dispatch channel.
+
+Path resolution: set DISPATCH_ROOT to the shared filesystem path that both
+nodes can read and write. On Linux this is typically a mounted NFS or SMB
+share; on Windows it is the mapped drive letter for the same share.
+
+Task envelope v2:
+ { "id": "<uuid>", "from": "<node-id>", "to": "<node-id>",
+ "created": "<ISO-8601 UTC>", "priority": "low|normal|high",
+ "request": "<free text>",
+ "require_ack": bool,
+ "require_dangerous": bool,
+ "timeout_s": int,
+ "max_output_bytes": int,
+ "schema": 2,
+ "hmac": "<hex sha256 hmac>" }
+"""
+from __future__ import annotations
+import hashlib
+import hmac
+import json
+import os
+import pathlib
+import sys
+import time
+import uuid
+from datetime import datetime, timezone
+
+
+NODE_A = os.environ.get("DISPATCH_NODE_A", "a")
+NODE_B = os.environ.get("DISPATCH_NODE_B", "b")
+
+
+def _resolve_root() -> pathlib.Path:
+ env = os.environ.get("DISPATCH_ROOT")
+ if env:
+ return pathlib.Path(env)
+ raise RuntimeError(
+ "DISPATCH_ROOT environment variable is required. "
+ "Point it at a filesystem path that both nodes can read and write."
+ )
+
+
+ROOT = _resolve_root()
+KEY_PATH = ROOT / "keys" / "hmac.key"
+LOG_PATH = ROOT / "session-log.jsonl"
+KILLSWITCH = ROOT / "KILLSWITCH"
+
+SCHEMA_VERSION = 2
+DEFAULTS = {
+ "require_ack": False,
+ "require_dangerous": False,
+ "timeout_s": 600,
+ "max_output_bytes": 2_000_000,
+}
+
+
+def _key() -> bytes:
+ return KEY_PATH.read_text().strip().encode()
+
+
+def _digest(task: dict) -> str:
+ payload = "|".join([
+ task["id"], task["from"], task["to"], task["created"],
+ task["priority"], task["request"],
+ ]).encode()
+ return hmac.new(_key(), payload, hashlib.sha256).hexdigest()
+
+
+def sign(task: dict) -> dict:
+ task["hmac"] = _digest(task)
+ return task
+
+
+def verify(task: dict) -> bool:
+ provided = task.get("hmac", "")
+ if not provided:
+ return False
+ try:
+ return hmac.compare_digest(provided, _digest(task))
+ except KeyError:
+ return False
+
+
+def new_task(sender: str, recipient: str, request: str,
+ priority: str = "normal", callback: str = "file",
+ require_ack: bool = False,
+ require_dangerous: bool = False,
+ timeout_s: int = 600,
+ max_output_bytes: int = 2_000_000) -> dict:
+ task = {
+ "id": str(uuid.uuid4()),
+ "from": sender,
+ "to": recipient,
+ "created": datetime.now(timezone.utc).isoformat(),
+ "priority": priority,
+ "request": request,
+ "callback": callback,
+ "require_ack": require_ack,
+ "require_dangerous": require_dangerous,
+ "timeout_s": timeout_s,
+ "max_output_bytes": max_output_bytes,
+ "schema": SCHEMA_VERSION,
+ }
+ return sign(task)
+
+
+def fill_defaults(task: dict) -> dict:
+ for k, v in DEFAULTS.items():
+ task.setdefault(k, v)
+ return task
+
+
+def killswitch_tripped() -> tuple[bool, str]:
+ if KILLSWITCH.exists():
+ try:
+ return True, KILLSWITCH.read_text().strip()
+ except Exception:
+ return True, "unreadable"
+ return False, ""
+
+
+def _lane_dir_name(recipient: str) -> str:
+ if recipient == NODE_A:
+ return f"{NODE_B}-to-{NODE_A}"
+ return f"{NODE_A}-to-{NODE_B}"
+
+
+def inbox_for(recipient: str) -> pathlib.Path:
+ return ROOT / _lane_dir_name(recipient) / "inbox"
+
+
+def lane(recipient: str, stage: str) -> pathlib.Path:
+ lane_dir = ROOT / _lane_dir_name(recipient) / stage
+ lane_dir.mkdir(parents=True, exist_ok=True)
+ return lane_dir
+
+
+def enqueue(task: dict) -> pathlib.Path:
+ lane_dir = inbox_for(task["to"])
+ lane_dir.mkdir(parents=True, exist_ok=True)
+ path = lane_dir / f"{task['id']}.json"
+ tmp = path.with_suffix(".json.tmp")
+ tmp.write_text(json.dumps(task, indent=2))
+ os.replace(tmp, path)
+ return path
+
+
+def log_event(session: str, event: str, summary: str,
+ task_id: str | None = None, **extra) -> None:
+ entry = {
+ "ts": datetime.now(timezone.utc).isoformat(),
+ "session": session,
+ "event": event,
+ "summary": summary[:140],
+ "task_id": task_id,
+ }
+ if extra:
+ entry.update(extra)
+ with LOG_PATH.open("a") as f:
+ f.write(json.dumps(entry) + "\n")
+
+
+def heartbeat(session: str, extra: dict | None = None) -> pathlib.Path:
+ hb = ROOT / "heartbeats" / f"{session}.json"
+ hb.parent.mkdir(parents=True, exist_ok=True)
+ payload = {
+ "host": session,
+ "ts": datetime.now(timezone.utc).isoformat(),
+ "unix": int(time.time()),
+ "pid": os.getpid(),
+ }
+ if extra:
+ payload.update(extra)
+ tmp = hb.with_suffix(".json.tmp")
+ tmp.write_text(json.dumps(payload, indent=2))
+ os.replace(tmp, hb)
+ return hb
bin/dispatch_watcher.py +197 -0
@@ -0,0 +1,197 @@
+"""Shared watcher for both sides.
+
+Each side runs this with --side <node-id>. Watches the inbox addressed to
+that side, verifies HMAC, optionally gates on human ack, then spawns
+dispatch-exec with a per-side concurrency cap.
+
+Honors the killswitch file. Concurrency cap = DISPATCH_MAX_PARALLEL (default 2).
+"""
+from __future__ import annotations
+import argparse
+import json
+import os
+import pathlib
+import subprocess
+import sys
+import time
+
+sys.path.insert(0, str(pathlib.Path(__file__).resolve().parent))
+import dispatch_lib as d
+
+POLL = int(os.environ.get("DISPATCH_POLL_SEC", "3"))
+MAX_PARALLEL = int(os.environ.get("DISPATCH_MAX_PARALLEL", "2"))
+EXEC_BIN = os.environ.get(
+ "DISPATCH_EXEC_BIN",
+ str(pathlib.Path(__file__).resolve().parent / "dispatch-exec"),
+)
+STATE_FILE = pathlib.Path(os.environ.get("DISPATCH_STATE_FILE", "/tmp/dispatch-watcher-state.json"))
+
+
+def lanes(side: str) -> dict[str, pathlib.Path]:
+ other = d.NODE_B if side == d.NODE_A else d.NODE_A
+ tree = d.ROOT / f"{other}-to-{side}"
+ return {
+ "inbox": tree / "inbox",
+ "pending_ack": tree / "pending-ack",
+ "processing": tree / "processing",
+ "done": tree / "done",
+ "rejected": tree / "rejected",
+ "pending_json": tree / "pending.json",
+ }
+
+
+def active_exec_count() -> int:
+ n = 0
+ proc = pathlib.Path("/proc")
+ if not proc.exists():
+ return 0
+ for p in proc.iterdir():
+ if not p.name.isdigit():
+ continue
+ try:
+ cmdline = (p / "cmdline").read_bytes().replace(b"\x00", b" ").decode(errors="replace")
+ except Exception:
+ continue
+ if "dispatch-exec" in cmdline:
+ n += 1
+ return n
+
+
+def process_inbox(side: str, L: dict) -> None:
+ for path in sorted(L["inbox"].glob("*.json")):
+ try:
+ task = json.loads(path.read_text())
+ except Exception as e:
+ target = L["rejected"] / path.name
+ try: path.rename(target)
+ except Exception: pass
+ d.log_event(side, "task_reject", f"parse err: {e}")
+ continue
+
+ if not d.verify(task):
+ path.rename(L["rejected"] / path.name)
+ d.log_event(side, "task_reject", f"bad HMAC id={task.get('id', '?')[:8]}")
+ continue
+
+ d.fill_defaults(task)
+ path.write_text(json.dumps(task, indent=2))
+
+ if task.get("require_ack"):
+ target = L["pending_ack"] / path.name
+ path.rename(target)
+ d.log_event(side, "task_pending_ack", task["request"][:80], task["id"])
+ write_pending_snapshot(L)
+ else:
+ target = L["processing"] / path.name
+ path.rename(target)
+ d.log_event(side, "task_auto", task["request"][:80], task["id"])
+
+
+def process_acked(side: str, L: dict) -> None:
+ for ack in sorted(L["pending_ack"].glob("*.ack")):
+ stem = ack.name[:-len(".ack")]
+ candidates = [L["pending_ack"] / f"{stem}.json", L["pending_ack"] / stem]
+ task_path = next((p for p in candidates if p.exists()), None)
+ if task_path is None:
+ ack.unlink()
+ continue
+ target = L["processing"] / task_path.name
+ task_path.rename(target)
+ ack.unlink()
+ write_pending_snapshot(L)
+
+
+def dispatch_processing(side: str, L: dict) -> None:
+ tripped, _ = d.killswitch_tripped()
+ if tripped:
+ return
+
+ queued = []
+ for p in sorted(L["processing"].glob("*.json")):
+ marker = p.with_suffix(".running")
+ if marker.exists():
+ continue
+ queued.append(p)
+
+ if not queued:
+ return
+
+ slots = MAX_PARALLEL - active_exec_count()
+ if slots <= 0:
+ return
+
+ for p in queued[:slots]:
+ marker = p.with_suffix(".running")
+ marker.write_text(str(int(time.time())))
+ try:
+ subprocess.Popen(
+ [EXEC_BIN, str(p)],
+ stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
+ start_new_session=True,
+ )
+ except Exception as e:
+ marker.unlink(missing_ok=True)
+ d.log_event(side, "exec_spawn_err", str(e)[:120])
+
+
+def cleanup_markers(L: dict) -> None:
+ for m in L["processing"].glob("*.running"):
+ task_json = m.with_suffix(".json")
+ if not task_json.exists():
+ m.unlink(missing_ok=True)
+
+
+def write_pending_snapshot(L: dict) -> None:
+ items = []
+ for p in sorted(L["pending_ack"].glob("*.json")):
+ try:
+ t = json.loads(p.read_text())
+ items.append({
+ "id": t["id"], "from": t.get("from"), "priority": t.get("priority"),
+ "request": t.get("request", "")[:200], "created": t.get("created"),
+ })
+ except Exception:
+ pass
+ L["pending_json"].write_text(json.dumps({"count": len(items), "items": items}, indent=2))
+
+
+def boot_once_log(side: str) -> None:
+ try:
+ prev = json.loads(STATE_FILE.read_text()) if STATE_FILE.exists() else {}
+ except Exception:
+ prev = {}
+ now = int(time.time())
+ last = prev.get("last_start", 0)
+ if now - last > 120:
+ d.log_event(side, "watcher_start", f"poll={POLL}s max_parallel={MAX_PARALLEL}")
+ prev["last_start"] = now
+ STATE_FILE.write_text(json.dumps(prev))
+
+
+def main() -> int:
+ ap = argparse.ArgumentParser()
+ ap.add_argument("--side", required=True, help="this node's identifier")
+ args = ap.parse_args()
+ side = args.side
+
+ L = lanes(side)
+ for k, p in L.items():
+ if k == "pending_json":
+ continue
+ p.mkdir(parents=True, exist_ok=True)
+
+ boot_once_log(side)
+
+ while True:
+ try:
+ process_inbox(side, L)
+ process_acked(side, L)
+ dispatch_processing(side, L)
+ cleanup_markers(L)
+ except Exception as e:
+ d.log_event(side, "watcher_err", str(e)[:120])
+ time.sleep(POLL)
+
+
+if __name__ == "__main__":
+ raise SystemExit(main())