From cc373a89f6c1265e4be74c1d294b3cc77a46d090 Mon Sep 17 00:00:00 2001 From: gillian Date: Mon, 29 Sep 2025 16:24:18 -0400 Subject: [PATCH 1/8] sdr_guides instructions updated for compatibility with radioconda --- docs/source/sdr_guides/blade.rst | 29 ++++++--------- docs/source/sdr_guides/hackrf.rst | 39 +++++++++++++++++--- docs/source/sdr_guides/pluto.rst | 59 ++++++++++++++++++++----------- docs/source/sdr_guides/rtl.rst | 53 +++++++++++++++++++++++++++ docs/source/sdr_guides/usrp.rst | 57 +++++++++++++++++------------ 5 files changed, 172 insertions(+), 65 deletions(-) diff --git a/docs/source/sdr_guides/blade.rst b/docs/source/sdr_guides/blade.rst index 8f9197e..b37c18e 100644 --- a/docs/source/sdr_guides/blade.rst +++ b/docs/source/sdr_guides/blade.rst @@ -40,7 +40,7 @@ Limitations - USB 3.0 connectivity is required for optimal performance; using USB 2.0 will significantly limit data transfer rates. -Set up instructions (Linux) +Set up instructions (Linux, Radioconda) --------------------------- Step 1: Install the base dependencies and drivers ('Easy method') @@ -54,32 +54,25 @@ Step 1: Install the base dependencies and drivers ('Easy method') sudo apt-get install bladerf-fpga-hostedxa4 # Necessary for installation of bladeRF 2.0 Micro A4. -Step 2: Create and/or activate your virtual environment +Step 2: Activate your Radioconda environment. .. code-block:: bash - python3 -m venv venv - pip install -r requirements.txt # If relevant - source venv/bin/activate + conda activate - -Step 3: from within the virtual environment, clone the bladerf host repo, then build and install the wheel for -bladerf python bindings. +Step 3: Install a udev rule by creating a link into your radioconda installation. .. code-block:: bash - cd ~ - mkdir workarea - cd workarea - git clone --depth 1 https://github.com/Nuand/bladeRF.git - cd bladeRF/host - cd libraries/libbladeRF_bindings/python - sudo python3 setup.py bdist_wheel - pip install dist/*.whl + sudo ln -s $CONDA_PREFIX/lib/udev/rules.d/88-nuand-bladerf1.rules /etc/udev/rules.d/88-radioconda-nuand-bladerf1.rules + sudo ln -s $CONDA_PREFIX/lib/udev/rules.d/88-nuand-bladerf2.rules /etc/udev/rules.d/88-radioconda-nuand-bladerf2.rules + sudo ln -s $CONDA_PREFIX/lib/udev/rules.d/88-nuand-bootloader.rules /etc/udev/rules.d/88-radioconda-nuand-bootloader.rules + sudo udevadm control --reload + sudo udevadm trigger Further Information ------------------- - `Official Website `_ -- `BladeRF Documentation `_ -- `GitHub Repository `_ +- `BladeRF GitHub Repository `_ +- `Radioconda Github Repository ` diff --git a/docs/source/sdr_guides/hackrf.rst b/docs/source/sdr_guides/hackrf.rst index f7a1b8f..741e140 100644 --- a/docs/source/sdr_guides/hackrf.rst +++ b/docs/source/sdr_guides/hackrf.rst @@ -39,18 +39,47 @@ Limitations - Bandwidth is limited to 20 MHz. - USB 2.0 connectivity might limit data transfer rates compared to USB 3.0 or Ethernet-based SDRs. -Set up instructions (Linux) +Set up instructions (Linux, Radioconda) +--------------------------- + +Step 1: Activate your Radioconda environment. + +.. code-block:: bash + + conda activate + +Step 2: Install the System Package (Ubuntu / Debian) + +.. code-block:: bash + + sudo apt-get update + sudo apt-get install hackrf + +Step 3: Install a udev rule by creating a link into your radioconda installation. + +.. code-block:: bash + + sudo ln -s $CONDA_PREFIX/lib/udev/rules.d/53-hackrf.rules /etc/udev/rules.d/53-radioconda-hackrf.rules + sudo udevadm control --reload + sudo udevadm trigger + +Make sure your user account belongs to the plugdev group in order to access your device: + +.. code-block:: bash + + sudo usermod -a -G plugdev + +You may have to restart for this change to take effect. + --------------------------- `HackRF Software Installation Guide `_ -.. todo:: - - Addition HackRF installation instructions Further information ------------------- - `Official Website `_ - `Project Documentation `_ -- `GitHub Repository `_ +- `HackRF GitHub Repository `_ +- `Radioconda Github Repository ` diff --git a/docs/source/sdr_guides/pluto.rst b/docs/source/sdr_guides/pluto.rst index f1cf70e..732c4c8 100644 --- a/docs/source/sdr_guides/pluto.rst +++ b/docs/source/sdr_guides/pluto.rst @@ -43,28 +43,49 @@ Limitations affect stability. - USB 2.0 connectivity might limit data transfer rates compared to USB 3.0 or Ethernet-based SDRs. -Set up instructions (Linux) +Set up instructions (Linux, Radioconda) --------------------------- -The PlutoSDR Python API can be installed via pip. To build and install the drivers from source, see the instructions below: +Step 1: Activate your Radioconda environment. .. code-block:: bash - # Install required packages - sudo apt-get update - sudo apt-get install -y \ - build-essential \ - git \ - libxml2-dev \ - bison \ - flex \ - libcdk5-dev \ - cmake \ - python3-pip \ - libusb-1.0-0-dev \ - libavahi-client-dev \ - libavahi-common-dev \ - libaio-dev + conda activate + +Step 2: Install system dependancies + +.. code-block:: bash + + sudo apt-get update + sudo apt-get install -y \ + build-essential \ + git \ + libxml2-dev \ + bison \ + flex \ + libcdk5-dev \ + cmake \ + libusb-1.0-0-dev \ + libavahi-client-dev \ + libavahi-common-dev \ + libaio-dev + + +Step 3: Install a udev rule by creating a link into your radioconda installation. + +.. code-block:: bash + + sudo ln -s $CONDA_PREFIX/lib/udev/rules.d/90-libiio.rules /etc/udev/rules.d/90-radioconda-libiio.rules + sudo udevadm control --reload + sudo udevadm trigger + +Once you can talk to the hardware, you may want to perform the post-install steps detailed on the PlutoSDR documentation below. + +Step 4 (Optional): Building libiio or libad9361-iio from source. + +Only needed if you want the very latest version of the libraries not provided in Radioconda: + +.. code-block:: bash # Clone and build libiio cd ~ @@ -87,10 +108,8 @@ The PlutoSDR Python API can be installed via pip. To build and install the drive make -j"$(nproc)" sudo make install - # Install Python bindings - pip install pyadi-iio - Further information ------------------- - `PlutoSDR Documentation `_ +- `Radioconda Github Repository ` \ No newline at end of file diff --git a/docs/source/sdr_guides/rtl.rst b/docs/source/sdr_guides/rtl.rst index d11132a..ed3d8e9 100644 --- a/docs/source/sdr_guides/rtl.rst +++ b/docs/source/sdr_guides/rtl.rst @@ -33,6 +33,59 @@ Limitations - Sensitivity and performance can vary depending on the specific model and components. - Requires external software for signal processing and analysis. +Set up instructions (Linux, Radioconda) +--------------------------- + +Step 1: Activate your Radioconda environment. + +.. code-block:: bash + + conda activate + +Step 2: Purge drivers. + +If you already have some other drivers installed, purge them from your system. + +.. code-block:: bash + + sudo apt purge ^librtlsdr + sudo rm -rvf /usr/lib/librtlsdr* + sudo rm -rvf /usr/include/rtl-sdr* + sudo rm -rvf /usr/local/lib/librtlsdr* + sudo rm -rvf /usr/local/include/rtl-sdr* + sudo rm -rvf /usr/local/include/rtl_* + sudo rm -rvf /usr/local/bin/rtl_* + +Step 3: Install RTL-SDR Blog drivers. + +.. code-block:: bash + + sudo apt-get install libusb-1.0-0-dev git cmake pkg-config build-essential + git clone https://github.com/rtlsdrblog/rtl-sdr-blog + cd rtl-sdr-blog/ + mkdir build + cd build + cmake ../ -DINSTALL_UDEV_RULES=ON + make + sudo make install + sudo cp ../rtl-sdr.rules /etc/udev/rules.d/ + sudo ldconfig + +Step 4: Blacklist the DVB-T modules that would otherwise claim the device: + +.. code-block:: bash + + sudo ln -s $CONDA_PREFIX/etc/modprobe.d/rtl-sdr-blacklist.conf /etc/modprobe.d/radioconda-rtl-sdr-blacklist.conf + sudo modprobe -r $(cat $CONDA_PREFIX/etc/modprobe.d/rtl-sdr-blacklist.conf | sed -n -e 's/^blacklist //p') + +Step 5: Install a udev rule by creating a link into your radioconda installation. + +.. code-block:: bash + + sudo ln -s $CONDA_PREFIX/lib/udev/rules.d/rtl-sdr.rules /etc/udev/rules.d/radioconda-rtl-sdr.rules + sudo udevadm control --reload + sudo udevadm trigger + Further Information ------------------- diff --git a/docs/source/sdr_guides/usrp.rst b/docs/source/sdr_guides/usrp.rst index b3f36a2..d88775b 100644 --- a/docs/source/sdr_guides/usrp.rst +++ b/docs/source/sdr_guides/usrp.rst @@ -41,40 +41,53 @@ Limitations - Compatibility with certain software tools may vary depending on the version of the UHD. - Price range can be a consideration, especially for high-end models. -Set up instructions (Linux) +Set up instructions (Linux, Radioconda) --------------------------- -1. Install the required system packages via APT: - -.. code-block:: bash - - sudo apt-get install libuhd-dev uhd-host python3-uhd - -2. Build and install UHD from source: +Step 1: Activate your Radioconda environment. .. code-block:: bash - sudo apt-get install git cmake libboost-all-dev libusb-1.0-0-dev python3-docutils python3-mako python3-numpy python3-requests python3-ruamel.yaml python3-setuptools build-essential - cd ~ - git clone https://github.com/EttusResearch/uhd.git - cd uhd/host - mkdir build - cd build - cmake -DENABLE_TESTS=OFF -DENABLE_C_API=OFF -DENABLE_MANUAL=OFF .. - make -j8 - sudo make install - sudo ldconfig + conda activate - -3. Find your dist packages and add to `PYTHONPATH`. Example: +Step 2: Install UHD and Python Bindings. .. code-block:: bash - export PYTHONPATH='/usr/local/lib/python3.10/site-packages/' - export PYTHONPATH='/usr/local/lib/python3.10/dist-packages/:$PYTHONPATH' + conda install conda-forge::uhd + +Step 3: Download UHD images. + +.. code-block:: bash + + uhd_images_downloader + +Step 4: Verify Installation. + +To verify access to your device: + +.. code-block:: bash + + uhd_find_devices + +For USB devices only (e.g. B series), install a udev rule by creating a link into your radioconda installation. + +.. code-block:: bash + sudo ln -s $CONDA_PREFIX/lib/uhd/utils/uhd-usrp.rules /etc/udev/rules.d/radioconda-uhd-usrp.rules + sudo udevadm control --reload + sudo udevadm trigger + +Step 5: (optional): Update Firmware/FPGA images + +.. code-block:: bash + + uhd_usrp_probe + +This will ensure your device is running the latest firmware and FPGA versions. Further information ------------------- - `Official Website `_ - `USRP Documentation `_ +- `Radioconda Github Repository `_ -- 2.34.1 From 0f5f36b1035dedfa37e6ec0b1157fd77beff428f Mon Sep 17 00:00:00 2001 From: madrigal Date: Thu, 2 Oct 2025 09:44:02 -0400 Subject: [PATCH 2/8] Fixed formatting of tx data for single tx --- src/ria_toolkit_oss/sdr/pluto.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/ria_toolkit_oss/sdr/pluto.py b/src/ria_toolkit_oss/sdr/pluto.py index 2735689..449505e 100644 --- a/src/ria_toolkit_oss/sdr/pluto.py +++ b/src/ria_toolkit_oss/sdr/pluto.py @@ -102,7 +102,7 @@ class Pluto(SDR): if channel == 0: print(f"Pluto gain = {self.radio.rx_hardwaregain_chan0}") elif channel == 1: - self.set_tx_gain(gain=abs_gain, channel=0) + self.set_rx_gain(gain=abs_gain, channel=0) print(f"Pluto gain = {self.radio.rx_hardwaregain_chan0}, {self.radio.rx_hardwaregain_chan1}") self.radio.rx_buffer_size = 1024 # TODO deal with this for zmq @@ -223,7 +223,7 @@ class Pluto(SDR): print("Pluto Starting RX...") samples = self.radio.rx() - if self.radio.tx_enabled_channels == [0]: + if self.radio.rx_enabled_channels == [0]: samples = self._convert_rx_samples(samples) samples = [samples] else: @@ -244,11 +244,11 @@ class Pluto(SDR): def _format_tx_data(self, recording: Recording | np.ndarray | list): if isinstance(recording, np.ndarray): - data = [self._convert_tx_samples(samples=recording)] + data = self._convert_tx_samples(samples=recording) elif isinstance(recording, Recording): if self.radio.tx_enabled_channels == [0]: samples = recording.data[0] - data = [self._convert_tx_samples(samples=samples)] + data = self._convert_tx_samples(samples=samples) if len(recording.data) > 1: warnings.warn("Recording object is multichannel, only channel 0 data was used for transmission") -- 2.34.1 From 0a27c99cd247e5932846fea678eff8b11022d2f3 Mon Sep 17 00:00:00 2001 From: madrigal Date: Thu, 2 Oct 2025 10:13:24 -0400 Subject: [PATCH 3/8] Fixed tx normalization --- src/ria_toolkit_oss/sdr/hackrf.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/ria_toolkit_oss/sdr/hackrf.py b/src/ria_toolkit_oss/sdr/hackrf.py index 349e4fc..91a84ef 100644 --- a/src/ria_toolkit_oss/sdr/hackrf.py +++ b/src/ria_toolkit_oss/sdr/hackrf.py @@ -135,6 +135,8 @@ class HackRF(SDR): samples = recording.data[0] samples = samples.astype(np.complex64, copy=False) + if np.max(np.abs(samples)) >= 1: + samples = samples / (np.max(np.abs(samples)) + 1e-12) print("HackRF Starting TX...") self.radio.start_tx(samples=samples, repeat=True) -- 2.34.1 From be32efeff280335ac7dccf60f5699d0088c9a44f Mon Sep 17 00:00:00 2001 From: madrigal Date: Thu, 2 Oct 2025 10:15:21 -0400 Subject: [PATCH 4/8] Added sdr pytests --- tests/sdr/test_blade.py | 178 +++++++++++++++++++++++++++++++++++++ tests/sdr/test_hackrf.py | 91 +++++++++++++++++++ tests/sdr/test_pluto.py | 185 +++++++++++++++++++++++++++++++++++++++ tests/sdr/test_usrp.py | 111 +++++++++++++++++++++++ 4 files changed, 565 insertions(+) create mode 100644 tests/sdr/test_blade.py create mode 100644 tests/sdr/test_hackrf.py create mode 100644 tests/sdr/test_pluto.py create mode 100644 tests/sdr/test_usrp.py diff --git a/tests/sdr/test_blade.py b/tests/sdr/test_blade.py new file mode 100644 index 0000000..fbc102b --- /dev/null +++ b/tests/sdr/test_blade.py @@ -0,0 +1,178 @@ +import subprocess +import numpy as np # type: ignore +import pytest # type: ignore + +from ria_toolkit_oss.datatypes.recording import Recording +from ria_toolkit_oss.sdr.blade import Blade + + +SAMPLE_RATE = int(1e6) +CENTER_FREQUENCY = int(3440e6) +CHANNEL = 0 +ABS_GAIN = 10 +REL_GAIN = -50 +t = np.linspace(0, 1, int(1e6 * 1), endpoint=False) +angular_frequency = 2 * np.pi * 1 +SINE_WAVE = 10 * np.exp(1j * angular_frequency * t) + + +def radio_connected() -> bool: + try: + # Example: check if a specific USB device is present + result = subprocess.run( + ["lsusb"], + capture_output=True, + text=True, + check=True + ) + return "bladeRF" in result.stdout + except Exception: + return False + + +@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") +def test_blade_rx_init(): + try: + rx_radio = Blade() + rx_radio.init_rx( + sample_rate=SAMPLE_RATE, + center_frequency=CENTER_FREQUENCY, + channel=CHANNEL, + gain=ABS_GAIN, + ) + assert str(rx_radio.rx_ch) == "Channel RX1" + assert int(rx_radio.rx_ch.sample_rate) == SAMPLE_RATE + assert int(rx_radio.rx_ch.frequency) == pytest.approx(CENTER_FREQUENCY, abs=5) + assert int(rx_radio.rx_ch.gain) == ABS_GAIN + finally: + rx_radio.close() + + +@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") +def test_blade_tx_init(): + try: + tx_radio = Blade() + tx_radio.init_tx( + sample_rate=SAMPLE_RATE, + center_frequency=CENTER_FREQUENCY, + channel=CHANNEL, + gain=ABS_GAIN, + ) + assert str(tx_radio.tx_ch) == "Channel TX1" + assert int(tx_radio.tx_ch.sample_rate) == SAMPLE_RATE + assert int(tx_radio.tx_ch.frequency) == pytest.approx(CENTER_FREQUENCY, abs=5) + assert int(tx_radio.tx_ch.gain) == ABS_GAIN + finally: + tx_radio.close() + + +@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") +def test_blade_rx_setters(): + try: + rx_radio = Blade() + rx_radio.init_rx( + sample_rate=SAMPLE_RATE, + center_frequency=CENTER_FREQUENCY, + channel=CHANNEL, + gain=ABS_GAIN, + ) + rx_radio._set_rx_channel(channel=1) + assert str(rx_radio.rx_ch) == "Channel RX2" + rx_radio._set_rx_buffer_size(buffer_size=4096) + assert int(rx_radio.rx_buffer_size) == 4096 + rx_radio._set_rx_center_frequency(center_frequency=int(3500e6)) + assert int(rx_radio.rx_ch.frequency) == pytest.approx(int(3500e6), abs=5) + rx_radio._set_rx_gain(channel=1, gain=20, gain_mode='absolute') + assert int(rx_radio.rx_ch.gain) == 20 + rx_radio._set_rx_sample_rate(sample_rate=int(2e6)) + assert int(rx_radio.rx_ch.sample_rate) == int(2e6) + finally: + rx_radio.close() + + +@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") +def test_blade_tx_setters(): + try: + tx_radio = Blade() + tx_radio.init_tx( + sample_rate=SAMPLE_RATE, + center_frequency=CENTER_FREQUENCY, + channel=CHANNEL, + gain=ABS_GAIN, + ) + tx_radio._set_tx_channel(channel=1) + assert str(tx_radio.tx_ch) == "Channel TX2" + tx_radio._set_tx_buffer_size(buffer_size=4096) + assert int(tx_radio.tx_buffer_size) == 4096 + tx_radio._set_tx_center_frequency(center_frequency=int(3500e6)) + assert int(tx_radio.tx_ch.frequency) == pytest.approx(int(3500e6), abs=5) + tx_radio._set_tx_gain(channel=1, gain=20, gain_mode='absolute') + assert int(tx_radio.tx_ch.gain) == 20 + tx_radio._set_tx_sample_rate(sample_rate=int(2e6)) + assert int(tx_radio.tx_ch.sample_rate) == int(2e6) + finally: + tx_radio.close() + + +@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") +def test_blade_relative_mode(): + try: + radio = Blade() + radio.init_rx( + sample_rate=SAMPLE_RATE, + center_frequency=CENTER_FREQUENCY, + channel=CHANNEL, + gain=REL_GAIN, + gain_mode='relative' + ) + assert int(radio.rx_ch.gain) == ABS_GAIN + radio.init_tx( + sample_rate=SAMPLE_RATE, + center_frequency=CENTER_FREQUENCY, + channel=CHANNEL, + gain=REL_GAIN, + gain_mode='relative' + ) + assert int(radio.tx_ch.gain) == ABS_GAIN + finally: + radio.close() + + +@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") +def test_blade_rx(): + try: + print('Beginning test of Blade rx...') + rx_radio = Blade() + rx_radio.init_rx( + sample_rate=SAMPLE_RATE, + center_frequency=CENTER_FREQUENCY, + channel=CHANNEL, + gain=ABS_GAIN, + ) + recording = rx_radio.record(num_samples=SAMPLE_RATE) + assert type(recording) is Recording + finally: + rx_radio.close() + + +@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") +def test_blade_tx(): + try: + tx_radio = Blade() + tx_radio.init_tx( + sample_rate=SAMPLE_RATE, + center_frequency=CENTER_FREQUENCY, + channel=CHANNEL, + gain=ABS_GAIN, + ) + recording = Recording( + data=SINE_WAVE, + metadata={'data': 'sine_wave'} + ) + tx_radio.tx_recording( + recording=recording, + num_samples=SAMPLE_RATE + ) + assert True + finally: + tx_radio.close() diff --git a/tests/sdr/test_hackrf.py b/tests/sdr/test_hackrf.py new file mode 100644 index 0000000..eb5b628 --- /dev/null +++ b/tests/sdr/test_hackrf.py @@ -0,0 +1,91 @@ +import subprocess +import numpy as np # type: ignore +import pytest # type: ignore + +from ria_toolkit_oss.datatypes.recording import Recording +from ria_toolkit_oss.sdr.hackrf import HackRF + + +SAMPLE_RATE = int(1e6) +CENTER_FREQUENCY = int(3440e6) +CHANNEL = 0 +ABS_GAIN = 10 +REL_GAIN = -37 +t = np.linspace(0, 1, int(1e6 * 1), endpoint=False) +angular_frequency = 2 * np.pi * 1 +SINE_WAVE = 10 * np.exp(1j * angular_frequency * t) + + +def radio_connected() -> bool: + try: + # Example: check if a specific USB device is present + result = subprocess.run( + ["lsusb"], + capture_output=True, + text=True, + check=True + ) + return "hackrf" in result.stdout.lower() + except Exception: + return False + + +@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") +def test_hackrf_relative_mode(): + try: + radio = HackRF() + radio.init_tx( + sample_rate=SAMPLE_RATE, + center_frequency=CENTER_FREQUENCY, + channel=CHANNEL, + gain=REL_GAIN, + gain_mode='relative' + ) + assert int(radio.radio.txvga_gain) == ABS_GAIN + finally: + radio.close() + + +@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") +def test_hackrf_rx(): + try: + rx_radio = HackRF() + try: + rx_radio.init_rx( + sample_rate=SAMPLE_RATE, + center_frequency=CENTER_FREQUENCY, + channel=CHANNEL, + gain=ABS_GAIN, + gain_mode='absolute' + ) + except NotImplementedError: + assert True + finally: + rx_radio.close() + + +@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") +def test_hackrf_tx(): + try: + tx_radio = HackRF() + tx_radio.init_tx( + sample_rate=SAMPLE_RATE, + center_frequency=CENTER_FREQUENCY, + channel=CHANNEL, + gain=ABS_GAIN, + ) + + max_val = np.max(np.abs(SINE_WAVE)) + data = SINE_WAVE / (max_val * 1.01) + recording = Recording( + data=SINE_WAVE, + metadata={'data': 'sine_wave'} + ) + + tx_radio.tx_recording( + recording=recording, + num_samples=SAMPLE_RATE + ) + assert True + finally: + tx_radio.close() diff --git a/tests/sdr/test_pluto.py b/tests/sdr/test_pluto.py new file mode 100644 index 0000000..4d9f73a --- /dev/null +++ b/tests/sdr/test_pluto.py @@ -0,0 +1,185 @@ +import subprocess +import numpy as np # type: ignore +import pytest # type: ignore + +from ria_toolkit_oss.datatypes.recording import Recording +from ria_toolkit_oss.sdr.pluto import Pluto + + +SAMPLE_RATE = int(1e6) +CENTER_FREQUENCY = int(3440e6) +CHANNEL = 0 +ABS_GAIN = 10 +REL_GAIN = -50 +t = np.linspace(0, 1, int(1e6 * 1), endpoint=False) +angular_frequency = 2 * np.pi * 1 +SINE_WAVE = 10 * np.exp(1j * angular_frequency * t) +CONSTANT_TONE = np.ones((1000), dtype=np.complex64) + + +def radio_connected() -> bool: + try: + # Example: check if a specific USB device is present + result = subprocess.run( + ["lsusb"], + capture_output=True, + text=True, + check=True + ) + return "pluto" in result.stdout.lower() + except Exception: + return False + + +@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") +def test_pluto_rx_setters(): + try: + rx_radio = Pluto() + rx_radio.init_rx( + sample_rate=SAMPLE_RATE, + center_frequency=CENTER_FREQUENCY, + channel=CHANNEL, + gain=ABS_GAIN, + ) + assert int(rx_radio.radio.sample_rate) == SAMPLE_RATE + assert int(rx_radio.radio.rx_lo) == pytest.approx(CENTER_FREQUENCY, abs=5) + assert rx_radio.radio.rx_enabled_channels == [0] + assert rx_radio.radio.rx_hardwaregain_chan0 == ABS_GAIN + + # rx_radio.set_rx_channel(channel=1) + # assert rx_radio.radio.rx_enabled_channels == [0, 1] + rx_radio.set_rx_center_frequency(center_frequency=int(3500e6)) + assert int(rx_radio.radio.rx_lo) == pytest.approx(int(3500e6), abs=5) + rx_radio.set_rx_gain(channel=0, gain=20) + assert rx_radio.radio.rx_hardwaregain_chan0 == 20 + rx_radio.set_rx_sample_rate(sample_rate=int(2e6)) + assert int(rx_radio.radio.sample_rate) == int(2e6) + finally: + rx_radio.close() + + +@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") +def test_pluto_tx_setters(): + try: + print('Beginning test of Pluto tx setters...') + tx_radio = Pluto() + tx_radio.init_tx( + sample_rate=SAMPLE_RATE, + center_frequency=CENTER_FREQUENCY, + channel=CHANNEL, + gain=-ABS_GAIN, + ) + assert int(tx_radio.radio.sample_rate) == SAMPLE_RATE + assert int(tx_radio.radio.tx_lo) == pytest.approx(CENTER_FREQUENCY, abs=5) + assert tx_radio.radio.tx_enabled_channels == [0] + assert tx_radio.radio.tx_hardwaregain_chan0 == -ABS_GAIN + + try: + tx_radio.set_tx_channel(channel=1) + except NotImplementedError: + assert True + try: + tx_radio.set_tx_buffer_size(buffer_size=4096) + except NotImplementedError: + assert True + tx_radio.set_tx_center_frequency(center_frequency=int(3500e6)) + assert int(tx_radio.radio.tx_lo) == pytest.approx(int(3500e6), abs=5) + tx_radio.set_tx_gain(channel=0, gain=-30) + assert int(tx_radio.radio.tx_hardwaregain_chan0) == -30 + tx_radio.set_tx_sample_rate(sample_rate=int(2e6)) + assert int(tx_radio.radio.sample_rate) == int(2e6) + finally: + tx_radio.close() + + +@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") +def test_pluto_relative_mode(): + try: + radio = Pluto() + radio.init_rx( + sample_rate=SAMPLE_RATE, + center_frequency=CENTER_FREQUENCY, + channel=CHANNEL, + gain=REL_GAIN, + gain_mode='relative' + ) + assert radio.radio.rx_hardwaregain_chan0 == (74 + REL_GAIN) + radio.init_tx( + sample_rate=SAMPLE_RATE, + center_frequency=CENTER_FREQUENCY, + channel=CHANNEL, + gain=REL_GAIN, + gain_mode='relative' + ) + assert radio.radio.tx_hardwaregain_chan0 == REL_GAIN + finally: + radio.close() + + +@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") +def test_pluto_rx(): + try: + rx_radio = Pluto() + rx_radio.init_rx( + sample_rate=SAMPLE_RATE, + center_frequency=CENTER_FREQUENCY, + channel=CHANNEL, + gain=ABS_GAIN, + ) + recording = rx_radio.record(num_samples=SAMPLE_RATE) + assert type(recording) is Recording + finally: + rx_radio.close() + + +@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") +def test_pluto_tx(): + try: + tx_radio = Pluto() + tx_radio.init_tx( + sample_rate=SAMPLE_RATE, + center_frequency=CENTER_FREQUENCY, + channel=CHANNEL, + gain=ABS_GAIN, + ) + recording = Recording( + data=SINE_WAVE, + metadata={'data': 'sine_wave'} + ) + tx_radio.tx_recording( + recording=recording, + num_samples=SAMPLE_RATE + ) + assert True + finally: + tx_radio.close() + + +@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") +def test_pluto_dual_tx(): + try: + tx_radio = Pluto() + try: + tx_radio.init_tx( + sample_rate=SAMPLE_RATE, + center_frequency=CENTER_FREQUENCY, + channel=1, + gain=ABS_GAIN, + ) + except AttributeError: + pytest.skip("Dual tx not available on connected Pluto device") + recording1 = Recording( + data=SINE_WAVE, + metadata={'data': 'sine_wave'} + ) + recording2 = Recording( + data=CONSTANT_TONE, + metadata={'data': 'constant_tone'} + ) + tx_radio.tx_recording( + recording=[recording1, recording2], + num_samples=SAMPLE_RATE, + ) + assert True + finally: + tx_radio.close() diff --git a/tests/sdr/test_usrp.py b/tests/sdr/test_usrp.py new file mode 100644 index 0000000..78102ce --- /dev/null +++ b/tests/sdr/test_usrp.py @@ -0,0 +1,111 @@ +import subprocess +import numpy as np # type: ignore +import pytest # type: ignore + +from ria_toolkit_oss.datatypes.recording import Recording +from ria_toolkit_oss.sdr.usrp import USRP + + +SAMPLE_RATE = int(1e6) +CENTER_FREQUENCY = int(3440e6) +CHANNEL = 0 +ABS_GAIN = 10 +REL_GAIN = -25 +t = np.linspace(0, 1, int(1e6 * 1), endpoint=False) +angular_frequency = 2 * np.pi * 1 +SINE_WAVE = 10 * np.exp(1j * angular_frequency * t) + + +def radio_connected() -> bool: + try: + # Example: check if a specific USB device is present + result = subprocess.run( + ["uhd_find_devices"], + capture_output=True, + text=True, + check=True + ) + return not "No UHD Devices Found" in result.stdout + except Exception: + return False + + +@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") +def test_usrp_clock_setter(): + try: + rx_radio = USRP() + rx_radio.init_rx( + sample_rate=SAMPLE_RATE, + center_frequency=CENTER_FREQUENCY, + channel=CHANNEL, + gain=ABS_GAIN, + ) + rx_radio.set_clock_source(source='external') + assert rx_radio.usrp.get_clock_source(0) == "external" + finally: + rx_radio.close() + + +@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") +def test_usrp_relative_mode(): + try: + radio = USRP() + radio.init_rx( + sample_rate=SAMPLE_RATE, + center_frequency=CENTER_FREQUENCY, + channel=CHANNEL, + gain=REL_GAIN, + gain_mode='relative' + ) + max_gain = radio.usrp.get_rx_gain_range().stop() + assert radio.rx_gain == (max_gain + REL_GAIN) + radio.init_tx( + sample_rate=SAMPLE_RATE, + center_frequency=CENTER_FREQUENCY, + channel=CHANNEL, + gain=REL_GAIN, + gain_mode='relative' + ) + max_gain = radio.usrp.get_tx_gain_range().stop() + assert radio.tx_gain == (max_gain + REL_GAIN) + finally: + radio.close() + + +@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") +def test_usrp_rx(): + try: + rx_radio = USRP() + rx_radio.init_rx( + sample_rate=SAMPLE_RATE, + center_frequency=CENTER_FREQUENCY, + channel=CHANNEL, + gain=ABS_GAIN, + ) + recording = rx_radio.record(num_samples=SAMPLE_RATE) + assert type(recording) is Recording + finally: + rx_radio.close() + + +@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") +def test_usrp_tx(): + try: + tx_radio = USRP() + tx_radio.init_tx( + sample_rate=SAMPLE_RATE, + center_frequency=CENTER_FREQUENCY, + channel=CHANNEL, + gain=ABS_GAIN, + ) + recording = Recording( + data=SINE_WAVE, + metadata={'data': 'sine_wave'} + ) + tx_radio.tx_recording( + recording=recording, + num_samples=SAMPLE_RATE + ) + assert True + finally: + tx_radio.close() -- 2.34.1 From 539d3b8b940fff82a3606037e505c82c4262e75a Mon Sep 17 00:00:00 2001 From: madrigal Date: Thu, 2 Oct 2025 11:29:40 -0400 Subject: [PATCH 5/8] Made both sample recordings same length to fix dual tx error --- tests/sdr/test_pluto.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/sdr/test_pluto.py b/tests/sdr/test_pluto.py index 4d9f73a..7908a89 100644 --- a/tests/sdr/test_pluto.py +++ b/tests/sdr/test_pluto.py @@ -14,7 +14,7 @@ REL_GAIN = -50 t = np.linspace(0, 1, int(1e6 * 1), endpoint=False) angular_frequency = 2 * np.pi * 1 SINE_WAVE = 10 * np.exp(1j * angular_frequency * t) -CONSTANT_TONE = np.ones((1000), dtype=np.complex64) +CONSTANT_TONE = np.ones((int(1e6)), dtype=np.complex64) def radio_connected() -> bool: -- 2.34.1 From 1c39d8fa3c5ac628ebe6c4b9b7b1e7169423e9d1 Mon Sep 17 00:00:00 2001 From: madrigal Date: Thu, 2 Oct 2025 11:56:00 -0400 Subject: [PATCH 6/8] linting --- tests/sdr/test_blade.py | 29 +++++++++-------------------- tests/sdr/test_hackrf.py | 28 +++++++--------------------- tests/sdr/test_pluto.py | 35 +++++++++-------------------------- tests/sdr/test_usrp.py | 27 ++++++++------------------- 4 files changed, 33 insertions(+), 86 deletions(-) diff --git a/tests/sdr/test_blade.py b/tests/sdr/test_blade.py index fbc102b..43be3ce 100644 --- a/tests/sdr/test_blade.py +++ b/tests/sdr/test_blade.py @@ -1,11 +1,11 @@ import subprocess + import numpy as np # type: ignore import pytest # type: ignore from ria_toolkit_oss.datatypes.recording import Recording from ria_toolkit_oss.sdr.blade import Blade - SAMPLE_RATE = int(1e6) CENTER_FREQUENCY = int(3440e6) CHANNEL = 0 @@ -19,12 +19,7 @@ SINE_WAVE = 10 * np.exp(1j * angular_frequency * t) def radio_connected() -> bool: try: # Example: check if a specific USB device is present - result = subprocess.run( - ["lsusb"], - capture_output=True, - text=True, - check=True - ) + result = subprocess.run(["lsusb"], capture_output=True, text=True, check=True) return "bladeRF" in result.stdout except Exception: return False @@ -82,7 +77,7 @@ def test_blade_rx_setters(): assert int(rx_radio.rx_buffer_size) == 4096 rx_radio._set_rx_center_frequency(center_frequency=int(3500e6)) assert int(rx_radio.rx_ch.frequency) == pytest.approx(int(3500e6), abs=5) - rx_radio._set_rx_gain(channel=1, gain=20, gain_mode='absolute') + rx_radio._set_rx_gain(channel=1, gain=20, gain_mode="absolute") assert int(rx_radio.rx_ch.gain) == 20 rx_radio._set_rx_sample_rate(sample_rate=int(2e6)) assert int(rx_radio.rx_ch.sample_rate) == int(2e6) @@ -106,7 +101,7 @@ def test_blade_tx_setters(): assert int(tx_radio.tx_buffer_size) == 4096 tx_radio._set_tx_center_frequency(center_frequency=int(3500e6)) assert int(tx_radio.tx_ch.frequency) == pytest.approx(int(3500e6), abs=5) - tx_radio._set_tx_gain(channel=1, gain=20, gain_mode='absolute') + tx_radio._set_tx_gain(channel=1, gain=20, gain_mode="absolute") assert int(tx_radio.tx_ch.gain) == 20 tx_radio._set_tx_sample_rate(sample_rate=int(2e6)) assert int(tx_radio.tx_ch.sample_rate) == int(2e6) @@ -123,7 +118,7 @@ def test_blade_relative_mode(): center_frequency=CENTER_FREQUENCY, channel=CHANNEL, gain=REL_GAIN, - gain_mode='relative' + gain_mode="relative", ) assert int(radio.rx_ch.gain) == ABS_GAIN radio.init_tx( @@ -131,7 +126,7 @@ def test_blade_relative_mode(): center_frequency=CENTER_FREQUENCY, channel=CHANNEL, gain=REL_GAIN, - gain_mode='relative' + gain_mode="relative", ) assert int(radio.tx_ch.gain) == ABS_GAIN finally: @@ -141,7 +136,7 @@ def test_blade_relative_mode(): @pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") def test_blade_rx(): try: - print('Beginning test of Blade rx...') + print("Beginning test of Blade rx...") rx_radio = Blade() rx_radio.init_rx( sample_rate=SAMPLE_RATE, @@ -165,14 +160,8 @@ def test_blade_tx(): channel=CHANNEL, gain=ABS_GAIN, ) - recording = Recording( - data=SINE_WAVE, - metadata={'data': 'sine_wave'} - ) - tx_radio.tx_recording( - recording=recording, - num_samples=SAMPLE_RATE - ) + recording = Recording(data=SINE_WAVE, metadata={"data": "sine_wave"}) + tx_radio.tx_recording(recording=recording, num_samples=SAMPLE_RATE) assert True finally: tx_radio.close() diff --git a/tests/sdr/test_hackrf.py b/tests/sdr/test_hackrf.py index eb5b628..164432a 100644 --- a/tests/sdr/test_hackrf.py +++ b/tests/sdr/test_hackrf.py @@ -1,11 +1,11 @@ import subprocess + import numpy as np # type: ignore import pytest # type: ignore from ria_toolkit_oss.datatypes.recording import Recording from ria_toolkit_oss.sdr.hackrf import HackRF - SAMPLE_RATE = int(1e6) CENTER_FREQUENCY = int(3440e6) CHANNEL = 0 @@ -19,16 +19,11 @@ SINE_WAVE = 10 * np.exp(1j * angular_frequency * t) def radio_connected() -> bool: try: # Example: check if a specific USB device is present - result = subprocess.run( - ["lsusb"], - capture_output=True, - text=True, - check=True - ) + result = subprocess.run(["lsusb"], capture_output=True, text=True, check=True) return "hackrf" in result.stdout.lower() except Exception: return False - + @pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") def test_hackrf_relative_mode(): @@ -39,7 +34,7 @@ def test_hackrf_relative_mode(): center_frequency=CENTER_FREQUENCY, channel=CHANNEL, gain=REL_GAIN, - gain_mode='relative' + gain_mode="relative", ) assert int(radio.radio.txvga_gain) == ABS_GAIN finally: @@ -56,7 +51,7 @@ def test_hackrf_rx(): center_frequency=CENTER_FREQUENCY, channel=CHANNEL, gain=ABS_GAIN, - gain_mode='absolute' + gain_mode="absolute", ) except NotImplementedError: assert True @@ -74,18 +69,9 @@ def test_hackrf_tx(): channel=CHANNEL, gain=ABS_GAIN, ) + recording = Recording(data=SINE_WAVE, metadata={"data": "sine_wave"}) - max_val = np.max(np.abs(SINE_WAVE)) - data = SINE_WAVE / (max_val * 1.01) - recording = Recording( - data=SINE_WAVE, - metadata={'data': 'sine_wave'} - ) - - tx_radio.tx_recording( - recording=recording, - num_samples=SAMPLE_RATE - ) + tx_radio.tx_recording(recording=recording, num_samples=SAMPLE_RATE) assert True finally: tx_radio.close() diff --git a/tests/sdr/test_pluto.py b/tests/sdr/test_pluto.py index 7908a89..ed0320d 100644 --- a/tests/sdr/test_pluto.py +++ b/tests/sdr/test_pluto.py @@ -1,11 +1,11 @@ import subprocess + import numpy as np # type: ignore import pytest # type: ignore from ria_toolkit_oss.datatypes.recording import Recording from ria_toolkit_oss.sdr.pluto import Pluto - SAMPLE_RATE = int(1e6) CENTER_FREQUENCY = int(3440e6) CHANNEL = 0 @@ -20,12 +20,7 @@ CONSTANT_TONE = np.ones((int(1e6)), dtype=np.complex64) def radio_connected() -> bool: try: # Example: check if a specific USB device is present - result = subprocess.run( - ["lsusb"], - capture_output=True, - text=True, - check=True - ) + result = subprocess.run(["lsusb"], capture_output=True, text=True, check=True) return "pluto" in result.stdout.lower() except Exception: return False @@ -61,7 +56,7 @@ def test_pluto_rx_setters(): @pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") def test_pluto_tx_setters(): try: - print('Beginning test of Pluto tx setters...') + print("Beginning test of Pluto tx setters...") tx_radio = Pluto() tx_radio.init_tx( sample_rate=SAMPLE_RATE, @@ -101,7 +96,7 @@ def test_pluto_relative_mode(): center_frequency=CENTER_FREQUENCY, channel=CHANNEL, gain=REL_GAIN, - gain_mode='relative' + gain_mode="relative", ) assert radio.radio.rx_hardwaregain_chan0 == (74 + REL_GAIN) radio.init_tx( @@ -109,7 +104,7 @@ def test_pluto_relative_mode(): center_frequency=CENTER_FREQUENCY, channel=CHANNEL, gain=REL_GAIN, - gain_mode='relative' + gain_mode="relative", ) assert radio.radio.tx_hardwaregain_chan0 == REL_GAIN finally: @@ -142,14 +137,8 @@ def test_pluto_tx(): channel=CHANNEL, gain=ABS_GAIN, ) - recording = Recording( - data=SINE_WAVE, - metadata={'data': 'sine_wave'} - ) - tx_radio.tx_recording( - recording=recording, - num_samples=SAMPLE_RATE - ) + recording = Recording(data=SINE_WAVE, metadata={"data": "sine_wave"}) + tx_radio.tx_recording(recording=recording, num_samples=SAMPLE_RATE) assert True finally: tx_radio.close() @@ -168,14 +157,8 @@ def test_pluto_dual_tx(): ) except AttributeError: pytest.skip("Dual tx not available on connected Pluto device") - recording1 = Recording( - data=SINE_WAVE, - metadata={'data': 'sine_wave'} - ) - recording2 = Recording( - data=CONSTANT_TONE, - metadata={'data': 'constant_tone'} - ) + recording1 = Recording(data=SINE_WAVE, metadata={"data": "sine_wave"}) + recording2 = Recording(data=CONSTANT_TONE, metadata={"data": "constant_tone"}) tx_radio.tx_recording( recording=[recording1, recording2], num_samples=SAMPLE_RATE, diff --git a/tests/sdr/test_usrp.py b/tests/sdr/test_usrp.py index 78102ce..4ed26e3 100644 --- a/tests/sdr/test_usrp.py +++ b/tests/sdr/test_usrp.py @@ -1,11 +1,11 @@ import subprocess + import numpy as np # type: ignore import pytest # type: ignore from ria_toolkit_oss.datatypes.recording import Recording from ria_toolkit_oss.sdr.usrp import USRP - SAMPLE_RATE = int(1e6) CENTER_FREQUENCY = int(3440e6) CHANNEL = 0 @@ -19,13 +19,8 @@ SINE_WAVE = 10 * np.exp(1j * angular_frequency * t) def radio_connected() -> bool: try: # Example: check if a specific USB device is present - result = subprocess.run( - ["uhd_find_devices"], - capture_output=True, - text=True, - check=True - ) - return not "No UHD Devices Found" in result.stdout + result = subprocess.run(["uhd_find_devices"], capture_output=True, text=True, check=True) + return "No UHD Devices Found" not in result.stdout except Exception: return False @@ -40,7 +35,7 @@ def test_usrp_clock_setter(): channel=CHANNEL, gain=ABS_GAIN, ) - rx_radio.set_clock_source(source='external') + rx_radio.set_clock_source(source="external") assert rx_radio.usrp.get_clock_source(0) == "external" finally: rx_radio.close() @@ -55,7 +50,7 @@ def test_usrp_relative_mode(): center_frequency=CENTER_FREQUENCY, channel=CHANNEL, gain=REL_GAIN, - gain_mode='relative' + gain_mode="relative", ) max_gain = radio.usrp.get_rx_gain_range().stop() assert radio.rx_gain == (max_gain + REL_GAIN) @@ -64,7 +59,7 @@ def test_usrp_relative_mode(): center_frequency=CENTER_FREQUENCY, channel=CHANNEL, gain=REL_GAIN, - gain_mode='relative' + gain_mode="relative", ) max_gain = radio.usrp.get_tx_gain_range().stop() assert radio.tx_gain == (max_gain + REL_GAIN) @@ -98,14 +93,8 @@ def test_usrp_tx(): channel=CHANNEL, gain=ABS_GAIN, ) - recording = Recording( - data=SINE_WAVE, - metadata={'data': 'sine_wave'} - ) - tx_radio.tx_recording( - recording=recording, - num_samples=SAMPLE_RATE - ) + recording = Recording(data=SINE_WAVE, metadata={"data": "sine_wave"}) + tx_radio.tx_recording(recording=recording, num_samples=SAMPLE_RATE) assert True finally: tx_radio.close() -- 2.34.1 From b600cd6b727632743a9ff1fc4342639b03e94145 Mon Sep 17 00:00:00 2001 From: madrigal Date: Thu, 2 Oct 2025 13:36:01 -0400 Subject: [PATCH 7/8] Removed sdr pytests - they'll skip during auto tests and reduce coverage in coverage report --- tests/sdr/test_blade.py | 167 -------------------------------------- tests/sdr/test_hackrf.py | 77 ------------------ tests/sdr/test_pluto.py | 168 --------------------------------------- tests/sdr/test_usrp.py | 100 ----------------------- 4 files changed, 512 deletions(-) delete mode 100644 tests/sdr/test_blade.py delete mode 100644 tests/sdr/test_hackrf.py delete mode 100644 tests/sdr/test_pluto.py delete mode 100644 tests/sdr/test_usrp.py diff --git a/tests/sdr/test_blade.py b/tests/sdr/test_blade.py deleted file mode 100644 index 43be3ce..0000000 --- a/tests/sdr/test_blade.py +++ /dev/null @@ -1,167 +0,0 @@ -import subprocess - -import numpy as np # type: ignore -import pytest # type: ignore - -from ria_toolkit_oss.datatypes.recording import Recording -from ria_toolkit_oss.sdr.blade import Blade - -SAMPLE_RATE = int(1e6) -CENTER_FREQUENCY = int(3440e6) -CHANNEL = 0 -ABS_GAIN = 10 -REL_GAIN = -50 -t = np.linspace(0, 1, int(1e6 * 1), endpoint=False) -angular_frequency = 2 * np.pi * 1 -SINE_WAVE = 10 * np.exp(1j * angular_frequency * t) - - -def radio_connected() -> bool: - try: - # Example: check if a specific USB device is present - result = subprocess.run(["lsusb"], capture_output=True, text=True, check=True) - return "bladeRF" in result.stdout - except Exception: - return False - - -@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") -def test_blade_rx_init(): - try: - rx_radio = Blade() - rx_radio.init_rx( - sample_rate=SAMPLE_RATE, - center_frequency=CENTER_FREQUENCY, - channel=CHANNEL, - gain=ABS_GAIN, - ) - assert str(rx_radio.rx_ch) == "Channel RX1" - assert int(rx_radio.rx_ch.sample_rate) == SAMPLE_RATE - assert int(rx_radio.rx_ch.frequency) == pytest.approx(CENTER_FREQUENCY, abs=5) - assert int(rx_radio.rx_ch.gain) == ABS_GAIN - finally: - rx_radio.close() - - -@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") -def test_blade_tx_init(): - try: - tx_radio = Blade() - tx_radio.init_tx( - sample_rate=SAMPLE_RATE, - center_frequency=CENTER_FREQUENCY, - channel=CHANNEL, - gain=ABS_GAIN, - ) - assert str(tx_radio.tx_ch) == "Channel TX1" - assert int(tx_radio.tx_ch.sample_rate) == SAMPLE_RATE - assert int(tx_radio.tx_ch.frequency) == pytest.approx(CENTER_FREQUENCY, abs=5) - assert int(tx_radio.tx_ch.gain) == ABS_GAIN - finally: - tx_radio.close() - - -@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") -def test_blade_rx_setters(): - try: - rx_radio = Blade() - rx_radio.init_rx( - sample_rate=SAMPLE_RATE, - center_frequency=CENTER_FREQUENCY, - channel=CHANNEL, - gain=ABS_GAIN, - ) - rx_radio._set_rx_channel(channel=1) - assert str(rx_radio.rx_ch) == "Channel RX2" - rx_radio._set_rx_buffer_size(buffer_size=4096) - assert int(rx_radio.rx_buffer_size) == 4096 - rx_radio._set_rx_center_frequency(center_frequency=int(3500e6)) - assert int(rx_radio.rx_ch.frequency) == pytest.approx(int(3500e6), abs=5) - rx_radio._set_rx_gain(channel=1, gain=20, gain_mode="absolute") - assert int(rx_radio.rx_ch.gain) == 20 - rx_radio._set_rx_sample_rate(sample_rate=int(2e6)) - assert int(rx_radio.rx_ch.sample_rate) == int(2e6) - finally: - rx_radio.close() - - -@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") -def test_blade_tx_setters(): - try: - tx_radio = Blade() - tx_radio.init_tx( - sample_rate=SAMPLE_RATE, - center_frequency=CENTER_FREQUENCY, - channel=CHANNEL, - gain=ABS_GAIN, - ) - tx_radio._set_tx_channel(channel=1) - assert str(tx_radio.tx_ch) == "Channel TX2" - tx_radio._set_tx_buffer_size(buffer_size=4096) - assert int(tx_radio.tx_buffer_size) == 4096 - tx_radio._set_tx_center_frequency(center_frequency=int(3500e6)) - assert int(tx_radio.tx_ch.frequency) == pytest.approx(int(3500e6), abs=5) - tx_radio._set_tx_gain(channel=1, gain=20, gain_mode="absolute") - assert int(tx_radio.tx_ch.gain) == 20 - tx_radio._set_tx_sample_rate(sample_rate=int(2e6)) - assert int(tx_radio.tx_ch.sample_rate) == int(2e6) - finally: - tx_radio.close() - - -@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") -def test_blade_relative_mode(): - try: - radio = Blade() - radio.init_rx( - sample_rate=SAMPLE_RATE, - center_frequency=CENTER_FREQUENCY, - channel=CHANNEL, - gain=REL_GAIN, - gain_mode="relative", - ) - assert int(radio.rx_ch.gain) == ABS_GAIN - radio.init_tx( - sample_rate=SAMPLE_RATE, - center_frequency=CENTER_FREQUENCY, - channel=CHANNEL, - gain=REL_GAIN, - gain_mode="relative", - ) - assert int(radio.tx_ch.gain) == ABS_GAIN - finally: - radio.close() - - -@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") -def test_blade_rx(): - try: - print("Beginning test of Blade rx...") - rx_radio = Blade() - rx_radio.init_rx( - sample_rate=SAMPLE_RATE, - center_frequency=CENTER_FREQUENCY, - channel=CHANNEL, - gain=ABS_GAIN, - ) - recording = rx_radio.record(num_samples=SAMPLE_RATE) - assert type(recording) is Recording - finally: - rx_radio.close() - - -@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") -def test_blade_tx(): - try: - tx_radio = Blade() - tx_radio.init_tx( - sample_rate=SAMPLE_RATE, - center_frequency=CENTER_FREQUENCY, - channel=CHANNEL, - gain=ABS_GAIN, - ) - recording = Recording(data=SINE_WAVE, metadata={"data": "sine_wave"}) - tx_radio.tx_recording(recording=recording, num_samples=SAMPLE_RATE) - assert True - finally: - tx_radio.close() diff --git a/tests/sdr/test_hackrf.py b/tests/sdr/test_hackrf.py deleted file mode 100644 index 164432a..0000000 --- a/tests/sdr/test_hackrf.py +++ /dev/null @@ -1,77 +0,0 @@ -import subprocess - -import numpy as np # type: ignore -import pytest # type: ignore - -from ria_toolkit_oss.datatypes.recording import Recording -from ria_toolkit_oss.sdr.hackrf import HackRF - -SAMPLE_RATE = int(1e6) -CENTER_FREQUENCY = int(3440e6) -CHANNEL = 0 -ABS_GAIN = 10 -REL_GAIN = -37 -t = np.linspace(0, 1, int(1e6 * 1), endpoint=False) -angular_frequency = 2 * np.pi * 1 -SINE_WAVE = 10 * np.exp(1j * angular_frequency * t) - - -def radio_connected() -> bool: - try: - # Example: check if a specific USB device is present - result = subprocess.run(["lsusb"], capture_output=True, text=True, check=True) - return "hackrf" in result.stdout.lower() - except Exception: - return False - - -@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") -def test_hackrf_relative_mode(): - try: - radio = HackRF() - radio.init_tx( - sample_rate=SAMPLE_RATE, - center_frequency=CENTER_FREQUENCY, - channel=CHANNEL, - gain=REL_GAIN, - gain_mode="relative", - ) - assert int(radio.radio.txvga_gain) == ABS_GAIN - finally: - radio.close() - - -@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") -def test_hackrf_rx(): - try: - rx_radio = HackRF() - try: - rx_radio.init_rx( - sample_rate=SAMPLE_RATE, - center_frequency=CENTER_FREQUENCY, - channel=CHANNEL, - gain=ABS_GAIN, - gain_mode="absolute", - ) - except NotImplementedError: - assert True - finally: - rx_radio.close() - - -@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") -def test_hackrf_tx(): - try: - tx_radio = HackRF() - tx_radio.init_tx( - sample_rate=SAMPLE_RATE, - center_frequency=CENTER_FREQUENCY, - channel=CHANNEL, - gain=ABS_GAIN, - ) - recording = Recording(data=SINE_WAVE, metadata={"data": "sine_wave"}) - - tx_radio.tx_recording(recording=recording, num_samples=SAMPLE_RATE) - assert True - finally: - tx_radio.close() diff --git a/tests/sdr/test_pluto.py b/tests/sdr/test_pluto.py deleted file mode 100644 index ed0320d..0000000 --- a/tests/sdr/test_pluto.py +++ /dev/null @@ -1,168 +0,0 @@ -import subprocess - -import numpy as np # type: ignore -import pytest # type: ignore - -from ria_toolkit_oss.datatypes.recording import Recording -from ria_toolkit_oss.sdr.pluto import Pluto - -SAMPLE_RATE = int(1e6) -CENTER_FREQUENCY = int(3440e6) -CHANNEL = 0 -ABS_GAIN = 10 -REL_GAIN = -50 -t = np.linspace(0, 1, int(1e6 * 1), endpoint=False) -angular_frequency = 2 * np.pi * 1 -SINE_WAVE = 10 * np.exp(1j * angular_frequency * t) -CONSTANT_TONE = np.ones((int(1e6)), dtype=np.complex64) - - -def radio_connected() -> bool: - try: - # Example: check if a specific USB device is present - result = subprocess.run(["lsusb"], capture_output=True, text=True, check=True) - return "pluto" in result.stdout.lower() - except Exception: - return False - - -@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") -def test_pluto_rx_setters(): - try: - rx_radio = Pluto() - rx_radio.init_rx( - sample_rate=SAMPLE_RATE, - center_frequency=CENTER_FREQUENCY, - channel=CHANNEL, - gain=ABS_GAIN, - ) - assert int(rx_radio.radio.sample_rate) == SAMPLE_RATE - assert int(rx_radio.radio.rx_lo) == pytest.approx(CENTER_FREQUENCY, abs=5) - assert rx_radio.radio.rx_enabled_channels == [0] - assert rx_radio.radio.rx_hardwaregain_chan0 == ABS_GAIN - - # rx_radio.set_rx_channel(channel=1) - # assert rx_radio.radio.rx_enabled_channels == [0, 1] - rx_radio.set_rx_center_frequency(center_frequency=int(3500e6)) - assert int(rx_radio.radio.rx_lo) == pytest.approx(int(3500e6), abs=5) - rx_radio.set_rx_gain(channel=0, gain=20) - assert rx_radio.radio.rx_hardwaregain_chan0 == 20 - rx_radio.set_rx_sample_rate(sample_rate=int(2e6)) - assert int(rx_radio.radio.sample_rate) == int(2e6) - finally: - rx_radio.close() - - -@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") -def test_pluto_tx_setters(): - try: - print("Beginning test of Pluto tx setters...") - tx_radio = Pluto() - tx_radio.init_tx( - sample_rate=SAMPLE_RATE, - center_frequency=CENTER_FREQUENCY, - channel=CHANNEL, - gain=-ABS_GAIN, - ) - assert int(tx_radio.radio.sample_rate) == SAMPLE_RATE - assert int(tx_radio.radio.tx_lo) == pytest.approx(CENTER_FREQUENCY, abs=5) - assert tx_radio.radio.tx_enabled_channels == [0] - assert tx_radio.radio.tx_hardwaregain_chan0 == -ABS_GAIN - - try: - tx_radio.set_tx_channel(channel=1) - except NotImplementedError: - assert True - try: - tx_radio.set_tx_buffer_size(buffer_size=4096) - except NotImplementedError: - assert True - tx_radio.set_tx_center_frequency(center_frequency=int(3500e6)) - assert int(tx_radio.radio.tx_lo) == pytest.approx(int(3500e6), abs=5) - tx_radio.set_tx_gain(channel=0, gain=-30) - assert int(tx_radio.radio.tx_hardwaregain_chan0) == -30 - tx_radio.set_tx_sample_rate(sample_rate=int(2e6)) - assert int(tx_radio.radio.sample_rate) == int(2e6) - finally: - tx_radio.close() - - -@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") -def test_pluto_relative_mode(): - try: - radio = Pluto() - radio.init_rx( - sample_rate=SAMPLE_RATE, - center_frequency=CENTER_FREQUENCY, - channel=CHANNEL, - gain=REL_GAIN, - gain_mode="relative", - ) - assert radio.radio.rx_hardwaregain_chan0 == (74 + REL_GAIN) - radio.init_tx( - sample_rate=SAMPLE_RATE, - center_frequency=CENTER_FREQUENCY, - channel=CHANNEL, - gain=REL_GAIN, - gain_mode="relative", - ) - assert radio.radio.tx_hardwaregain_chan0 == REL_GAIN - finally: - radio.close() - - -@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") -def test_pluto_rx(): - try: - rx_radio = Pluto() - rx_radio.init_rx( - sample_rate=SAMPLE_RATE, - center_frequency=CENTER_FREQUENCY, - channel=CHANNEL, - gain=ABS_GAIN, - ) - recording = rx_radio.record(num_samples=SAMPLE_RATE) - assert type(recording) is Recording - finally: - rx_radio.close() - - -@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") -def test_pluto_tx(): - try: - tx_radio = Pluto() - tx_radio.init_tx( - sample_rate=SAMPLE_RATE, - center_frequency=CENTER_FREQUENCY, - channel=CHANNEL, - gain=ABS_GAIN, - ) - recording = Recording(data=SINE_WAVE, metadata={"data": "sine_wave"}) - tx_radio.tx_recording(recording=recording, num_samples=SAMPLE_RATE) - assert True - finally: - tx_radio.close() - - -@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") -def test_pluto_dual_tx(): - try: - tx_radio = Pluto() - try: - tx_radio.init_tx( - sample_rate=SAMPLE_RATE, - center_frequency=CENTER_FREQUENCY, - channel=1, - gain=ABS_GAIN, - ) - except AttributeError: - pytest.skip("Dual tx not available on connected Pluto device") - recording1 = Recording(data=SINE_WAVE, metadata={"data": "sine_wave"}) - recording2 = Recording(data=CONSTANT_TONE, metadata={"data": "constant_tone"}) - tx_radio.tx_recording( - recording=[recording1, recording2], - num_samples=SAMPLE_RATE, - ) - assert True - finally: - tx_radio.close() diff --git a/tests/sdr/test_usrp.py b/tests/sdr/test_usrp.py deleted file mode 100644 index 4ed26e3..0000000 --- a/tests/sdr/test_usrp.py +++ /dev/null @@ -1,100 +0,0 @@ -import subprocess - -import numpy as np # type: ignore -import pytest # type: ignore - -from ria_toolkit_oss.datatypes.recording import Recording -from ria_toolkit_oss.sdr.usrp import USRP - -SAMPLE_RATE = int(1e6) -CENTER_FREQUENCY = int(3440e6) -CHANNEL = 0 -ABS_GAIN = 10 -REL_GAIN = -25 -t = np.linspace(0, 1, int(1e6 * 1), endpoint=False) -angular_frequency = 2 * np.pi * 1 -SINE_WAVE = 10 * np.exp(1j * angular_frequency * t) - - -def radio_connected() -> bool: - try: - # Example: check if a specific USB device is present - result = subprocess.run(["uhd_find_devices"], capture_output=True, text=True, check=True) - return "No UHD Devices Found" not in result.stdout - except Exception: - return False - - -@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") -def test_usrp_clock_setter(): - try: - rx_radio = USRP() - rx_radio.init_rx( - sample_rate=SAMPLE_RATE, - center_frequency=CENTER_FREQUENCY, - channel=CHANNEL, - gain=ABS_GAIN, - ) - rx_radio.set_clock_source(source="external") - assert rx_radio.usrp.get_clock_source(0) == "external" - finally: - rx_radio.close() - - -@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") -def test_usrp_relative_mode(): - try: - radio = USRP() - radio.init_rx( - sample_rate=SAMPLE_RATE, - center_frequency=CENTER_FREQUENCY, - channel=CHANNEL, - gain=REL_GAIN, - gain_mode="relative", - ) - max_gain = radio.usrp.get_rx_gain_range().stop() - assert radio.rx_gain == (max_gain + REL_GAIN) - radio.init_tx( - sample_rate=SAMPLE_RATE, - center_frequency=CENTER_FREQUENCY, - channel=CHANNEL, - gain=REL_GAIN, - gain_mode="relative", - ) - max_gain = radio.usrp.get_tx_gain_range().stop() - assert radio.tx_gain == (max_gain + REL_GAIN) - finally: - radio.close() - - -@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") -def test_usrp_rx(): - try: - rx_radio = USRP() - rx_radio.init_rx( - sample_rate=SAMPLE_RATE, - center_frequency=CENTER_FREQUENCY, - channel=CHANNEL, - gain=ABS_GAIN, - ) - recording = rx_radio.record(num_samples=SAMPLE_RATE) - assert type(recording) is Recording - finally: - rx_radio.close() - - -@pytest.mark.skipif(not radio_connected(), reason="Required radio not connected") -def test_usrp_tx(): - try: - tx_radio = USRP() - tx_radio.init_tx( - sample_rate=SAMPLE_RATE, - center_frequency=CENTER_FREQUENCY, - channel=CHANNEL, - gain=ABS_GAIN, - ) - recording = Recording(data=SINE_WAVE, metadata={"data": "sine_wave"}) - tx_radio.tx_recording(recording=recording, num_samples=SAMPLE_RATE) - assert True - finally: - tx_radio.close() -- 2.34.1 From e88148c31248eb4a7deadb6b950f5bd58b9cd1dc Mon Sep 17 00:00:00 2001 From: gillian Date: Thu, 2 Oct 2025 15:19:29 -0400 Subject: [PATCH 8/8] rtl.rst removed until further notice --- docs/source/sdr_guides/blade.rst | 15 +++--- docs/source/sdr_guides/pluto.rst | 2 +- docs/source/sdr_guides/rtl.rst | 93 -------------------------------- 3 files changed, 8 insertions(+), 102 deletions(-) delete mode 100644 docs/source/sdr_guides/rtl.rst diff --git a/docs/source/sdr_guides/blade.rst b/docs/source/sdr_guides/blade.rst index b37c18e..b52ffd6 100644 --- a/docs/source/sdr_guides/blade.rst +++ b/docs/source/sdr_guides/blade.rst @@ -43,7 +43,13 @@ Limitations Set up instructions (Linux, Radioconda) --------------------------- -Step 1: Install the base dependencies and drivers ('Easy method') +Step 1: Activate your Radioconda environment. + +.. code-block:: bash + + conda activate + +Step 2: Install the base dependencies and drivers ('Easy method') .. code-block:: bash @@ -53,13 +59,6 @@ Step 1: Install the base dependencies and drivers ('Easy method') sudo apt-get install libbladerf-dev sudo apt-get install bladerf-fpga-hostedxa4 # Necessary for installation of bladeRF 2.0 Micro A4. - -Step 2: Activate your Radioconda environment. - -.. code-block:: bash - - conda activate - Step 3: Install a udev rule by creating a link into your radioconda installation. .. code-block:: bash diff --git a/docs/source/sdr_guides/pluto.rst b/docs/source/sdr_guides/pluto.rst index 732c4c8..61413b8 100644 --- a/docs/source/sdr_guides/pluto.rst +++ b/docs/source/sdr_guides/pluto.rst @@ -52,7 +52,7 @@ Step 1: Activate your Radioconda environment. conda activate -Step 2: Install system dependancies +Step 2: Install system dependancies. .. code-block:: bash diff --git a/docs/source/sdr_guides/rtl.rst b/docs/source/sdr_guides/rtl.rst deleted file mode 100644 index ed3d8e9..0000000 --- a/docs/source/sdr_guides/rtl.rst +++ /dev/null @@ -1,93 +0,0 @@ -.. _rtl: - -RTL-SDR -======= - -RTL-SDR (RTL2832U Software Defined Radio) is a low-cost USB dongle originally designed for digital TV reception -that has been repurposed as a wideband software-defined radio. RTL-SDR devices are popular for hobbyist use due to -their affordability and wide range of applications. - -The RTL-SDR is based on the Realtek RTL2832U chipset, which features direct sampling and demodulation of RF -signals. These devices are commonly used for tasks such as listening to FM radio, monitoring aircraft traffic -(ADS-B), receiving weather satellite images, and more. - -Supported Models ----------------- - -- **Generic RTL-SDR Dongle:** The most common variant, usually featuring an R820T or R820T2 tuner. -- **RTL-SDR Blog V3:** An enhanced version with additional features like direct sampling mode and a bias tee for - powering external devices. - -Key Features ------------- - -- **Frequency Range:** Typically from 24 MHz to 1.7 GHz, depending on the tuner chip. -- **Bandwidth:** Limited to about 2.4 MHz, making it suitable for narrowband applications. -- **Connectivity:** USB 2.0 interface, plug-and-play on most platforms. -- **Software Support:** Compatible with SDR software like SDR#, GQRX, and GNU Radio. - -Limitations ------------ - -- Narrow bandwidth compared to more expensive SDRs, which may limit some applications. -- Sensitivity and performance can vary depending on the specific model and components. -- Requires external software for signal processing and analysis. - -Set up instructions (Linux, Radioconda) ---------------------------- - -Step 1: Activate your Radioconda environment. - -.. code-block:: bash - - conda activate - -Step 2: Purge drivers. - -If you already have some other drivers installed, purge them from your system. - -.. code-block:: bash - - sudo apt purge ^librtlsdr - sudo rm -rvf /usr/lib/librtlsdr* - sudo rm -rvf /usr/include/rtl-sdr* - sudo rm -rvf /usr/local/lib/librtlsdr* - sudo rm -rvf /usr/local/include/rtl-sdr* - sudo rm -rvf /usr/local/include/rtl_* - sudo rm -rvf /usr/local/bin/rtl_* - -Step 3: Install RTL-SDR Blog drivers. - -.. code-block:: bash - - sudo apt-get install libusb-1.0-0-dev git cmake pkg-config build-essential - git clone https://github.com/rtlsdrblog/rtl-sdr-blog - cd rtl-sdr-blog/ - mkdir build - cd build - cmake ../ -DINSTALL_UDEV_RULES=ON - make - sudo make install - sudo cp ../rtl-sdr.rules /etc/udev/rules.d/ - sudo ldconfig - -Step 4: Blacklist the DVB-T modules that would otherwise claim the device: - -.. code-block:: bash - - sudo ln -s $CONDA_PREFIX/etc/modprobe.d/rtl-sdr-blacklist.conf /etc/modprobe.d/radioconda-rtl-sdr-blacklist.conf - sudo modprobe -r $(cat $CONDA_PREFIX/etc/modprobe.d/rtl-sdr-blacklist.conf | sed -n -e 's/^blacklist //p') - -Step 5: Install a udev rule by creating a link into your radioconda installation. - -.. code-block:: bash - - sudo ln -s $CONDA_PREFIX/lib/udev/rules.d/rtl-sdr.rules /etc/udev/rules.d/radioconda-rtl-sdr.rules - sudo udevadm control --reload - sudo udevadm trigger - -Further Information -------------------- - -- `Official Website `_ -- `RTL-SDR Quick Start Guide `_ -- 2.34.1