Zion Boggan
repos/Claude Dispatch/bin/dispatch-ack
zionboggan.com ↗
49 lines · text
History for this file →
1
#!/usr/bin/env python3
2
"""Approve a pending-ack task so the watcher promotes it to processing/.
3
 
4
Usage:
5
  dispatch-ack <task-id-or-prefix> [--side <node-id>]
6
"""
7
from __future__ import annotations
8
import argparse
9
import os
10
import pathlib
11
import sys
12
 
13
sys.path.insert(0, str(pathlib.Path(__file__).resolve().parent))
14
import dispatch_lib as d
15
 
16
 
17
def main() -> int:
18
    ap = argparse.ArgumentParser()
19
    ap.add_argument("task_id", help="task id or unique prefix")
20
    ap.add_argument("--side", default=os.environ.get("DISPATCH_SIDE"),
21
                    help="this node's id (or set DISPATCH_SIDE)")
22
    args = ap.parse_args()
23
 
24
    if not args.side:
25
        print("ERR: --side required (or set DISPATCH_SIDE)", file=sys.stderr)
26
        return 2
27
 
28
    other = d.NODE_B if args.side == d.NODE_A else d.NODE_A
29
    pending = d.ROOT / f"{other}-to-{args.side}" / "pending-ack"
30
    pending.mkdir(parents=True, exist_ok=True)
31
 
32
    matches = list(pending.glob(f"{args.task_id}*.json"))
33
    if not matches:
34
        print(f"ERR: no pending task matching '{args.task_id}'", file=sys.stderr)
35
        return 1
36
    if len(matches) > 1:
37
        print(f"ERR: prefix '{args.task_id}' matches {len(matches)} tasks; be more specific",
38
              file=sys.stderr)
39
        return 1
40
 
41
    task_path = matches[0]
42
    ack_path = task_path.with_suffix(".ack")
43
    ack_path.write_text("ack\n")
44
    print(f"acked: {task_path.name}")
45
    return 0
46
 
47
 
48
if __name__ == "__main__":
49
    raise SystemExit(main())