112 lines
3.1 KiB
Python
112 lines
3.1 KiB
Python
|
import subprocess
|
||
|
import numpy as np # type: ignore
|
||
|
import pytest # type: ignore
|
||
|
|
||
|
from ria_toolkit_oss.datatypes.recording import Recording
|
||
|
from ria_toolkit_oss.sdr.usrp import USRP
|
||
|
|
||
|
|
||
|
SAMPLE_RATE = int(1e6)
|
||
|
CENTER_FREQUENCY = int(3440e6)
|
||
|
CHANNEL = 0
|
||
|
ABS_GAIN = 10
|
||
|
REL_GAIN = -25
|
||
|
t = np.linspace(0, 1, int(1e6 * 1), endpoint=False)
|
||
|
angular_frequency = 2 * np.pi * 1
|
||
|
SINE_WAVE = 10 * np.exp(1j * angular_frequency * t)
|
||
|
|
||
|
|
||
|
def radio_connected() -> bool:
|
||
|
try:
|
||
|
# Example: check if a specific USB device is present
|
||
|
result = subprocess.run(
|
||
|
["uhd_find_devices"],
|
||
|
capture_output=True,
|
||
|
text=True,
|
||
|
check=True
|
||
|
)
|
||
|
return not "No UHD Devices Found" in result.stdout
|
||
|
except Exception:
|
||
|
return False
|
||
|
|
||
|
|
||
|
@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected")
|
||
|
def test_usrp_clock_setter():
|
||
|
try:
|
||
|
rx_radio = USRP()
|
||
|
rx_radio.init_rx(
|
||
|
sample_rate=SAMPLE_RATE,
|
||
|
center_frequency=CENTER_FREQUENCY,
|
||
|
channel=CHANNEL,
|
||
|
gain=ABS_GAIN,
|
||
|
)
|
||
|
rx_radio.set_clock_source(source='external')
|
||
|
assert rx_radio.usrp.get_clock_source(0) == "external"
|
||
|
finally:
|
||
|
rx_radio.close()
|
||
|
|
||
|
|
||
|
@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected")
|
||
|
def test_usrp_relative_mode():
|
||
|
try:
|
||
|
radio = USRP()
|
||
|
radio.init_rx(
|
||
|
sample_rate=SAMPLE_RATE,
|
||
|
center_frequency=CENTER_FREQUENCY,
|
||
|
channel=CHANNEL,
|
||
|
gain=REL_GAIN,
|
||
|
gain_mode='relative'
|
||
|
)
|
||
|
max_gain = radio.usrp.get_rx_gain_range().stop()
|
||
|
assert radio.rx_gain == (max_gain + REL_GAIN)
|
||
|
radio.init_tx(
|
||
|
sample_rate=SAMPLE_RATE,
|
||
|
center_frequency=CENTER_FREQUENCY,
|
||
|
channel=CHANNEL,
|
||
|
gain=REL_GAIN,
|
||
|
gain_mode='relative'
|
||
|
)
|
||
|
max_gain = radio.usrp.get_tx_gain_range().stop()
|
||
|
assert radio.tx_gain == (max_gain + REL_GAIN)
|
||
|
finally:
|
||
|
radio.close()
|
||
|
|
||
|
|
||
|
@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected")
|
||
|
def test_usrp_rx():
|
||
|
try:
|
||
|
rx_radio = USRP()
|
||
|
rx_radio.init_rx(
|
||
|
sample_rate=SAMPLE_RATE,
|
||
|
center_frequency=CENTER_FREQUENCY,
|
||
|
channel=CHANNEL,
|
||
|
gain=ABS_GAIN,
|
||
|
)
|
||
|
recording = rx_radio.record(num_samples=SAMPLE_RATE)
|
||
|
assert type(recording) is Recording
|
||
|
finally:
|
||
|
rx_radio.close()
|
||
|
|
||
|
|
||
|
@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected")
|
||
|
def test_usrp_tx():
|
||
|
try:
|
||
|
tx_radio = USRP()
|
||
|
tx_radio.init_tx(
|
||
|
sample_rate=SAMPLE_RATE,
|
||
|
center_frequency=CENTER_FREQUENCY,
|
||
|
channel=CHANNEL,
|
||
|
gain=ABS_GAIN,
|
||
|
)
|
||
|
recording = Recording(
|
||
|
data=SINE_WAVE,
|
||
|
metadata={'data': 'sine_wave'}
|
||
|
)
|
||
|
tx_radio.tx_recording(
|
||
|
recording=recording,
|
||
|
num_samples=SAMPLE_RATE
|
||
|
)
|
||
|
assert True
|
||
|
finally:
|
||
|
tx_radio.close()
|