"""Hardware-free tests for the USRP continuous-streaming rx(). `uhd` isn't importable without the UHD install, so we stub the bits USRP.rx() touches and drive it with a scripted fake rx_stream. The point is to prove the capture is gapless across rx() calls — the property that fixes the choppy / black-banded spectrogram caused by the old start/stop-per-buffer record(). """ from __future__ import annotations import sys import types import numpy as np import pytest def _install_fake_uhd(): uhd = types.ModuleType("uhd") class StreamCMD: def __init__(self, mode): self.mode = mode self.stream_now = False self.time_spec = None uhd.types = types.SimpleNamespace( StreamCMD=StreamCMD, StreamMode=types.SimpleNamespace(start_cont="start_cont", stop_cont="stop_cont"), RXMetadataErrorCode=types.SimpleNamespace(none="none", overflow="overflow", timeout="timeout"), ) uhd.usrp = types.SimpleNamespace() sys.modules["uhd"] = uhd return uhd @pytest.fixture def USRP(): # Snapshot so the fake uhd / freshly-imported usrp don't leak into other # tests (e.g. detect_available() would otherwise think usrp is importable). saved_uhd = sys.modules.get("uhd") saved_usrp = sys.modules.get("ria_toolkit_oss.sdr.usrp") _install_fake_uhd() sys.modules.pop("ria_toolkit_oss.sdr.usrp", None) from ria_toolkit_oss.sdr.usrp import USRP as _USRP yield _USRP for name, mod in (("uhd", saved_uhd), ("ria_toolkit_oss.sdr.usrp", saved_usrp)): if mod is None: sys.modules.pop(name, None) else: sys.modules[name] = mod class _FakeStream: """Delivers a contiguous ramp of samples; ``real`` part is the sample index. ``script`` is a list of (count, error_code) the recv loop walks through. """ def __init__(self, script, metadata): self._script = list(script) self._metadata = metadata self._counter = 0 self.issued = [] def issue_stream_cmd(self, cmd): self.issued.append(cmd.mode) def recv(self, buffer, metadata, timeout): count, err = self._script.pop(0) metadata.error_code = err if count > 0: idx = np.arange(self._counter, self._counter + count, dtype=np.float32) buffer[0, :count] = idx.astype(np.complex64) self._counter += count return count def _make_usrp(USRP, script, rx_buffer_size=4): u = USRP.__new__(USRP) u._rx_initialized = True u._rx_streaming = False u._rx_residual = np.empty(0, dtype=np.complex64) u.rx_buffer_size = rx_buffer_size u.timeout = 0.1 u._enable_rx = False u.metadata = types.SimpleNamespace(error_code="none") u.rx_stream = _FakeStream(script, u.metadata) return u def test_rx_is_gapless_across_calls(USRP): # rx_buffer_size=4; each recv yields 4 fresh samples. Two rx(6) calls must # return a contiguous 0..11 ramp — the over-read remainder is carried over. script = [(4, "none")] * 4 u = _make_usrp(USRP, script) first = u.rx(6) second = u.rx(6) assert first.dtype == np.complex64 and len(first) == 6 combined = np.concatenate([first, second]).real assert np.array_equal(combined, np.arange(12, dtype=np.float32)) # no drops, no zeros assert "start_cont" in u.rx_stream.issued # stream started exactly via start_cont assert u.rx_stream.issued.count("start_cont") == 1 # ...and only once def test_rx_starts_stream_only_once(USRP): u = _make_usrp(USRP, [(4, "none")] * 6) u.rx(4) u.rx(4) assert u.rx_stream.issued.count("start_cont") == 1 def test_rx_keeps_going_on_overflow(USRP): # Overflow samples are still valid — they must be used, not dropped. script = [(2, "none"), (2, "overflow"), (2, "none")] u = _make_usrp(USRP, script) out = u.rx(6).real assert np.array_equal(out, np.arange(6, dtype=np.float32)) def test_rx_raises_on_persistent_timeout(USRP): from ria_toolkit_oss.sdr.sdr import SdrDisconnectedError u = _make_usrp(USRP, [(0, "timeout")] * 10) with pytest.raises(SdrDisconnectedError): u.rx(4) def test_stop_rx_stream_resets_state(USRP): u = _make_usrp(USRP, [(4, "none")] * 4) u.rx(6) # leaves a 2-sample residual, stream running assert u._rx_streaming is True assert u._rx_residual.size == 2 u._stop_rx_stream() assert u._rx_streaming is False assert u._rx_residual.size == 0 assert "stop_cont" in u.rx_stream.issued