| 1 | """Replay recorded ball_track events through try_fit, report why predictions fire or fail.""" |
| 2 | from __future__ import annotations |
| 3 | |
| 4 | import json |
| 5 | import sys |
| 6 | from collections import defaultdict |
| 7 | from pathlib import Path |
| 8 | |
| 9 | import numpy as np |
| 10 | from rich.console import Console |
| 11 | |
| 12 | sys.path.insert(0, str(Path(__file__).resolve().parents[1])) |
| 13 | from cv._common import load_config |
| 14 | |
| 15 | console = Console() |
| 16 | |
| 17 | def fit_and_explain(pts: list[dict], plate_y_px: float) -> str: |
| 18 | if len(pts) < 4: |
| 19 | return f"SKIP n={len(pts)}<4" |
| 20 | ys = np.array([p["y"] for p in pts], dtype=np.float64) |
| 21 | xs = np.array([p["x"] for p in pts], dtype=np.float64) |
| 22 | if ys.max() - ys.min() < 30: |
| 23 | return f"REJECT dy={ys.max()-ys.min():.0f}<30" |
| 24 | if ys[-1] - ys[0] <= 0: |
| 25 | return f"REJECT not moving down (y0={ys[0]:.0f} -> yN={ys[-1]:.0f})" |
| 26 | |
| 27 | t0 = pts[0]["ts_ns"] |
| 28 | ts = np.array([(p["ts_ns"] - t0) / 1e9 for p in pts], dtype=np.float64) |
| 29 | try: |
| 30 | ay, by, cy = np.polyfit(ts, ys, 2) |
| 31 | bx, cx = np.polyfit(ts, xs, 1) |
| 32 | except Exception as e: |
| 33 | return f"FIT_ERR {e}" |
| 34 | |
| 35 | disc = by * by - 4 * ay * (cy - plate_y_px) |
| 36 | if disc < 0: |
| 37 | return f"REJECT disc<0 (ay={ay:.1f} by={by:.1f} cy={cy:.1f} plate={plate_y_px:.0f})" |
| 38 | if abs(ay) < 1e-6: |
| 39 | return f"REJECT ay~0 ({ay:.3e})" |
| 40 | sqrt_d = float(np.sqrt(disc)) |
| 41 | t_cand = [(-by + sqrt_d) / (2 * ay), (-by - sqrt_d) / (2 * ay)] |
| 42 | future = [tc for tc in t_cand if tc > ts[-1]] |
| 43 | if not future: |
| 44 | |
| 45 | past_best = max(tc for tc in t_cand if tc <= ts[-1]) |
| 46 | return f"REJECT crossing in past: t_cand={[f'{t:.3f}' for t in t_cand]} ts[-1]={ts[-1]:.3f} (past_best={past_best:.3f})" |
| 47 | t_cross = min(future) |
| 48 | plate_x = bx * t_cross + cx |
| 49 | eta_ms = (t_cross - ts[-1]) * 1000.0 |
| 50 | if eta_ms > 2000: |
| 51 | return f"REJECT eta={eta_ms:.0f}ms>2000" |
| 52 | return f"FIT OK plate_x={plate_x:.0f} eta={eta_ms:.1f}ms t_cross={t_cross:.3f} ts[-1]={ts[-1]:.3f} ay={ay:.1f} by={by:.1f}" |
| 53 | |
| 54 | def main() -> int: |
| 55 | if len(sys.argv) < 2: |
| 56 | console.print("usage: fit_debug.py <events.jsonl>") |
| 57 | return 2 |
| 58 | |
| 59 | cfg = load_config() |
| 60 | plate_y_px = 1080 * float(cfg["cv"]["plate_y_frac"]) |
| 61 | console.print(f"plate_y_px = {plate_y_px:.0f} (plate_y_frac={cfg['cv']['plate_y_frac']})\n") |
| 62 | |
| 63 | pitches: dict[int, list[dict]] = defaultdict(list) |
| 64 | with open(sys.argv[1], "r", encoding="utf-8") as f: |
| 65 | for line in f: |
| 66 | try: |
| 67 | ev = json.loads(line) |
| 68 | except Exception: |
| 69 | continue |
| 70 | if ev.get("type") != "ball_track": |
| 71 | continue |
| 72 | pid = int(ev.get("pitch_id", 0)) |
| 73 | pitches[pid].append(ev) |
| 74 | |
| 75 | for pid in sorted(pitches.keys()): |
| 76 | pts = pitches[pid] |
| 77 | if len(pts) < 4: |
| 78 | continue |
| 79 | ys = [p["y"] for p in pts] |
| 80 | xs = [p["x"] for p in pts] |
| 81 | dy = max(ys) - min(ys) |
| 82 | if dy < 100: |
| 83 | continue |
| 84 | res = fit_and_explain(pts, plate_y_px) |
| 85 | console.print(f"pitch #{pid:3d} n={len(pts):2d} dy={dy:4.0f} {res}") |
| 86 | return 0 |
| 87 | |
| 88 | if __name__ == "__main__": |
| 89 | sys.exit(main()) |