diff --git a/src/ria_toolkit_oss/agent/streamer.py b/src/ria_toolkit_oss/agent/streamer.py index 1e82a5f..6146a5a 100644 --- a/src/ria_toolkit_oss/agent/streamer.py +++ b/src/ria_toolkit_oss/agent/streamer.py @@ -249,9 +249,16 @@ class Streamer: await self._send_error(app_id, "start missing radio_config.device") return + # Open the SDR in a thread, never inline. The open is blocking and can be + # slow — a USRP shells out to uhd_find_devices and loads its FPGA, which + # takes seconds — and doing it on the event loop freezes the WebSocket + # keepalive long enough that the hub drops the agent and stops the app. + # (A Pluto opens fast enough to slip under the timeout, which is why it + # worked where a USRP hung.) + loop = asyncio.get_running_loop() try: - sdr, device_key = self._registry.acquire(device, identifier) - _apply_sdr_config(sdr, radio_config) + sdr, device_key = await loop.run_in_executor(None, self._registry.acquire, device, identifier) + await loop.run_in_executor(None, _apply_sdr_config, sdr, radio_config) except Exception as exc: logger.exception("Failed to open SDR %r", device) await self._send_error(app_id, f"SDR init failed: {_friendly_sdr_error(device, exc)}") @@ -385,42 +392,51 @@ class Streamer: await self._send_tx_status(app_id, "error", "tx_start missing radio_config.device") return - device_key: tuple[str, str | None] | None = None - sdr: Any = None - try: - sdr, device_key = self._registry.acquire(device, identifier) - _apply_sdr_config(sdr, radio_config) - # init_tx is mandatory for any driver that exposes it: drivers - # that gate _stream_tx on _tx_initialized (Pluto, HackRF, USRP, - # …) crash with a confusing "TX was not initialized" error 2 s - # later in the executor thread if we skip it. Treat the three - # required keys as a hard contract — a missing one is a hub-side - # manifest bug and we want it surfaced immediately, not papered - # over with stale radio state. - if hasattr(sdr, "init_tx"): - init_args = {k: radio_config.get(f"tx_{k}") for k in ("sample_rate", "center_frequency", "gain")} - missing = [f"tx_{k}" for k, v in init_args.items() if v is None] - if missing: - raise ValueError(f"tx_start missing required radio_config keys: {missing}") - sdr.init_tx( - sample_rate=init_args["sample_rate"], - center_frequency=init_args["center_frequency"], - gain=init_args["gain"], - channel=radio_config.get("tx_channel", 0), - gain_mode=radio_config.get("tx_gain_mode", "manual"), - ) - except Exception as exc: - if device_key is not None: - if self._registry.release(device_key): + # Open + init the SDR in a thread, never inline — the open is blocking and + # slow on a USRP (uhd_find_devices + FPGA load), and freezing the event + # loop stalls the WebSocket keepalive until the hub drops us. Cleanup on + # failure (release/close) stays inside the thread so a partial open never + # leaks a device handle. + def _open_and_init_tx() -> tuple[Any, tuple[str, str | None]]: + sdr_local, key_local = self._registry.acquire(device, identifier) + try: + _apply_sdr_config(sdr_local, radio_config) + # init_tx is mandatory for any driver that exposes it: drivers + # that gate _stream_tx on _tx_initialized (Pluto, HackRF, USRP, + # …) crash with a confusing "TX was not initialized" error 2 s + # later in the executor thread if we skip it. Treat the three + # required keys as a hard contract — a missing one is a hub-side + # manifest bug and we want it surfaced immediately, not papered + # over with stale radio state. + if hasattr(sdr_local, "init_tx"): + init_args = {k: radio_config.get(f"tx_{k}") for k in ("sample_rate", "center_frequency", "gain")} + missing = [f"tx_{k}" for k, v in init_args.items() if v is None] + if missing: + raise ValueError(f"tx_start missing required radio_config keys: {missing}") + sdr_local.init_tx( + sample_rate=init_args["sample_rate"], + center_frequency=init_args["center_frequency"], + gain=init_args["gain"], + channel=radio_config.get("tx_channel", 0), + gain_mode=radio_config.get("tx_gain_mode", "manual"), + ) + except Exception: + if self._registry.release(key_local): try: - sdr.close() + sdr_local.close() except Exception: pass + raise + return sdr_local, key_local + + self._loop = asyncio.get_running_loop() + try: + sdr, device_key = await self._loop.run_in_executor(None, _open_and_init_tx) + except Exception as exc: logger.exception("Failed to init TX on %r", device) await self._send_tx_status(app_id, "error", f"tx init failed: {_friendly_sdr_error(device, exc)}") return - self._loop = asyncio.get_running_loop() session = TxSession( app_id=app_id, sdr=sdr, diff --git a/src/ria_toolkit_oss/agent/ws_client.py b/src/ria_toolkit_oss/agent/ws_client.py index a33991d..ab5bd98 100644 --- a/src/ria_toolkit_oss/agent/ws_client.py +++ b/src/ria_toolkit_oss/agent/ws_client.py @@ -119,9 +119,15 @@ class WsClient: await asyncio.sleep(self.reconnect_pause) async def _heartbeat_loop(self, heartbeat: HeartbeatBuilder) -> None: + loop = asyncio.get_running_loop() while True: try: - await self.send_json(heartbeat()) + # Build off the event loop: a heartbeat can probe SDR hardware + # (e.g. uhd_find_devices on a USRP), which blocks for seconds and + # would otherwise freeze the WebSocket keepalive long enough for + # the hub to drop the agent. + payload = await loop.run_in_executor(None, heartbeat) + await self.send_json(payload) except Exception as exc: logger.debug("Heartbeat send failed: %s", exc) return