| | @@ -0,0 +1,278 @@ |
| + | #pragma METAINFO("pitch_tracker_cv_gcv", 1, 0, "pitch-tracker-cv") |
| + | |
| + | main {"""GCVWorker for MLB The Show 26 aim assist. |
| + | |
| + | Runs inside Gtuner IV's Computer Vision (GCV) module. Gtuner IV captures video |
| + | from the Monster capture card at 60fps and calls GCVWorker.process(frame) each |
| + | frame. This class: |
| + | |
| + | 1. Detects the baseball and PCI (Plate Coverage Indicator) using classical CV. |
| + | 2. Predicts plate-crossing x from a rolling trajectory fit. |
| + | 3. Packs an aim-assist command payload into gcvdata (≤255 bytes) for the |
| + | paired GPC script (mlb26_bridge.gpc) to read via gcv_ready()/gcv_read(). |
| + | |
| + | gcvdata layout: |
| + | offset type name notes |
| + | 0 fix32 aim_stick_x left-stick X deflection (-100..100) |
| + | 4 fix32 aim_stick_y left-stick Y deflection (-100..100) |
| + | 8 int16 armed 0 = passthrough, 1 = aim active |
| + | 10 int16 in_flight 1 = ball currently tracked |
| + | 12 int16 press_contact 1 = press X (contact swing) THIS FRAME |
| + | 14 int16 press_power 1 = press A (power swing) THIS FRAME |
| + | 16 int16 eta_ms predicted ms until ball crosses plate |
| + | 18 int16 debug_flags bit0=pci_found, bit1=ball_found, bit2=pred_good |
| + | |
| + | All fix32 values are 32-bit fixed-point (Titan Two native). We pack them as |
| + | little-endian 4-byte signed integers scaled by 65536 (16.16). |
| + | """ |
| + | from __future__ import annotations |
| + | |
| + | import struct |
| + | import time |
| + | from collections import deque |
| + | |
| + | import cv2 |
| + | import numpy as np |
| + | |
| + | BALL_HSV_LOW = np.array([0, 0, 190], dtype=np.uint8) |
| + | BALL_HSV_HIGH = np.array([180, 60, 255], dtype=np.uint8) |
| + | BALL_MIN_R = 3.0 |
| + | BALL_MAX_R = 30.0 |
| + | BALL_MIN_CIRC = 0.6 |
| + | |
| + | PCI_HSV_LOW = np.array([45, 140, 140], dtype=np.uint8) |
| + | PCI_HSV_HIGH = np.array([75, 255, 255], dtype=np.uint8) |
| + | PCI_SEARCH_X = (0.35, 0.65) |
| + | PCI_SEARCH_Y = (0.35, 0.75) |
| + | PCI_MIN_GREEN_PX = 150 |
| + | |
| + | PLATE_Y_FRAC = 0.85 |
| + | TRAJ_WINDOW_S = 0.8 |
| + | MIN_FIT_POINTS = 4 |
| + | FIT_USE_LAST_N = 8 |
| + | PITCH_GAP_MS = 300 |
| + | MAX_STEP_PX = 200 |
| + | PITCH_START_MAX_Y = 500 |
| + | STATIC_WINDOW = 3 |
| + | STATIC_SPREAD_PX = 3.0 |
| + | BAN_ZONE_MS = 1500 |
| + | BAN_ZONE_PX = 6 |
| + | |
| + | AIM_GAIN_X = 0.08 |
| + | AIM_GAIN_Y = 0.06 |
| + | AIM_DEADZONE_PX = 8 |
| + | AIM_MAX_STICK = 100.0 |
| + | |
| + | CONTACT_TRIGGER_ETA_MS = 80 |
| + | CONTACT_MAX_AIM_ERR_PX = 18 |
| + | |
| + | def _fix32(value: float) -> bytes: |
| + | """Encode float as Titan Two fix32 (16.16 signed, little-endian).""" |
| + | v = int(round(value * 65536.0)) |
| + | v = max(-2**31, min(2**31 - 1, v)) |
| + | return struct.pack("<i", v) |
| + | |
| + | def _int16(value: int) -> bytes: |
| + | v = max(-32768, min(32767, int(value))) |
| + | return struct.pack("<h", v) |
| + | |
| + | class _Ball: |
| + | __slots__ = ("ts_ns", "x", "y", "r") |
| + | def __init__(self, ts_ns, x, y, r): |
| + | self.ts_ns, self.x, self.y, self.r = ts_ns, x, y, r |
| + | |
| + | class GCVWorker: |
| + | """Gtuner IV computer-vision worker. Called per frame. |
| + | |
| + | Gtuner IV invokes: GCVWorker(width, height). The (width, height) are the |
| + | captured frame dimensions (e.g. 1920, 1080). |
| + | """ |
| + | |
| + | def __init__(self, width, height): |
| + | import os |
| + | os.chdir(os.path.dirname(__file__)) |
| + | self.width = width |
| + | self.height = height |
| + | self.trail: deque = deque(maxlen=64) |
| + | self.banned: deque = deque(maxlen=32) |
| + | self.last_det_ns: int | None = None |
| + | self.last_pci = None |
| + | self.pitch_id = 0 |
| + | self.gcvdata = bytearray(20) |
| + | |
| + | def __del__(self): |
| + | try: |
| + | del self.gcvdata |
| + | except Exception: |
| + | pass |
| + | |
| + | def _detect_ball(self, frame, ts_ns): |
| + | hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) |
| + | mask = cv2.inRange(hsv, BALL_HSV_LOW, BALL_HSV_HIGH) |
| + | mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, np.ones((3, 3), np.uint8), 1) |
| + | contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) |
| + | best, best_score = None, 0.0 |
| + | for c in contours: |
| + | area = cv2.contourArea(c) |
| + | if area < np.pi * BALL_MIN_R * BALL_MIN_R * 0.5: |
| + | continue |
| + | (cx, cy), r = cv2.minEnclosingCircle(c) |
| + | if r < BALL_MIN_R or r > BALL_MAX_R: |
| + | continue |
| + | circ = float(area / (np.pi * r * r)) if r > 0 else 0.0 |
| + | if circ < BALL_MIN_CIRC: |
| + | continue |
| + | if circ > best_score: |
| + | best_score = circ |
| + | best = _Ball(ts_ns, float(cx), float(cy), float(r)) |
| + | return best |
| + | |
| + | def _detect_pci(self, frame): |
| + | h, w = frame.shape[:2] |
| + | hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) |
| + | mask = cv2.inRange(hsv, PCI_HSV_LOW, PCI_HSV_HIGH) |
| + | x0, x1 = int(w * PCI_SEARCH_X[0]), int(w * PCI_SEARCH_X[1]) |
| + | y0, y1 = int(h * PCI_SEARCH_Y[0]), int(h * PCI_SEARCH_Y[1]) |
| + | window = np.zeros_like(mask) |
| + | window[y0:y1, x0:x1] = 255 |
| + | mask = cv2.bitwise_and(mask, window) |
| + | mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, np.ones((3, 3), np.uint8), 1) |
| + | ys, xs = np.where(mask > 0) |
| + | if len(xs) < PCI_MIN_GREEN_PX: |
| + | return None |
| + | return float(xs.mean()), float(ys.mean()) |
| + | |
| + | def _try_fit(self, plate_y_px: float): |
| + | trail = list(self.trail)[-FIT_USE_LAST_N:] |
| + | if len(trail) < MIN_FIT_POINTS: |
| + | return None |
| + | ys = np.array([d.y for d in trail], dtype=np.float64) |
| + | if ys.max() - ys.min() < 30: |
| + | return None |
| + | if (ys[-1] - ys[0]) < -30: |
| + | return None |
| + | t0 = trail[0].ts_ns |
| + | ts = np.array([(d.ts_ns - t0) / 1e9 for d in trail], dtype=np.float64) |
| + | xs = np.array([d.x for d in trail], dtype=np.float64) |
| + | try: |
| + | ay, by, cy = np.polyfit(ts, ys, 2) |
| + | bx, cx = np.polyfit(ts, xs, 1) |
| + | except Exception: |
| + | return None |
| + | disc = by * by - 4 * ay * (cy - plate_y_px) |
| + | if disc < 0 or abs(ay) < 1e-6: |
| + | return None |
| + | sqrt_d = float(np.sqrt(disc)) |
| + | candidates = [(-by + sqrt_d) / (2 * ay), (-by - sqrt_d) / (2 * ay)] |
| + | future = [t for t in candidates if t > ts[-1]] |
| + | if not future: |
| + | return None |
| + | t_cross = min(future) |
| + | plate_x = float(bx * t_cross + cx) |
| + | eta_ms = float((t_cross - ts[-1]) * 1000.0) |
| + | if eta_ms < 0 or eta_ms > 2000: |
| + | return None |
| + | return plate_x, eta_ms |
| + | |
| + | def process(self, frame): |
| + | h, w = frame.shape[:2] |
| + | ts_ns = time.time_ns() |
| + | plate_y_px = h * PLATE_Y_FRAC |
| + | |
| + | pci = self._detect_pci(frame) |
| + | if pci is not None: |
| + | self.last_pci = pci |
| + | |
| + | while self.banned and self.banned[0][2] <= ts_ns: |
| + | self.banned.popleft() |
| + | ball = self._detect_ball(frame, ts_ns) |
| + | if ball is not None: |
| + | in_banned = any( |
| + | abs(ball.x - bx) < BAN_ZONE_PX and abs(ball.y - by) < BAN_ZONE_PX |
| + | for (bx, by, _) in self.banned |
| + | ) |
| + | if in_banned: |
| + | ball = None |
| + | |
| + | if ball is not None: |
| + | if self.last_det_ns is not None and (ts_ns - self.last_det_ns) > PITCH_GAP_MS * 1e6: |
| + | self.trail.clear() |
| + | self.pitch_id += 1 |
| + | if self.trail: |
| + | last = self.trail[-1] |
| + | d = ((ball.x - last.x) ** 2 + (ball.y - last.y) ** 2) ** 0.5 |
| + | if d > MAX_STEP_PX: |
| + | self.trail.clear() |
| + | self.pitch_id += 1 |
| + | if not self.trail and ball.y > PITCH_START_MAX_Y: |
| + | ball = None |
| + | if ball is not None: |
| + | self.trail.append(ball) |
| + | self.last_det_ns = ts_ns |
| + | cutoff = ts_ns - int(TRAJ_WINDOW_S * 1e9) |
| + | while self.trail and self.trail[0].ts_ns < cutoff: |
| + | self.trail.popleft() |
| + | if len(self.trail) >= STATIC_WINDOW: |
| + | recent = list(self.trail)[-STATIC_WINDOW:] |
| + | rxs = [d.x for d in recent] |
| + | rys = [d.y for d in recent] |
| + | spread = max(max(rxs) - min(rxs), max(rys) - min(rys)) |
| + | if spread < STATIC_SPREAD_PX: |
| + | cx = sum(rxs) / len(rxs) |
| + | cy_c = sum(rys) / len(rys) |
| + | self.banned.append((cx, cy_c, ts_ns + int(BAN_ZONE_MS * 1e6))) |
| + | self.trail.clear() |
| + | self.last_det_ns = None |
| + | ball = None |
| + | |
| + | pred = self._try_fit(plate_y_px) if ball is not None else None |
| + | |
| + | aim_x = 0.0 |
| + | aim_y = 0.0 |
| + | press_contact = 0 |
| + | press_power = 0 |
| + | eta_ms = 0 |
| + | pred_good = 0 |
| + | if pred is not None and self.last_pci is not None: |
| + | plate_x, eta_ms_f = pred |
| + | eta_ms = int(max(0, min(32767, eta_ms_f))) |
| + | pred_good = 1 |
| + | dx = plate_x - self.last_pci[0] |
| + | dy = plate_y_px - self.last_pci[1] |
| + | if abs(dx) > AIM_DEADZONE_PX: |
| + | aim_x = max(-AIM_MAX_STICK, min(AIM_MAX_STICK, AIM_GAIN_X * dx)) |
| + | if abs(dy) > AIM_DEADZONE_PX: |
| + | aim_y = max(-AIM_MAX_STICK, min(AIM_MAX_STICK, AIM_GAIN_Y * dy)) |
| + | |
| + | residual = (dx ** 2 + dy ** 2) ** 0.5 |
| + | if eta_ms <= CONTACT_TRIGGER_ETA_MS and residual <= CONTACT_MAX_AIM_ERR_PX: |
| + | press_contact = 1 |
| + | |
| + | armed = 1 |
| + | in_flight = 1 if (self.trail and ball is not None) else 0 |
| + | debug_flags = (1 if self.last_pci else 0) | (2 if ball else 0) | (4 if pred_good else 0) |
| + | |
| + | gcvdata = bytearray(20) |
| + | gcvdata[0:4] = _fix32(aim_x) |
| + | gcvdata[4:8] = _fix32(aim_y) |
| + | gcvdata[8:10] = _int16(armed) |
| + | gcvdata[10:12] = _int16(in_flight) |
| + | gcvdata[12:14] = _int16(press_contact) |
| + | gcvdata[14:16] = _int16(press_power) |
| + | gcvdata[16:18] = _int16(eta_ms) |
| + | gcvdata[18:20] = _int16(debug_flags) |
| + | |
| + | if self.last_pci is not None: |
| + | cv2.circle(frame, (int(self.last_pci[0]), int(self.last_pci[1])), 80, (0, 255, 0), 2) |
| + | if ball is not None: |
| + | cv2.circle(frame, (int(ball.x), int(ball.y)), max(6, int(ball.r)), (0, 255, 255), 2) |
| + | if pred is not None: |
| + | px, _ = pred |
| + | cv2.drawMarker(frame, (int(px), int(plate_y_px)), (0, 128, 255), cv2.MARKER_CROSS, 24, 2) |
| + | cv2.putText(frame, f"eta {eta_ms}ms aim=({aim_x:+.1f},{aim_y:+.1f}) contact={press_contact}", |
| + | (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2, cv2.LINE_AA) |
| + | cv2.line(frame, (0, int(plate_y_px)), (w, int(plate_y_px)), (255, 255, 255), 1) |
| + | |
| + | return frame, bytes(gcvdata) |
| + | |
| + | } |