Zion Boggan
repos/Pitch Tracker CV/tools/viewer.py
zionboggan.com ↗
175 lines · python
History for this file →
1
"""Visual overlay for live debugging.
2
 
3
Subscribes to:
4
  - capture frames         (capture.publish_endpoint)
5
  - ball/pitch events      (cv.ball_events_endpoint)
6
  - PCI events             (cv.pci_events_endpoint)
7
 
8
Draws the most recent detections on each frame. Press q or ESC to quit.
9
Run capture/ingest.py, cv/ball_tracker.py, cv/pci_tracker.py first.
10
"""
11
from __future__ import annotations
12
 
13
import argparse
14
import json
15
import sys
16
import time
17
from pathlib import Path
18
 
19
import cv2
20
import numpy as np
21
import zmq
22
from rich.console import Console
23
 
24
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
25
from cv._common import (
26
    event_subscriber,
27
    iter_latest_frames,
28
    load_config,
29
    make_frame_subscriber,
30
)
31
 
32
console = Console()
33
 
34
COLOR_BALL = (0, 255, 255)
35
COLOR_BALL_PRED = (0, 128, 255)
36
COLOR_PCI = (0, 255, 0)
37
COLOR_PLATE = (255, 255, 255)
38
COLOR_BG = (0, 0, 0)
39
 
40
def drain_latest_json(sock: zmq.Socket) -> list[dict]:
41
    """Pull every pending JSON event without blocking."""
42
    events: list[dict] = []
43
    while True:
44
        try:
45
            raw = sock.recv(flags=zmq.NOBLOCK)
46
        except zmq.Again:
47
            break
48
        try:
49
            events.append(json.loads(raw.decode("utf-8")))
50
        except Exception:
51
            pass
52
    return events
53
 
54
def draw_hud(frame: np.ndarray, lines: list[str]) -> None:
55
    y = 28
56
    for line in lines:
57
        cv2.putText(frame, line, (12, y), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 0), 3, cv2.LINE_AA)
58
        cv2.putText(frame, line, (12, y), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1, cv2.LINE_AA)
59
        y += 22
60
 
61
def main() -> int:
62
    ap = argparse.ArgumentParser(description="pitch-tracker-cv live viewer.")
63
    ap.add_argument("--scale", type=float, default=0.75, help="Display scale factor (0.5 = half-size).")
64
    ap.add_argument("--no-gui", action="store_true", help="Pull events and print stats without opening a window.")
65
    args = ap.parse_args()
66
 
67
    cfg = load_config()
68
    cap_ep = cfg["capture"]["publish_endpoint"]
69
    ball_ep = cfg["cv"]["ball_events_endpoint"]
70
    pci_ep = cfg["cv"]["pci_events_endpoint"]
71
    plate_y_frac = float(cfg["cv"].get("plate_y_frac", 0.72))
72
 
73
    ctx = zmq.Context.instance()
74
    frame_sub = make_frame_subscriber(cap_ep, ctx=ctx)
75
    ball_sub = event_subscriber(ball_ep, ctx=ctx)
76
    pci_sub = event_subscriber(pci_ep, ctx=ctx)
77
    console.print(f"[green]viewer[/green] cap={cap_ep} ball={ball_ep} pci={pci_ep}")
78
 
79
    last_ball: dict | None = None
80
    last_pred: dict | None = None
81
    last_pci: dict | None = None
82
    frames = 0
83
    t_start = time.perf_counter()
84
 
85
    win = "pitch-tracker-cv viewer"
86
    if not args.no_gui:
87
        cv2.namedWindow(win, cv2.WINDOW_NORMAL)
88
 
89
    try:
90
        for meta, frame in iter_latest_frames(frame_sub, timeout_ms=3000):
91
            frames += 1
92
            for ev in drain_latest_json(ball_sub):
93
                if ev.get("type") == "ball_track":
94
                    last_ball = ev
95
                elif ev.get("type") == "pitch_pred":
96
                    last_pred = ev
97
                elif ev.get("type") == "ball_miss":
98
                    if last_ball and ev["ts_ns"] - last_ball.get("ts_ns", 0) > 3e8:
99
                        last_ball = None
100
            for ev in drain_latest_json(pci_sub):
101
                if ev.get("type") == "pci_track":
102
                    last_pci = ev
103
                elif ev.get("type") == "pci_miss":
104
                    if last_pci and ev["ts_ns"] - last_pci.get("ts_ns", 0) > 3e8:
105
                        last_pci = None
106
 
107
            if args.no_gui:
108
                if frames % 60 == 0:
109
                    console.print(
110
                        f"[dim]  viewer: {frames} frames  "
111
                        f"ball={'y' if last_ball else 'n'}  "
112
                        f"pred={'y' if last_pred else 'n'}  "
113
                        f"pci={'y' if last_pci else 'n'}[/dim]"
114
                    )
115
                continue
116
 
117
            h, w = frame.shape[:2]
118
            overlay = frame.copy()
119
 
120
            plate_y = int(h * plate_y_frac)
121
            cv2.line(overlay, (0, plate_y), (w, plate_y), COLOR_PLATE, 1, cv2.LINE_AA)
122
 
123
            if last_ball is not None:
124
                bx, by, br = int(last_ball["x"]), int(last_ball["y"]), int(last_ball["r"])
125
                cv2.circle(overlay, (bx, by), max(br, 4), COLOR_BALL, 2)
126
                cv2.circle(overlay, (bx, by), 2, COLOR_BALL, -1)
127
 
128
            if last_pred is not None:
129
                px = int(last_pred["plate_x"])
130
                py = int(last_pred["plate_y"])
131
                cv2.drawMarker(overlay, (px, py), COLOR_BALL_PRED, cv2.MARKER_CROSS, 28, 2)
132
                cv2.putText(
133
                    overlay,
134
                    f"eta {last_pred['eta_ms']:.0f}ms",
135
                    (px + 14, py - 8),
136
                    cv2.FONT_HERSHEY_SIMPLEX,
137
                    0.6,
138
                    COLOR_BALL_PRED,
139
                    2,
140
                    cv2.LINE_AA,
141
                )
142
 
143
            if last_pci is not None:
144
                pcx, pcy, pcr = int(last_pci["x"]), int(last_pci["y"]), int(last_pci["r"])
145
                cv2.circle(overlay, (pcx, pcy), pcr, COLOR_PCI, 2)
146
                cv2.circle(overlay, (pcx, pcy), 3, COLOR_PCI, -1)
147
 
148
            dt = time.perf_counter() - t_start
149
            fps = frames / dt if dt > 0 else 0.0
150
            draw_hud(overlay, [
151
                f"frames {frames}  display {fps:.1f} fps  seq {meta['seq']}",
152
                f"ball {'Y' if last_ball else '-'}   pred {'Y' if last_pred else '-'}   pci {'Y' if last_pci else '-'}",
153
            ])
154
 
155
            if args.scale != 1.0:
156
                overlay = cv2.resize(overlay, None, fx=args.scale, fy=args.scale, interpolation=cv2.INTER_AREA)
157
            cv2.imshow(win, overlay)
158
            key = cv2.waitKey(1) & 0xFF
159
            if key in (ord("q"), 27):
160
                break
161
    except TimeoutError as e:
162
        console.print(f"[red]viewer: {e}. Is capture/ingest.py running?[/red]")
163
        return 2
164
    except KeyboardInterrupt:
165
        console.print("[yellow]viewer interrupted.[/yellow]")
166
    finally:
167
        frame_sub.close(0)
168
        ball_sub.close(0)
169
        pci_sub.close(0)
170
        if not args.no_gui:
171
            cv2.destroyAllWindows()
172
    return 0
173
 
174
if __name__ == "__main__":
175
    sys.exit(main())