From c2b47ead95657b7f64cbf02d19e07eaf931ae54f Mon Sep 17 00:00:00 2001 From: madrigal Date: Thu, 16 Oct 2025 15:22:07 -0400 Subject: [PATCH] Updated, edited, and cleaned up SDR files --- src/ria_toolkit_oss/sdr/blade.py | 248 +++++++++++++------------ src/ria_toolkit_oss/sdr/hackrf.py | 183 ++++++++++--------- src/ria_toolkit_oss/sdr/pluto.py | 174 +++++++++++------- src/ria_toolkit_oss/sdr/rtlsdr.py | 216 ++++++++++++++-------- src/ria_toolkit_oss/sdr/sdr.py | 29 +-- src/ria_toolkit_oss/sdr/thinkrf.py | 170 +++++++++-------- src/ria_toolkit_oss/sdr/usrp.py | 283 +++++++++++++++++------------ 7 files changed, 758 insertions(+), 545 deletions(-) diff --git a/src/ria_toolkit_oss/sdr/blade.py b/src/ria_toolkit_oss/sdr/blade.py index 7fa6231..6bb0d03 100644 --- a/src/ria_toolkit_oss/sdr/blade.py +++ b/src/ria_toolkit_oss/sdr/blade.py @@ -1,3 +1,5 @@ +import time +import warnings from typing import Optional import numpy as np @@ -35,22 +37,6 @@ class Blade(SDR): super().__init__() - def supports_bias_tee(self) -> bool: - return True - - def set_bias_tee(self, enable: bool, channel: Optional[int] = None): - if channel is None: - channel = getattr(self, "rx_channel", getattr(self, "tx_channel", 0)) - - try: - bladerf_channel = _bladerf.CHANNEL_RX(channel) - self.device.set_bias_tee(bladerf_channel, bool(enable)) - except AttributeError as exc: # pragma: no cover - depends on libbladeRF version - raise NotImplementedError("bladeRF binding lacks bias-tee control") from exc - - state = "enabled" if enable else "disabled" - print(f"BladeRF bias tee {state} on channel {channel}.") - def _shutdown(self, error=0, board=None): print("Shutting down with error code: " + str(error)) if board is not None: @@ -83,9 +69,6 @@ class Blade(SDR): print("FPGA version:\t\t" + str(device.get_fpga_version())) return 0 - def close(self): - self.device.close() - def init_rx( self, sample_rate: int | float, @@ -108,6 +91,9 @@ class Blade(SDR): :type channel: int :param buffer_size: The buffer size during receive. Defaults to 8192. :type buffer_size: int + :param gain_mode: 'absolute' passes gain directly to the sdr, + 'relative' means that gain should be a negative value, and it will be subtracted from the max gain (60). + :type gain_mode: str """ print("Initializing RX") @@ -128,6 +114,93 @@ class Blade(SDR): self._rx_initialized = True self._tx_initialized = False + def _stream_rx(self, callback): + if not self._rx_initialized: + raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record()") + + # Setup synchronous stream + self.device.sync_config( + layout=_bladerf.ChannelLayout.RX_X1, + fmt=_bladerf.Format.SC16_Q11, + num_buffers=16, + buffer_size=self.rx_buffer_size, + num_transfers=8, + stream_timeout=3500000000, + ) + + self.rx_ch.enable = True + self.bytes_per_sample = 4 + + print("Blade Starting RX...") + self._enable_rx = True + + while self._enable_rx: + # Create receive buffer and read in samples to buffer + # Add them to a list to convert and save after stream is finished + buffer = bytearray(self.rx_buffer_size * self.bytes_per_sample) + self.device.sync_rx(buffer, self.rx_buffer_size) + signal = self._convert_rx_samples(buffer) + self.buffer = buffer + # send callback complex signal + callback(buffer=signal, metadata=None) + + # Disable module + print("Blade RX Completed.") + self.rx_ch.enable = False + + def record(self, num_samples: Optional[int] = None, rx_time: Optional[int | float] = None): + if not self._rx_initialized: + raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record()") + + if num_samples is not None and rx_time is not None: + raise ValueError("Only input one of num_samples or rx_time") + elif num_samples is not None: + self._num_samples_to_record = num_samples + elif rx_time is not None: + self._num_samples_to_record = int(rx_time * self.rx_sample_rate) + else: + raise ValueError("Must provide input of one of num_samples or rx_time") + + # Setup synchronous stream + self.device.sync_config( + layout=_bladerf.ChannelLayout.RX_X1, + fmt=_bladerf.Format.SC16_Q11, + num_buffers=16, + buffer_size=self.rx_buffer_size, + num_transfers=8, + stream_timeout=3500000000, + ) + + self.rx_ch.enable = True + self.bytes_per_sample = 4 + + print("Blade Starting RX...") + self._enable_rx = True + + store_array = np.zeros( + (1, (self._num_samples_to_record // self.rx_buffer_size + 1) * self.rx_buffer_size), dtype=np.complex64 + ) + + for i in range(self._num_samples_to_record // self.rx_buffer_size + 1): + # Create receive buffer and read in samples to buffer + # Add them to a list to convert and save after stream is finished + buffer = bytearray(self.rx_buffer_size * self.bytes_per_sample) + self.device.sync_rx(buffer, self.rx_buffer_size) + signal = self._convert_rx_samples(buffer) + store_array[:, i * self.rx_buffer_size : (i + 1) * self.rx_buffer_size] = signal + + # Disable module + print("Blade RX Completed.") + self.rx_ch.enable = False + metadata = { + "source": self.__class__.__name__, + "sample_rate": self.rx_sample_rate, + "center_frequency": self.rx_center_frequency, + "gain": self.rx_gain, + } + + return Recording(data=store_array[:, : self._num_samples_to_record], metadata=metadata) + def init_tx( self, sample_rate: int | float, @@ -150,6 +223,9 @@ class Blade(SDR): :type channel: int :param buffer_size: The buffer size during transmission. Defaults to 8192. :type buffer_size: int + :param gain_mode: 'absolute' passes gain directly to the sdr, + 'relative' means that gain should be a negative value, and it will be subtracted from the max gain (60). + :type gain_mode: str """ # Configure BladeRF @@ -178,87 +254,36 @@ class Blade(SDR): self._rx_initialized = False return 0 - def _stream_rx(self, callback): - if not self._rx_initialized: - raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record()") + def _stream_tx(self, callback): - # Setup synchronous stream + # Setup stream self.device.sync_config( - layout=_bladerf.ChannelLayout.RX_X1, + layout=_bladerf.ChannelLayout.TX_X1, fmt=_bladerf.Format.SC16_Q11, num_buffers=16, - buffer_size=self.rx_buffer_size, + buffer_size=8192, num_transfers=8, - stream_timeout=3500000000, + stream_timeout=3500, ) - self.rx_ch.enable = True - self.bytes_per_sample = 4 + # Enable module + self.tx_ch.enable = True + self._enable_tx = True - print("Blade Starting RX...") - self._enable_rx = True + print("Blade Starting TX...") - while self._enable_rx: - # Create receive buffer and read in samples to buffer - # Add them to a list to convert and save after stream is finished - buffer = bytearray(self.rx_buffer_size * self.bytes_per_sample) - self.device.sync_rx(buffer, self.rx_buffer_size) - signal = self._convert_rx_samples(buffer) - # samples = convert_to_2xn(signal) - self.buffer = buffer - # send callback complex signal - callback(buffer=signal, metadata=None) + while self._enable_tx: + buffer = callback(self.tx_buffer_size) # [0] + byte_array = self._convert_tx_samples(buffer) + self.device.sync_tx(byte_array, len(buffer)) # Disable module - print("Blade RX Completed.") - self.rx_ch.enable = False - - def record(self, num_samples): - if not self._rx_initialized: - raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record()") - - # Setup synchronous stream - self.device.sync_config( - layout=_bladerf.ChannelLayout.RX_X1, - fmt=_bladerf.Format.SC16_Q11, - num_buffers=16, - buffer_size=self.rx_buffer_size, - num_transfers=8, - stream_timeout=3500000000, - ) - - self.rx_ch.enable = True - self.bytes_per_sample = 4 - - print("Blade Starting RX...") - self._enable_rx = True - - store_array = np.zeros((1, (num_samples // self.rx_buffer_size + 1) * self.rx_buffer_size), dtype=np.complex64) - - for i in range(num_samples // self.rx_buffer_size + 1): - # Create receive buffer and read in samples to buffer - # Add them to a list to convert and save after stream is finished - buffer = bytearray(self.rx_buffer_size * self.bytes_per_sample) - self.device.sync_rx(buffer, self.rx_buffer_size) - signal = self._convert_rx_samples(buffer) - # samples = convert_to_2xn(signal) - store_array[:, i * self.rx_buffer_size : (i + 1) * self.rx_buffer_size] = signal - - # Disable module - print("Blade RX Completed.") - self.rx_ch.enable = False - metadata = { - "source": self.__class__.__name__, - "sample_rate": self.rx_sample_rate, - "center_frequency": self.rx_center_frequency, - "gain": self.rx_gain, - } - - return Recording(data=store_array[:, :num_samples], metadata=metadata) + print("Blade TX Completed.") + self.tx_ch.enable = False def tx_recording( self, - recording: "Recording | np.ndarray", + recording: Recording | np.ndarray, num_samples: Optional[int] = None, tx_time: Optional[int | float] = None, ): @@ -275,9 +300,6 @@ class Blade(SDR): recording to this length. Defaults to None. :type tx_time: int or float, optional """ - import warnings - import time - from ria_toolkit_oss.datatypes.recording import Recording if num_samples is not None and tx_time is not None: raise ValueError("Only input one of num_samples or tx_time") @@ -327,7 +349,7 @@ class Blade(SDR): sample_index = 0 chunk_size = min(self.tx_buffer_size, len(samples)) - chunk = samples[sample_index:sample_index + chunk_size] + chunk = samples[sample_index : sample_index + chunk_size] sample_index += chunk_size # Convert and transmit @@ -341,33 +363,6 @@ class Blade(SDR): print("Blade TX Completed.") self.tx_ch.enable = False - def _stream_tx(self, callback): - - # Setup stream - self.device.sync_config( - layout=_bladerf.ChannelLayout.TX_X1, - fmt=_bladerf.Format.SC16_Q11, - num_buffers=16, - buffer_size=8192, - num_transfers=8, - stream_timeout=3500, - ) - - # Enable module - self.tx_ch.enable = True - self._enable_tx = True - - print("Blade Starting TX...") - - while self._enable_tx: - buffer = callback(self.tx_buffer_size) # [0] - byte_array = self._convert_tx_samples(buffer) - self.device.sync_tx(byte_array, len(buffer)) - - # Disable module - print("Blade TX Completed.") - self.tx_ch.enable = False - def _convert_rx_samples(self, samples): samples = np.frombuffer(samples, dtype=np.int16).astype(np.float32) samples /= 2048 @@ -486,3 +481,22 @@ class Blade(SDR): print(f"Clock source set to {self.device.get_clock_select()}") print(f"PLL Reference set to {self.device.get_pll_refclk()}") + + def supports_bias_tee(self) -> bool: + return True + + def set_bias_tee(self, enable: bool, channel: Optional[int] = None): + if channel is None: + channel = getattr(self, "rx_channel", getattr(self, "tx_channel", 0)) + + try: + bladerf_channel = _bladerf.CHANNEL_RX(channel) + self.device.set_bias_tee(bladerf_channel, bool(enable)) + except AttributeError as exc: # pragma: no cover - depends on libbladeRF version + raise NotImplementedError("bladeRF binding lacks bias-tee control") from exc + + state = "enabled" if enable else "disabled" + print(f"BladeRF bias tee {state} on channel {channel}.") + + def close(self): + self.device.close() diff --git a/src/ria_toolkit_oss/sdr/hackrf.py b/src/ria_toolkit_oss/sdr/hackrf.py index 189a983..dc8c01d 100644 --- a/src/ria_toolkit_oss/sdr/hackrf.py +++ b/src/ria_toolkit_oss/sdr/hackrf.py @@ -1,6 +1,5 @@ import time import warnings -import math from typing import Optional import numpy as np @@ -36,16 +35,14 @@ class HackRF(SDR): super().__init__() - def supports_bias_tee(self) -> bool: - return True - - def set_bias_tee(self, enable: bool): - try: - self.radio.set_antenna_enable(bool(enable)) - except AttributeError as exc: # pragma: no cover - defensive - raise NotImplementedError("Underlying HackRF interface lacks bias-tee control") from exc - - def init_rx(self, sample_rate, center_frequency, gain, channel, gain_mode): + def init_rx( + self, + sample_rate: int | float, + center_frequency: int | float, + gain: int, + channel: int, + gain_mode: Optional[str] = "absolute", + ): """ Initializes the HackRF for receiving. @@ -58,11 +55,12 @@ class HackRF(SDR): :type sample_rate: int or float :param center_frequency: The center frequency of the recording. :type center_frequency: int or float - :param gain: The total gain set for receiving on the HackRF (distributed across stages) + :param gain: The LNA gain set for receiving on the HackRF :type gain: int :param channel: The channel the HackRF is set to. (Not actually used) :type channel: int - :param gain_mode: Gain mode setting. Currently only "absolute" is supported. + :param gain_mode: 'absolute' passes gain directly to the sdr, + 'relative' means that gain should be a negative value, and it will be subtracted from the max gain (40). :type gain_mode: str """ print("Initializing RX") @@ -77,7 +75,7 @@ class HackRF(SDR): # Distribute gain across amplifier stages rx_gain_min = 0 - rx_gain_max = 116 # 14 (amp) + 40 (LNA) + 62 (VGA) + rx_gain_max = 40 # (LNA) if gain_mode == "relative": if gain > 0: @@ -95,42 +93,61 @@ class HackRF(SDR): print(f"Gain {gain} out of range for HackRF.") print(f"Gain range: {rx_gain_min} to {rx_gain_max} dB") - # Distribute gain using the signal-testbed algorithm - enable_amp = False - remaining_gain = abs_gain + self.set_gain_amp(False) + self.set_rx_vga_gain(45) + self.set_rx_lna_gain(abs_gain) - # Enable 14 dB pre-amp if gain is high enough - if remaining_gain > 30: - remaining_gain = remaining_gain - 14 - enable_amp = True - print("HackRF: 14dB front-end amplifier enabled.") + print(f"HackRF gain distribution: Amp={self.amp_enabled}, LNA={self.rx_lna_gain}dB, VGA={self.rx_vga_gain}dB") + print("To individually modify the HackRF gains, use set_gain_amp(), set_rx_lna_gain(), and set_rx_vga_gain().") - # Distribute remaining gain between LNA and VGA - # LNA gets 60% of remaining gain, rounded down to 8 dB steps - lna_gain = math.floor(remaining_gain * 0.6) - lna_gain = lna_gain - (lna_gain % 8) # Round to 8 dB steps - if lna_gain > 40: - lna_gain = 40 - - # VGA gets the rest - vga_gain = remaining_gain - lna_gain - if vga_gain > 62: - vga_gain = 62 - - # Apply gain settings - if enable_amp: - self.radio.enable_amp() - else: - self.radio.disable_amp() - - self.radio.set_lna_gain(lna_gain) - self.radio.set_vga_gain(vga_gain) - - self.rx_gain = abs_gain - print(f"HackRF gain distribution: Amp={enable_amp}, LNA={lna_gain}dB, VGA={vga_gain}dB") - - self._rx_initialized = True self._tx_initialized = False + self._rx_initialized = True + + def record(self, num_samples: Optional[int] = None, rx_time: Optional[int | float] = None): + """ + Create a radio recording (iq samples and metadata) of a given length from the SDR. + HackRF uses block capture mode, which is more reliable than streaming for USB2 connections. + Either num_samples or rx_time must be provided. + init_rx() must be called before record() + + :param num_samples: The number of samples to record. + :type num_samples: int, optional + :param rx_time: The time to record. + :type rx_time: int or float, optional + + returns: Recording object (iq samples and metadata) + """ + if not self._rx_initialized: + raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record()") + + if num_samples is not None and rx_time is not None: + raise ValueError("Only input one of num_samples or rx_time") + elif num_samples is not None: + self._num_samples_to_record = num_samples + elif rx_time is not None: + self._num_samples_to_record = int(rx_time * self.rx_sample_rate) + else: + raise ValueError("Must provide input of one of num_samples or rx_time") + + print("HackRF Starting RX...") + + # Use libhackrf's block capture method + all_samples = self.radio.read_samples(self._num_samples_to_record) + + print("HackRF RX Completed.") + + # Create 1xN array for single-channel recording + store_array = np.zeros((1, self._num_samples_to_record), dtype=np.complex64) + store_array[0, :] = all_samples + + metadata = { + "source": self.__class__.__name__, + "sample_rate": self.rx_sample_rate, + "center_frequency": self.rx_center_frequency, + "gain": self.rx_gain, + } + + return Recording(data=store_array, metadata=metadata) def init_tx( self, @@ -164,8 +181,6 @@ class HackRF(SDR): self.radio.center_freq = int(center_frequency) print(f"HackRF center frequency = {self.radio.center_freq}") - self.radio.enable_amp() - tx_gain_min = 0 tx_gain_max = 47 if gain_mode == "relative": @@ -184,8 +199,10 @@ class HackRF(SDR): print(f"Gain {gain} out of range for Pluto.") print(f"Gain range: {tx_gain_min} to {tx_gain_max} dB") - self.radio.txvga_gain = abs_gain - print(f"HackRF gain = {self.radio.txvga_gain}") + self.set_gain_amp(True) + self.set_tx_vga_gain(abs_gain) + print(f"HackRF gain distribution: Amp={self.amp_enabled}, VGA={self.tx_vga_gain}dB") + print("To individually modify the HackRF gains, use set_gain_amp() or set_tx_vga_gain().") self._tx_initialized = True self._rx_initialized = False @@ -236,46 +253,41 @@ class HackRF(SDR): self.radio.stop_tx() print("HackRF Tx Completed.") - def set_clock_source(self, source): + def set_gain_amp(self, enable): + if enable: + self.radio.enable_amp() + self.amp_enabled = True + else: + self.radio.disable_amp() + self.amp_enabled = False + def set_rx_lna_gain(self, lna_gain): + self.radio.set_lna_gain(lna_gain) + self.rx_lna_gain = lna_gain + + def set_rx_vga_gain(self, vga_gain): + self.radio.set_vga_gain(vga_gain) + self.rx_vga_gain = vga_gain + + def set_tx_vga_gain(self, vga_gain): + self.radio.set_txvga_gain(vga_gain) + self.tx_vga_gain = vga_gain + + def set_clock_source(self, source): self.radio.set_clock_source(source) + def supports_bias_tee(self) -> bool: + return True + + def set_bias_tee(self, enable: bool): + try: + self.radio.set_antenna_enable(bool(enable)) + except AttributeError as exc: # pragma: no cover - defensive + raise NotImplementedError("Underlying HackRF interface lacks bias-tee control") from exc + def close(self): self.radio.close() - def record(self, num_samples): - """ - Record a specified number of samples from the HackRF using block capture mode. - This is more reliable than streaming for USB2 connections. - - :param num_samples: Number of samples to capture - :type num_samples: int - :return: Recording object containing the captured data - :rtype: Recording - """ - if not self._rx_initialized: - raise RuntimeError("RX was not initialized. init_rx() must be called before record()") - - print("HackRF Starting RX...") - - # Use libhackrf's block capture method - all_samples = self.radio.read_samples(num_samples) - - print("HackRF RX Completed.") - - # Create 1xN array for single-channel recording - store_array = np.zeros((1, num_samples), dtype=np.complex64) - store_array[0, :] = all_samples - - metadata = { - "source": self.__class__.__name__, - "sample_rate": self.rx_sample_rate, - "center_frequency": self.rx_center_frequency, - "gain": self.rx_gain, - } - - return Recording(data=store_array, metadata=metadata) - def _stream_rx(self, callback): """ Stream samples from the HackRF using a callback function. @@ -300,6 +312,7 @@ class HackRF(SDR): # Use ctypes string_at to safely copy the buffer from ctypes import string_at + byte_data = string_at(c.buffer, c.valid_length) # Convert bytes to int8, then to float32, then view as complex64 diff --git a/src/ria_toolkit_oss/sdr/pluto.py b/src/ria_toolkit_oss/sdr/pluto.py index e95ecdd..6c617ee 100644 --- a/src/ria_toolkit_oss/sdr/pluto.py +++ b/src/ria_toolkit_oss/sdr/pluto.py @@ -48,6 +48,7 @@ class Pluto(SDR): print(f"Successfully found MIMO-capable Pluto (Rev C/D) with identifier [{identifier}].") else: # Non-MIMO hardware (Rev B) - use standard Pluto driver + del test_radio self.radio = adi.Pluto(uri) self._mimo_capable = False print(f"Successfully found Pluto (Rev B) with identifier [{identifier}].") @@ -75,8 +76,9 @@ class Pluto(SDR): :type gain: int :param channel: The channel the Pluto is set to. Must be 0 or 1. 0 enables channel 1, 1 enables both channels. :type channel: int - :param buffer_size: The buffer size during receive. Defaults to 10000. - :type buffer_size: int + :param gain_mode: 'absolute' passes gain directly to the sdr, + 'relative' means that gain should be a negative value, and it will be subtracted from the max gain (74). + :type gain_mode: str """ print("Initializing RX") @@ -100,36 +102,20 @@ class Pluto(SDR): else: raise ValueError("Channel must be either 0 or 1.") - rx_gain_min = 0 - rx_gain_max = 74 - - if gain_mode == "relative": - if gain > 0: - raise ValueError( - "When gain_mode = 'relative', gain must be < 0. This sets \ - the gain relative to the maximum possible gain." - ) - else: - abs_gain = rx_gain_max + gain - else: - abs_gain = gain - - if abs_gain < rx_gain_min or abs_gain > rx_gain_max: - abs_gain = min(max(gain, rx_gain_min), rx_gain_max) - print(f"Gain {gain} out of range for Pluto.") - print(f"Gain range: {rx_gain_min} to {rx_gain_max} dB") - - self.set_rx_gain(gain=abs_gain, channel=channel) + self.set_rx_gain(gain=gain, channel=channel, gain_mode=gain_mode) if channel == 0: print(f"Pluto gain = {self.radio.rx_hardwaregain_chan0}") elif channel == 1: - self.set_rx_gain(gain=abs_gain, channel=0) + self.set_rx_gain(gain=gain, channel=0, gain_mode=gain_mode) print(f"Pluto gain = {self.radio.rx_hardwaregain_chan0}, {self.radio.rx_hardwaregain_chan1}") - self.radio.rx_buffer_size = 1024 # TODO deal with this for zmq + self.set_rx_buffer_size(getattr(self, "rx_buffer_size", 1024)) + self._rx_initialized = True self._tx_initialized = False + return {"sample_rate": self.rx_sample_rate, "center_frequency": self.rx_center_frequency, "gain": self.rx_gain} + def init_tx( self, sample_rate: int | float, @@ -150,8 +136,9 @@ class Pluto(SDR): :type gain: int :param channel: The channel the Pluto is set to. Must be 0 or 1. 0 enables channel 1, 1 enables both channels. :type channel: int - :param buffer_size: The buffer size during transmit. Defaults to 10000. - :type buffer_size: int + :param gain_mode: 'absolute' passes gain directly to the sdr, + 'relative' means that gain should be a negative value, and it will be subtracted from the max gain (0). + :type gain_mode: str """ print("Initializing TX") @@ -162,7 +149,10 @@ class Pluto(SDR): self.set_tx_center_frequency(center_frequency=int(center_frequency)) print(f"Pluto center frequency = {self.radio.tx_lo}") - if channel == 1: + if channel == 0: + self.radio.tx_enabled_channels = [0] + print(f"Pluto channel(s) = {self.radio.tx_enabled_channels}") + elif channel == 1: if not self._mimo_capable: raise ValueError( "Dual TX channel requested (channel=1) but hardware is not MIMO-capable. " @@ -170,41 +160,21 @@ class Pluto(SDR): ) self.radio.tx_enabled_channels = [0, 1] print(f"Pluto channel(s) = {self.radio.tx_enabled_channels}") - elif channel == 0: - self.radio.tx_enabled_channels = [0] - print(f"Pluto channel(s) = {self.radio.tx_enabled_channels}") else: raise ValueError("Channel must be either 0 or 1.") - tx_gain_min = -89 - tx_gain_max = 0 - - if gain_mode == "relative": - if gain > 0: - raise ValueError( - "When gain_mode = 'relative', gain must be < 0. This sets\ - the gain relative to the maximum possible gain." - ) - else: - abs_gain = tx_gain_max + gain - else: - abs_gain = gain - - if abs_gain < tx_gain_min or abs_gain > tx_gain_max: - abs_gain = min(max(gain, tx_gain_min), tx_gain_max) - print(f"Gain {gain} out of range for Pluto.") - print(f"Gain range: {tx_gain_min} to {tx_gain_max} dB") - - self.set_tx_gain(gain=abs_gain, channel=channel) + self.set_tx_gain(gain=gain, channel=channel, gain_mode=gain_mode) if channel == 0: print(f"Pluto gain = {self.radio.tx_hardwaregain_chan0}") elif channel == 1: - self.set_tx_gain(gain=abs_gain, channel=0) + self.set_tx_gain(gain=gain, channel=0, gain_mode=gain_mode) print(f"Pluto gain = {self.radio.tx_hardwaregain_chan0}, {self.radio.tx_hardwaregain_chan1}") self._tx_initialized = True self._rx_initialized = False + return {"sample_rate": self.tx_sample_rate, "center_frequency": self.tx_center_frequency, "gain": self.tx_gain} + def _stream_rx(self, callback): if not self._rx_initialized: raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record()") @@ -323,11 +293,6 @@ class Pluto(SDR): self.radio.tx_cyclic_buffer = False print("Pluto TX Completed.") - def close(self): - if self.radio.tx_cyclic_buffer: - self.radio.tx_destroy_buffer() - del self.radio - def tx_recording(self, recording: Recording | np.ndarray | list, num_samples=None, tx_time=None, mode="timed"): """ Transmit the given iq samples from the provided recording. @@ -407,28 +372,47 @@ class Pluto(SDR): except ValueError as e: _handle_OSError(e) - def set_rx_gain(self, gain, channel=0): - self.rx_gain = gain + def set_rx_gain(self, gain, channel=0, gain_mode="absolute"): + rx_gain_min = 0 + rx_gain_max = 74 + + if gain_mode == "relative": + if gain > 0: + raise ValueError( + "When gain_mode = 'relative', gain must be < 0. This sets \ + the gain relative to the maximum possible gain." + ) + else: + abs_gain = rx_gain_max + gain + else: + abs_gain = gain + + if abs_gain < rx_gain_min or abs_gain > rx_gain_max: + abs_gain = min(max(gain, rx_gain_min), rx_gain_max) + print(f"Gain {gain} out of range for Pluto.") + print(f"Gain range: {rx_gain_min} to {rx_gain_max} dB") + + self.rx_gain = abs_gain try: if channel == 0: - if gain is None: + if abs_gain is None: self.radio.gain_control_mode_chan0 = "automatic" print("Using Pluto Automatic Gain Control.") else: self.radio.gain_control_mode_chan0 = "manual" - self.radio.rx_hardwaregain_chan0 = gain # dB + self.radio.rx_hardwaregain_chan0 = abs_gain # dB elif channel == 1: try: - if gain is None: + if abs_gain is None: self.radio.gain_control_mode_chan1 = "automatic" print("Using Pluto Automatic Gain Control.") else: self.radio.gain_control_mode_chan1 = "manual" - self.radio.rx_hardwaregain_chan1 = gain # dB + self.radio.rx_hardwaregain_chan1 = abs_gain # dB except Exception as e: print("Failed to use channel 1 on the PlutoSDR. \nThis is only available for revC versions.") @@ -443,10 +427,31 @@ class Pluto(SDR): _handle_OSError(e) def set_rx_channel(self, channel): - self.rx_channel = channel + if channel == 0: + self.radio.rx_enabled_channels = [0] + print(f"Pluto channel(s) = {self.radio.rx_enabled_channels}") + elif channel == 1: + self.radio.rx_enabled_channels = [0, 1] + print(f"Pluto channel(s) = {self.radio.rx_enabled_channels}") + else: + raise ValueError("Channel must be either 0 or 1.") def set_rx_buffer_size(self, buffer_size): - raise NotImplementedError + if buffer_size is None: + raise ValueError("Buffer_size must be provided.") + buffer_size = int(buffer_size) + if buffer_size <= 0: + raise ValueError("Buffer_size must be a positive integer.") + + self.rx_buffer_size = buffer_size + + if hasattr(self, "radio"): + try: + self.radio.rx_buffer_size = buffer_size + except OSError as e: + _handle_OSError(e) + except ValueError as e: + _handle_OSError(e) def set_tx_center_frequency(self, center_frequency): try: @@ -468,14 +473,33 @@ class Pluto(SDR): except ValueError as e: _handle_OSError(e) - def set_tx_gain(self, gain, channel=0): + def set_tx_gain(self, gain, channel=0, gain_mode="absolute"): + tx_gain_min = -89 + tx_gain_max = 0 + + if gain_mode == "relative": + if gain > 0: + raise ValueError( + "When gain_mode = 'relative', gain must be < 0. This sets\ + the gain relative to the maximum possible gain." + ) + else: + abs_gain = tx_gain_max + gain + else: + abs_gain = gain + + if abs_gain < tx_gain_min or abs_gain > tx_gain_max: + abs_gain = min(max(gain, tx_gain_min), tx_gain_max) + print(f"Gain {gain} out of range for Pluto.") + print(f"Gain range: {tx_gain_min} to {tx_gain_max} dB") + try: - self.tx_gain = gain + self.tx_gain = abs_gain if channel == 0: - self.radio.tx_hardwaregain_chan0 = int(gain) + self.radio.tx_hardwaregain_chan0 = int(abs_gain) elif channel == 1: - self.radio.tx_hardwaregain_chan1 = int(gain) + self.radio.tx_hardwaregain_chan1 = int(abs_gain) else: raise ValueError(f"Pluto channel must be 0 or 1 but was {channel}.") @@ -485,11 +509,23 @@ class Pluto(SDR): _handle_OSError(e) def set_tx_channel(self, channel): - raise NotImplementedError + if channel == 1: + self.radio.tx_enabled_channels = [0, 1] + print(f"Pluto channel(s) = {self.radio.tx_enabled_channels}") + elif channel == 0: + self.radio.tx_enabled_channels = [0] + print(f"Pluto channel(s) = {self.radio.tx_enabled_channels}") + else: + raise ValueError("Channel must be either 0 or 1.") def set_tx_buffer_size(self, buffer_size): raise NotImplementedError + def close(self): + if self.radio.tx_cyclic_buffer: + self.radio.tx_destroy_buffer() + del self.radio + def shutdown(self): del self.radio diff --git a/src/ria_toolkit_oss/sdr/rtlsdr.py b/src/ria_toolkit_oss/sdr/rtlsdr.py index a4fa8a1..dfe2ed0 100644 --- a/src/ria_toolkit_oss/sdr/rtlsdr.py +++ b/src/ria_toolkit_oss/sdr/rtlsdr.py @@ -11,35 +11,42 @@ try: except ImportError as exc: # pragma: no cover - dependency provided by end user raise ImportError("pyrtlsdr is required to use the RTLSDR class") from exc +from ria_toolkit_oss.datatypes.recording import Recording from ria_toolkit_oss.sdr.sdr import SDR class RTLSDR(SDR): """SDR interface for RTL-SDR dongles using pyrtlsdr.""" - def __init__(self, identifier: Optional[int | str] = None): - super().__init__() + def __init__(self, identifier: Optional[str] = None): + """ + Initialize a Pluto SDR device object and connect to the SDR hardware. + This software supports the ADALM Pluto SDR created by Analog Devices. + + :param identifier: The value of the parameter that identifies the device. + :type identifier: str = "192.168.3.1", "pluto.local", etc + + If no identifier is provided, it will select the first device found, with a warning. + If more than one device is found with the identifier, it will select the first of those devices. + """ + print(f"Initializing Pluto radio with identifier [{identifier}].") try: + super().__init__() + if identifier is None: self.radio = RtlSdr() else: self.radio = RtlSdr(identifier) + + self.rx_buffer_size = 256_000 + self.rx_channel = 0 + print(f"Initialized RTL-SDR with identifier [{identifier}].") - except Exception as exc: - print(f"Failed to initialize RTL-SDR with identifier [{identifier}].") - raise exc - self.rx_buffer_size = 256_000 - self.rx_channel = 0 - - def supports_bias_tee(self) -> bool: - return True - - def set_bias_tee(self, enable: bool): - self.radio.set_bias_tee(bool(enable)) - state = "enabled" if enable else "disabled" - print(f"RTL-SDR bias tee {state}.") + except Exception as e: + print(f"Failed to find RTL-SDR with identifier [{identifier}].") + raise e def init_rx( self, @@ -54,43 +61,9 @@ class RTLSDR(SDR): if channel not in (0, None): raise ValueError("RTL-SDR supports only channel 0 for RX.") - self.radio.sample_rate = float(sample_rate) - self.rx_sample_rate = self.radio.sample_rate - - self.radio.center_freq = float(center_frequency) - self.rx_center_frequency = self.radio.center_freq - - available_gains = getattr(self.radio, "gains", []) - if gain is None: - self.radio.gain = "auto" - self.rx_gain = "auto" - else: - if not available_gains: - warnings.warn( - "No gain table reported by RTL-SDR; applying requested gain directly.", - RuntimeWarning, - ) - target_gain = gain - else: - if gain_mode == "relative": - if gain > 0: - raise ValueError( - "When gain_mode = 'relative', gain must be < 0. This sets the gain relative to the maximum." - ) - target_gain = max(available_gains) + gain - else: - target_gain = gain - - min_gain = min(available_gains) - max_gain = max(available_gains) - if target_gain < min_gain or target_gain > max_gain: - print(f"Requested gain {target_gain} dB out of range; clamping to valid span {min_gain}-{max_gain} dB.") - target_gain = min(max(target_gain, min_gain), max_gain) - - target_gain = min(available_gains, key=lambda g: abs(g - target_gain)) - - self.radio.gain = target_gain - self.rx_gain = self.radio.gain + self.set_rx_sample_rate(sample_rate=sample_rate) + self.set_rx_center_frequency(center_frequency=center_frequency) + self.set_rx_gain(gain=gain, gain_mode=gain_mode) self.rx_buffer_size = int(buffer_size or self.rx_buffer_size) self.rx_channel = 0 @@ -102,25 +75,112 @@ class RTLSDR(SDR): self._rx_initialized = True self._tx_initialized = False - def init_tx(self, *args, **kwargs): # pragma: no cover - RTL-SDR is RX only - raise NotImplementedError("RTL-SDR does not support transmit operations") + return {"sample_rate": self.rx_sample_rate, "center_frequency": self.rx_center_frequency, "gain": self.rx_gain} - def record(self, num_samples): + def get_rx_sample_rate(self): """ - Record a fixed number of samples from RTL-SDR. - - Args: - num_samples: Number of samples to capture + Retrieve the current sample rate of the receiver. Returns: - Recording object with captured samples + float: The receiver's sample rate in samples per second (Hz). + """ + return self.rx_sample_rate + + def get_rx_center_frequency(self): + """ + Retrieve the current center frequency of the receiver. + + Returns: + float: The receiver's center frequency in Hertz (Hz). + """ + return self.rx_center_frequency + + def get_rx_gain(self): + """ + Retrieve the current gain setting of the receiver. + + Returns: + float: The receiver's gain in decibels (dB). + """ + return self.rx_gain + + def set_rx_sample_rate(self, sample_rate): + self.radio.sample_rate = float(sample_rate) + self.rx_sample_rate = self.radio.sample_rate + print(f"RTL RX Sample Rate = {self.radio.get_sample_rate()}") + + def set_rx_center_frequency(self, center_frequency): + self.radio.center_freq = float(center_frequency) + self.rx_center_frequency = self.radio.center_freq + print(f"RTL RX Center Frequency = {self.radio.get_center_freq()}") + + def set_rx_gain(self, gain, gain_mode="absolute"): + available_gains = self.radio.get_gains() + + if gain is None: + self.radio.gain = "auto" + self.rx_gain = "auto" + else: + if not available_gains: + warnings.warn( + "No gain table reported by RTL-SDR; applying requested gain directly.", + RuntimeWarning, + ) + target_gain = gain + else: + min_gain = min(available_gains) + max_gain = max(available_gains) + + if gain_mode == "relative": + if gain > 0: + raise ValueError( + "When gain_mode = 'relative', gain must be < 0. This sets\ + the gain relative to the maximum possible gain." + ) + target_gain = max_gain + gain + else: + target_gain = gain + + if target_gain < min_gain or target_gain > max_gain: + print( + f"Requested gain {target_gain} dB out of range;\ + clamping to valid span {min_gain}-{max_gain} dB." + ) + target_gain = min(max(target_gain, min_gain), max_gain) + + target_gain = min(available_gains, key=lambda g: abs(g - target_gain)) + + self.radio.set_gain(target_gain) + self.rx_gain = self.radio.get_gain() + + print(f"RTL RX Gain = {self.radio.get_gain()}") + print(f"Available RTL RX Gains: {available_gains}") + + def record(self, num_samples: Optional[int] = None, rx_time: Optional[int | float] = None): + """ + Create a radio recording (iq samples and metadata) of a given length from the RTL-SDR. + Either num_samples or rx_time must be provided. + init_rx() must be called before record() + + :param num_samples: The number of samples to record. + :type num_samples: int, optional + :param rx_time: The time to record. + :type rx_time: int or float, optional + + returns: Recording object (iq samples and metadata) """ - from ria_toolkit_oss.datatypes.recording import Recording if not self._rx_initialized: raise RuntimeError("RX was not initialized. init_rx() must be called before record().") - print("RTL-SDR Starting RX...") + if num_samples is not None and rx_time is not None: + raise ValueError("Only input one of num_samples or rx_time") + elif num_samples is not None: + pass + elif rx_time is not None: + num_samples = int(rx_time * self.rx_sample_rate) + else: + raise ValueError("Must provide input of one of num_samples or rx_time") # RTL-SDR has USB buffer limitations - use consistent 256k chunks # Always read full chunks to avoid USB overflow issues with partial reads @@ -129,8 +189,10 @@ class RTLSDR(SDR): remainder = num_samples % max_samples_per_read signal = np.array([], dtype=np.complex64) + print("RTL-SDR Starting RX...") + # Read full chunks - for i in range(num_full_reads): + for _ in range(num_full_reads): try: chunk = self.radio.read_samples(max_samples_per_read) signal = np.append(signal, chunk) @@ -150,10 +212,6 @@ class RTLSDR(SDR): print("RTL-SDR RX Completed.") - # Create 1xN array for single-channel recording - store_array = np.zeros((1, len(signal)), dtype=np.complex64) - store_array[0, :] = signal - metadata = { "source": self.__class__.__name__, "sample_rate": self.rx_sample_rate, @@ -161,7 +219,7 @@ class RTLSDR(SDR): "gain": self.rx_gain, } - return Recording(data=store_array, metadata=metadata) + return Recording(data=signal, metadata=metadata) def _stream_rx(self, callback): if not self._rx_initialized: @@ -179,12 +237,28 @@ class RTLSDR(SDR): def _stream_tx(self, callback): # pragma: no cover - RTL-SDR is RX only raise NotImplementedError("RTL-SDR does not support transmit operations") + def init_tx(self, *args, **kwargs): # pragma: no cover - RTL-SDR is RX only + raise NotImplementedError("RTL-SDR does not support transmit operations") + + def tx_recording( + self, recording: Recording | np.ndarray | list, num_samples=None, tx_time=None + ): # pragma: no cover - RTL-SDR is RX only + raise NotImplementedError("RTL-SDR does not support transmit operations") + + def supports_bias_tee(self) -> bool: + return True + + def set_bias_tee(self, enable: bool): + self.radio.set_bias_tee(bool(enable)) + state = "enabled" if enable else "disabled" + print(f"RTL-SDR bias tee {state}.") + + def set_clock_source(self, source): # pragma: no cover - not applicable to RTL-SDR + raise NotImplementedError("RTL-SDR does not support external clock configuration") + def close(self): try: self.radio.close() finally: self._enable_rx = False self._enable_tx = False - - def set_clock_source(self, source): # pragma: no cover - not applicable to RTL-SDR - raise NotImplementedError("RTL-SDR does not support external clock configuration") diff --git a/src/ria_toolkit_oss/sdr/sdr.py b/src/ria_toolkit_oss/sdr/sdr.py index 7f70ff9..c191e97 100644 --- a/src/ria_toolkit_oss/sdr/sdr.py +++ b/src/ria_toolkit_oss/sdr/sdr.py @@ -295,26 +295,27 @@ class SDR(ABC): return samples + def supports_bias_tee(self) -> bool: + """Return True when the radio supports bias-tee control.""" + return False + + def set_bias_tee(self, enable: bool): + """Enable or disable bias-tee power when supported by the radio.""" + raise NotImplementedError(f"{self.__class__.__name__} does not support bias-tee control") + def pause_rx(self): self._enable_rx = False def pause_tx(self): self._enable_tx = False - def stop(self): - self.pause_rx() - - def supports_bias_tee(self) -> bool: - """Return True when the radio supports bias-tee control.""" - return False - - def set_bias_tee(self, enable: bool): - """Enable or disable bias-tee power when supported by the radio.""" - raise NotImplementedError(f"{self.__class__.__name__} does not support bias-tee control") - - @abstractmethod - def close(self): - pass + def stop(self): + self.pause_rx() + self.pause_tx() + + @abstractmethod + def close(self): + pass @abstractmethod def init_rx(self, sample_rate, center_frequency, gain, channel, gain_mode): diff --git a/src/ria_toolkit_oss/sdr/thinkrf.py b/src/ria_toolkit_oss/sdr/thinkrf.py index ccbe9e9..93de2cd 100644 --- a/src/ria_toolkit_oss/sdr/thinkrf.py +++ b/src/ria_toolkit_oss/sdr/thinkrf.py @@ -116,21 +116,7 @@ class ThinkRF(SDR): raise ValueError("ThinkRF devices expose a single receive channel") stream_mode = getattr(self, "_capture_mode", "block") == "stream" - - # Enforce sample rate / decimation - # Note: decimation parameter takes precedence if provided - actual_decimation, actual_sample_rate = self.enforce_sample_rate(sample_rate, decimation) - - if stream_mode and actual_decimation < self._min_stream_decimation: - enforced = self._min_stream_decimation - print( - "Requested ThinkRF sample rate exceeds typical GigE throughput; " - f"enforcing decimation {enforced} for streaming." - ) - actual_decimation = enforced - actual_sample_rate = self.BASE_SAMPLE_RATE / actual_decimation - - self._decimation = actual_decimation + actual_decimation, actual_sample_rate = self.set_rx_sample_rate(sample_rate=sample_rate, decimation=decimation) self.radio.reset() self.radio.scpiset(":SYSTEM:FLUSH") @@ -138,9 +124,11 @@ class ThinkRF(SDR): self.radio.scpiset(":TRACE:STREAM:STOP") except Exception: pass + self.radio.rfe_mode(self._rfe_mode) - self.radio.freq(int(center_frequency)) - attenuation = self._attenuation if gain is None else int(gain) + self.set_rx_center_frequency(center_frequency=center_frequency) + + attenuation = self._attenuation if gain is None else int(gain) # gain attenuation = max(0, min(attenuation, 30)) self.radio.attenuator(attenuation) @@ -159,12 +147,12 @@ class ThinkRF(SDR): if stream_mode: self._streaming_active = False else: - print(f"ThinkRF: Configuring block capture - SPP={self._samples_per_packet}, PPB={self._packets_per_block}") + print( + f"ThinkRF: Configuring block capture - SPP={self._samples_per_packet}, PPB={self._packets_per_block}" + ) self.radio.scpiset(f":TRACE:BLOCK:PACKETS {self._packets_per_block}") self.radio.scpiset(":TRACE:BLOCK:DATA?") - self.rx_sample_rate = actual_sample_rate - self.rx_center_frequency = center_frequency self.rx_gain = { "attenuation_dB": attenuation, "profile": gain_profile, @@ -179,21 +167,35 @@ class ThinkRF(SDR): self._rx_initialized = True self._tx_initialized = False - def init_tx( - self, - sample_rate: int | float, - center_frequency: int | float, - gain: int, - channel: int, - gain_mode: Optional[str] = "absolute", - ): - raise NotImplementedError("ThinkRF devices do not support transmit operations") + def set_rx_sample_rate(self, sample_rate, decimation, stream_mode): + # Enforce sample rate / decimation + # Note: decimation parameter takes precedence if provided + actual_decimation, actual_sample_rate = self.enforce_sample_rate(sample_rate, decimation) + + if stream_mode and actual_decimation < self._min_stream_decimation: + enforced = self._min_stream_decimation + print( + "Requested ThinkRF sample rate exceeds typical GigE throughput; " + f"enforcing decimation {enforced} for streaming." + ) + actual_decimation = enforced + actual_sample_rate = self.BASE_SAMPLE_RATE / actual_decimation + + self._decimation = actual_decimation + self.rx_sample_rate = actual_sample_rate + print(f"ThinkRF RX Sample Rate = {actual_sample_rate}") + + return actual_decimation, actual_sample_rate + + def set_rx_center_frequency(self, center_frequency): + self.radio.freq(int(center_frequency)) + self.rx_center_frequency = self.radio.freq + print(f"ThinkRF RX Center Frequency = {self.radio.freq}") def _stream_rx(self, callback): if not self._rx_initialized: raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record().") - print("ThinkRF Starting RX...") self._enable_rx = True packets_processed = 0 stream_mode = getattr(self, "_capture_mode", "block") == "stream" @@ -206,18 +208,9 @@ class ThinkRF(SDR): print(f"Failed to start ThinkRF stream: {exc}") return + print("ThinkRF Starting RX...") while self._enable_rx: - try: - packet = self.radio.read() - except Exception as exc: - # In block mode, reaching end of block can cause exceptions - # This is normal - just stop reading - if not stream_mode and packets_processed > 0: - # Got some packets in block mode, finish gracefully - print(f"ThinkRF: Block read complete ({packets_processed} packets received)") - break - print(f"ThinkRF read error: {exc}") - break + packet = self._safe_read(stream_mode, packets_processed) if packet is None: # No more packets available @@ -234,32 +227,13 @@ class ThinkRF(SDR): # Unknown packet type - skip continue - # packet.data is an iterable IQData object that yields (I, Q) tuples - # Convert to numpy array: collect all [I, Q] pairs - try: - # Iterate through packet.data to get all IQ pairs - iq_pairs = list(packet.data) # List of (I, Q) tuples - if not iq_pairs: - continue - - # Convert to numpy array [N, 2] - iq_array = np.array(iq_pairs, dtype=np.float32) - - # Extract I and Q channels and create complex buffer - complex_buffer = (iq_array[:, 0] + 1j * iq_array[:, 1]).astype(np.complex64) - except Exception as e: - print(f"Error extracting IQ from packet.data: {e}") + metadata = metadata = self._extract_metadata(packet) + complex_buffer = self._extract_iq(packet) + if complex_buffer is None: continue - metadata = None - if hasattr(packet, "fields"): - metadata = packet.fields - if metadata.get("sample_loss"): - print("\033[93mWarning: ThinkRF sample overflow detected\033[0m") - # Send packet data to callback (accumulation handled by parent) callback(buffer=complex_buffer, metadata=metadata) - packets_processed += 1 # In block mode, stop after receiving all packets in the block @@ -269,14 +243,61 @@ class ThinkRF(SDR): print("ThinkRF RX Completed.") if stream_mode and self._streaming_active: - try: - self.radio.scpiset(":TRACE:STREAM:STOP") - except Exception: - pass - self._streaming_active = False + self._stop_stream() self.radio.scpiset(":SYSTEM:FLUSH") + def _safe_read(self, stream_mode, packets_processed): + packet = None + try: + packet = self.radio.read() + except Exception as e: + # In block mode, reaching end of block can cause exceptions + if not stream_mode and packets_processed > 0: + # We got some packets in block mode, so finish gracefully + print(f"ThinkRF: Block read complete ({packets_processed} packets received)") + else: + print(f"ThinkRF read error: {e}") + return packet + + def _extract_iq(self, packet): + # packet.data is an iterable IQData object that yields (I, Q) tuples + # Convert to numpy array: collect all [I, Q] pairs + try: + iq_pairs = list(packet.data) + if not iq_pairs: + return None + iq_array = np.array(iq_pairs, dtype=np.float32) + return (iq_array[:, 0] + 1j * iq_array[:, 1]).astype(np.complex64) + except Exception as e: + print(f"Error extracting IQ from packet.data: {e}") + return None + + def _extract_metadata(self, packet): + if not hasattr(packet, "fields"): + return None + metadata = packet.fields + if metadata.get("sample_loss"): + print("\033[93mWarning: ThinkRF sample overflow detected\033[0m") + return metadata + + def _stop_stream(self): + try: + self.radio.scpiset(":TRACE:STREAM:STOP") + except Exception: + pass + self._streaming_active = False + + def init_tx( + self, + sample_rate: int | float, + center_frequency: int | float, + gain: int, + channel: int, + gain_mode: Optional[str] = "absolute", + ): + raise NotImplementedError("ThinkRF devices do not support transmit operations") + def _stream_tx(self, callback): raise NotImplementedError("ThinkRF devices do not support transmit operations") @@ -333,7 +354,9 @@ class ThinkRF(SDR): return int(best) - def enforce_sample_rate(self, requested_sample_rate: int | float, decimation: Optional[int] = None) -> tuple[int, float]: + def enforce_sample_rate( + self, requested_sample_rate: int | float, decimation: Optional[int] = None + ) -> tuple[int, float]: """ Enforce valid sample rate and decimation. @@ -356,7 +379,10 @@ class ThinkRF(SDR): actual_sample_rate = self.BASE_SAMPLE_RATE / decimation if abs(actual_sample_rate - requested_sample_rate) > 1e3: # More than 1 kHz difference - print(f"ThinkRF: Requested {requested_sample_rate/1e6:.2f} MS/s → Using decimation={decimation} ({actual_sample_rate/1e6:.2f} MS/s)") + print( + f"ThinkRF: Requested {requested_sample_rate/1e6:.2f} MS/s → \ + Using decimation={decimation} ({actual_sample_rate/1e6:.2f} MS/s)" + ) return decimation, actual_sample_rate @@ -391,7 +417,9 @@ class ThinkRF(SDR): actual_samples = actual_spp * ppb if actual_samples != num_samples: - print(f"ThinkRF: Requested {num_samples} samples → Capturing {actual_samples} (SPP={actual_spp}, PPB={ppb})") + print( + f"ThinkRF: Requested {num_samples} samples → Capturing {actual_samples} (SPP={actual_spp}, PPB={ppb})" + ) return actual_spp, ppb diff --git a/src/ria_toolkit_oss/sdr/usrp.py b/src/ria_toolkit_oss/sdr/usrp.py index 4169c0a..1dd03c4 100644 --- a/src/ria_toolkit_oss/sdr/usrp.py +++ b/src/ria_toolkit_oss/sdr/usrp.py @@ -17,11 +17,11 @@ class USRP(SDR): This software supports all USRP SDRs created by Ettus Research. - :param identifier: Identifier of the device. Can be an IP address (e.g. "192.168.0.0"), - a device name (e.g. "MyB210"), or any name/address found via ``uhd_find_devices``. - If not provided, the first available device is selected with a warning. - If multiple devices match the identifier, the first one is selected. - :type identifier: str, optional + :param identifier: The value of the parameter that identifies the device. + :type identifier: str = "192.168.0.0", "MyB210", name or address found in uhd_find_devices + + If no identifier is provided, it will select the first device found, with a warning. + If more than one device is found with the identifier, it will select the first of those devices. """ super().__init__() @@ -43,29 +43,23 @@ class USRP(SDR): rx_buffer_size: int = 960000, ): """ - Initialize the USRP for receiving. + Initializes the USRP for receiving. :param sample_rate: The sample rate for receiving. :type sample_rate: int or float - :param center_frequency: The center frequency of the recording. :type center_frequency: int or float - + :param gain: The gain set for receiving on the USRP + :type gain: int :param channel: The channel the USRP is set to. :type channel: int - - :param gain: The gain set for receiving on the USRP. - :type gain: int - - :param gain_mode: Gain mode setting. ``"absolute"`` passes gain directly to the SDR. - ``"relative"`` means gain should be a negative value, which will be subtracted - from the maximum gain. + :param gain_mode: 'absolute' passes gain directly to the sdr, + 'relative' means that gain should be a negative value, and it will be subtracted from the max gain. :type gain_mode: str - :param rx_buffer_size: Internal buffer size for receiving samples. Defaults to 960000. :type rx_buffer_size: int - :return: Dictionary with the actual RX parameters after configuration. + :return: A dictionary with the actual RX parameters after configuration. :rtype: dict """ @@ -80,59 +74,12 @@ class USRP(SDR): if channel + 1 > max_num_channels: raise IOError(f"Channel {channel} not valid for device with {max_num_channels} channels.") - # check if gain arg is valid - gain_range = self.usrp.get_rx_gain_range() - if gain_mode == "relative": - if gain > 0: - raise ValueError( - "When gain_mode = 'relative', gain must be < 0. This sets\ - the gain relative to the maximum possible gain." - ) - else: - # set gain relative to max - abs_gain = gain_range.stop() + gain - else: - abs_gain = gain - if abs_gain < gain_range.start() or abs_gain > gain_range.stop(): - print(f"Gain {abs_gain} out of range for this USRP.") - print(f"Gain range: {gain_range.start()} to {gain_range.stop()} dB") - abs_gain = min(max(abs_gain, gain_range.start()), gain_range.stop()) - self.usrp.set_rx_gain(abs_gain, channel) + self.set_rx_sample_rate(sample_rate=sample_rate, channel=channel) + self.set_rx_center_frequency(center_frequency=center_frequency, channel=channel) + self.set_rx_gain(gain=gain, gain_mode=gain_mode, channel=channel) - # check if sample rate arg is valid - # Note: B200/B210 devices auto-adjust master clock rate, so get_rx_rates() returns - # the range for the CURRENT master clock, not the maximum possible range. - # Skip validation for B-series devices and let UHD handle it. - device_type = self.device_dict.get("type", "").lower() - if device_type not in ["b200", "b210"]: - sample_rate_range = self.usrp.get_rx_rates() - if sample_rate < sample_rate_range.start() or sample_rate > sample_rate_range.stop(): - raise IOError( - f"Sample rate {sample_rate} not valid for this USRP.\nValid\ - range is {sample_rate_range.start()}\ - to {sample_rate_range.stop()}." - ) - self.usrp.set_rx_rate(sample_rate, channel) - - center_frequency_range = self.usrp.get_rx_freq_range() - if center_frequency < center_frequency_range.start() or center_frequency > center_frequency_range.stop(): - raise IOError( - f"Center frequency {center_frequency} out of range for USRP.\ - \nValid range is {center_frequency_range.start()} \ - to {center_frequency_range.stop()}." - ) - self.usrp.set_rx_freq(uhd.libpyuhd.types.tune_request(center_frequency), channel) - - # set internal variables for metadata - self.rx_sample_rate = self.usrp.get_rx_rate(channel) - self.rx_gain = self.usrp.get_rx_gain(channel) - self.rx_center_frequency = self.usrp.get_rx_freq(channel) self.rx_channel = channel - - print(f"USRP RX Sample Rate = {self.rx_sample_rate}") - print(f"USRP RX Center Frequency = {self.rx_center_frequency}") print(f"USRP RX Channel = {self.rx_channel}") - print(f"USRP RX Gain = {self.rx_gain}") # flag to prevent user from calling certain functions before this one. self._rx_initialized = True @@ -167,6 +114,58 @@ class USRP(SDR): """ return self.rx_gain + def set_rx_sample_rate(self, sample_rate, channel=0): + # check if sample rate arg is valid + # Note: B200/B210 devices auto-adjust master clock rate, so get_rx_rates() returns + # the range for the CURRENT master clock, not the maximum possible range. + # Skip validation for B-series devices and let UHD handle it. + device_type = self.device_dict.get("type", "").lower() + if device_type not in ["b200", "b210"]: + sample_rate_range = self.usrp.get_rx_rates() + if sample_rate < sample_rate_range.start() or sample_rate > sample_rate_range.stop(): + raise IOError( + f"Sample rate {sample_rate} not valid for this USRP.\nValid\ + range is {sample_rate_range.start()}\ + to {sample_rate_range.stop()}." + ) + self.usrp.set_rx_rate(sample_rate, channel) + self.rx_sample_rate = self.usrp.get_rx_rate(channel) + print(f"USRP RX Sample Rate = {self.rx_sample_rate}") + + def set_rx_center_frequency(self, center_frequency, channel=0): + center_frequency_range = self.usrp.get_rx_freq_range() + if center_frequency < center_frequency_range.start() or center_frequency > center_frequency_range.stop(): + raise IOError( + f"Center frequency {center_frequency} out of range for USRP.\ + \nValid range is {center_frequency_range.start()} \ + to {center_frequency_range.stop()}." + ) + self.usrp.set_rx_freq(uhd.libpyuhd.types.tune_request(center_frequency), channel) + self.rx_center_frequency = self.usrp.get_rx_freq(channel) + print(f"USRP RX Center Frequency = {self.rx_center_frequency}") + + def set_rx_gain(self, gain, gain_mode="absolute", channel=0): + # check if gain arg is valid + gain_range = self.usrp.get_rx_gain_range() + if gain_mode == "relative": + if gain > 0: + raise ValueError( + "When gain_mode = 'relative', gain must be < 0. This sets\ + the gain relative to the maximum possible gain." + ) + else: + # set gain relative to max + abs_gain = gain_range.stop() + gain + else: + abs_gain = gain + if abs_gain < gain_range.start() or abs_gain > gain_range.stop(): + print(f"Gain {abs_gain} out of range for this USRP.") + print(f"Gain range: {gain_range.start()} to {gain_range.stop()} dB") + abs_gain = min(max(abs_gain, gain_range.start()), gain_range.stop()) + self.usrp.set_rx_gain(abs_gain, channel) + self.rx_gain = self.usrp.get_rx_gain(channel) + print(f"USRP RX Gain = {self.rx_gain}") + def _stream_rx(self, callback): if not self._rx_initialized: @@ -211,10 +210,31 @@ class USRP(SDR): del self.rx_stream print("USRP RX Completed.") - def record(self, num_samples): + def record(self, num_samples: Optional[int] = None, rx_time: Optional[int | float] = None): + """ + Create a radio recording (iq samples and metadata) of a given length from the USRP. + Either num_samples or rx_time must be provided. + init_rx() must be called before record() + + :param num_samples: The number of samples to record. + :type num_samples: int, optional + :param rx_time: The time to record. + :type rx_time: int or float, optional + + returns: Recording object (iq samples and metadata) + """ if not self._rx_initialized: raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record()") + if num_samples is not None and rx_time is not None: + raise ValueError("Only input one of num_samples or rx_time") + elif num_samples is not None: + pass + elif rx_time is not None: + num_samples = int(rx_time * self.rx_sample_rate) + else: + raise ValueError("Must provide input of one of num_samples or rx_time") + stream_args = uhd.usrp.StreamArgs("fc32", "sc16") stream_args.channels = [self.rx_channel] @@ -269,23 +289,18 @@ class USRP(SDR): gain_mode: Optional[str] = "absolute", ): """ - Initialize the USRP for transmitting. + Initializes the USRP for transmitting. :param sample_rate: The sample rate for transmitting. :type sample_rate: int or float - :param center_frequency: The center frequency of the recording. :type center_frequency: int or float - - :param gain: The gain set for transmitting on the USRP. + :param gain: The gain set for transmitting on the USRP :type gain: int - :param channel: The channel the USRP is set to. :type channel: int - - :param gain_mode: Gain mode setting. ``"absolute"`` passes gain directly to the SDR. - ``"relative"`` means gain should be a negative value, which will be subtracted - from the maximum gain. + :param gain_mode: 'absolute' passes gain directly to the sdr, + 'relative' means that gain should be a negative value, and it will be subtracted from the max gain. :type gain_mode: str """ @@ -301,6 +316,79 @@ class USRP(SDR): if channel + 1 > max_num_channels: raise IOError(f"Channel {channel} not valid for device with {max_num_channels} channels.") + self.set_tx_sample_rate(sample_rate=sample_rate, channel=channel) + self.set_tx_center_frequency(center_frequency=center_frequency, channel=channel) + self.set_tx_gain(gain=gain, gain_mode=gain_mode, channel=channel) + + self.tx_channel = channel + print(f"USRP TX Channel = {self.tx_channel}") + + self.usrp.set_clock_source("internal") + self.usrp.set_time_source("internal") + self.usrp.set_tx_antenna("TX/RX", channel) + + self._tx_initialized = True + self._rx_initialized = False + + return {"sample_rate": self.tx_sample_rate, "center_frequency": self.tx_center_frequency, "gain": self.tx_gain} + + def get_tx_sample_rate(self): + """ + Retrieve the current sample rate of the transmitter. + + Returns: + float: The transmitter's sample rate in samples per second (Hz). + """ + return self.tx_sample_rate + + def get_tx_center_frequency(self): + """ + Retrieve the current center frequency of the transmitter. + + Returns: + float: The transmitter's center frequency in Hertz (Hz). + """ + return self.tx_center_frequency + + def get_tx_gain(self): + """ + Retrieve the current gain setting of the transmitter. + + Returns: + float: The transmitter's gain in decibels (dB). + """ + return self.tx_gain + + def set_tx_sample_rate(self, sample_rate, channel=0): + # check if sample rate arg is valid + # Note: B200/B210 devices auto-adjust master clock rate, so get_tx_rates() returns + # the range for the CURRENT master clock, not the maximum possible range. + # Skip validation for B-series devices and let UHD handle it. + device_type = self.device_dict.get("type", "").lower() + if device_type not in ["b200", "b210"]: + sample_rate_range = self.usrp.get_tx_rates() + if sample_rate < sample_rate_range.start() or sample_rate > sample_rate_range.stop(): + raise IOError( + f"Sample rate {sample_rate} not valid for this USRP.\nValid\ + range is {sample_rate_range.start()} to {sample_rate_range.stop()}." + ) + self.usrp.set_tx_rate(sample_rate, channel) + self.tx_sample_rate = self.usrp.get_tx_rate(channel) + print(f"USRP TX Sample Rate = {self.tx_sample_rate}") + + def set_tx_center_frequency(self, center_frequency, channel=0): + center_frequency_range = self.usrp.get_tx_freq_range() + if center_frequency < center_frequency_range.start() or center_frequency > center_frequency_range.stop(): + raise IOError( + f"Center frequency {center_frequency} out of range for USRP.\ + \nValid range is {center_frequency_range.start()}\ + to {center_frequency_range.stop()}." + ) + self.usrp.set_tx_freq(uhd.types.TuneRequest(center_frequency), channel) + self.tx_center_frequency = self.usrp.get_tx_freq(channel) + print(f"USRP TX Center Frequency = {self.tx_center_frequency}") + + def set_tx_gain(self, gain, gain_mode="absolute", channel=0): # Ensure gain is within valid range gain_range = self.usrp.get_tx_gain_range() if gain_mode == "relative": @@ -320,50 +408,9 @@ class USRP(SDR): abs_gain = min(max(abs_gain, gain_range.start()), gain_range.stop()) self.usrp.set_tx_gain(abs_gain, channel) - - # check if sample rate arg is valid - # Note: B200/B210 devices auto-adjust master clock rate, so get_tx_rates() returns - # the range for the CURRENT master clock, not the maximum possible range. - # Skip validation for B-series devices and let UHD handle it. - device_type = self.device_dict.get("type", "").lower() - if device_type not in ["b200", "b210"]: - sample_rate_range = self.usrp.get_tx_rates() - if sample_rate < sample_rate_range.start() or sample_rate > sample_rate_range.stop(): - raise IOError( - f"Sample rate {sample_rate} not valid for this USRP.\nValid\ - range is {sample_rate_range.start()} to {sample_rate_range.stop()}." - ) - self.usrp.set_tx_rate(sample_rate, channel) - - center_frequency_range = self.usrp.get_tx_freq_range() - if center_frequency < center_frequency_range.start() or center_frequency > center_frequency_range.stop(): - raise IOError( - f"Center frequency {center_frequency} out of range for USRP.\ - \nValid range is {center_frequency_range.start()}\ - to {center_frequency_range.stop()}." - ) - self.usrp.set_tx_freq(uhd.libpyuhd.types.tune_request(center_frequency), channel) - - self.usrp.set_clock_source("internal") - self.usrp.set_time_source("internal") - self.usrp.set_tx_rate(sample_rate) - self.usrp.set_tx_freq(uhd.types.TuneRequest(center_frequency), channel) - self.usrp.set_tx_antenna("TX/RX", channel) - - # set internal variables for metadata - self.tx_sample_rate = self.usrp.get_tx_rate(channel) self.tx_gain = self.usrp.get_tx_gain(channel) - self.tx_center_frequency = self.usrp.get_tx_freq(channel) - self.tx_channel = channel - - print(f"USRP TX Sample Rate = {self.tx_sample_rate}") - print(f"USRP TX Center Frequency = {self.tx_center_frequency}") - print(f"USRP TX Channel = {self.tx_channel}") print(f"USRP TX Gain = {self.tx_gain}") - self._tx_initialized = True - self._rx_initialized = False - def close(self): pass