92 lines
2.9 KiB
Python
92 lines
2.9 KiB
Python
|
J
|
"""Hardware-free tests for _open_multi_usrp's transient-FX3 retry.
|
||
|
|
|
||
|
|
On B200/B210 the `uhd_find_devices` enumeration that runs right before opening
|
||
|
|
can leave the FX3 USB controller mid-reset, so the first MultiUSRP open fails
|
||
|
|
with "fx3 is in state 5". _open_multi_usrp retries transient USB states with a
|
||
|
|
short settle; a non-transient error is re-raised immediately.
|
||
|
|
"""
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import sys
|
||
|
|
import types
|
||
|
|
|
||
|
|
import pytest
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.fixture
|
||
|
|
def usrp_mod(monkeypatch):
|
||
|
|
"""Import the usrp module against a stub `uhd`, with time.sleep neutered."""
|
||
|
|
saved_uhd = sys.modules.get("uhd")
|
||
|
|
saved_usrp = sys.modules.get("ria_toolkit_oss.sdr.usrp")
|
||
|
|
|
||
|
|
uhd = types.ModuleType("uhd")
|
||
|
|
uhd.usrp = types.SimpleNamespace(MultiUSRP=None) # set per-test
|
||
|
|
sys.modules["uhd"] = uhd
|
||
|
|
sys.modules.pop("ria_toolkit_oss.sdr.usrp", None)
|
||
|
|
import ria_toolkit_oss.sdr.usrp as mod
|
||
|
|
|
||
|
|
monkeypatch.setattr(mod.time, "sleep", lambda *_a, **_k: None)
|
||
|
|
|
||
|
|
yield mod
|
||
|
|
|
||
|
|
for name, m in (("uhd", saved_uhd), ("ria_toolkit_oss.sdr.usrp", saved_usrp)):
|
||
|
|
if m is None:
|
||
|
|
sys.modules.pop(name, None)
|
||
|
|
else:
|
||
|
|
sys.modules[name] = m
|
||
|
|
|
||
|
|
|
||
|
|
def _flaky_factory(fail_times, exc):
|
||
|
|
"""A MultiUSRP stand-in that raises `exc` the first `fail_times` calls."""
|
||
|
|
calls = {"n": 0}
|
||
|
|
|
||
|
|
def make(args):
|
||
|
|
calls["n"] += 1
|
||
|
|
if calls["n"] <= fail_times:
|
||
|
|
raise exc
|
||
|
|
return f"usrp<{args}>"
|
||
|
|
|
||
|
|
make.calls = calls
|
||
|
|
return make
|
||
|
|
|
||
|
|
|
||
|
|
def test_retries_transient_fx3_state_then_succeeds(usrp_mod):
|
||
|
|
factory = _flaky_factory(2, RuntimeError("RuntimeError: fx3 is in state 5"))
|
||
|
|
usrp_mod.uhd.usrp.MultiUSRP = factory
|
||
|
|
|
||
|
|
out = usrp_mod._open_multi_usrp("name=B210,", attempts=4, settle_s=0)
|
||
|
|
|
||
|
|
assert out == "usrp<name=B210,>"
|
||
|
|
assert factory.calls["n"] == 3 # failed twice, third succeeded
|
||
|
|
|
||
|
|
|
||
|
|
def test_gives_up_after_attempts_and_raises_last(usrp_mod):
|
||
|
|
factory = _flaky_factory(99, RuntimeError("fx3 is in state 5"))
|
||
|
|
usrp_mod.uhd.usrp.MultiUSRP = factory
|
||
|
|
|
||
|
|
with pytest.raises(RuntimeError, match="fx3 is in state 5"):
|
||
|
|
usrp_mod._open_multi_usrp("name=B210,", attempts=3, settle_s=0)
|
||
|
|
|
||
|
|
assert factory.calls["n"] == 3 # exactly `attempts` tries, no infinite loop
|
||
|
|
|
||
|
|
|
||
|
|
def test_non_transient_error_is_raised_immediately(usrp_mod):
|
||
|
|
factory = _flaky_factory(99, RuntimeError("EnvironmentError: no UHD images"))
|
||
|
|
usrp_mod.uhd.usrp.MultiUSRP = factory
|
||
|
|
|
||
|
|
with pytest.raises(RuntimeError, match="no UHD images"):
|
||
|
|
usrp_mod._open_multi_usrp("name=B210,", attempts=4, settle_s=0)
|
||
|
|
|
||
|
|
assert factory.calls["n"] == 1 # not retried — fails fast
|
||
|
|
|
||
|
|
|
||
|
|
def test_success_on_first_try_does_not_retry(usrp_mod):
|
||
|
|
factory = _flaky_factory(0, RuntimeError("fx3 is in state 5"))
|
||
|
|
usrp_mod.uhd.usrp.MultiUSRP = factory
|
||
|
|
|
||
|
|
out = usrp_mod._open_multi_usrp("addr=192.168.10.2,", attempts=4, settle_s=0)
|
||
|
|
|
||
|
|
assert out == "usrp<addr=192.168.10.2,>"
|
||
|
|
assert factory.calls["n"] == 1
|