import subprocess import numpy as np # type: ignore import pytest # type: ignore from ria_toolkit_oss.datatypes.recording import Recording from ria_toolkit_oss.sdr.blade import Blade SAMPLE_RATE = int(1e6) CENTER_FREQUENCY = int(3440e6) CHANNEL = 0 ABS_GAIN = 10 REL_GAIN = -50 t = np.linspace(0, 1, int(1e6 * 1), endpoint=False) angular_frequency = 2 * np.pi * 1 SINE_WAVE = 10 * np.exp(1j * angular_frequency * t) def radio_connected() -> bool: try: # Example: check if a specific USB device is present result = subprocess.run( ["lsusb"], capture_output=True, text=True, check=True ) return "bladeRF" in result.stdout except Exception: return False @pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") def test_blade_rx_init(): try: rx_radio = Blade() rx_radio.init_rx( sample_rate=SAMPLE_RATE, center_frequency=CENTER_FREQUENCY, channel=CHANNEL, gain=ABS_GAIN, ) assert str(rx_radio.rx_ch) == "Channel RX1" assert int(rx_radio.rx_ch.sample_rate) == SAMPLE_RATE assert int(rx_radio.rx_ch.frequency) == pytest.approx(CENTER_FREQUENCY, abs=5) assert int(rx_radio.rx_ch.gain) == ABS_GAIN finally: rx_radio.close() @pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") def test_blade_tx_init(): try: tx_radio = Blade() tx_radio.init_tx( sample_rate=SAMPLE_RATE, center_frequency=CENTER_FREQUENCY, channel=CHANNEL, gain=ABS_GAIN, ) assert str(tx_radio.tx_ch) == "Channel TX1" assert int(tx_radio.tx_ch.sample_rate) == SAMPLE_RATE assert int(tx_radio.tx_ch.frequency) == pytest.approx(CENTER_FREQUENCY, abs=5) assert int(tx_radio.tx_ch.gain) == ABS_GAIN finally: tx_radio.close() @pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") def test_blade_rx_setters(): try: rx_radio = Blade() rx_radio.init_rx( sample_rate=SAMPLE_RATE, center_frequency=CENTER_FREQUENCY, channel=CHANNEL, gain=ABS_GAIN, ) rx_radio._set_rx_channel(channel=1) assert str(rx_radio.rx_ch) == "Channel RX2" rx_radio._set_rx_buffer_size(buffer_size=4096) assert int(rx_radio.rx_buffer_size) == 4096 rx_radio._set_rx_center_frequency(center_frequency=int(3500e6)) assert int(rx_radio.rx_ch.frequency) == pytest.approx(int(3500e6), abs=5) rx_radio._set_rx_gain(channel=1, gain=20, gain_mode='absolute') assert int(rx_radio.rx_ch.gain) == 20 rx_radio._set_rx_sample_rate(sample_rate=int(2e6)) assert int(rx_radio.rx_ch.sample_rate) == int(2e6) finally: rx_radio.close() @pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") def test_blade_tx_setters(): try: tx_radio = Blade() tx_radio.init_tx( sample_rate=SAMPLE_RATE, center_frequency=CENTER_FREQUENCY, channel=CHANNEL, gain=ABS_GAIN, ) tx_radio._set_tx_channel(channel=1) assert str(tx_radio.tx_ch) == "Channel TX2" tx_radio._set_tx_buffer_size(buffer_size=4096) assert int(tx_radio.tx_buffer_size) == 4096 tx_radio._set_tx_center_frequency(center_frequency=int(3500e6)) assert int(tx_radio.tx_ch.frequency) == pytest.approx(int(3500e6), abs=5) tx_radio._set_tx_gain(channel=1, gain=20, gain_mode='absolute') assert int(tx_radio.tx_ch.gain) == 20 tx_radio._set_tx_sample_rate(sample_rate=int(2e6)) assert int(tx_radio.tx_ch.sample_rate) == int(2e6) finally: tx_radio.close() @pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") def test_blade_relative_mode(): try: radio = Blade() radio.init_rx( sample_rate=SAMPLE_RATE, center_frequency=CENTER_FREQUENCY, channel=CHANNEL, gain=REL_GAIN, gain_mode='relative' ) assert int(radio.rx_ch.gain) == ABS_GAIN radio.init_tx( sample_rate=SAMPLE_RATE, center_frequency=CENTER_FREQUENCY, channel=CHANNEL, gain=REL_GAIN, gain_mode='relative' ) assert int(radio.tx_ch.gain) == ABS_GAIN finally: radio.close() @pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") def test_blade_rx(): try: print('Beginning test of Blade rx...') rx_radio = Blade() rx_radio.init_rx( sample_rate=SAMPLE_RATE, center_frequency=CENTER_FREQUENCY, channel=CHANNEL, gain=ABS_GAIN, ) recording = rx_radio.record(num_samples=SAMPLE_RATE) assert type(recording) is Recording finally: rx_radio.close() @pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") def test_blade_tx(): try: tx_radio = Blade() tx_radio.init_tx( sample_rate=SAMPLE_RATE, center_frequency=CENTER_FREQUENCY, channel=CHANNEL, gain=ABS_GAIN, ) recording = Recording( data=SINE_WAVE, metadata={'data': 'sine_wave'} ) tx_radio.tx_recording( recording=recording, num_samples=SAMPLE_RATE ) assert True finally: tx_radio.close()