fix(agent): open SDRs off the event loop so a slow USRP open can't drop the hub
rx/tx start handlers called the registry's blocking SDR open (and TX init) directly on the asyncio loop. A USRP open shells out to uhd_find_devices and loads its FPGA — several seconds — freezing the WebSocket keepalive long enough for the hub to drop the agent and stop the app, with the agent terminal hung in the blocking call. A Pluto opens fast enough to slip under the timeout, which is why it worked where a USRP did not. Run acquire/_apply_sdr_config (rx) and acquire/config/init_tx (tx) in a thread via run_in_executor, keeping release/close-on-failure inside the thread. Also build the heartbeat off the loop, since it can probe hardware (uhd_find_devices) while idle and block the keepalive the same way. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
54b66b64c4
commit
1ea865f5f8
|
|
@ -249,9 +249,16 @@ class Streamer:
|
||||||
await self._send_error(app_id, "start missing radio_config.device")
|
await self._send_error(app_id, "start missing radio_config.device")
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# Open the SDR in a thread, never inline. The open is blocking and can be
|
||||||
|
# slow — a USRP shells out to uhd_find_devices and loads its FPGA, which
|
||||||
|
# takes seconds — and doing it on the event loop freezes the WebSocket
|
||||||
|
# keepalive long enough that the hub drops the agent and stops the app.
|
||||||
|
# (A Pluto opens fast enough to slip under the timeout, which is why it
|
||||||
|
# worked where a USRP hung.)
|
||||||
|
loop = asyncio.get_running_loop()
|
||||||
try:
|
try:
|
||||||
sdr, device_key = self._registry.acquire(device, identifier)
|
sdr, device_key = await loop.run_in_executor(None, self._registry.acquire, device, identifier)
|
||||||
_apply_sdr_config(sdr, radio_config)
|
await loop.run_in_executor(None, _apply_sdr_config, sdr, radio_config)
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
logger.exception("Failed to open SDR %r", device)
|
logger.exception("Failed to open SDR %r", device)
|
||||||
await self._send_error(app_id, f"SDR init failed: {_friendly_sdr_error(device, exc)}")
|
await self._send_error(app_id, f"SDR init failed: {_friendly_sdr_error(device, exc)}")
|
||||||
|
|
@ -385,42 +392,51 @@ class Streamer:
|
||||||
await self._send_tx_status(app_id, "error", "tx_start missing radio_config.device")
|
await self._send_tx_status(app_id, "error", "tx_start missing radio_config.device")
|
||||||
return
|
return
|
||||||
|
|
||||||
device_key: tuple[str, str | None] | None = None
|
# Open + init the SDR in a thread, never inline — the open is blocking and
|
||||||
sdr: Any = None
|
# slow on a USRP (uhd_find_devices + FPGA load), and freezing the event
|
||||||
try:
|
# loop stalls the WebSocket keepalive until the hub drops us. Cleanup on
|
||||||
sdr, device_key = self._registry.acquire(device, identifier)
|
# failure (release/close) stays inside the thread so a partial open never
|
||||||
_apply_sdr_config(sdr, radio_config)
|
# leaks a device handle.
|
||||||
# init_tx is mandatory for any driver that exposes it: drivers
|
def _open_and_init_tx() -> tuple[Any, tuple[str, str | None]]:
|
||||||
# that gate _stream_tx on _tx_initialized (Pluto, HackRF, USRP,
|
sdr_local, key_local = self._registry.acquire(device, identifier)
|
||||||
# …) crash with a confusing "TX was not initialized" error 2 s
|
try:
|
||||||
# later in the executor thread if we skip it. Treat the three
|
_apply_sdr_config(sdr_local, radio_config)
|
||||||
# required keys as a hard contract — a missing one is a hub-side
|
# init_tx is mandatory for any driver that exposes it: drivers
|
||||||
# manifest bug and we want it surfaced immediately, not papered
|
# that gate _stream_tx on _tx_initialized (Pluto, HackRF, USRP,
|
||||||
# over with stale radio state.
|
# …) crash with a confusing "TX was not initialized" error 2 s
|
||||||
if hasattr(sdr, "init_tx"):
|
# later in the executor thread if we skip it. Treat the three
|
||||||
init_args = {k: radio_config.get(f"tx_{k}") for k in ("sample_rate", "center_frequency", "gain")}
|
# required keys as a hard contract — a missing one is a hub-side
|
||||||
missing = [f"tx_{k}" for k, v in init_args.items() if v is None]
|
# manifest bug and we want it surfaced immediately, not papered
|
||||||
if missing:
|
# over with stale radio state.
|
||||||
raise ValueError(f"tx_start missing required radio_config keys: {missing}")
|
if hasattr(sdr_local, "init_tx"):
|
||||||
sdr.init_tx(
|
init_args = {k: radio_config.get(f"tx_{k}") for k in ("sample_rate", "center_frequency", "gain")}
|
||||||
sample_rate=init_args["sample_rate"],
|
missing = [f"tx_{k}" for k, v in init_args.items() if v is None]
|
||||||
center_frequency=init_args["center_frequency"],
|
if missing:
|
||||||
gain=init_args["gain"],
|
raise ValueError(f"tx_start missing required radio_config keys: {missing}")
|
||||||
channel=radio_config.get("tx_channel", 0),
|
sdr_local.init_tx(
|
||||||
gain_mode=radio_config.get("tx_gain_mode", "manual"),
|
sample_rate=init_args["sample_rate"],
|
||||||
)
|
center_frequency=init_args["center_frequency"],
|
||||||
except Exception as exc:
|
gain=init_args["gain"],
|
||||||
if device_key is not None:
|
channel=radio_config.get("tx_channel", 0),
|
||||||
if self._registry.release(device_key):
|
gain_mode=radio_config.get("tx_gain_mode", "manual"),
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
if self._registry.release(key_local):
|
||||||
try:
|
try:
|
||||||
sdr.close()
|
sdr_local.close()
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
raise
|
||||||
|
return sdr_local, key_local
|
||||||
|
|
||||||
|
self._loop = asyncio.get_running_loop()
|
||||||
|
try:
|
||||||
|
sdr, device_key = await self._loop.run_in_executor(None, _open_and_init_tx)
|
||||||
|
except Exception as exc:
|
||||||
logger.exception("Failed to init TX on %r", device)
|
logger.exception("Failed to init TX on %r", device)
|
||||||
await self._send_tx_status(app_id, "error", f"tx init failed: {_friendly_sdr_error(device, exc)}")
|
await self._send_tx_status(app_id, "error", f"tx init failed: {_friendly_sdr_error(device, exc)}")
|
||||||
return
|
return
|
||||||
|
|
||||||
self._loop = asyncio.get_running_loop()
|
|
||||||
session = TxSession(
|
session = TxSession(
|
||||||
app_id=app_id,
|
app_id=app_id,
|
||||||
sdr=sdr,
|
sdr=sdr,
|
||||||
|
|
|
||||||
|
|
@ -119,9 +119,15 @@ class WsClient:
|
||||||
await asyncio.sleep(self.reconnect_pause)
|
await asyncio.sleep(self.reconnect_pause)
|
||||||
|
|
||||||
async def _heartbeat_loop(self, heartbeat: HeartbeatBuilder) -> None:
|
async def _heartbeat_loop(self, heartbeat: HeartbeatBuilder) -> None:
|
||||||
|
loop = asyncio.get_running_loop()
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
await self.send_json(heartbeat())
|
# Build off the event loop: a heartbeat can probe SDR hardware
|
||||||
|
# (e.g. uhd_find_devices on a USRP), which blocks for seconds and
|
||||||
|
# would otherwise freeze the WebSocket keepalive long enough for
|
||||||
|
# the hub to drop the agent.
|
||||||
|
payload = await loop.run_in_executor(None, heartbeat)
|
||||||
|
await self.send_json(payload)
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
logger.debug("Heartbeat send failed: %s", exc)
|
logger.debug("Heartbeat send failed: %s", exc)
|
||||||
return
|
return
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user