| 1 | """Capture card ingest service. |
| 2 | |
| 3 | Smoke mode (--smoke): 10s test that prints FPS + latency and saves one frame. |
| 4 | Default: subscribe-side-agnostic ZMQ PUB of JPEG frames on capture.publish_endpoint. |
| 5 | """ |
| 6 | from __future__ import annotations |
| 7 | |
| 8 | import argparse |
| 9 | import json |
| 10 | import sys |
| 11 | import time |
| 12 | from pathlib import Path |
| 13 | |
| 14 | import cv2 |
| 15 | import numpy as np |
| 16 | import yaml |
| 17 | import zmq |
| 18 | from rich.console import Console |
| 19 | |
| 20 | console = Console() |
| 21 | |
| 22 | ROOT = Path(__file__).resolve().parents[1] |
| 23 | CONFIG_PATH = ROOT / "configs" / "runtime.yaml" |
| 24 | LOG_DIR = ROOT / "logs" |
| 25 | |
| 26 | def load_config() -> dict: |
| 27 | with open(CONFIG_PATH, "r", encoding="utf-8") as f: |
| 28 | return yaml.safe_load(f) |
| 29 | |
| 30 | def probe_devices(max_index: int = 5, backend: int = cv2.CAP_DSHOW) -> list[tuple[int, int, int]]: |
| 31 | found: list[tuple[int, int, int]] = [] |
| 32 | for i in range(max_index + 1): |
| 33 | cap = cv2.VideoCapture(i, backend) |
| 34 | if cap.isOpened(): |
| 35 | w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
| 36 | h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
| 37 | found.append((i, w, h)) |
| 38 | cap.release() |
| 39 | return found |
| 40 | |
| 41 | def _try_open(idx: int, backend: int, w: int, h: int, fps: int) -> cv2.VideoCapture | None: |
| 42 | cap = cv2.VideoCapture(idx, backend) |
| 43 | if not cap.isOpened(): |
| 44 | cap.release() |
| 45 | return None |
| 46 | cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*"MJPG")) |
| 47 | cap.set(cv2.CAP_PROP_FRAME_WIDTH, w) |
| 48 | cap.set(cv2.CAP_PROP_FRAME_HEIGHT, h) |
| 49 | cap.set(cv2.CAP_PROP_FPS, fps) |
| 50 | ok, frame = cap.read() |
| 51 | if not ok or frame is None: |
| 52 | cap.release() |
| 53 | return None |
| 54 | return cap |
| 55 | |
| 56 | def open_capture(cfg: dict) -> cv2.VideoCapture | None: |
| 57 | cap_cfg = cfg["capture"] |
| 58 | idx = cap_cfg.get("device_index", 0) |
| 59 | w = cap_cfg.get("width", 1920) |
| 60 | h = cap_cfg.get("height", 1080) |
| 61 | fps = cap_cfg.get("fps", 60) |
| 62 | candidate_indices = [idx] |
| 63 | for extra in range(0, 6): |
| 64 | if extra not in candidate_indices: |
| 65 | candidate_indices.append(extra) |
| 66 | backends = [cv2.CAP_DSHOW, cv2.CAP_MSMF, cv2.CAP_ANY] |
| 67 | for backend in backends: |
| 68 | for cand_idx in candidate_indices: |
| 69 | cap = _try_open(cand_idx, backend, w, h, fps) |
| 70 | if cap is not None: |
| 71 | console.print( |
| 72 | f"[dim]open_capture: index={cand_idx} backend={backend} " |
| 73 | f"size={int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))}x{int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))}[/dim]" |
| 74 | ) |
| 75 | return cap |
| 76 | return None |
| 77 | |
| 78 | def smoke_test(cfg: dict, duration_s: float = 10.0) -> int: |
| 79 | LOG_DIR.mkdir(parents=True, exist_ok=True) |
| 80 | |
| 81 | console.print("[bold cyan]Probing video devices 0..5 (DirectShow)...[/bold cyan]") |
| 82 | devs = probe_devices() |
| 83 | if not devs: |
| 84 | console.print( |
| 85 | "[bold red]No DirectShow video devices could be opened.[/bold red]\n" |
| 86 | " Check: capture card USB cable seated, card has power (LED on),\n" |
| 87 | " Xbox HDMI plugged into card HDMI IN (not OUT)." |
| 88 | ) |
| 89 | return 2 |
| 90 | for i, w, h in devs: |
| 91 | console.print(f" device {i}: default size {w}x{h}") |
| 92 | |
| 93 | cap = open_capture(cfg) |
| 94 | if cap is None or not cap.isOpened(): |
| 95 | console.print( |
| 96 | f"[bold red]Could not open configured device_index={cfg['capture']['device_index']}.[/bold red]\n" |
| 97 | " Try a different index from the probe results above and update configs/runtime.yaml." |
| 98 | ) |
| 99 | return 2 |
| 100 | |
| 101 | w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
| 102 | h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
| 103 | reported_fps = cap.get(cv2.CAP_PROP_FPS) |
| 104 | console.print( |
| 105 | f"[green]Opened device_index={cfg['capture']['device_index']} " |
| 106 | f"at {w}x{h}, reported fps={reported_fps:.1f}[/green]" |
| 107 | ) |
| 108 | console.print(f"[bold]Running {duration_s:.1f}s smoke test...[/bold]") |
| 109 | |
| 110 | frame_count = 0 |
| 111 | first_frame: np.ndarray | None = None |
| 112 | last_frame: np.ndarray | None = None |
| 113 | latencies_ms: list[float] = [] |
| 114 | |
| 115 | t_start = time.perf_counter() |
| 116 | deadline = t_start + duration_s |
| 117 | while time.perf_counter() < deadline: |
| 118 | t0 = time.perf_counter() |
| 119 | ret, frame = cap.read() |
| 120 | t1 = time.perf_counter() |
| 121 | if not ret or frame is None: |
| 122 | continue |
| 123 | latencies_ms.append((t1 - t0) * 1000.0) |
| 124 | if first_frame is None: |
| 125 | first_frame = frame.copy() |
| 126 | last_frame = frame |
| 127 | frame_count += 1 |
| 128 | elapsed = time.perf_counter() - t_start |
| 129 | cap.release() |
| 130 | |
| 131 | if frame_count == 0 or first_frame is None: |
| 132 | console.print("[bold red]Zero frames captured in 10 seconds.[/bold red]") |
| 133 | return 3 |
| 134 | |
| 135 | fps = frame_count / elapsed if elapsed > 0 else 0.0 |
| 136 | lat = np.asarray(latencies_ms) |
| 137 | |
| 138 | console.print("[bold green]Smoke test results[/bold green]") |
| 139 | console.print(f" frames: {frame_count}") |
| 140 | console.print(f" elapsed: {elapsed:.2f}s") |
| 141 | console.print(f" measured FPS: {fps:.2f}") |
| 142 | console.print(f" frame size: {first_frame.shape[1]}x{first_frame.shape[0]} (channels={first_frame.shape[2]})") |
| 143 | console.print( |
| 144 | f" read latency: mean {lat.mean():.2f}ms " |
| 145 | f"p50 {np.percentile(lat, 50):.2f}ms " |
| 146 | f"p95 {np.percentile(lat, 95):.2f}ms " |
| 147 | f"max {lat.max():.2f}ms" |
| 148 | ) |
| 149 | |
| 150 | out_first = LOG_DIR / "smoke_frame.png" |
| 151 | cv2.imwrite(str(out_first), first_frame) |
| 152 | console.print(f"[green]Saved first frame -> {out_first}[/green]") |
| 153 | if last_frame is not None and frame_count > 1: |
| 154 | out_last = LOG_DIR / "smoke_frame_last.png" |
| 155 | cv2.imwrite(str(out_last), last_frame) |
| 156 | console.print(f"[green]Saved last frame -> {out_last}[/green]") |
| 157 | return 0 |
| 158 | |
| 159 | def run_publisher(cfg: dict) -> int: |
| 160 | cap = open_capture(cfg) |
| 161 | if cap is None or not cap.isOpened(): |
| 162 | console.print("[red]Capture device not available; publisher exiting.[/red]") |
| 163 | return 2 |
| 164 | endpoint = cfg["capture"]["publish_endpoint"] |
| 165 | jpeg_q = int(cfg["capture"].get("jpeg_quality", 80)) |
| 166 | ctx = zmq.Context.instance() |
| 167 | sock = ctx.socket(zmq.PUB) |
| 168 | sock.setsockopt(zmq.SNDHWM, 2) |
| 169 | sock.bind(endpoint) |
| 170 | console.print(f"[green]Publishing multipart [meta, jpeg] on {endpoint}[/green] (Ctrl-C to stop)") |
| 171 | |
| 172 | seq = 0 |
| 173 | t_last_report = time.perf_counter() |
| 174 | frames_since_report = 0 |
| 175 | try: |
| 176 | while True: |
| 177 | ret, frame = cap.read() |
| 178 | if not ret or frame is None: |
| 179 | continue |
| 180 | ok, buf = cv2.imencode(".jpg", frame, [cv2.IMWRITE_JPEG_QUALITY, jpeg_q]) |
| 181 | if not ok: |
| 182 | continue |
| 183 | meta = { |
| 184 | "seq": seq, |
| 185 | "ts_ns": time.time_ns(), |
| 186 | "h": int(frame.shape[0]), |
| 187 | "w": int(frame.shape[1]), |
| 188 | } |
| 189 | sock.send_multipart([json.dumps(meta).encode("utf-8"), buf.tobytes()]) |
| 190 | seq += 1 |
| 191 | frames_since_report += 1 |
| 192 | now = time.perf_counter() |
| 193 | if now - t_last_report >= 5.0: |
| 194 | fps = frames_since_report / (now - t_last_report) |
| 195 | console.print(f"[dim] publisher: {fps:.1f} fps, last seq={seq - 1}[/dim]") |
| 196 | t_last_report = now |
| 197 | frames_since_report = 0 |
| 198 | except KeyboardInterrupt: |
| 199 | console.print("[yellow]Publisher interrupted.[/yellow]") |
| 200 | finally: |
| 201 | cap.release() |
| 202 | sock.close(0) |
| 203 | ctx.term() |
| 204 | return 0 |
| 205 | |
| 206 | def main() -> None: |
| 207 | p = argparse.ArgumentParser(description="pitch-tracker-cv capture card ingest.") |
| 208 | p.add_argument("--smoke", action="store_true", help="10s smoke test: print FPS/latency, save one frame, exit.") |
| 209 | p.add_argument("--duration", type=float, default=10.0, help="Smoke test duration in seconds.") |
| 210 | args = p.parse_args() |
| 211 | cfg = load_config() |
| 212 | if args.smoke: |
| 213 | sys.exit(smoke_test(cfg, duration_s=args.duration)) |
| 214 | sys.exit(run_publisher(cfg)) |
| 215 | |
| 216 | if __name__ == "__main__": |
| 217 | main() |