import subprocess import numpy as np # type: ignore import pytest # type: ignore from ria_toolkit_oss.datatypes.recording import Recording from ria_toolkit_oss.sdr.pluto import Pluto SAMPLE_RATE = int(1e6) CENTER_FREQUENCY = int(3440e6) CHANNEL = 0 ABS_GAIN = 10 REL_GAIN = -50 t = np.linspace(0, 1, int(1e6 * 1), endpoint=False) angular_frequency = 2 * np.pi * 1 SINE_WAVE = 10 * np.exp(1j * angular_frequency * t) CONSTANT_TONE = np.ones((int(1e6)), dtype=np.complex64) def radio_connected() -> bool: try: # Example: check if a specific USB device is present result = subprocess.run(["lsusb"], capture_output=True, text=True, check=True) return "pluto" in result.stdout.lower() except Exception: return False @pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") def test_pluto_rx_setters(): try: rx_radio = Pluto() rx_radio.init_rx( sample_rate=SAMPLE_RATE, center_frequency=CENTER_FREQUENCY, channel=CHANNEL, gain=ABS_GAIN, ) assert int(rx_radio.radio.sample_rate) == SAMPLE_RATE assert int(rx_radio.radio.rx_lo) == pytest.approx(CENTER_FREQUENCY, abs=5) assert rx_radio.radio.rx_enabled_channels == [0] assert rx_radio.radio.rx_hardwaregain_chan0 == ABS_GAIN # rx_radio.set_rx_channel(channel=1) # assert rx_radio.radio.rx_enabled_channels == [0, 1] rx_radio.set_rx_center_frequency(center_frequency=int(3500e6)) assert int(rx_radio.radio.rx_lo) == pytest.approx(int(3500e6), abs=5) rx_radio.set_rx_gain(channel=0, gain=20) assert rx_radio.radio.rx_hardwaregain_chan0 == 20 rx_radio.set_rx_sample_rate(sample_rate=int(2e6)) assert int(rx_radio.radio.sample_rate) == int(2e6) finally: rx_radio.close() @pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") def test_pluto_tx_setters(): try: print("Beginning test of Pluto tx setters...") tx_radio = Pluto() tx_radio.init_tx( sample_rate=SAMPLE_RATE, center_frequency=CENTER_FREQUENCY, channel=CHANNEL, gain=-ABS_GAIN, ) assert int(tx_radio.radio.sample_rate) == SAMPLE_RATE assert int(tx_radio.radio.tx_lo) == pytest.approx(CENTER_FREQUENCY, abs=5) assert tx_radio.radio.tx_enabled_channels == [0] assert tx_radio.radio.tx_hardwaregain_chan0 == -ABS_GAIN try: tx_radio.set_tx_channel(channel=1) except NotImplementedError: assert True try: tx_radio.set_tx_buffer_size(buffer_size=4096) except NotImplementedError: assert True tx_radio.set_tx_center_frequency(center_frequency=int(3500e6)) assert int(tx_radio.radio.tx_lo) == pytest.approx(int(3500e6), abs=5) tx_radio.set_tx_gain(channel=0, gain=-30) assert int(tx_radio.radio.tx_hardwaregain_chan0) == -30 tx_radio.set_tx_sample_rate(sample_rate=int(2e6)) assert int(tx_radio.radio.sample_rate) == int(2e6) finally: tx_radio.close() @pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") def test_pluto_relative_mode(): try: radio = Pluto() radio.init_rx( sample_rate=SAMPLE_RATE, center_frequency=CENTER_FREQUENCY, channel=CHANNEL, gain=REL_GAIN, gain_mode="relative", ) assert radio.radio.rx_hardwaregain_chan0 == (74 + REL_GAIN) radio.init_tx( sample_rate=SAMPLE_RATE, center_frequency=CENTER_FREQUENCY, channel=CHANNEL, gain=REL_GAIN, gain_mode="relative", ) assert radio.radio.tx_hardwaregain_chan0 == REL_GAIN finally: radio.close() @pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") def test_pluto_rx(): try: rx_radio = Pluto() rx_radio.init_rx( sample_rate=SAMPLE_RATE, center_frequency=CENTER_FREQUENCY, channel=CHANNEL, gain=ABS_GAIN, ) recording = rx_radio.record(num_samples=SAMPLE_RATE) assert type(recording) is Recording finally: rx_radio.close() @pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") def test_pluto_tx(): try: tx_radio = Pluto() tx_radio.init_tx( sample_rate=SAMPLE_RATE, center_frequency=CENTER_FREQUENCY, channel=CHANNEL, gain=ABS_GAIN, ) recording = Recording(data=SINE_WAVE, metadata={"data": "sine_wave"}) tx_radio.tx_recording(recording=recording, num_samples=SAMPLE_RATE) assert True finally: tx_radio.close() @pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") def test_pluto_dual_tx(): try: tx_radio = Pluto() try: tx_radio.init_tx( sample_rate=SAMPLE_RATE, center_frequency=CENTER_FREQUENCY, channel=1, gain=ABS_GAIN, ) except AttributeError: pytest.skip("Dual tx not available on connected Pluto device") recording1 = Recording(data=SINE_WAVE, metadata={"data": "sine_wave"}) recording2 = Recording(data=CONSTANT_TONE, metadata={"data": "constant_tone"}) tx_radio.tx_recording( recording=[recording1, recording2], num_samples=SAMPLE_RATE, ) assert True finally: tx_radio.close()