Zion Boggan
repos/Pitch Tracker CV/cv/_common.py
zionboggan.com ↗
71 lines · python
History for this file →
1
"""Shared helpers for CV subscribers."""
2
from __future__ import annotations
3
 
4
import json
5
from pathlib import Path
6
from typing import Iterator
7
 
8
import cv2
9
import numpy as np
10
import yaml
11
import zmq
12
 
13
ROOT = Path(__file__).resolve().parents[1]
14
CONFIG_PATH = ROOT / "configs" / "runtime.yaml"
15
 
16
def load_config() -> dict:
17
    with open(CONFIG_PATH, "r", encoding="utf-8") as f:
18
        return yaml.safe_load(f)
19
 
20
def make_frame_subscriber(endpoint: str, ctx: zmq.Context | None = None) -> zmq.Socket:
21
    ctx = ctx or zmq.Context.instance()
22
    sock = ctx.socket(zmq.SUB)
23
    sock.setsockopt(zmq.RCVHWM, 1)
24
    sock.setsockopt(zmq.SUBSCRIBE, b"")
25
    sock.connect(endpoint)
26
    return sock
27
 
28
def make_pub(endpoint: str, ctx: zmq.Context | None = None) -> zmq.Socket:
29
    ctx = ctx or zmq.Context.instance()
30
    sock = ctx.socket(zmq.PUB)
31
    sock.setsockopt(zmq.SNDHWM, 8)
32
    sock.bind(endpoint)
33
    return sock
34
 
35
def iter_latest_frames(sock: zmq.Socket, timeout_ms: int = 2000) -> Iterator[tuple[dict, np.ndarray]]:
36
    """Yield (meta_dict, decoded_bgr_frame) for each arriving message, dropping stale ones.
37
 
38
    Blocks up to timeout_ms for the next message; raises TimeoutError if idle too long.
39
    """
40
    poller = zmq.Poller()
41
    poller.register(sock, zmq.POLLIN)
42
    while True:
43
        events = dict(poller.poll(timeout_ms))
44
        if sock not in events:
45
            raise TimeoutError(f"No frame within {timeout_ms}ms")
46
 
47
        latest: list[bytes] | None = None
48
        while True:
49
            try:
50
                latest = sock.recv_multipart(flags=zmq.NOBLOCK)
51
            except zmq.Again:
52
                break
53
        if latest is None or len(latest) != 2:
54
            continue
55
        meta = json.loads(latest[0].decode("utf-8"))
56
        arr = np.frombuffer(latest[1], dtype=np.uint8)
57
        frame = cv2.imdecode(arr, cv2.IMREAD_COLOR)
58
        if frame is None:
59
            continue
60
        yield meta, frame
61
 
62
def send_event(sock: zmq.Socket, event: dict) -> None:
63
    sock.send(json.dumps(event).encode("utf-8"))
64
 
65
def event_subscriber(endpoint: str, ctx: zmq.Context | None = None) -> zmq.Socket:
66
    ctx = ctx or zmq.Context.instance()
67
    sock = ctx.socket(zmq.SUB)
68
    sock.setsockopt(zmq.RCVHWM, 64)
69
    sock.setsockopt(zmq.SUBSCRIBE, b"")
70
    sock.connect(endpoint)
71
    return sock