diff --git a/src/ria_toolkit_oss/sdr/usrp.py b/src/ria_toolkit_oss/sdr/usrp.py index 3ae7569..44d8842 100644 --- a/src/ria_toolkit_oss/sdr/usrp.py +++ b/src/ria_toolkit_oss/sdr/usrp.py @@ -72,7 +72,7 @@ class USRP(SDR): # build USRP object usrp_args = _generate_usrp_config_string(sample_rate=sample_rate, device_dict=self.device_dict) - self.usrp = uhd.usrp.MultiUSRP(usrp_args) + self.usrp = _open_multi_usrp(usrp_args) # check if channel arg is valid max_num_channels = self.usrp.get_rx_num_channels() @@ -394,7 +394,7 @@ class USRP(SDR): print(f"USRP TX Gain Mode = '{gain_mode}'") config_str = _generate_usrp_config_string(sample_rate=sample_rate, device_dict=self.device_dict) - self.usrp = uhd.usrp.MultiUSRP(config_str) + self.usrp = _open_multi_usrp(config_str) # check if channel arg is valid max_num_channels = self.usrp.get_rx_num_channels() @@ -563,6 +563,32 @@ class USRP(SDR): return {"center_frequency": True, "sample_rate": True, "gain": True} +def _open_multi_usrp(usrp_args, *, attempts=4, settle_s=2.0): + """Construct a ``uhd.usrp.MultiUSRP``, retrying transient B200 USB states. + + On USB USRPs (B200/B210) the ``uhd_find_devices`` enumeration that resolves + the device (see ``_create_device_dict``) runs immediately before the open and + can leave the FX3 USB controller mid-reset, so the first open fails with e.g. + ``RuntimeError: fx3 is in state 5``. The device settles once that + enumeration's USB handle is fully released — and the failed open itself nudges + the FX3 to reload firmware/FPGA — so we retry with a short backoff before + giving up. A non-transient error (bad args, genuinely absent device) is + re-raised immediately. + """ + for attempt in range(1, attempts + 1): + try: + return uhd.usrp.MultiUSRP(usrp_args) + except RuntimeError as exc: + msg = str(exc).lower() + transient = ("fx3" in msg) or ("usb" in msg) or ("no devices found" in msg) + if not transient or attempt == attempts: + raise + print( + f"\033[93mUSRP open attempt {attempt}/{attempts} failed " f"({exc}); retrying in {settle_s}s…\033[0m" + ) + time.sleep(settle_s) + + def _create_device_dict(identifier_value=None): """ Get the dictionary of information corresponding to any unique identifier, diff --git a/tests/agent/test_usrp_open_retry.py b/tests/agent/test_usrp_open_retry.py new file mode 100644 index 0000000..57ddbe4 --- /dev/null +++ b/tests/agent/test_usrp_open_retry.py @@ -0,0 +1,91 @@ +"""Hardware-free tests for _open_multi_usrp's transient-FX3 retry. + +On B200/B210 the `uhd_find_devices` enumeration that runs right before opening +can leave the FX3 USB controller mid-reset, so the first MultiUSRP open fails +with "fx3 is in state 5". _open_multi_usrp retries transient USB states with a +short settle; a non-transient error is re-raised immediately. +""" + +from __future__ import annotations + +import sys +import types + +import pytest + + +@pytest.fixture +def usrp_mod(monkeypatch): + """Import the usrp module against a stub `uhd`, with time.sleep neutered.""" + saved_uhd = sys.modules.get("uhd") + saved_usrp = sys.modules.get("ria_toolkit_oss.sdr.usrp") + + uhd = types.ModuleType("uhd") + uhd.usrp = types.SimpleNamespace(MultiUSRP=None) # set per-test + sys.modules["uhd"] = uhd + sys.modules.pop("ria_toolkit_oss.sdr.usrp", None) + import ria_toolkit_oss.sdr.usrp as mod + + monkeypatch.setattr(mod.time, "sleep", lambda *_a, **_k: None) + + yield mod + + for name, m in (("uhd", saved_uhd), ("ria_toolkit_oss.sdr.usrp", saved_usrp)): + if m is None: + sys.modules.pop(name, None) + else: + sys.modules[name] = m + + +def _flaky_factory(fail_times, exc): + """A MultiUSRP stand-in that raises `exc` the first `fail_times` calls.""" + calls = {"n": 0} + + def make(args): + calls["n"] += 1 + if calls["n"] <= fail_times: + raise exc + return f"usrp<{args}>" + + make.calls = calls + return make + + +def test_retries_transient_fx3_state_then_succeeds(usrp_mod): + factory = _flaky_factory(2, RuntimeError("RuntimeError: fx3 is in state 5")) + usrp_mod.uhd.usrp.MultiUSRP = factory + + out = usrp_mod._open_multi_usrp("name=B210,", attempts=4, settle_s=0) + + assert out == "usrp" + assert factory.calls["n"] == 3 # failed twice, third succeeded + + +def test_gives_up_after_attempts_and_raises_last(usrp_mod): + factory = _flaky_factory(99, RuntimeError("fx3 is in state 5")) + usrp_mod.uhd.usrp.MultiUSRP = factory + + with pytest.raises(RuntimeError, match="fx3 is in state 5"): + usrp_mod._open_multi_usrp("name=B210,", attempts=3, settle_s=0) + + assert factory.calls["n"] == 3 # exactly `attempts` tries, no infinite loop + + +def test_non_transient_error_is_raised_immediately(usrp_mod): + factory = _flaky_factory(99, RuntimeError("EnvironmentError: no UHD images")) + usrp_mod.uhd.usrp.MultiUSRP = factory + + with pytest.raises(RuntimeError, match="no UHD images"): + usrp_mod._open_multi_usrp("name=B210,", attempts=4, settle_s=0) + + assert factory.calls["n"] == 1 # not retried — fails fast + + +def test_success_on_first_try_does_not_retry(usrp_mod): + factory = _flaky_factory(0, RuntimeError("fx3 is in state 5")) + usrp_mod.uhd.usrp.MultiUSRP = factory + + out = usrp_mod._open_multi_usrp("addr=192.168.10.2,", attempts=4, settle_s=0) + + assert out == "usrp" + assert factory.calls["n"] == 1