Zion Boggan
repos/Claude Dispatch/bin/dispatch-exec
zionboggan.com ↗
198 lines · text
History for this file →
1
#!/usr/bin/env python3
2
"""Headless executor for dispatch tasks.
3
 
4
Reads a task JSON file, spawns the configured agent binary in headless mode,
5
captures output within caps, and writes:
6
  done/<id>.result.json  - structured result (summary, exit code, timing)
7
  done/<id>.log          - full stdout/stderr for audit
8
 
9
Honors the killswitch - refuses to start if KILLSWITCH file exists.
10
 
11
Usage:
12
  dispatch-exec <path-to-task-json>
13
"""
14
from __future__ import annotations
15
import json
16
import os
17
import pathlib
18
import shutil
19
import signal
20
import subprocess
21
import sys
22
import time
23
from datetime import datetime, timezone
24
 
25
sys.path.insert(0, str(pathlib.Path(__file__).resolve().parent))
26
import dispatch_lib as d
27
 
28
 
29
def _locate_agent_bin() -> str:
30
    env = os.environ.get("DISPATCH_AGENT_BIN")
31
    if env and os.access(env, os.X_OK):
32
        return env
33
    found = shutil.which("claude")
34
    if found:
35
        return found
36
    return "claude"
37
 
38
 
39
AGENT_BIN = _locate_agent_bin()
40
SIDE = os.environ.get("DISPATCH_SIDE", "exec")
41
SYSTEM_PROMPT = (
42
    "You are running as a headless dispatch worker. A peer agent session "
43
    "sent you this task via signed dispatch. Execute it end-to-end and "
44
    "report results. Be terse. The task request follows."
45
)
46
 
47
 
48
def _other(node: str) -> str:
49
    return d.NODE_B if node == d.NODE_A else d.NODE_A
50
 
51
 
52
def killswitch_check(task_id: str) -> bool:
53
    tripped, reason = d.killswitch_tripped()
54
    if tripped:
55
        d.log_event(SIDE, "exec_killswitch", f"refused task: {reason[:80]}", task_id)
56
        return True
57
    return False
58
 
59
 
60
def processing_path(task: dict) -> pathlib.Path:
61
    receiver = task["to"]
62
    return d.ROOT / f"{_other(receiver)}-to-{receiver}" / "processing" / f"{task['id']}.json"
63
 
64
 
65
def run_task(task: dict) -> int:
66
    task_id = task["id"]
67
    d.fill_defaults(task)
68
    if killswitch_check(task_id):
69
        return 99
70
 
71
    timeout = int(task.get("timeout_s", 600))
72
    max_bytes = int(task.get("max_output_bytes", 2_000_000))
73
    dangerous = bool(task.get("require_dangerous", False))
74
    perm_mode = "bypassPermissions" if dangerous else "acceptEdits"
75
 
76
    sender = task["from"]
77
    done = d.ROOT / f"{sender}-to-{_other(sender)}" / "done"
78
    done.mkdir(parents=True, exist_ok=True)
79
    log_path = done / f"{task_id}.log"
80
    result_path = done / f"{task_id}.result.json"
81
 
82
    cmd = [
83
        AGENT_BIN, "-p", task["request"],
84
        "--permission-mode", perm_mode,
85
        "--output-format", "stream-json",
86
        "--include-partial-messages",
87
        "--verbose",
88
        "--append-system-prompt", SYSTEM_PROMPT,
89
    ]
90
    if dangerous:
91
        cmd.append("--dangerously-skip-permissions")
92
 
93
    d.log_event(SIDE, "task_exec_start", task["request"][:80], task_id,
94
                perm_mode=perm_mode, timeout_s=timeout)
95
 
96
    started = time.time()
97
    out_bytes = bytearray()
98
    exit_code = 0
99
    try:
100
        with log_path.open("wb") as logf:
101
            logf.write(f"# task {task_id}\n# started {datetime.now(timezone.utc).isoformat()}\n".encode())
102
            logf.write(f"# perm_mode={perm_mode} timeout={timeout}s max_bytes={max_bytes}\n".encode())
103
            logf.write(b"# ---\n")
104
            logf.flush()
105
            proc = subprocess.Popen(
106
                cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
107
            )
108
            try:
109
                while True:
110
                    rc = proc.poll()
111
                    chunk = proc.stdout.read(8192) if proc.stdout else b""
112
                    if chunk:
113
                        if len(out_bytes) + len(chunk) <= max_bytes:
114
                            out_bytes.extend(chunk)
115
                        logf.write(chunk)
116
                        logf.flush()
117
                    if rc is not None and not chunk:
118
                        break
119
                    if time.time() - started > timeout:
120
                        proc.send_signal(signal.SIGTERM)
121
                        try:
122
                            proc.wait(timeout=10)
123
                        except subprocess.TimeoutExpired:
124
                            proc.kill()
125
                        raise TimeoutError(f"exceeded {timeout}s")
126
                exit_code = proc.returncode
127
            except TimeoutError as e:
128
                logf.write(f"\n# TIMEOUT: {e}\n".encode())
129
                exit_code = 124
130
    except Exception as e:
131
        exit_code = 127
132
        err_msg = f"{type(e).__name__}: {e}"
133
        d.log_event(SIDE, "task_exec_err", err_msg[:120], task_id)
134
        try:
135
            with log_path.open("ab") as lf:
136
                lf.write(f"\n# EXEC ERROR: {err_msg}\n".encode())
137
        except Exception:
138
            pass
139
 
140
    elapsed = round(time.time() - started, 2)
141
    status = "ok" if exit_code == 0 else ("timeout" if exit_code == 124 else "error")
142
 
143
    final_text = ""
144
    try:
145
        for line in out_bytes.decode("utf-8", errors="replace").splitlines():
146
            if not line.strip():
147
                continue
148
            try:
149
                msg = json.loads(line)
150
            except Exception:
151
                continue
152
            if msg.get("type") == "result" and msg.get("result"):
153
                final_text = msg["result"]
154
    except Exception:
155
        pass
156
 
157
    result = {
158
        "id": task_id,
159
        "answered_by": SIDE,
160
        "answered_at": datetime.now(timezone.utc).isoformat(),
161
        "request": task["request"],
162
        "status": status,
163
        "exit_code": exit_code,
164
        "elapsed_s": elapsed,
165
        "log_path": str(log_path),
166
        "final_text": final_text[:8000] if final_text else "",
167
        "output_tail": out_bytes[-8000:].decode("utf-8", errors="replace") if out_bytes else "",
168
        "output_bytes": len(out_bytes),
169
        "perm_mode": perm_mode,
170
    }
171
    tmp = result_path.with_suffix(".json.tmp")
172
    tmp.write_text(json.dumps(result, indent=2))
173
    os.replace(tmp, result_path)
174
 
175
    proc_path = processing_path(task)
176
    if proc_path.exists():
177
        proc_path.unlink()
178
 
179
    d.log_event(SIDE, "task_exec_end",
180
                f"{status} exit={exit_code} elapsed={elapsed}s", task_id,
181
                exit_code=exit_code, status=status)
182
    return exit_code
183
 
184
 
185
def main() -> int:
186
    if len(sys.argv) != 2:
187
        print("usage: dispatch-exec <task.json>", file=sys.stderr)
188
        return 2
189
    path = pathlib.Path(sys.argv[1])
190
    if not path.exists():
191
        print(f"ERR: {path} not found", file=sys.stderr)
192
        return 2
193
    task = json.loads(path.read_text())
194
    return run_task(task)
195
 
196
 
197
if __name__ == "__main__":
198
    raise SystemExit(main())