fix(agent): don't probe SDR hardware while a session is active

heartbeat detect_devices() shelled out to uhd_find_devices every 60s. When a
USB SDR (USRP B2x0) was mid-capture, that probe disrupted the live stream and
the device briefly vanished ("No UHD Devices Found"), killing the capture.

detect_devices() gains a probe flag; heartbeat_payload passes probe=False
whenever a session is active, returning the last good enumeration (or a
driver-only list) instead of touching the hardware.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
J jrhughes003 2026-06-05 10:40:54 -04:00
parent 6bead217a3
commit bb1259fefc
2 changed files with 50 additions and 4 deletions

View File

@ -144,18 +144,33 @@ def _detect_devices_uncached() -> list[dict]:
return out return out
def detect_devices(*, use_cache: bool = True) -> list[dict]: def _driver_only_devices() -> list[dict]:
"""Hardware list from importable drivers alone — no device probing."""
return [{"device": d, "identifier": None, "label": _label_for(d), "connected": None} for d in available_devices()]
def detect_devices(*, use_cache: bool = True, probe: bool = True) -> list[dict]:
"""Return enriched ``hardware`` entries for the heartbeat. """Return enriched ``hardware`` entries for the heartbeat.
Results are cached for ``_PROBE_TTL_S`` seconds because enumeration may shell Results are cached for ``_PROBE_TTL_S`` seconds because enumeration may shell
out to hardware tools. Pass ``use_cache=False`` to force a fresh probe. out to hardware tools (e.g. ``uhd_find_devices``). Pass ``use_cache=False``
to force a fresh probe.
``probe=False`` MUST be used while a capture/transmit session is active:
probing a USB SDR (running ``uhd_find_devices``) while it is streaming
disrupts the live stream and makes the device briefly disappear. In that
case we return the last good enumeration if we have one, else a driver-only
list never touching the hardware.
""" """
global _probe_cache global _probe_cache
now = time.monotonic() now = time.monotonic()
if use_cache and _probe_cache is not None: if use_cache and _probe_cache is not None:
ts, cached = _probe_cache ts, cached = _probe_cache
if now - ts < _PROBE_TTL_S: if not probe or (now - ts < _PROBE_TTL_S):
return cached return cached
if not probe:
# No cache yet and we must not touch the hardware mid-stream.
return _driver_only_devices()
devices = _detect_devices_uncached() devices = _detect_devices_uncached()
_probe_cache = (now, devices) _probe_cache = (now, devices)
return devices return devices
@ -179,9 +194,11 @@ def heartbeat_payload(
if c.tx_enabled: if c.tx_enabled:
capabilities.append("tx") capabilities.append("tx")
# Never probe the hardware while a session is active: running
# uhd_find_devices against a streaming USB SDR disrupts the live capture.
payload: dict = { payload: dict = {
"type": "heartbeat", "type": "heartbeat",
"hardware": detect_devices(), "hardware": detect_devices(probe=not bool(sessions)),
"status": status, "status": status,
"capabilities": capabilities, "capabilities": capabilities,
"tx_enabled": bool(c.tx_enabled), "tx_enabled": bool(c.tx_enabled),

View File

@ -55,6 +55,35 @@ def test_detect_devices_cache():
assert _device_names(a) == _device_names(b) assert _device_names(a) == _device_names(b)
def test_detect_devices_probe_false_never_touches_hardware(monkeypatch):
# probe=False must not run the hardware enumerators (uhd_find_devices etc.),
# which would disrupt an active USB capture.
def boom():
raise AssertionError("hardware must not be probed when probe=False")
monkeypatch.setattr(hardware, "_detect_devices_uncached", boom)
monkeypatch.setattr(hardware, "_probe_cache", None)
devices = hardware.detect_devices(probe=False, use_cache=False)
assert all(e.get("connected") is None for e in devices) # driver-only
def test_heartbeat_disables_probe_during_active_session(monkeypatch):
seen = {}
def fake_detect(**kw):
seen.clear()
seen.update(kw)
return []
monkeypatch.setattr(hardware, "detect_devices", fake_detect)
hardware.heartbeat_payload(sessions={"rx": {"app_id": "a", "state": "streaming"}})
assert seen.get("probe") is False # streaming → no hardware probe
hardware.heartbeat_payload(sessions=None)
assert seen.get("probe") is True # idle → probe allowed
def test_heartbeat_payload_tx_capability_from_cfg(): def test_heartbeat_payload_tx_capability_from_cfg():
from ria_toolkit_oss.agent.config import AgentConfig from ria_toolkit_oss.agent.config import AgentConfig