From bb1259fefc8e65955bccd72c3037e7950398fb2c Mon Sep 17 00:00:00 2001 From: jrhughes003 Date: Fri, 5 Jun 2026 10:40:54 -0400 Subject: [PATCH] fix(agent): don't probe SDR hardware while a session is active heartbeat detect_devices() shelled out to uhd_find_devices every 60s. When a USB SDR (USRP B2x0) was mid-capture, that probe disrupted the live stream and the device briefly vanished ("No UHD Devices Found"), killing the capture. detect_devices() gains a probe flag; heartbeat_payload passes probe=False whenever a session is active, returning the last good enumeration (or a driver-only list) instead of touching the hardware. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/ria_toolkit_oss/agent/hardware.py | 25 +++++++++++++++++++---- tests/agent/test_hardware.py | 29 +++++++++++++++++++++++++++ 2 files changed, 50 insertions(+), 4 deletions(-) diff --git a/src/ria_toolkit_oss/agent/hardware.py b/src/ria_toolkit_oss/agent/hardware.py index 6882e92..dfb61f4 100644 --- a/src/ria_toolkit_oss/agent/hardware.py +++ b/src/ria_toolkit_oss/agent/hardware.py @@ -144,18 +144,33 @@ def _detect_devices_uncached() -> list[dict]: return out -def detect_devices(*, use_cache: bool = True) -> list[dict]: +def _driver_only_devices() -> list[dict]: + """Hardware list from importable drivers alone — no device probing.""" + return [{"device": d, "identifier": None, "label": _label_for(d), "connected": None} for d in available_devices()] + + +def detect_devices(*, use_cache: bool = True, probe: bool = True) -> list[dict]: """Return enriched ``hardware`` entries for the heartbeat. Results are cached for ``_PROBE_TTL_S`` seconds because enumeration may shell - out to hardware tools. Pass ``use_cache=False`` to force a fresh probe. + out to hardware tools (e.g. ``uhd_find_devices``). Pass ``use_cache=False`` + to force a fresh probe. + + ``probe=False`` MUST be used while a capture/transmit session is active: + probing a USB SDR (running ``uhd_find_devices``) while it is streaming + disrupts the live stream and makes the device briefly disappear. In that + case we return the last good enumeration if we have one, else a driver-only + list — never touching the hardware. """ global _probe_cache now = time.monotonic() if use_cache and _probe_cache is not None: ts, cached = _probe_cache - if now - ts < _PROBE_TTL_S: + if not probe or (now - ts < _PROBE_TTL_S): return cached + if not probe: + # No cache yet and we must not touch the hardware mid-stream. + return _driver_only_devices() devices = _detect_devices_uncached() _probe_cache = (now, devices) return devices @@ -179,9 +194,11 @@ def heartbeat_payload( if c.tx_enabled: capabilities.append("tx") + # Never probe the hardware while a session is active: running + # uhd_find_devices against a streaming USB SDR disrupts the live capture. payload: dict = { "type": "heartbeat", - "hardware": detect_devices(), + "hardware": detect_devices(probe=not bool(sessions)), "status": status, "capabilities": capabilities, "tx_enabled": bool(c.tx_enabled), diff --git a/tests/agent/test_hardware.py b/tests/agent/test_hardware.py index 9e2af07..0ad07ce 100644 --- a/tests/agent/test_hardware.py +++ b/tests/agent/test_hardware.py @@ -55,6 +55,35 @@ def test_detect_devices_cache(): assert _device_names(a) == _device_names(b) +def test_detect_devices_probe_false_never_touches_hardware(monkeypatch): + # probe=False must not run the hardware enumerators (uhd_find_devices etc.), + # which would disrupt an active USB capture. + def boom(): + raise AssertionError("hardware must not be probed when probe=False") + + monkeypatch.setattr(hardware, "_detect_devices_uncached", boom) + monkeypatch.setattr(hardware, "_probe_cache", None) + devices = hardware.detect_devices(probe=False, use_cache=False) + assert all(e.get("connected") is None for e in devices) # driver-only + + +def test_heartbeat_disables_probe_during_active_session(monkeypatch): + seen = {} + + def fake_detect(**kw): + seen.clear() + seen.update(kw) + return [] + + monkeypatch.setattr(hardware, "detect_devices", fake_detect) + + hardware.heartbeat_payload(sessions={"rx": {"app_id": "a", "state": "streaming"}}) + assert seen.get("probe") is False # streaming → no hardware probe + + hardware.heartbeat_payload(sessions=None) + assert seen.get("probe") is True # idle → probe allowed + + def test_heartbeat_payload_tx_capability_from_cfg(): from ria_toolkit_oss.agent.config import AgentConfig