| 1 | #!/usr/bin/env python3 |
| 2 | """Headless executor for dispatch tasks. |
| 3 | |
| 4 | Reads a task JSON file, spawns the configured agent binary in headless mode, |
| 5 | captures output within caps, and writes: |
| 6 | done/<id>.result.json - structured result (summary, exit code, timing) |
| 7 | done/<id>.log - full stdout/stderr for audit |
| 8 | |
| 9 | Honors the killswitch - refuses to start if KILLSWITCH file exists. |
| 10 | |
| 11 | Usage: |
| 12 | dispatch-exec <path-to-task-json> |
| 13 | """ |
| 14 | from __future__ import annotations |
| 15 | import json |
| 16 | import os |
| 17 | import pathlib |
| 18 | import shutil |
| 19 | import signal |
| 20 | import subprocess |
| 21 | import sys |
| 22 | import time |
| 23 | from datetime import datetime, timezone |
| 24 | |
| 25 | sys.path.insert(0, str(pathlib.Path(__file__).resolve().parent)) |
| 26 | import dispatch_lib as d |
| 27 | |
| 28 | |
| 29 | def _locate_agent_bin() -> str: |
| 30 | env = os.environ.get("DISPATCH_AGENT_BIN") |
| 31 | if env and os.access(env, os.X_OK): |
| 32 | return env |
| 33 | found = shutil.which("claude") |
| 34 | if found: |
| 35 | return found |
| 36 | return "claude" |
| 37 | |
| 38 | |
| 39 | AGENT_BIN = _locate_agent_bin() |
| 40 | SIDE = os.environ.get("DISPATCH_SIDE", "exec") |
| 41 | SYSTEM_PROMPT = ( |
| 42 | "You are running as a headless dispatch worker. A peer agent session " |
| 43 | "sent you this task via signed dispatch. Execute it end-to-end and " |
| 44 | "report results. Be terse. The task request follows." |
| 45 | ) |
| 46 | |
| 47 | |
| 48 | def _other(node: str) -> str: |
| 49 | return d.NODE_B if node == d.NODE_A else d.NODE_A |
| 50 | |
| 51 | |
| 52 | def killswitch_check(task_id: str) -> bool: |
| 53 | tripped, reason = d.killswitch_tripped() |
| 54 | if tripped: |
| 55 | d.log_event(SIDE, "exec_killswitch", f"refused task: {reason[:80]}", task_id) |
| 56 | return True |
| 57 | return False |
| 58 | |
| 59 | |
| 60 | def processing_path(task: dict) -> pathlib.Path: |
| 61 | receiver = task["to"] |
| 62 | return d.ROOT / f"{_other(receiver)}-to-{receiver}" / "processing" / f"{task['id']}.json" |
| 63 | |
| 64 | |
| 65 | def run_task(task: dict) -> int: |
| 66 | task_id = task["id"] |
| 67 | d.fill_defaults(task) |
| 68 | if killswitch_check(task_id): |
| 69 | return 99 |
| 70 | |
| 71 | timeout = int(task.get("timeout_s", 600)) |
| 72 | max_bytes = int(task.get("max_output_bytes", 2_000_000)) |
| 73 | dangerous = bool(task.get("require_dangerous", False)) |
| 74 | perm_mode = "bypassPermissions" if dangerous else "acceptEdits" |
| 75 | |
| 76 | sender = task["from"] |
| 77 | done = d.ROOT / f"{sender}-to-{_other(sender)}" / "done" |
| 78 | done.mkdir(parents=True, exist_ok=True) |
| 79 | log_path = done / f"{task_id}.log" |
| 80 | result_path = done / f"{task_id}.result.json" |
| 81 | |
| 82 | cmd = [ |
| 83 | AGENT_BIN, "-p", task["request"], |
| 84 | "--permission-mode", perm_mode, |
| 85 | "--output-format", "stream-json", |
| 86 | "--include-partial-messages", |
| 87 | "--verbose", |
| 88 | "--append-system-prompt", SYSTEM_PROMPT, |
| 89 | ] |
| 90 | if dangerous: |
| 91 | cmd.append("--dangerously-skip-permissions") |
| 92 | |
| 93 | d.log_event(SIDE, "task_exec_start", task["request"][:80], task_id, |
| 94 | perm_mode=perm_mode, timeout_s=timeout) |
| 95 | |
| 96 | started = time.time() |
| 97 | out_bytes = bytearray() |
| 98 | exit_code = 0 |
| 99 | try: |
| 100 | with log_path.open("wb") as logf: |
| 101 | logf.write(f"# task {task_id}\n# started {datetime.now(timezone.utc).isoformat()}\n".encode()) |
| 102 | logf.write(f"# perm_mode={perm_mode} timeout={timeout}s max_bytes={max_bytes}\n".encode()) |
| 103 | logf.write(b"# ---\n") |
| 104 | logf.flush() |
| 105 | proc = subprocess.Popen( |
| 106 | cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, |
| 107 | ) |
| 108 | try: |
| 109 | while True: |
| 110 | rc = proc.poll() |
| 111 | chunk = proc.stdout.read(8192) if proc.stdout else b"" |
| 112 | if chunk: |
| 113 | if len(out_bytes) + len(chunk) <= max_bytes: |
| 114 | out_bytes.extend(chunk) |
| 115 | logf.write(chunk) |
| 116 | logf.flush() |
| 117 | if rc is not None and not chunk: |
| 118 | break |
| 119 | if time.time() - started > timeout: |
| 120 | proc.send_signal(signal.SIGTERM) |
| 121 | try: |
| 122 | proc.wait(timeout=10) |
| 123 | except subprocess.TimeoutExpired: |
| 124 | proc.kill() |
| 125 | raise TimeoutError(f"exceeded {timeout}s") |
| 126 | exit_code = proc.returncode |
| 127 | except TimeoutError as e: |
| 128 | logf.write(f"\n# TIMEOUT: {e}\n".encode()) |
| 129 | exit_code = 124 |
| 130 | except Exception as e: |
| 131 | exit_code = 127 |
| 132 | err_msg = f"{type(e).__name__}: {e}" |
| 133 | d.log_event(SIDE, "task_exec_err", err_msg[:120], task_id) |
| 134 | try: |
| 135 | with log_path.open("ab") as lf: |
| 136 | lf.write(f"\n# EXEC ERROR: {err_msg}\n".encode()) |
| 137 | except Exception: |
| 138 | pass |
| 139 | |
| 140 | elapsed = round(time.time() - started, 2) |
| 141 | status = "ok" if exit_code == 0 else ("timeout" if exit_code == 124 else "error") |
| 142 | |
| 143 | final_text = "" |
| 144 | try: |
| 145 | for line in out_bytes.decode("utf-8", errors="replace").splitlines(): |
| 146 | if not line.strip(): |
| 147 | continue |
| 148 | try: |
| 149 | msg = json.loads(line) |
| 150 | except Exception: |
| 151 | continue |
| 152 | if msg.get("type") == "result" and msg.get("result"): |
| 153 | final_text = msg["result"] |
| 154 | except Exception: |
| 155 | pass |
| 156 | |
| 157 | result = { |
| 158 | "id": task_id, |
| 159 | "answered_by": SIDE, |
| 160 | "answered_at": datetime.now(timezone.utc).isoformat(), |
| 161 | "request": task["request"], |
| 162 | "status": status, |
| 163 | "exit_code": exit_code, |
| 164 | "elapsed_s": elapsed, |
| 165 | "log_path": str(log_path), |
| 166 | "final_text": final_text[:8000] if final_text else "", |
| 167 | "output_tail": out_bytes[-8000:].decode("utf-8", errors="replace") if out_bytes else "", |
| 168 | "output_bytes": len(out_bytes), |
| 169 | "perm_mode": perm_mode, |
| 170 | } |
| 171 | tmp = result_path.with_suffix(".json.tmp") |
| 172 | tmp.write_text(json.dumps(result, indent=2)) |
| 173 | os.replace(tmp, result_path) |
| 174 | |
| 175 | proc_path = processing_path(task) |
| 176 | if proc_path.exists(): |
| 177 | proc_path.unlink() |
| 178 | |
| 179 | d.log_event(SIDE, "task_exec_end", |
| 180 | f"{status} exit={exit_code} elapsed={elapsed}s", task_id, |
| 181 | exit_code=exit_code, status=status) |
| 182 | return exit_code |
| 183 | |
| 184 | |
| 185 | def main() -> int: |
| 186 | if len(sys.argv) != 2: |
| 187 | print("usage: dispatch-exec <task.json>", file=sys.stderr) |
| 188 | return 2 |
| 189 | path = pathlib.Path(sys.argv[1]) |
| 190 | if not path.exists(): |
| 191 | print(f"ERR: {path} not found", file=sys.stderr) |
| 192 | return 2 |
| 193 | task = json.loads(path.read_text()) |
| 194 | return run_task(task) |
| 195 | |
| 196 | |
| 197 | if __name__ == "__main__": |
| 198 | raise SystemExit(main()) |