From be32efeff280335ac7dccf60f5699d0088c9a44f Mon Sep 17 00:00:00 2001 From: madrigal Date: Thu, 2 Oct 2025 10:15:21 -0400 Subject: [PATCH] Added sdr pytests --- tests/sdr/test_blade.py | 178 +++++++++++++++++++++++++++++++++++++ tests/sdr/test_hackrf.py | 91 +++++++++++++++++++ tests/sdr/test_pluto.py | 185 +++++++++++++++++++++++++++++++++++++++ tests/sdr/test_usrp.py | 111 +++++++++++++++++++++++ 4 files changed, 565 insertions(+) create mode 100644 tests/sdr/test_blade.py create mode 100644 tests/sdr/test_hackrf.py create mode 100644 tests/sdr/test_pluto.py create mode 100644 tests/sdr/test_usrp.py diff --git a/tests/sdr/test_blade.py b/tests/sdr/test_blade.py new file mode 100644 index 0000000..fbc102b --- /dev/null +++ b/tests/sdr/test_blade.py @@ -0,0 +1,178 @@ +import subprocess +import numpy as np # type: ignore +import pytest # type: ignore + +from ria_toolkit_oss.datatypes.recording import Recording +from ria_toolkit_oss.sdr.blade import Blade + + +SAMPLE_RATE = int(1e6) +CENTER_FREQUENCY = int(3440e6) +CHANNEL = 0 +ABS_GAIN = 10 +REL_GAIN = -50 +t = np.linspace(0, 1, int(1e6 * 1), endpoint=False) +angular_frequency = 2 * np.pi * 1 +SINE_WAVE = 10 * np.exp(1j * angular_frequency * t) + + +def radio_connected() -> bool: + try: + # Example: check if a specific USB device is present + result = subprocess.run( + ["lsusb"], + capture_output=True, + text=True, + check=True + ) + return "bladeRF" in result.stdout + except Exception: + return False + + +@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") +def test_blade_rx_init(): + try: + rx_radio = Blade() + rx_radio.init_rx( + sample_rate=SAMPLE_RATE, + center_frequency=CENTER_FREQUENCY, + channel=CHANNEL, + gain=ABS_GAIN, + ) + assert str(rx_radio.rx_ch) == "Channel RX1" + assert int(rx_radio.rx_ch.sample_rate) == SAMPLE_RATE + assert int(rx_radio.rx_ch.frequency) == pytest.approx(CENTER_FREQUENCY, abs=5) + assert int(rx_radio.rx_ch.gain) == ABS_GAIN + finally: + rx_radio.close() + + +@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") +def test_blade_tx_init(): + try: + tx_radio = Blade() + tx_radio.init_tx( + sample_rate=SAMPLE_RATE, + center_frequency=CENTER_FREQUENCY, + channel=CHANNEL, + gain=ABS_GAIN, + ) + assert str(tx_radio.tx_ch) == "Channel TX1" + assert int(tx_radio.tx_ch.sample_rate) == SAMPLE_RATE + assert int(tx_radio.tx_ch.frequency) == pytest.approx(CENTER_FREQUENCY, abs=5) + assert int(tx_radio.tx_ch.gain) == ABS_GAIN + finally: + tx_radio.close() + + +@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") +def test_blade_rx_setters(): + try: + rx_radio = Blade() + rx_radio.init_rx( + sample_rate=SAMPLE_RATE, + center_frequency=CENTER_FREQUENCY, + channel=CHANNEL, + gain=ABS_GAIN, + ) + rx_radio._set_rx_channel(channel=1) + assert str(rx_radio.rx_ch) == "Channel RX2" + rx_radio._set_rx_buffer_size(buffer_size=4096) + assert int(rx_radio.rx_buffer_size) == 4096 + rx_radio._set_rx_center_frequency(center_frequency=int(3500e6)) + assert int(rx_radio.rx_ch.frequency) == pytest.approx(int(3500e6), abs=5) + rx_radio._set_rx_gain(channel=1, gain=20, gain_mode='absolute') + assert int(rx_radio.rx_ch.gain) == 20 + rx_radio._set_rx_sample_rate(sample_rate=int(2e6)) + assert int(rx_radio.rx_ch.sample_rate) == int(2e6) + finally: + rx_radio.close() + + +@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") +def test_blade_tx_setters(): + try: + tx_radio = Blade() + tx_radio.init_tx( + sample_rate=SAMPLE_RATE, + center_frequency=CENTER_FREQUENCY, + channel=CHANNEL, + gain=ABS_GAIN, + ) + tx_radio._set_tx_channel(channel=1) + assert str(tx_radio.tx_ch) == "Channel TX2" + tx_radio._set_tx_buffer_size(buffer_size=4096) + assert int(tx_radio.tx_buffer_size) == 4096 + tx_radio._set_tx_center_frequency(center_frequency=int(3500e6)) + assert int(tx_radio.tx_ch.frequency) == pytest.approx(int(3500e6), abs=5) + tx_radio._set_tx_gain(channel=1, gain=20, gain_mode='absolute') + assert int(tx_radio.tx_ch.gain) == 20 + tx_radio._set_tx_sample_rate(sample_rate=int(2e6)) + assert int(tx_radio.tx_ch.sample_rate) == int(2e6) + finally: + tx_radio.close() + + +@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") +def test_blade_relative_mode(): + try: + radio = Blade() + radio.init_rx( + sample_rate=SAMPLE_RATE, + center_frequency=CENTER_FREQUENCY, + channel=CHANNEL, + gain=REL_GAIN, + gain_mode='relative' + ) + assert int(radio.rx_ch.gain) == ABS_GAIN + radio.init_tx( + sample_rate=SAMPLE_RATE, + center_frequency=CENTER_FREQUENCY, + channel=CHANNEL, + gain=REL_GAIN, + gain_mode='relative' + ) + assert int(radio.tx_ch.gain) == ABS_GAIN + finally: + radio.close() + + +@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") +def test_blade_rx(): + try: + print('Beginning test of Blade rx...') + rx_radio = Blade() + rx_radio.init_rx( + sample_rate=SAMPLE_RATE, + center_frequency=CENTER_FREQUENCY, + channel=CHANNEL, + gain=ABS_GAIN, + ) + recording = rx_radio.record(num_samples=SAMPLE_RATE) + assert type(recording) is Recording + finally: + rx_radio.close() + + +@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") +def test_blade_tx(): + try: + tx_radio = Blade() + tx_radio.init_tx( + sample_rate=SAMPLE_RATE, + center_frequency=CENTER_FREQUENCY, + channel=CHANNEL, + gain=ABS_GAIN, + ) + recording = Recording( + data=SINE_WAVE, + metadata={'data': 'sine_wave'} + ) + tx_radio.tx_recording( + recording=recording, + num_samples=SAMPLE_RATE + ) + assert True + finally: + tx_radio.close() diff --git a/tests/sdr/test_hackrf.py b/tests/sdr/test_hackrf.py new file mode 100644 index 0000000..eb5b628 --- /dev/null +++ b/tests/sdr/test_hackrf.py @@ -0,0 +1,91 @@ +import subprocess +import numpy as np # type: ignore +import pytest # type: ignore + +from ria_toolkit_oss.datatypes.recording import Recording +from ria_toolkit_oss.sdr.hackrf import HackRF + + +SAMPLE_RATE = int(1e6) +CENTER_FREQUENCY = int(3440e6) +CHANNEL = 0 +ABS_GAIN = 10 +REL_GAIN = -37 +t = np.linspace(0, 1, int(1e6 * 1), endpoint=False) +angular_frequency = 2 * np.pi * 1 +SINE_WAVE = 10 * np.exp(1j * angular_frequency * t) + + +def radio_connected() -> bool: + try: + # Example: check if a specific USB device is present + result = subprocess.run( + ["lsusb"], + capture_output=True, + text=True, + check=True + ) + return "hackrf" in result.stdout.lower() + except Exception: + return False + + +@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") +def test_hackrf_relative_mode(): + try: + radio = HackRF() + radio.init_tx( + sample_rate=SAMPLE_RATE, + center_frequency=CENTER_FREQUENCY, + channel=CHANNEL, + gain=REL_GAIN, + gain_mode='relative' + ) + assert int(radio.radio.txvga_gain) == ABS_GAIN + finally: + radio.close() + + +@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") +def test_hackrf_rx(): + try: + rx_radio = HackRF() + try: + rx_radio.init_rx( + sample_rate=SAMPLE_RATE, + center_frequency=CENTER_FREQUENCY, + channel=CHANNEL, + gain=ABS_GAIN, + gain_mode='absolute' + ) + except NotImplementedError: + assert True + finally: + rx_radio.close() + + +@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") +def test_hackrf_tx(): + try: + tx_radio = HackRF() + tx_radio.init_tx( + sample_rate=SAMPLE_RATE, + center_frequency=CENTER_FREQUENCY, + channel=CHANNEL, + gain=ABS_GAIN, + ) + + max_val = np.max(np.abs(SINE_WAVE)) + data = SINE_WAVE / (max_val * 1.01) + recording = Recording( + data=SINE_WAVE, + metadata={'data': 'sine_wave'} + ) + + tx_radio.tx_recording( + recording=recording, + num_samples=SAMPLE_RATE + ) + assert True + finally: + tx_radio.close() diff --git a/tests/sdr/test_pluto.py b/tests/sdr/test_pluto.py new file mode 100644 index 0000000..4d9f73a --- /dev/null +++ b/tests/sdr/test_pluto.py @@ -0,0 +1,185 @@ +import subprocess +import numpy as np # type: ignore +import pytest # type: ignore + +from ria_toolkit_oss.datatypes.recording import Recording +from ria_toolkit_oss.sdr.pluto import Pluto + + +SAMPLE_RATE = int(1e6) +CENTER_FREQUENCY = int(3440e6) +CHANNEL = 0 +ABS_GAIN = 10 +REL_GAIN = -50 +t = np.linspace(0, 1, int(1e6 * 1), endpoint=False) +angular_frequency = 2 * np.pi * 1 +SINE_WAVE = 10 * np.exp(1j * angular_frequency * t) +CONSTANT_TONE = np.ones((1000), dtype=np.complex64) + + +def radio_connected() -> bool: + try: + # Example: check if a specific USB device is present + result = subprocess.run( + ["lsusb"], + capture_output=True, + text=True, + check=True + ) + return "pluto" in result.stdout.lower() + except Exception: + return False + + +@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") +def test_pluto_rx_setters(): + try: + rx_radio = Pluto() + rx_radio.init_rx( + sample_rate=SAMPLE_RATE, + center_frequency=CENTER_FREQUENCY, + channel=CHANNEL, + gain=ABS_GAIN, + ) + assert int(rx_radio.radio.sample_rate) == SAMPLE_RATE + assert int(rx_radio.radio.rx_lo) == pytest.approx(CENTER_FREQUENCY, abs=5) + assert rx_radio.radio.rx_enabled_channels == [0] + assert rx_radio.radio.rx_hardwaregain_chan0 == ABS_GAIN + + # rx_radio.set_rx_channel(channel=1) + # assert rx_radio.radio.rx_enabled_channels == [0, 1] + rx_radio.set_rx_center_frequency(center_frequency=int(3500e6)) + assert int(rx_radio.radio.rx_lo) == pytest.approx(int(3500e6), abs=5) + rx_radio.set_rx_gain(channel=0, gain=20) + assert rx_radio.radio.rx_hardwaregain_chan0 == 20 + rx_radio.set_rx_sample_rate(sample_rate=int(2e6)) + assert int(rx_radio.radio.sample_rate) == int(2e6) + finally: + rx_radio.close() + + +@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") +def test_pluto_tx_setters(): + try: + print('Beginning test of Pluto tx setters...') + tx_radio = Pluto() + tx_radio.init_tx( + sample_rate=SAMPLE_RATE, + center_frequency=CENTER_FREQUENCY, + channel=CHANNEL, + gain=-ABS_GAIN, + ) + assert int(tx_radio.radio.sample_rate) == SAMPLE_RATE + assert int(tx_radio.radio.tx_lo) == pytest.approx(CENTER_FREQUENCY, abs=5) + assert tx_radio.radio.tx_enabled_channels == [0] + assert tx_radio.radio.tx_hardwaregain_chan0 == -ABS_GAIN + + try: + tx_radio.set_tx_channel(channel=1) + except NotImplementedError: + assert True + try: + tx_radio.set_tx_buffer_size(buffer_size=4096) + except NotImplementedError: + assert True + tx_radio.set_tx_center_frequency(center_frequency=int(3500e6)) + assert int(tx_radio.radio.tx_lo) == pytest.approx(int(3500e6), abs=5) + tx_radio.set_tx_gain(channel=0, gain=-30) + assert int(tx_radio.radio.tx_hardwaregain_chan0) == -30 + tx_radio.set_tx_sample_rate(sample_rate=int(2e6)) + assert int(tx_radio.radio.sample_rate) == int(2e6) + finally: + tx_radio.close() + + +@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") +def test_pluto_relative_mode(): + try: + radio = Pluto() + radio.init_rx( + sample_rate=SAMPLE_RATE, + center_frequency=CENTER_FREQUENCY, + channel=CHANNEL, + gain=REL_GAIN, + gain_mode='relative' + ) + assert radio.radio.rx_hardwaregain_chan0 == (74 + REL_GAIN) + radio.init_tx( + sample_rate=SAMPLE_RATE, + center_frequency=CENTER_FREQUENCY, + channel=CHANNEL, + gain=REL_GAIN, + gain_mode='relative' + ) + assert radio.radio.tx_hardwaregain_chan0 == REL_GAIN + finally: + radio.close() + + +@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") +def test_pluto_rx(): + try: + rx_radio = Pluto() + rx_radio.init_rx( + sample_rate=SAMPLE_RATE, + center_frequency=CENTER_FREQUENCY, + channel=CHANNEL, + gain=ABS_GAIN, + ) + recording = rx_radio.record(num_samples=SAMPLE_RATE) + assert type(recording) is Recording + finally: + rx_radio.close() + + +@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") +def test_pluto_tx(): + try: + tx_radio = Pluto() + tx_radio.init_tx( + sample_rate=SAMPLE_RATE, + center_frequency=CENTER_FREQUENCY, + channel=CHANNEL, + gain=ABS_GAIN, + ) + recording = Recording( + data=SINE_WAVE, + metadata={'data': 'sine_wave'} + ) + tx_radio.tx_recording( + recording=recording, + num_samples=SAMPLE_RATE + ) + assert True + finally: + tx_radio.close() + + +@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") +def test_pluto_dual_tx(): + try: + tx_radio = Pluto() + try: + tx_radio.init_tx( + sample_rate=SAMPLE_RATE, + center_frequency=CENTER_FREQUENCY, + channel=1, + gain=ABS_GAIN, + ) + except AttributeError: + pytest.skip("Dual tx not available on connected Pluto device") + recording1 = Recording( + data=SINE_WAVE, + metadata={'data': 'sine_wave'} + ) + recording2 = Recording( + data=CONSTANT_TONE, + metadata={'data': 'constant_tone'} + ) + tx_radio.tx_recording( + recording=[recording1, recording2], + num_samples=SAMPLE_RATE, + ) + assert True + finally: + tx_radio.close() diff --git a/tests/sdr/test_usrp.py b/tests/sdr/test_usrp.py new file mode 100644 index 0000000..78102ce --- /dev/null +++ b/tests/sdr/test_usrp.py @@ -0,0 +1,111 @@ +import subprocess +import numpy as np # type: ignore +import pytest # type: ignore + +from ria_toolkit_oss.datatypes.recording import Recording +from ria_toolkit_oss.sdr.usrp import USRP + + +SAMPLE_RATE = int(1e6) +CENTER_FREQUENCY = int(3440e6) +CHANNEL = 0 +ABS_GAIN = 10 +REL_GAIN = -25 +t = np.linspace(0, 1, int(1e6 * 1), endpoint=False) +angular_frequency = 2 * np.pi * 1 +SINE_WAVE = 10 * np.exp(1j * angular_frequency * t) + + +def radio_connected() -> bool: + try: + # Example: check if a specific USB device is present + result = subprocess.run( + ["uhd_find_devices"], + capture_output=True, + text=True, + check=True + ) + return not "No UHD Devices Found" in result.stdout + except Exception: + return False + + +@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") +def test_usrp_clock_setter(): + try: + rx_radio = USRP() + rx_radio.init_rx( + sample_rate=SAMPLE_RATE, + center_frequency=CENTER_FREQUENCY, + channel=CHANNEL, + gain=ABS_GAIN, + ) + rx_radio.set_clock_source(source='external') + assert rx_radio.usrp.get_clock_source(0) == "external" + finally: + rx_radio.close() + + +@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") +def test_usrp_relative_mode(): + try: + radio = USRP() + radio.init_rx( + sample_rate=SAMPLE_RATE, + center_frequency=CENTER_FREQUENCY, + channel=CHANNEL, + gain=REL_GAIN, + gain_mode='relative' + ) + max_gain = radio.usrp.get_rx_gain_range().stop() + assert radio.rx_gain == (max_gain + REL_GAIN) + radio.init_tx( + sample_rate=SAMPLE_RATE, + center_frequency=CENTER_FREQUENCY, + channel=CHANNEL, + gain=REL_GAIN, + gain_mode='relative' + ) + max_gain = radio.usrp.get_tx_gain_range().stop() + assert radio.tx_gain == (max_gain + REL_GAIN) + finally: + radio.close() + + +@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") +def test_usrp_rx(): + try: + rx_radio = USRP() + rx_radio.init_rx( + sample_rate=SAMPLE_RATE, + center_frequency=CENTER_FREQUENCY, + channel=CHANNEL, + gain=ABS_GAIN, + ) + recording = rx_radio.record(num_samples=SAMPLE_RATE) + assert type(recording) is Recording + finally: + rx_radio.close() + + +@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") +def test_usrp_tx(): + try: + tx_radio = USRP() + tx_radio.init_tx( + sample_rate=SAMPLE_RATE, + center_frequency=CENTER_FREQUENCY, + channel=CHANNEL, + gain=ABS_GAIN, + ) + recording = Recording( + data=SINE_WAVE, + metadata={'data': 'sine_wave'} + ) + tx_radio.tx_recording( + recording=recording, + num_samples=SAMPLE_RATE + ) + assert True + finally: + tx_radio.close()