diff --git a/tests/sdr/test_blade.py b/tests/sdr/test_blade.py deleted file mode 100644 index 43be3ce..0000000 --- a/tests/sdr/test_blade.py +++ /dev/null @@ -1,167 +0,0 @@ -import subprocess - -import numpy as np # type: ignore -import pytest # type: ignore - -from ria_toolkit_oss.datatypes.recording import Recording -from ria_toolkit_oss.sdr.blade import Blade - -SAMPLE_RATE = int(1e6) -CENTER_FREQUENCY = int(3440e6) -CHANNEL = 0 -ABS_GAIN = 10 -REL_GAIN = -50 -t = np.linspace(0, 1, int(1e6 * 1), endpoint=False) -angular_frequency = 2 * np.pi * 1 -SINE_WAVE = 10 * np.exp(1j * angular_frequency * t) - - -def radio_connected() -> bool: - try: - # Example: check if a specific USB device is present - result = subprocess.run(["lsusb"], capture_output=True, text=True, check=True) - return "bladeRF" in result.stdout - except Exception: - return False - - -@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") -def test_blade_rx_init(): - try: - rx_radio = Blade() - rx_radio.init_rx( - sample_rate=SAMPLE_RATE, - center_frequency=CENTER_FREQUENCY, - channel=CHANNEL, - gain=ABS_GAIN, - ) - assert str(rx_radio.rx_ch) == "Channel RX1" - assert int(rx_radio.rx_ch.sample_rate) == SAMPLE_RATE - assert int(rx_radio.rx_ch.frequency) == pytest.approx(CENTER_FREQUENCY, abs=5) - assert int(rx_radio.rx_ch.gain) == ABS_GAIN - finally: - rx_radio.close() - - -@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") -def test_blade_tx_init(): - try: - tx_radio = Blade() - tx_radio.init_tx( - sample_rate=SAMPLE_RATE, - center_frequency=CENTER_FREQUENCY, - channel=CHANNEL, - gain=ABS_GAIN, - ) - assert str(tx_radio.tx_ch) == "Channel TX1" - assert int(tx_radio.tx_ch.sample_rate) == SAMPLE_RATE - assert int(tx_radio.tx_ch.frequency) == pytest.approx(CENTER_FREQUENCY, abs=5) - assert int(tx_radio.tx_ch.gain) == ABS_GAIN - finally: - tx_radio.close() - - -@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") -def test_blade_rx_setters(): - try: - rx_radio = Blade() - rx_radio.init_rx( - sample_rate=SAMPLE_RATE, - center_frequency=CENTER_FREQUENCY, - channel=CHANNEL, - gain=ABS_GAIN, - ) - rx_radio._set_rx_channel(channel=1) - assert str(rx_radio.rx_ch) == "Channel RX2" - rx_radio._set_rx_buffer_size(buffer_size=4096) - assert int(rx_radio.rx_buffer_size) == 4096 - rx_radio._set_rx_center_frequency(center_frequency=int(3500e6)) - assert int(rx_radio.rx_ch.frequency) == pytest.approx(int(3500e6), abs=5) - rx_radio._set_rx_gain(channel=1, gain=20, gain_mode="absolute") - assert int(rx_radio.rx_ch.gain) == 20 - rx_radio._set_rx_sample_rate(sample_rate=int(2e6)) - assert int(rx_radio.rx_ch.sample_rate) == int(2e6) - finally: - rx_radio.close() - - -@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") -def test_blade_tx_setters(): - try: - tx_radio = Blade() - tx_radio.init_tx( - sample_rate=SAMPLE_RATE, - center_frequency=CENTER_FREQUENCY, - channel=CHANNEL, - gain=ABS_GAIN, - ) - tx_radio._set_tx_channel(channel=1) - assert str(tx_radio.tx_ch) == "Channel TX2" - tx_radio._set_tx_buffer_size(buffer_size=4096) - assert int(tx_radio.tx_buffer_size) == 4096 - tx_radio._set_tx_center_frequency(center_frequency=int(3500e6)) - assert int(tx_radio.tx_ch.frequency) == pytest.approx(int(3500e6), abs=5) - tx_radio._set_tx_gain(channel=1, gain=20, gain_mode="absolute") - assert int(tx_radio.tx_ch.gain) == 20 - tx_radio._set_tx_sample_rate(sample_rate=int(2e6)) - assert int(tx_radio.tx_ch.sample_rate) == int(2e6) - finally: - tx_radio.close() - - -@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") -def test_blade_relative_mode(): - try: - radio = Blade() - radio.init_rx( - sample_rate=SAMPLE_RATE, - center_frequency=CENTER_FREQUENCY, - channel=CHANNEL, - gain=REL_GAIN, - gain_mode="relative", - ) - assert int(radio.rx_ch.gain) == ABS_GAIN - radio.init_tx( - sample_rate=SAMPLE_RATE, - center_frequency=CENTER_FREQUENCY, - channel=CHANNEL, - gain=REL_GAIN, - gain_mode="relative", - ) - assert int(radio.tx_ch.gain) == ABS_GAIN - finally: - radio.close() - - -@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") -def test_blade_rx(): - try: - print("Beginning test of Blade rx...") - rx_radio = Blade() - rx_radio.init_rx( - sample_rate=SAMPLE_RATE, - center_frequency=CENTER_FREQUENCY, - channel=CHANNEL, - gain=ABS_GAIN, - ) - recording = rx_radio.record(num_samples=SAMPLE_RATE) - assert type(recording) is Recording - finally: - rx_radio.close() - - -@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") -def test_blade_tx(): - try: - tx_radio = Blade() - tx_radio.init_tx( - sample_rate=SAMPLE_RATE, - center_frequency=CENTER_FREQUENCY, - channel=CHANNEL, - gain=ABS_GAIN, - ) - recording = Recording(data=SINE_WAVE, metadata={"data": "sine_wave"}) - tx_radio.tx_recording(recording=recording, num_samples=SAMPLE_RATE) - assert True - finally: - tx_radio.close() diff --git a/tests/sdr/test_hackrf.py b/tests/sdr/test_hackrf.py deleted file mode 100644 index 164432a..0000000 --- a/tests/sdr/test_hackrf.py +++ /dev/null @@ -1,77 +0,0 @@ -import subprocess - -import numpy as np # type: ignore -import pytest # type: ignore - -from ria_toolkit_oss.datatypes.recording import Recording -from ria_toolkit_oss.sdr.hackrf import HackRF - -SAMPLE_RATE = int(1e6) -CENTER_FREQUENCY = int(3440e6) -CHANNEL = 0 -ABS_GAIN = 10 -REL_GAIN = -37 -t = np.linspace(0, 1, int(1e6 * 1), endpoint=False) -angular_frequency = 2 * np.pi * 1 -SINE_WAVE = 10 * np.exp(1j * angular_frequency * t) - - -def radio_connected() -> bool: - try: - # Example: check if a specific USB device is present - result = subprocess.run(["lsusb"], capture_output=True, text=True, check=True) - return "hackrf" in result.stdout.lower() - except Exception: - return False - - -@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") -def test_hackrf_relative_mode(): - try: - radio = HackRF() - radio.init_tx( - sample_rate=SAMPLE_RATE, - center_frequency=CENTER_FREQUENCY, - channel=CHANNEL, - gain=REL_GAIN, - gain_mode="relative", - ) - assert int(radio.radio.txvga_gain) == ABS_GAIN - finally: - radio.close() - - -@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") -def test_hackrf_rx(): - try: - rx_radio = HackRF() - try: - rx_radio.init_rx( - sample_rate=SAMPLE_RATE, - center_frequency=CENTER_FREQUENCY, - channel=CHANNEL, - gain=ABS_GAIN, - gain_mode="absolute", - ) - except NotImplementedError: - assert True - finally: - rx_radio.close() - - -@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") -def test_hackrf_tx(): - try: - tx_radio = HackRF() - tx_radio.init_tx( - sample_rate=SAMPLE_RATE, - center_frequency=CENTER_FREQUENCY, - channel=CHANNEL, - gain=ABS_GAIN, - ) - recording = Recording(data=SINE_WAVE, metadata={"data": "sine_wave"}) - - tx_radio.tx_recording(recording=recording, num_samples=SAMPLE_RATE) - assert True - finally: - tx_radio.close() diff --git a/tests/sdr/test_pluto.py b/tests/sdr/test_pluto.py deleted file mode 100644 index ed0320d..0000000 --- a/tests/sdr/test_pluto.py +++ /dev/null @@ -1,168 +0,0 @@ -import subprocess - -import numpy as np # type: ignore -import pytest # type: ignore - -from ria_toolkit_oss.datatypes.recording import Recording -from ria_toolkit_oss.sdr.pluto import Pluto - -SAMPLE_RATE = int(1e6) -CENTER_FREQUENCY = int(3440e6) -CHANNEL = 0 -ABS_GAIN = 10 -REL_GAIN = -50 -t = np.linspace(0, 1, int(1e6 * 1), endpoint=False) -angular_frequency = 2 * np.pi * 1 -SINE_WAVE = 10 * np.exp(1j * angular_frequency * t) -CONSTANT_TONE = np.ones((int(1e6)), dtype=np.complex64) - - -def radio_connected() -> bool: - try: - # Example: check if a specific USB device is present - result = subprocess.run(["lsusb"], capture_output=True, text=True, check=True) - return "pluto" in result.stdout.lower() - except Exception: - return False - - -@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") -def test_pluto_rx_setters(): - try: - rx_radio = Pluto() - rx_radio.init_rx( - sample_rate=SAMPLE_RATE, - center_frequency=CENTER_FREQUENCY, - channel=CHANNEL, - gain=ABS_GAIN, - ) - assert int(rx_radio.radio.sample_rate) == SAMPLE_RATE - assert int(rx_radio.radio.rx_lo) == pytest.approx(CENTER_FREQUENCY, abs=5) - assert rx_radio.radio.rx_enabled_channels == [0] - assert rx_radio.radio.rx_hardwaregain_chan0 == ABS_GAIN - - # rx_radio.set_rx_channel(channel=1) - # assert rx_radio.radio.rx_enabled_channels == [0, 1] - rx_radio.set_rx_center_frequency(center_frequency=int(3500e6)) - assert int(rx_radio.radio.rx_lo) == pytest.approx(int(3500e6), abs=5) - rx_radio.set_rx_gain(channel=0, gain=20) - assert rx_radio.radio.rx_hardwaregain_chan0 == 20 - rx_radio.set_rx_sample_rate(sample_rate=int(2e6)) - assert int(rx_radio.radio.sample_rate) == int(2e6) - finally: - rx_radio.close() - - -@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") -def test_pluto_tx_setters(): - try: - print("Beginning test of Pluto tx setters...") - tx_radio = Pluto() - tx_radio.init_tx( - sample_rate=SAMPLE_RATE, - center_frequency=CENTER_FREQUENCY, - channel=CHANNEL, - gain=-ABS_GAIN, - ) - assert int(tx_radio.radio.sample_rate) == SAMPLE_RATE - assert int(tx_radio.radio.tx_lo) == pytest.approx(CENTER_FREQUENCY, abs=5) - assert tx_radio.radio.tx_enabled_channels == [0] - assert tx_radio.radio.tx_hardwaregain_chan0 == -ABS_GAIN - - try: - tx_radio.set_tx_channel(channel=1) - except NotImplementedError: - assert True - try: - tx_radio.set_tx_buffer_size(buffer_size=4096) - except NotImplementedError: - assert True - tx_radio.set_tx_center_frequency(center_frequency=int(3500e6)) - assert int(tx_radio.radio.tx_lo) == pytest.approx(int(3500e6), abs=5) - tx_radio.set_tx_gain(channel=0, gain=-30) - assert int(tx_radio.radio.tx_hardwaregain_chan0) == -30 - tx_radio.set_tx_sample_rate(sample_rate=int(2e6)) - assert int(tx_radio.radio.sample_rate) == int(2e6) - finally: - tx_radio.close() - - -@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") -def test_pluto_relative_mode(): - try: - radio = Pluto() - radio.init_rx( - sample_rate=SAMPLE_RATE, - center_frequency=CENTER_FREQUENCY, - channel=CHANNEL, - gain=REL_GAIN, - gain_mode="relative", - ) - assert radio.radio.rx_hardwaregain_chan0 == (74 + REL_GAIN) - radio.init_tx( - sample_rate=SAMPLE_RATE, - center_frequency=CENTER_FREQUENCY, - channel=CHANNEL, - gain=REL_GAIN, - gain_mode="relative", - ) - assert radio.radio.tx_hardwaregain_chan0 == REL_GAIN - finally: - radio.close() - - -@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") -def test_pluto_rx(): - try: - rx_radio = Pluto() - rx_radio.init_rx( - sample_rate=SAMPLE_RATE, - center_frequency=CENTER_FREQUENCY, - channel=CHANNEL, - gain=ABS_GAIN, - ) - recording = rx_radio.record(num_samples=SAMPLE_RATE) - assert type(recording) is Recording - finally: - rx_radio.close() - - -@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") -def test_pluto_tx(): - try: - tx_radio = Pluto() - tx_radio.init_tx( - sample_rate=SAMPLE_RATE, - center_frequency=CENTER_FREQUENCY, - channel=CHANNEL, - gain=ABS_GAIN, - ) - recording = Recording(data=SINE_WAVE, metadata={"data": "sine_wave"}) - tx_radio.tx_recording(recording=recording, num_samples=SAMPLE_RATE) - assert True - finally: - tx_radio.close() - - -@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") -def test_pluto_dual_tx(): - try: - tx_radio = Pluto() - try: - tx_radio.init_tx( - sample_rate=SAMPLE_RATE, - center_frequency=CENTER_FREQUENCY, - channel=1, - gain=ABS_GAIN, - ) - except AttributeError: - pytest.skip("Dual tx not available on connected Pluto device") - recording1 = Recording(data=SINE_WAVE, metadata={"data": "sine_wave"}) - recording2 = Recording(data=CONSTANT_TONE, metadata={"data": "constant_tone"}) - tx_radio.tx_recording( - recording=[recording1, recording2], - num_samples=SAMPLE_RATE, - ) - assert True - finally: - tx_radio.close() diff --git a/tests/sdr/test_usrp.py b/tests/sdr/test_usrp.py deleted file mode 100644 index 4ed26e3..0000000 --- a/tests/sdr/test_usrp.py +++ /dev/null @@ -1,100 +0,0 @@ -import subprocess - -import numpy as np # type: ignore -import pytest # type: ignore - -from ria_toolkit_oss.datatypes.recording import Recording -from ria_toolkit_oss.sdr.usrp import USRP - -SAMPLE_RATE = int(1e6) -CENTER_FREQUENCY = int(3440e6) -CHANNEL = 0 -ABS_GAIN = 10 -REL_GAIN = -25 -t = np.linspace(0, 1, int(1e6 * 1), endpoint=False) -angular_frequency = 2 * np.pi * 1 -SINE_WAVE = 10 * np.exp(1j * angular_frequency * t) - - -def radio_connected() -> bool: - try: - # Example: check if a specific USB device is present - result = subprocess.run(["uhd_find_devices"], capture_output=True, text=True, check=True) - return "No UHD Devices Found" not in result.stdout - except Exception: - return False - - -@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") -def test_usrp_clock_setter(): - try: - rx_radio = USRP() - rx_radio.init_rx( - sample_rate=SAMPLE_RATE, - center_frequency=CENTER_FREQUENCY, - channel=CHANNEL, - gain=ABS_GAIN, - ) - rx_radio.set_clock_source(source="external") - assert rx_radio.usrp.get_clock_source(0) == "external" - finally: - rx_radio.close() - - -@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") -def test_usrp_relative_mode(): - try: - radio = USRP() - radio.init_rx( - sample_rate=SAMPLE_RATE, - center_frequency=CENTER_FREQUENCY, - channel=CHANNEL, - gain=REL_GAIN, - gain_mode="relative", - ) - max_gain = radio.usrp.get_rx_gain_range().stop() - assert radio.rx_gain == (max_gain + REL_GAIN) - radio.init_tx( - sample_rate=SAMPLE_RATE, - center_frequency=CENTER_FREQUENCY, - channel=CHANNEL, - gain=REL_GAIN, - gain_mode="relative", - ) - max_gain = radio.usrp.get_tx_gain_range().stop() - assert radio.tx_gain == (max_gain + REL_GAIN) - finally: - radio.close() - - -@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") -def test_usrp_rx(): - try: - rx_radio = USRP() - rx_radio.init_rx( - sample_rate=SAMPLE_RATE, - center_frequency=CENTER_FREQUENCY, - channel=CHANNEL, - gain=ABS_GAIN, - ) - recording = rx_radio.record(num_samples=SAMPLE_RATE) - assert type(recording) is Recording - finally: - rx_radio.close() - - -@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") -def test_usrp_tx(): - try: - tx_radio = USRP() - tx_radio.init_tx( - sample_rate=SAMPLE_RATE, - center_frequency=CENTER_FREQUENCY, - channel=CHANNEL, - gain=ABS_GAIN, - ) - recording = Recording(data=SINE_WAVE, metadata={"data": "sine_wave"}) - tx_radio.tx_recording(recording=recording, num_samples=SAMPLE_RATE) - assert True - finally: - tx_radio.close()