From c673967a905a3c1d30a99259ceb4856b4cda6e25 Mon Sep 17 00:00:00 2001 From: madrigal Date: Mon, 17 Nov 2025 11:20:38 -0500 Subject: [PATCH 1/8] Updated methods, added setters, and created standardized SDRError classes --- src/ria_toolkit_oss/sdr/sdr.py | 197 ++++++++++++++++++++++++--------- 1 file changed, 142 insertions(+), 55 deletions(-) diff --git a/src/ria_toolkit_oss/sdr/sdr.py b/src/ria_toolkit_oss/sdr/sdr.py index c2464bf..489fd48 100644 --- a/src/ria_toolkit_oss/sdr/sdr.py +++ b/src/ria_toolkit_oss/sdr/sdr.py @@ -1,5 +1,6 @@ import math import pickle +import threading import warnings from abc import ABC, abstractmethod from typing import Optional @@ -27,17 +28,21 @@ class SDR(ABC): self._tx_initialized = False self._enable_rx = False self._enable_tx = False + self._accumulated_buffer = None self._max_num_buffers = None self._num_buffers_processed = 0 self._accumulated_buffer = None self._last_buffer = None + self._corrupted_buffer_count = 0 + self.rx_sample_rate = None self.rx_center_frequency = None self.rx_gain = None self.tx_sample_rate = None self.tx_center_frequency = None self.tx_gain = None + self._param_lock = threading.RLock() # Reentrant lock def record(self, num_samples: Optional[int] = None, rx_time: Optional[int | float] = None) -> Recording: """ @@ -71,7 +76,6 @@ class SDR(ABC): self._max_num_buffers = num_buffers self._num_buffers_processed = 0 - self._num_buffers_processed = 0 self._last_buffer = None self._accumulated_buffer = None print("Starting stream") @@ -94,6 +98,7 @@ class SDR(ABC): # reset to record again self._accumulated_buffer = None + self._num_buffers_processed = 0 return recording def stream_to_zmq(self, zmq_address, n_samples: int, buffer_size: Optional[int] = 10000): @@ -110,21 +115,23 @@ class SDR(ABC): :return: The trimmed Recording. :rtype: Recording """ + try: + self._previous_buffer = None + self._max_num_buffers = np.inf if n_samples == np.inf else math.ceil(n_samples / buffer_size) + self._num_buffers_processed = 0 + self.zmq_address = _generate_full_zmq_address(str(zmq_address)) + self.context = zmq.Context() + self.socket = self.context.socket(zmq.PUB) + self.socket.bind(self.zmq_address) - self._previous_buffer = None - self._max_num_buffers = np.inf if n_samples == np.inf else math.ceil(n_samples / buffer_size) - self._num_buffers_processed = 0 - self.zmq_address = _generate_full_zmq_address(str(zmq_address)) - self.context = zmq.Context() - self.socket = self.context.socket(zmq.PUB) - self.socket.bind(self.zmq_address) - - self._stream_rx( - self._zmq_bytestream_callback, - ) - - self.context.destroy() - self.socket.close() + self._stream_rx( + self._zmq_bytestream_callback, + ) + finally: + if hasattr(self, "socket"): + self.socket.close() + if hasattr(self, "context"): + self.context.destroy() def _accumulate_buffers_callback(self, buffer, metadata=None): """ @@ -134,62 +141,72 @@ class SDR(ABC): # save the buffer until max reached # return a recording - buffer = np.array(buffer) # make it 1d - if len(buffer.shape) == 1: - buffer = np.array([buffer]) + # Validate buffer + if not self._validate_buffer(buffer): + print("Warning: Corrupted buffer detected, skipping") + self._corrupted_buffer_count += 1 + return # Skip this buffer - # it runs these checks each time, is that an efficiency issue? - - if self._max_num_buffers is None: - # default then - # this should probably print, but that would happen every buffer... - raise ValueError("Number of buffers for block capture not set.") - - # add the given buffer to the pre-allocated buffer - - if metadata is not None: - self.received_metadata = metadata - - # TODO optimize, pre-allocate - if self._accumulated_buffer is not None: - self._accumulated_buffer = np.concatenate((self._accumulated_buffer, buffer), axis=1) + if isinstance(buffer, np.ndarray): + if buffer.ndim == 1: + buffer = buffer[np.newaxis, :] # make shape (1, N) else: - # the first time - self._accumulated_buffer = buffer.copy() + buffer = np.array(buffer) # make it 1d + if len(buffer.shape) == 1: + buffer = np.array([buffer]) - self._num_buffers_processed = self._num_buffers_processed + 1 + # First call: pre-allocate if we know the final size + if self._accumulated_buffer is None: + # Check that _max_num_buffers is set + if self._max_num_buffers is None: + raise ValueError("Number of buffers for block capture not set.") + if self._num_samples_to_record is None: + raise ValueError("Number of samples not set before RX start.") + + if metadata is not None: + self.received_metadata = metadata + + # Preallocate once (avoid np.zeros; use np.empty for speed) + num_channels = buffer.shape[0] + self._accumulated_buffer = np.empty((num_channels, self._num_samples_to_record), dtype=buffer.dtype) + self._write_position = 0 + print(f"Pre-allocated buffer for {self._num_samples_to_record:,} samples.") + + # Write new buffer into pre-allocated array + n = buffer.shape[1] + start = self._write_position + end = min(start + n, self._num_samples_to_record) + samples_to_write = end - start + + if samples_to_write > 0: + self._accumulated_buffer[:, start:end] = buffer[:, : end - start] + self._write_position = end + + # Check if we're done + self._num_buffers_processed += 1 if self._num_buffers_processed >= self._max_num_buffers: self.stop() - if self._last_buffer is not None: - if (buffer == self._last_buffer).all(): - print("\033[93mWarning: Buffer Overflow Detected\033[0m") - self._last_buffer = buffer.copy() - else: - self._last_buffer = buffer.copy() - - # print("Number of buffers received: " + str(self._num_buffers_processed)) + def _validate_buffer(self, buffer): + """Check for obviously corrupt data.""" + # Check for all zeros + if np.all(buffer == 0): + return False + # Check for all same value + if np.all(buffer == buffer[0]): + return False + return True def _zmq_bytestream_callback(self, buffer, metadata=None): # push to ZMQ port data = np.array(buffer).tobytes() # convert to bytes for transport self.socket.send(data) - # print(f"Sent {self._num_buffers_processed} ZMQ buffers to {self.zmq_address}") - self._num_buffers_processed = self._num_buffers_processed + 1 if self._max_num_buffers is not None: if self._num_buffers_processed >= self._max_num_buffers: self.pause_rx() - if self._previous_buffer is not None: - if (buffer == self._previous_buffer).all(): - print("\033[93mWarning: Buffer Overflow Detected\033[0m") - # TODO: I suggest we think about moving this part to the top of this function - # and skip the rest of the function in case of overflow. - # like, it's not necessary to stream repeated IQ data anyways! - self._previous_buffer = buffer.copy() - def pickle_buffer_to_zmq(self, zmq_address, buffer_size, num_buffers): """ Stream samples to a zmq address, packaged in binary buffers using numpy.pickle. @@ -229,7 +246,7 @@ class SDR(ABC): self.stop() if self._last_buffer is not None: - if (buffer == self._last_buffer).all(): + if np.array_equal(buffer, self._last_buffer): print("\033[93mWarning: Buffer Overflow Detected\033[0m") self._last_buffer = buffer.copy() else: @@ -373,6 +390,58 @@ class SDR(ABC): """ return self.tx_gain + def set_rx_sample_rate(self): + """ + Set the sample rate of the receiver. + """ + raise NotImplementedError + + def set_rx_center_frequency(self): + """ + Set the center frequency of the receiver. + """ + raise NotImplementedError + + def set_rx_gain(self): + """ + Set the gain setting of the receiver. + """ + raise NotImplementedError + + def set_tx_sample_rate(self): + """ + Set the sample rate of the transmitter. + """ + raise NotImplementedError + + def set_tx_center_frequency(self): + """ + Set the center frequency of the transmitter. + """ + raise NotImplementedError + + def set_tx_gain(self): + """ + Set the gain setting of the transmitter. + """ + raise NotImplementedError + + def supports_dynamic_updates(self) -> dict: + """ + Report which parameters can be updated during streaming. + + Returns: + dict: {'center_frequency': bool, 'sample_rate': bool, 'gain': bool} + """ + return {"center_frequency": False, "sample_rate": False, "gain": False} + + def __del__(self): + """Cleanup on garbage collection.""" + try: + self.close() + except Exception: + pass + @abstractmethod def close(self): pass @@ -442,3 +511,21 @@ def _verify_sample_format(samples): """ return np.max(np.abs(samples)) <= 1 + + +class SDRError(Exception): + """Base exception for SDR errors.""" + + pass + + +class SDRParameterError(SDRError): + """Invalid parameter (sample rate, freq, gain).""" + + pass + + +class SDROverflowError(SDRError): + """Buffer overflow detected.""" + + pass -- 2.34.1 From 96d864aa0bb4468495a314298365e4cad50bba5d Mon Sep 17 00:00:00 2001 From: madrigal Date: Mon, 17 Nov 2025 11:24:54 -0500 Subject: [PATCH 2/8] Fixed shutdown and cleanup, standardized setters, and improved TX --- src/ria_toolkit_oss/sdr/blade.py | 304 +++++++++++++++++++++---------- 1 file changed, 210 insertions(+), 94 deletions(-) diff --git a/src/ria_toolkit_oss/sdr/blade.py b/src/ria_toolkit_oss/sdr/blade.py index 6bb0d03..c8e6f3f 100644 --- a/src/ria_toolkit_oss/sdr/blade.py +++ b/src/ria_toolkit_oss/sdr/blade.py @@ -1,4 +1,4 @@ -import time +import gc import warnings from typing import Optional @@ -6,7 +6,7 @@ import numpy as np from bladerf import _bladerf from ria_toolkit_oss.datatypes import Recording -from ria_toolkit_oss.sdr import SDR +from ria_toolkit_oss.sdr import SDR, SDRError, SDRParameterError class Blade(SDR): @@ -22,7 +22,7 @@ class Blade(SDR): """ if identifier != "": - print(f"Warning, radio identifier {identifier} provided for Blade but will not be used.") + warnings.warn(f"Blade: Identifier '{identifier}' will be ignored", UserWarning) uut = self._probe_bladerf() @@ -34,6 +34,7 @@ class Blade(SDR): self.device = _bladerf.BladeRF(uut) self._print_versions(device=self.device) + self.bytes_per_sample = 4 super().__init__() @@ -42,8 +43,10 @@ class Blade(SDR): if board is not None: board.close() - # TODO why does this create an error under any conditions? - raise OSError("Shutdown initiated with error code: {}".format(error)) + if error != 0: + raise OSError(f"BladeRF shutdown with error code: {error}") + else: + print("BladeRF shutdown successfully") def _probe_bladerf(self): device = None @@ -85,24 +88,25 @@ class Blade(SDR): :type sample_rate: int or float :param center_frequency: The center frequency of the recording. :type center_frequency: int or float - :param gain: The gain set for receiving on the BladeRF + :param gain: The gain set for receiving on the BladeRF. :type gain: int :param channel: The channel the BladeRF is set to. :type channel: int :param buffer_size: The buffer size during receive. Defaults to 8192. :type buffer_size: int - :param gain_mode: 'absolute' passes gain directly to the sdr, - 'relative' means that gain should be a negative value, and it will be subtracted from the max gain (60). + :param gain_mode: 'absolute' passes gain directly to the SDR; + 'relative' means that gain should be a negative value, and it will be subtracted from the max gain (60). :type gain_mode: str """ + print("Initializing RX") # Configure BladeRF - self._set_rx_channel(channel) - self._set_rx_sample_rate(sample_rate) - self._set_rx_center_frequency(center_frequency) - self._set_rx_gain(channel, gain, gain_mode) - self._set_rx_buffer_size(buffer_size) + self.set_rx_channel(channel) + self.set_rx_sample_rate(sample_rate) + self.set_rx_center_frequency(center_frequency) + self.set_rx_gain(channel, gain, gain_mode) + self.set_rx_buffer_size(buffer_size) bw = self.rx_sample_rate if bw < 200000: @@ -128,10 +132,8 @@ class Blade(SDR): stream_timeout=3500000000, ) - self.rx_ch.enable = True - self.bytes_per_sample = 4 - print("Blade Starting RX...") + self.rx_ch.enable = True self._enable_rx = True while self._enable_rx: @@ -148,18 +150,34 @@ class Blade(SDR): print("Blade RX Completed.") self.rx_ch.enable = False - def record(self, num_samples: Optional[int] = None, rx_time: Optional[int | float] = None): + def record( + self, + num_samples: Optional[int] = None, + rx_time: Optional[int | float] = None, + ) -> Recording: + """ + Create a radio recording (iq samples and metadata) of a given length from the Blade. + Either num_samples or rx_time must be provided. + init_rx() must be called before record() + + :param num_samples: The number of samples to record. + :type num_samples: int, optional + :param rx_time: The time to record. + :type rx_time: int or float, optional + + returns: Recording object (iq samples and metadata) + """ if not self._rx_initialized: raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record()") if num_samples is not None and rx_time is not None: - raise ValueError("Only input one of num_samples or rx_time") + raise SDRParameterError("Only input one of num_samples or rx_time") elif num_samples is not None: self._num_samples_to_record = num_samples elif rx_time is not None: self._num_samples_to_record = int(rx_time * self.rx_sample_rate) else: - raise ValueError("Must provide input of one of num_samples or rx_time") + raise SDRParameterError("Must provide input of one of num_samples or rx_time") # Setup synchronous stream self.device.sync_config( @@ -171,11 +189,10 @@ class Blade(SDR): stream_timeout=3500000000, ) - self.rx_ch.enable = True - self.bytes_per_sample = 4 - print("Blade Starting RX...") - self._enable_rx = True + with self._param_lock: + self._enable_rx = True + self.rx_ch.enable = True store_array = np.zeros( (1, (self._num_samples_to_record // self.rx_buffer_size + 1) * self.rx_buffer_size), dtype=np.complex64 @@ -191,7 +208,8 @@ class Blade(SDR): # Disable module print("Blade RX Completed.") - self.rx_ch.enable = False + with self._param_lock: + self.rx_ch.enable = False metadata = { "source": self.__class__.__name__, "sample_rate": self.rx_sample_rate, @@ -207,7 +225,7 @@ class Blade(SDR): center_frequency: int | float, gain: int, channel: int, - buffer_size: Optional[int] = 8192, + buffer_size: Optional[int] = 32768, gain_mode: Optional[str] = "absolute", ): """ @@ -224,16 +242,24 @@ class Blade(SDR): :param buffer_size: The buffer size during transmission. Defaults to 8192. :type buffer_size: int :param gain_mode: 'absolute' passes gain directly to the sdr, - 'relative' means that gain should be a negative value, and it will be subtracted from the max gain (60). + 'relative' means that gain should be a negative value, and it will be subtracted from the max gain (60). :type gain_mode: str + + :return: 0 if successful, -1 if there's an error. + :rtype: int """ # Configure BladeRF - self._set_tx_channel(channel) - self._set_tx_sample_rate(sample_rate) - self._set_tx_center_frequency(center_frequency) - self._set_tx_gain(channel=channel, gain=gain, gain_mode=gain_mode) - self._set_tx_buffer_size(buffer_size) + self.set_tx_channel(channel) + self.set_tx_sample_rate(sample_rate) + self.set_tx_center_frequency(center_frequency) + self.set_tx_gain(channel=channel, gain=gain, gain_mode=gain_mode) + self.set_tx_buffer_size(buffer_size) + + if self.tx_sample_rate >= 7.5e6 and self.tx_buffer_size < 65536: + warnings.warn( + "Blade: For high sample rates, a buffer size of 65536, 131072, or 262144 is recommended", UserWarning + ) bw = self.tx_sample_rate if bw < 200000: @@ -302,13 +328,13 @@ class Blade(SDR): """ if num_samples is not None and tx_time is not None: - raise ValueError("Only input one of num_samples or tx_time") + raise SDRParameterError("Only input one of num_samples or tx_time") elif num_samples is not None: - tx_time = num_samples / self.tx_sample_rate - elif tx_time is not None: pass + elif tx_time is not None: + num_samples = int(tx_time * self.tx_sample_rate) else: - tx_time = len(recording) / self.tx_sample_rate + num_samples = len(recording) if isinstance(recording, np.ndarray): samples = recording @@ -317,9 +343,15 @@ class Blade(SDR): warnings.warn("Recording object is multichannel, only channel 0 data was used for transmission") samples = recording.data[0] else: - raise TypeError("recording must be np.ndarray or Recording") + raise SDRParameterError("recording must be np.ndarray or Recording") samples = samples.astype(np.complex64, copy=False) + tx_bytes = self._convert_tx_samples(samples) + + # Transmit in chunks + samples_sent = 0 + len_samples = len(samples) + chunk_size = self.tx_buffer_size # Setup stream self.device.sync_config( @@ -335,26 +367,21 @@ class Blade(SDR): self.tx_ch.enable = True print("Blade Starting TX...") - - # Transmit samples - repeat as needed for the duration - start_time = time.time() - sample_index = 0 - try: - while time.time() - start_time < tx_time: - # Get next chunk - chunk_size = min(self.tx_buffer_size, len(samples) - sample_index) - if chunk_size == 0: - # Reached end, loop back - sample_index = 0 - chunk_size = min(self.tx_buffer_size, len(samples)) + while samples_sent < num_samples: + this_chunk_size = min(chunk_size, num_samples - samples_sent) - chunk = samples[sample_index : sample_index + chunk_size] - sample_index += chunk_size + start_idx = (samples_sent % len_samples) * self.bytes_per_sample + end_idx = start_idx + this_chunk_size * self.bytes_per_sample + end_idx %= len_samples * self.bytes_per_sample - # Convert and transmit - byte_array = self._convert_tx_samples(chunk) - self.device.sync_tx(byte_array, len(chunk)) + if end_idx > start_idx: + chunk_bytes_arr = tx_bytes[start_idx:end_idx] + else: + chunk_bytes_arr = tx_bytes[start_idx:] + tx_bytes[:end_idx] + + self.device.sync_tx(chunk_bytes_arr, this_chunk_size) + samples_sent += this_chunk_size except KeyboardInterrupt: print("\nTransmission interrupted by user") @@ -384,73 +411,146 @@ class Blade(SDR): byte_array = tx_samples.tobytes() return byte_array - def _set_rx_channel(self, channel): + def set_rx_channel(self, channel): + if channel != 0 and channel != 1: + raise SDRParameterError("Channel must be either 0 or 1.") + self.rx_channel = channel self.rx_ch = self.device.Channel(_bladerf.CHANNEL_RX(channel)) print(f"\nBlade channel = {self.rx_ch}") - def _set_rx_sample_rate(self, sample_rate): - self.rx_sample_rate = sample_rate - self.rx_ch.sample_rate = self.rx_sample_rate - print(f"Blade sample rate = {self.rx_ch.sample_rate}") - - def _set_rx_center_frequency(self, center_frequency): - self.rx_center_frequency = center_frequency - self.rx_ch.frequency = center_frequency - print(f"Blade center frequency = {self.rx_ch.frequency}") - - def _set_rx_gain(self, channel, gain, gain_mode): - - rx_gain_min = self.device.get_gain_range(channel)[0] - rx_gain_max = self.device.get_gain_range(channel)[1] - - if gain_mode == "relative": - if gain > 0: - raise ValueError( - "When gain_mode = 'relative', gain must be < 0. This sets \ - the gain relative to the maximum possible gain." - ) + def set_rx_sample_rate(self, sample_rate): + """ + Set the sample rate of the receiver. + Not callable during recording; Blade requires stream stop/restart to change sample rate. + """ + with self._param_lock: + if hasattr(self, "rx_channel"): + range_list = self.device.get_sample_rate_range(self.rx_channel) + min_rate, max_rate = range_list[0], range_list[1] else: - abs_gain = rx_gain_max + gain - else: - abs_gain = gain + raise SDRError("Must set channel before setting center frequency") - if abs_gain < rx_gain_min or abs_gain > rx_gain_max: - abs_gain = min(max(gain, rx_gain_min), rx_gain_max) - print(f"Gain {abs_gain} out of range for Blade.") - print(f"Gain range: {rx_gain_min} to {rx_gain_max} dB") + if sample_rate < min_rate or sample_rate > max_rate: + raise SDRParameterError( + f"{self.__class__.__name__}: Sample rate {sample_rate/1e6:.3f} Msps " + f"out of range: [{min_rate/1e6:.3f} - {max_rate/1e6:.3f} Msps]" + ) - self.rx_gain = abs_gain - self.rx_ch.gain = abs_gain + self.rx_sample_rate = sample_rate + self.rx_ch.sample_rate = self.rx_sample_rate + print(f"Blade sample rate = {self.rx_ch.sample_rate}") - print(f"Blade gain = {self.rx_ch.gain}") + def set_rx_center_frequency(self, center_frequency): + """ + Set the center frequency of the receiver. + Not callable during recording; Blade requires stream stop/restart to change center frequency. + """ + with self._param_lock: + if hasattr(self, "rx_channel"): + range_list = self.device.get_frequency_range(self.rx_channel) + min_rate, max_rate = range_list[0], range_list[1] + else: + raise SDRError("Must set channel before setting center frequency") - def _set_rx_buffer_size(self, buffer_size): + if center_frequency < min_rate or center_frequency > max_rate: + raise SDRParameterError( + f"{self.__class__.__name__}: Center frequency {center_frequency/1e9:.3f} GHz " + f"out of range: [{min_rate/1e9:.3f} - {max_rate/1e9:.3f} GHz]" + ) + + self.rx_center_frequency = center_frequency + self.rx_ch.frequency = center_frequency + print(f"Blade center frequency = {self.rx_ch.frequency}") + + def set_rx_gain(self, channel, gain, gain_mode): + """ + Set the gain of the receiver. + Not callable during recording; Blade requires stream stop/restart to change gain. + """ + with self._param_lock: + rx_gain_min = self.device.get_gain_range(channel)[0] + rx_gain_max = self.device.get_gain_range(channel)[1] + + if gain_mode == "relative": + if gain > 0: + raise SDRParameterError( + "When gain_mode = 'relative', gain must be < 0. This sets \ + the gain relative to the maximum possible gain." + ) + else: + abs_gain = rx_gain_max + gain + else: + abs_gain = gain + + if abs_gain < rx_gain_min or abs_gain > rx_gain_max: + abs_gain = min(max(gain, rx_gain_min), rx_gain_max) + print(f"Gain {abs_gain} out of range for Blade.") + print(f"Gain range: {rx_gain_min} to {rx_gain_max} dB") + + self.rx_gain = abs_gain + self.rx_ch.gain = abs_gain + + print(f"Blade gain = {self.rx_ch.gain}") + + def set_rx_buffer_size(self, buffer_size): self.rx_buffer_size = buffer_size - def _set_tx_channel(self, channel): + def set_tx_channel(self, channel): + if channel != 0 and channel != 1: + raise SDRParameterError("Channel must be either 0 or 1.") + self.tx_channel = channel self.tx_ch = self.device.Channel(_bladerf.CHANNEL_TX(self.tx_channel)) print(f"\nBlade channel = {self.tx_ch}") - def _set_tx_sample_rate(self, sample_rate): + def set_tx_sample_rate(self, sample_rate): + if hasattr(self, "tx_channel"): + range_list = self.device.get_sample_rate_range(self.tx_channel) + min_rate, max_rate = range_list[0], range_list[1] + else: + raise SDRError("Must set channel before setting center frequency") + + if sample_rate < min_rate or sample_rate > max_rate: + raise SDRParameterError( + f"{self.__class__.__name__}: Sample rate {sample_rate/1e6:.3f} Msps " + f"out of range: [{min_rate/1e6:.3f} - {max_rate/1e6:.3f} Msps]" + ) + + if sample_rate < min_rate or sample_rate > max_rate: + raise SDRParameterError( + f"{self.__class__.__name__}: Sample rate {sample_rate/1e6:.3f} Msps " + f"out of range: [{min_rate/1e6:.3f} - {max_rate/1e6:.3f} Msps]" + ) + self.tx_sample_rate = sample_rate self.tx_ch.sample_rate = self.tx_sample_rate print(f"Blade sample rate = {self.tx_ch.sample_rate}") - def _set_tx_center_frequency(self, center_frequency): + def set_tx_center_frequency(self, center_frequency): + if hasattr(self, "tx_channel"): + range_list = self.device.get_frequency_range(self.tx_channel) + min_rate, max_rate = range_list[0], range_list[1] + else: + raise SDRError("Must set channel before setting center frequency") + + if center_frequency < min_rate or center_frequency > max_rate: + raise SDRParameterError( + f"{self.__class__.__name__}: Center frequency {center_frequency/1e9:.3f} GHz " + f"out of range: [{min_rate/1e9:.3f} - {max_rate/1e9:.3f} GHz]" + ) + self.tx_center_frequency = center_frequency self.tx_ch.frequency = center_frequency print(f"Blade center frequency = {self.tx_ch.frequency}") - def _set_tx_gain(self, channel, gain, gain_mode): - + def set_tx_gain(self, channel, gain, gain_mode): tx_gain_min = self.device.get_gain_range(channel)[0] tx_gain_max = self.device.get_gain_range(channel)[1] if gain_mode == "relative": if gain > 0: - raise ValueError( + raise SDRParameterError( "When gain_mode = 'relative', gain must be < 0. This sets\ the gain relative to the maximum possible gain." ) @@ -469,7 +569,7 @@ class Blade(SDR): print(f"Blade gain = {self.tx_ch.gain}") - def _set_tx_buffer_size(self, buffer_size): + def set_tx_buffer_size(self, buffer_size): self.tx_buffer_size = buffer_size def set_clock_source(self, source): @@ -499,4 +599,20 @@ class Blade(SDR): print(f"BladeRF bias tee {state} on channel {channel}.") def close(self): - self.device.close() + if hasattr(self, "device") and self.device is not None: + try: + if hasattr(self, "tx_ch"): + self.tx_ch.enable = False + if hasattr(self, "rx_ch"): + self.rx_ch.enable = False + + self.device.close() + except Exception as e: + print(f"Warning: error closing bladeRF: {e}") + finally: + del self.device + self.device = None + gc.collect() + + def supports_dynamic_updates(self) -> dict: + return {"center_frequency": False, "sample_rate": False, "gain": False} -- 2.34.1 From bca962d7b278115e971f2024640604207f2014db Mon Sep 17 00:00:00 2001 From: madrigal Date: Mon, 17 Nov 2025 11:39:57 -0500 Subject: [PATCH 3/8] Added setter methods, fixed rx sample conversion, minor fixes --- src/ria_toolkit_oss/sdr/hackrf.py | 125 +++++++++++++++++++++--------- 1 file changed, 87 insertions(+), 38 deletions(-) diff --git a/src/ria_toolkit_oss/sdr/hackrf.py b/src/ria_toolkit_oss/sdr/hackrf.py index 39e4a47..06b49a7 100644 --- a/src/ria_toolkit_oss/sdr/hackrf.py +++ b/src/ria_toolkit_oss/sdr/hackrf.py @@ -6,7 +6,7 @@ import numpy as np from ria_toolkit_oss.datatypes.recording import Recording from ria_toolkit_oss.sdr._external.libhackrf import HackRF as hrf -from ria_toolkit_oss.sdr.sdr import SDR +from ria_toolkit_oss.sdr.sdr import SDR, SDRParameterError class HackRF(SDR): @@ -21,7 +21,7 @@ class HackRF(SDR): """ if identifier != "": - print(f"Warning, radio identifier {identifier} provided for HackRF but will not be used.") + warnings.warn(f"HackRF: Identifier '{identifier}' will be ignored", UserWarning) print("Initializing HackRF radio.") try: @@ -33,8 +33,6 @@ class HackRF(SDR): print("Failed to find HackRF radio.") raise e - super().__init__() - def init_rx( self, sample_rate: int | float, @@ -64,14 +62,8 @@ class HackRF(SDR): :type gain_mode: str """ print("Initializing RX") - - self.rx_sample_rate = sample_rate - self.radio.sample_rate = int(sample_rate) - print(f"HackRF sample rate = {self.radio.sample_rate}") - - self.rx_center_frequency = center_frequency - self.radio.center_freq = int(center_frequency) - print(f"HackRF center frequency = {self.radio.center_freq}") + self.set_sample_rate(sample_rate=sample_rate) + self.set_center_frequency(center_frequency=center_frequency) # Distribute gain across amplifier stages rx_gain_min = 0 @@ -79,7 +71,7 @@ class HackRF(SDR): if gain_mode == "relative": if gain > 0: - raise ValueError( + raise SDRParameterError( "When gain_mode = 'relative', gain must be < 0. This " "sets the gain relative to the maximum possible gain." ) @@ -99,7 +91,9 @@ class HackRF(SDR): self.rx_gain = abs_gain print(f"HackRF gain distribution: Amp={self.amp_enabled}, LNA={self.rx_lna_gain}dB, VGA={self.rx_vga_gain}dB") - print("To individually modify the HackRF gains, use set_gain_amp(), set_rx_lna_gain(), and set_rx_vga_gain().") + print( + "To individually modify the HackRF gains, use set_gain_amp(), set_rx_lna_gain(), and set_rx_vga_gain().\n" + ) self._tx_initialized = False self._rx_initialized = True @@ -122,13 +116,13 @@ class HackRF(SDR): raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record()") if num_samples is not None and rx_time is not None: - raise ValueError("Only input one of num_samples or rx_time") + raise SDRParameterError("Only input one of num_samples or rx_time") elif num_samples is not None: self._num_samples_to_record = num_samples elif rx_time is not None: - self._num_samples_to_record = int(rx_time * self.rx_sample_rate) + self._num_samples_to_record = int(rx_time * self.sample_rate) else: - raise ValueError("Must provide input of one of num_samples or rx_time") + raise SDRParameterError("Must provide input of one of num_samples or rx_time") print("HackRF Starting RX...") @@ -137,18 +131,15 @@ class HackRF(SDR): print("HackRF RX Completed.") - # Create 1xN array for single-channel recording - store_array = np.zeros((1, self._num_samples_to_record), dtype=np.complex64) - store_array[0, :] = all_samples - + rx_complex = self.convert_rx_samples(rx_samples=all_samples) metadata = { "source": self.__class__.__name__, - "sample_rate": self.rx_sample_rate, - "center_frequency": self.rx_center_frequency, + "sample_rate": self.sample_rate, + "center_frequency": self.center_frequency, "gain": self.rx_gain, } - return Recording(data=store_array, metadata=metadata) + return Recording(data=rx_complex, metadata=metadata) def init_tx( self, @@ -174,19 +165,14 @@ class HackRF(SDR): """ print("Initializing TX") - self.tx_sample_rate = sample_rate - self.radio.sample_rate = int(sample_rate) - print(f"HackRF sample rate = {self.radio.sample_rate}") - - self.tx_center_frequency = center_frequency - self.radio.center_freq = int(center_frequency) - print(f"HackRF center frequency = {self.radio.center_freq}") + self.set_sample_rate(sample_rate=sample_rate) + self.set_center_frequency(center_frequency=center_frequency) tx_gain_min = 0 tx_gain_max = 47 if gain_mode == "relative": if gain > 0: - raise ValueError( + raise SDRParameterError( "When gain_mode = 'relative', gain must be < 0. This \ sets the gain relative to the maximum possible gain." ) @@ -197,14 +183,14 @@ class HackRF(SDR): if abs_gain < tx_gain_min or abs_gain > tx_gain_max: abs_gain = min(max(gain, tx_gain_min), tx_gain_max) - print(f"Gain {gain} out of range for Pluto.") + print(f"Gain {gain} out of range for HackRF.") print(f"Gain range: {tx_gain_min} to {tx_gain_max} dB") self.set_gain_amp(True) self.set_tx_vga_gain(abs_gain) self.tx_gain = abs_gain print(f"HackRF gain distribution: Amp={self.amp_enabled}, VGA={self.tx_vga_gain}dB") - print("To individually modify the HackRF gains, use set_gain_amp() or set_tx_vga_gain().") + print("To individually modify the HackRF gains, use set_gain_amp() or set_tx_vga_gain().\n") self._tx_initialized = True self._rx_initialized = False @@ -229,13 +215,13 @@ class HackRF(SDR): :type tx_time: int or float, optional """ if num_samples is not None and tx_time is not None: - raise ValueError("Only input one of num_samples or tx_time") + raise SDRParameterError("Only input one of num_samples or tx_time") elif num_samples is not None: - tx_time = num_samples / self.tx_sample_rate + tx_time = num_samples / self.sample_rate elif tx_time is not None: pass else: - tx_time = len(recording) / self.tx_sample_rate + tx_time = len(recording) / self.sample_rate if isinstance(recording, np.ndarray): samples = recording @@ -275,6 +261,62 @@ class HackRF(SDR): self.radio.set_txvga_gain(vga_gain) self.tx_vga_gain = vga_gain + def set_sample_rate(self, sample_rate): + if sample_rate < 2e6 or sample_rate > 20e6: + raise SDRParameterError( + f"{self.__class__.__name__}: Sample rate {sample_rate/1e6:.3f} Msps " + f"out of range: [{2:.3f} - {20:.3f} Msps]" + ) + self.sample_rate = sample_rate + self.radio.sample_rate = int(sample_rate) + print(f"HackRF sample rate = {self.radio.sample_rate}") + + def set_rx_sample_rate(self, sample_rate): + """ + Set the sample rate. + Not callable during recording; HackRF requires stream stop/restart to change sample rate. + """ + self.set_sample_rate(sample_rate=sample_rate) + + def set_tx_sample_rate(self, sample_rate): + self.set_sample_rate(sample_rate=sample_rate) + + def set_center_frequency(self, center_frequency): + with self._param_lock: + if center_frequency < 1e6 or center_frequency > 6e9: + raise SDRParameterError( + f"{self.__class__.__name__}: Center frequency {center_frequency/1e9:.3f} GHz " + f"out of range: [{1e6/1e9:.3f} - {6e9/1e9:.3f} GHz]" + ) + self.center_frequency = center_frequency + self.radio.center_freq = int(center_frequency) + print(f"HackRF center frequency = {self.radio.center_freq}") + + def set_rx_center_frequency(self, center_frequency): + """ + Set the center frequency. Callable during streaming. + """ + self.set_center_frequency(center_frequency=center_frequency) + + def set_tx_center_frequency(self, center_frequency): + self.set_center_frequency(center_frequency=center_frequency) + + def convert_rx_samples(self, rx_samples): + # Handle conversion depending on dtype + if np.issubdtype(rx_samples.dtype, np.complexfloating): + # Already complex: just normalize + rx_complex = rx_samples.astype(np.complex64) / 128.0 + elif np.issubdtype(rx_samples.dtype, np.integer): + # Raw interleaved I/Q bytes: convert to complex + i_samples = rx_samples[0::2].astype(np.float32) + q_samples = rx_samples[1::2].astype(np.float32) + rx_complex = (i_samples + 1j * q_samples) / 128.0 + else: + raise TypeError(f"Unexpected dtype from read_samples: {rx_samples.dtype}") + + # Ensure 2D array: 1xN for single channel + return rx_complex.reshape((1, -1)) + def set_clock_source(self, source): self.radio.set_clock_source(source) @@ -288,7 +330,11 @@ class HackRF(SDR): raise NotImplementedError("Underlying HackRF interface lacks bias-tee control") from exc def close(self): - self.radio.close() + try: + self.radio.close() + del self.radio + finally: + self._enable_rx = False def _stream_rx(self, callback): """ @@ -342,3 +388,6 @@ class HackRF(SDR): def _stream_tx(self, callback): return super()._stream_tx(callback) + + def supports_dynamic_updates(self) -> dict: + return {"center_frequency": True, "sample_rate": False, "gain": False} -- 2.34.1 From 0ea81c37ba94e84e82d746626f65b92e5cc7284f Mon Sep 17 00:00:00 2001 From: madrigal Date: Mon, 17 Nov 2025 11:54:05 -0500 Subject: [PATCH 4/8] Updated setters, removed redundant shutdown, added chunked recording --- src/ria_toolkit_oss/sdr/pluto.py | 399 ++++++++++++++++++------------- 1 file changed, 234 insertions(+), 165 deletions(-) diff --git a/src/ria_toolkit_oss/sdr/pluto.py b/src/ria_toolkit_oss/sdr/pluto.py index 6c617ee..47af8df 100644 --- a/src/ria_toolkit_oss/sdr/pluto.py +++ b/src/ria_toolkit_oss/sdr/pluto.py @@ -8,7 +8,7 @@ import adi import numpy as np from ria_toolkit_oss.datatypes.recording import Recording -from ria_toolkit_oss.sdr.sdr import SDR +from ria_toolkit_oss.sdr.sdr import SDR, SDRError, SDRParameterError class Pluto(SDR): @@ -28,6 +28,7 @@ class Pluto(SDR): print(f"Initializing Pluto radio with identifier [{identifier}].") try: super().__init__() + self._tx_lock = threading.Lock() if identifier is None: uri = "ip:pluto.local" @@ -74,10 +75,12 @@ class Pluto(SDR): :type center_frequency: int or float :param gain: The gain set for receiving on the Pluto :type gain: int - :param channel: The channel the Pluto is set to. Must be 0 or 1. 0 enables channel 1, 1 enables both channels. + :param channel: The channel the Pluto is set to. Must be 0 or 1. 0 + enables channel 1, 1 enables both channels. :type channel: int :param gain_mode: 'absolute' passes gain directly to the sdr, - 'relative' means that gain should be a negative value, and it will be subtracted from the max gain (74). + 'relative' means that gain should be a negative value, and it will + be subtracted from the max gain (74). :type gain_mode: str """ print("Initializing RX") @@ -88,20 +91,7 @@ class Pluto(SDR): self.set_rx_center_frequency(center_frequency=int(center_frequency)) print(f"Pluto center frequency = {self.radio.rx_lo}") - if channel == 0: - self.radio.rx_enabled_channels = [0] - print(f"Pluto channel(s) = {self.radio.rx_enabled_channels}") - elif channel == 1: - if not self._mimo_capable: - raise ValueError( - "Dual RX channel requested (channel=1) but hardware is not MIMO-capable. " - "Dual RX/TX requires Pluto Rev C/D. Detected hardware: Rev B (single channel only)." - ) - self.radio.rx_enabled_channels = [0, 1] - print(f"Pluto channel(s) = {self.radio.rx_enabled_channels}") - else: - raise ValueError("Channel must be either 0 or 1.") - + self.set_rx_channel(channel=channel) self.set_rx_gain(gain=gain, channel=channel, gain_mode=gain_mode) if channel == 0: print(f"Pluto gain = {self.radio.rx_hardwaregain_chan0}") @@ -109,8 +99,6 @@ class Pluto(SDR): self.set_rx_gain(gain=gain, channel=0, gain_mode=gain_mode) print(f"Pluto gain = {self.radio.rx_hardwaregain_chan0}, {self.radio.rx_hardwaregain_chan1}") - self.set_rx_buffer_size(getattr(self, "rx_buffer_size", 1024)) - self._rx_initialized = True self._tx_initialized = False @@ -134,10 +122,12 @@ class Pluto(SDR): :type center_frequency: int or float :param gain: The gain set for transmitting on the Pluto :type gain: int - :param channel: The channel the Pluto is set to. Must be 0 or 1. 0 enables channel 1, 1 enables both channels. + :param channel: The channel the Pluto is set to. Must be 0 or 1. 0 + enables channel 1, 1 enables both channels. :type channel: int :param gain_mode: 'absolute' passes gain directly to the sdr, - 'relative' means that gain should be a negative value, and it will be subtracted from the max gain (0). + 'relative' means that gain should be a negative value, and it will + be subtracted from the max gain (0). :type gain_mode: str """ @@ -149,20 +139,7 @@ class Pluto(SDR): self.set_tx_center_frequency(center_frequency=int(center_frequency)) print(f"Pluto center frequency = {self.radio.tx_lo}") - if channel == 0: - self.radio.tx_enabled_channels = [0] - print(f"Pluto channel(s) = {self.radio.tx_enabled_channels}") - elif channel == 1: - if not self._mimo_capable: - raise ValueError( - "Dual TX channel requested (channel=1) but hardware is not MIMO-capable. " - "Dual RX/TX requires Pluto Rev C/D. Detected hardware: Rev B (single channel only)." - ) - self.radio.tx_enabled_channels = [0, 1] - print(f"Pluto channel(s) = {self.radio.tx_enabled_channels}") - else: - raise ValueError("Channel must be either 0 or 1.") - + self.set_tx_channel(channel=channel) self.set_tx_gain(gain=gain, channel=channel, gain_mode=gain_mode) if channel == 0: print(f"Pluto gain = {self.radio.tx_hardwaregain_chan0}") @@ -179,16 +156,74 @@ class Pluto(SDR): if not self._rx_initialized: raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record()") - # print("Starting rx...") - self._enable_rx = True while self._enable_rx is True: + # collect complex signa from radio signal = self.radio.rx() - signal = self._convert_rx_samples(signal) + # send callback complex signal callback(buffer=signal, metadata=None) - def record(self, num_samples: Optional[int] = None, rx_time: Optional[int | float] = None): + def _record_fast(self, num_samples): + """Optimized single-buffer capture for ≤16M samples.""" + + self.set_rx_buffer_size(buffer_size=num_samples) + print("Pluto Starting RX...") + samples = self.radio.rx() + + # Handle single/dual channel + if self.radio.rx_enabled_channels == [0]: + samples = [self._convert_rx_samples(samples)] + else: + samples = [self._convert_rx_samples(s) for s in samples] + + print("Pluto RX Completed.") + + metadata = { + "source": self.__class__.__name__, + "sample_rate": self.rx_sample_rate, + "center_frequency": self.rx_center_frequency, + "gain": self.rx_gain, + } + return Recording(data=samples, metadata=metadata) + + def _record_chunked(self, num_samples): + """Chunked streaming capture for >2M samples.""" + + # Use base class streaming with pre-allocation + chunk_size = 2_000_000 # 2M sample chunks (safe size) + self.set_rx_buffer_size(buffer_size=chunk_size) + + self._max_num_buffers = (num_samples // chunk_size) + 1 + self._num_buffers_processed = 0 + self._accumulated_buffer = None + + # Stream with accumulation callback + print("Pluto Starting RX...") + self._stream_rx(callback=self._accumulate_buffers_callback) + print("Pluto RX Completed.") + print(f"Corrupted buffer count: {self._corrupted_buffer_count}") + + # Truncate to exact size + samples = self._accumulated_buffer[:, :num_samples] + samples_list = [self._convert_rx_samples(chan) for chan in samples] + + metadata = { + "source": self.__class__.__name__, + "sample_rate": self.rx_sample_rate, + "center_frequency": self.rx_center_frequency, + "gain": self.rx_gain, + } + + # Reset for next capture + self._accumulated_buffer = None + return Recording(data=samples_list, metadata=metadata) + + def record( + self, + num_samples: Optional[int] = None, + rx_time: Optional[int | float] = None, + ) -> Recording: """ Create a radio recording (iq samples and metadata) of a given length from the SDR. Either num_samples or rx_time must be provided. @@ -205,38 +240,19 @@ class Pluto(SDR): raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record()") if num_samples is not None and rx_time is not None: - raise ValueError("Only input one of num_samples or rx_time") + raise SDRParameterError("Only input one of num_samples or rx_time") elif num_samples is not None: self._num_samples_to_record = num_samples elif rx_time is not None: self._num_samples_to_record = int(rx_time * self.rx_sample_rate) else: - raise ValueError("Must provide input of one of num_samples or rx_time") + raise SDRParameterError("Must provide input of one of num_samples or rx_time") - if self._num_samples_to_record > 16000000: - raise NotImplementedError("Pluto record for num_samples>16M not implemented yet.") - self.radio.rx_buffer_size = self._num_samples_to_record - - print("Pluto Starting RX...") - samples = self.radio.rx() - if self.radio.rx_enabled_channels == [0]: - samples = self._convert_rx_samples(samples) - samples = [samples] + # Record in one go if there are less than 2,000,000 samples to record, record in chunks otherwise + if self._num_samples_to_record <= 2_000_000: + return self._record_fast(self._num_samples_to_record) else: - channel1 = self._convert_rx_samples(samples[0]) - channel2 = self._convert_rx_samples(samples[1]) - samples = [channel1, channel2] - print("Pluto RX Completed.") - - metadata = { - "source": self.__class__.__name__, - "sample_rate": self.rx_sample_rate, - "center_frequency": self.rx_center_frequency, - "gain": self.rx_gain, - } - - recording = Recording(data=samples, metadata=metadata) - return recording + return self._record_chunked(self._num_samples_to_record) def _format_tx_data(self, recording: Recording | np.ndarray | list): if isinstance(recording, np.ndarray): @@ -289,8 +305,9 @@ class Pluto(SDR): print("Pluto TX Completed.") def interrupt_transmit(self): - self.radio.tx_destroy_buffer() - self.radio.tx_cyclic_buffer = False + with self._tx_lock: + self.radio.tx_destroy_buffer() + self.radio.tx_cyclic_buffer = False print("Pluto TX Completed.") def tx_recording(self, recording: Recording | np.ndarray | list, num_samples=None, tx_time=None, mode="timed"): @@ -310,7 +327,7 @@ class Pluto(SDR): :type mode: str, optional """ if num_samples is not None and tx_time is not None: - raise ValueError("Only input one of num_samples or tx_time") + raise SDRParameterError("Only input one of num_samples or tx_time") elif num_samples is not None: tx_time = num_samples / self.tx_sample_rate elif tx_time is not None: @@ -320,82 +337,112 @@ class Pluto(SDR): data = self._format_tx_data(recording=recording) - try: - if self.radio.tx_cyclic_buffer: - print("Destroying existing TX buffer...") - self.radio.tx_destroy_buffer() - self.radio.tx_cyclic_buffer = False - except Exception as e: - print(f"Error while destroying TX buffer: {e}") + with self._tx_lock: + try: + if self.radio.tx_cyclic_buffer: + print("Destroying existing TX buffer...") + self.radio.tx_destroy_buffer() + self.radio.tx_cyclic_buffer = False + except Exception as e: + print(f"Error while destroying TX buffer: {e}") - self.radio.tx_cyclic_buffer = True - print("Pluto Starting TX...") - self.radio.tx(data_np=data) - if mode == "timed": - timeout_thread = threading.Thread(target=self._timeout_cyclic_buffer, args=([tx_time])) - timeout_thread.start() - timeout_thread.join() + self.radio.tx_cyclic_buffer = True + print("Pluto Starting TX...") + self.radio.tx(data_np=data) + if mode == "timed": + timeout_thread = threading.Thread(target=self._timeout_cyclic_buffer, args=([tx_time])) + timeout_thread.start() + timeout_thread.join() def _stream_tx(self, callback): if self._tx_initialized is False: raise RuntimeError("TX was not initialized, init_tx must be called before _stream_tx") - num_samples = 10000 - # TODO remove hardcode + if not hasattr(self, "tx_buffer_size"): + self.tx_buffer_size = 10000 self._enable_tx = True while self._enable_tx is True: - buffer = self._convert_tx_samples(callback(num_samples)) + buffer = self._convert_tx_samples(callback(self.tx_buffer_size)) self.radio.tx(buffer[0]) def set_rx_center_frequency(self, center_frequency): - try: - self.radio.rx_lo = int(center_frequency) - self.rx_center_frequency = center_frequency - except OSError as e: - _handle_OSError(e) - except ValueError as e: - _handle_OSError(e) + """ + Set the center frequency of the receiver. Callable during streaming. + """ + with self._param_lock: + if center_frequency < 70e6 or center_frequency > 6e9: + raise SDRParameterError( + f"{self.__class__.__name__}: Center frequency {center_frequency/1e9:.3f} GHz " + f"out of range:\nStandard:\t[{325e6/1e9:.3f} - {3.8e9/1e9:.3f} GHz]\nHacked:\t" + f"[{70e6/1e9:.3f} - {6e9/1e9:.3f} GHz]" + ) + + try: + self.radio.rx_lo = int(center_frequency) + self.rx_center_frequency = center_frequency + except OSError as e: + raise SDRError(e) + except ValueError: + raise SDRParameterError( + f"{self.__class__.__name__}: Center frequency {center_frequency/1e9:.3f} GHz " + f"out of range:\nStandard:\t[{325e6/1e9:.3f} - {3.8e9/1e9:.3f} GHz]\nHacked:\t" + f"[{70e6/1e9:.3f} - {6e9/1e9:.3f} GHz]" + ) def set_rx_sample_rate(self, sample_rate): - self.rx_sample_rate = sample_rate + """ + Set the sample rate of the receiver. Callable during streaming. + """ + with self._param_lock: + min_rate, max_rate = 65.1e3, 61.44e6 + if sample_rate < min_rate or sample_rate > max_rate: + raise SDRParameterError( + f"{self.__class__.__name__}: Sample rate {sample_rate/1e6:.3f} Msps " + f"out of range: [{min_rate/1e6:.3f} - {max_rate/1e6:.3f} Msps]" + ) - # TODO add logic for limiting sample rate + try: + # set the sample rate + self.radio.sample_rate = int(sample_rate) + self.rx_sample_rate = sample_rate - try: - self.radio.sample_rate = int(sample_rate) - - # set the front end filter width - self.radio.rx_rf_bandwidth = int(sample_rate) - except OSError as e: - _handle_OSError(e) - except ValueError as e: - _handle_OSError(e) + # set the front end filter width + self.radio.rx_rf_bandwidth = int(sample_rate) + except OSError as e: + raise SDRError(e) + except ValueError: + raise SDRParameterError( + f"{self.__class__.__name__}: Sample rate {sample_rate/1e6:.3f} Msps " + f"out of range: [{min_rate/1e6:.3f} - {max_rate/1e6:.3f} Msps]" + ) def set_rx_gain(self, gain, channel=0, gain_mode="absolute"): - rx_gain_min = 0 - rx_gain_max = 74 + """ + Set the gain of the receiver. Callable during streaming. + """ + with self._param_lock: + rx_gain_min = 0 + rx_gain_max = 74 - if gain_mode == "relative": - if gain > 0: - raise ValueError( - "When gain_mode = 'relative', gain must be < 0. This sets \ - the gain relative to the maximum possible gain." - ) + if gain_mode == "relative": + if gain > 0: + raise SDRParameterError( + "When gain_mode = 'relative', gain must be < 0. This sets \ + the gain relative to the maximum possible gain." + ) + else: + abs_gain = rx_gain_max + gain else: - abs_gain = rx_gain_max + gain - else: - abs_gain = gain + abs_gain = gain - if abs_gain < rx_gain_min or abs_gain > rx_gain_max: - abs_gain = min(max(gain, rx_gain_min), rx_gain_max) - print(f"Gain {gain} out of range for Pluto.") - print(f"Gain range: {rx_gain_min} to {rx_gain_max} dB") + if abs_gain < rx_gain_min or abs_gain > rx_gain_max: + abs_gain = min(max(gain, rx_gain_min), rx_gain_max) + print(f"Gain {gain} out of range for Pluto.") + print(f"Gain range: {rx_gain_min} to {rx_gain_max} dB") - self.rx_gain = abs_gain - try: + self.rx_gain = abs_gain if channel == 0: - if abs_gain is None: self.radio.gain_control_mode_chan0 = "automatic" print("Using Pluto Automatic Gain Control.") @@ -415,63 +462,77 @@ class Pluto(SDR): self.radio.rx_hardwaregain_chan1 = abs_gain # dB except Exception as e: - print("Failed to use channel 1 on the PlutoSDR. \nThis is only available for revC versions.") + print("Failed to use channel 1 on the PlutoSDR.\nThis is only available for revC versions.") raise e else: - raise ValueError(f"Pluto channel must be 0 or 1 but was {channel}.") - - except OSError as e: - _handle_OSError(e) - except ValueError as e: - _handle_OSError(e) + raise SDRParameterError(f"Pluto channel must be 0 or 1 but was {channel}.") def set_rx_channel(self, channel): if channel == 0: self.radio.rx_enabled_channels = [0] - print(f"Pluto channel(s) = {self.radio.rx_enabled_channels}") elif channel == 1: + if not self._mimo_capable: + raise SDRParameterError( + "Dual RX channel requested (channel=1) but hardware is not MIMO-capable. " + "Dual RX/TX requires Pluto Rev C/D. Detected hardware: Rev B (single channel only)." + ) self.radio.rx_enabled_channels = [0, 1] - print(f"Pluto channel(s) = {self.radio.rx_enabled_channels}") else: - raise ValueError("Channel must be either 0 or 1.") + raise SDRParameterError("Channel must be either 0 or 1.") - def set_rx_buffer_size(self, buffer_size): + print(f"Pluto channel(s) = {self.radio.rx_enabled_channels}") + + def set_rx_buffer_size(self, buffer_size: int): if buffer_size is None: - raise ValueError("Buffer_size must be provided.") - buffer_size = int(buffer_size) + raise SDRParameterError("Buffer_size must be provided.") if buffer_size <= 0: - raise ValueError("Buffer_size must be a positive integer.") - - self.rx_buffer_size = buffer_size + raise SDRParameterError("Buffer_size must be a positive integer.") if hasattr(self, "radio"): try: self.radio.rx_buffer_size = buffer_size - except OSError as e: - _handle_OSError(e) - except ValueError as e: - _handle_OSError(e) + except Exception as e: + raise SDRError(e) def set_tx_center_frequency(self, center_frequency): + if center_frequency < 70e6 or center_frequency > 6e9: + raise SDRParameterError( + f"{self.__class__.__name__}: Center frequency {center_frequency/1e9:.3f} GHz " + f"out of range:\nStandard:\t[{325e6/1e9:.3f} - {3.8e9/1e9:.3f} GHz]\nHacked:\t" + f"[{70e6/1e9:.3f} - {6e9/1e9:.3f} GHz]" + ) + try: self.radio.tx_lo = int(center_frequency) self.tx_center_frequency = center_frequency - except OSError as e: - _handle_OSError(e) - except ValueError as e: - _handle_OSError(e) + raise SDRError(e) + except ValueError: + raise SDRParameterError( + f"{self.__class__.__name__}: Center frequency {center_frequency/1e9:.3f} GHz " + f"out of range:\nStandard:\t[{325e6/1e9:.3f} - {3.8e9/1e9:.3f} GHz]\nHacked:\t" + f"[{70e6/1e9:.3f} - {6e9/1e9:.3f} GHz]" + ) def set_tx_sample_rate(self, sample_rate): + min_rate, max_rate = 65.1e3, 61.44e6 + if sample_rate < min_rate or sample_rate > max_rate: + raise SDRParameterError( + f"{self.__class__.__name__}: Sample rate {sample_rate/1e6:.3f} Msps " + f"out of range: [{min_rate/1e6:.3f} - {max_rate/1e6:.3f} Msps]" + ) + try: self.radio.sample_rate = sample_rate self.tx_sample_rate = sample_rate - except OSError as e: - _handle_OSError(e) - except ValueError as e: - _handle_OSError(e) + raise SDRError(e) + except ValueError: + raise SDRParameterError( + f"{self.__class__.__name__}: Sample rate {sample_rate/1e6:.3f} Msps " + f"out of range: [{min_rate/1e6:.3f} - {max_rate/1e6:.3f} Msps]" + ) def set_tx_gain(self, gain, channel=0, gain_mode="absolute"): tx_gain_min = -89 @@ -479,7 +540,7 @@ class Pluto(SDR): if gain_mode == "relative": if gain > 0: - raise ValueError( + raise SDRParameterError( "When gain_mode = 'relative', gain must be < 0. This sets\ the gain relative to the maximum possible gain." ) @@ -501,34 +562,39 @@ class Pluto(SDR): elif channel == 1: self.radio.tx_hardwaregain_chan1 = int(abs_gain) else: - raise ValueError(f"Pluto channel must be 0 or 1 but was {channel}.") + raise SDRParameterError(f"Pluto channel must be 0 or 1 but was {channel}.") - except OSError as e: - _handle_OSError(e) - except ValueError as e: - _handle_OSError(e) + except Exception as e: + raise SDRError(e) def set_tx_channel(self, channel): - if channel == 1: - self.radio.tx_enabled_channels = [0, 1] - print(f"Pluto channel(s) = {self.radio.tx_enabled_channels}") - elif channel == 0: + if channel == 0: self.radio.tx_enabled_channels = [0] - print(f"Pluto channel(s) = {self.radio.tx_enabled_channels}") + elif channel == 1: + if not self._mimo_capable: + raise SDRParameterError( + "Dual TX channel requested (channel=1) but hardware is not MIMO-capable. " + "Dual RX/TX requires Pluto Rev C/D. Detected hardware: Rev B (single channel only)." + ) + self.radio.tx_enabled_channels = [0, 1] else: - raise ValueError("Channel must be either 0 or 1.") + raise SDRParameterError("Channel must be either 0 or 1.") - def set_tx_buffer_size(self, buffer_size): - raise NotImplementedError + print(f"Pluto channel(s) = {self.radio.tx_enabled_channels}") + + def set_tx_buffer_size(self, buffer_size: int): + if buffer_size is None: + raise SDRParameterError("Buffer_size must be provided.") + if buffer_size <= 0: + raise SDRParameterError("Buffer_size must be a positive integer.") + + self.tx_buffer_size = buffer_size def close(self): if self.radio.tx_cyclic_buffer: self.radio.tx_destroy_buffer() del self.radio - def shutdown(self): - del self.radio - def _convert_rx_samples(self, samples): return samples / (2**11) @@ -538,6 +604,9 @@ class Pluto(SDR): def set_clock_source(self, source): raise NotImplementedError + def supports_dynamic_updates(self) -> dict: + return {"center_frequency": True, "sample_rate": True, "gain": True} + def _handle_OSError(e): -- 2.34.1 From c575fa798c6f326ae8f0f693c7a2299345330d47 Mon Sep 17 00:00:00 2001 From: madrigal Date: Mon, 17 Nov 2025 12:09:45 -0500 Subject: [PATCH 5/8] Updated setters, added buffer size calculation, standardized errors --- src/ria_toolkit_oss/sdr/rtlsdr.py | 139 ++++++++++++++++++++---------- 1 file changed, 93 insertions(+), 46 deletions(-) diff --git a/src/ria_toolkit_oss/sdr/rtlsdr.py b/src/ria_toolkit_oss/sdr/rtlsdr.py index 3368df1..d49ab6e 100644 --- a/src/ria_toolkit_oss/sdr/rtlsdr.py +++ b/src/ria_toolkit_oss/sdr/rtlsdr.py @@ -12,7 +12,7 @@ except ImportError as exc: # pragma: no cover - dependency provided by end user raise ImportError("pyrtlsdr is required to use the RTLSDR class") from exc from ria_toolkit_oss.datatypes.recording import Recording -from ria_toolkit_oss.sdr.sdr import SDR +from ria_toolkit_oss.sdr.sdr import SDR, SDRParameterError class RTLSDR(SDR): @@ -45,8 +45,7 @@ class RTLSDR(SDR): print(f"Initialized RTL-SDR with identifier [{identifier}].") except Exception as e: - print(f"Failed to find RTL-SDR with identifier [{identifier}].") - raise e + raise RuntimeError(f"RTL-SDR: Failed to find device with identifier '{identifier}'\nError: {e}") def init_rx( self, @@ -55,18 +54,18 @@ class RTLSDR(SDR): gain: Optional[int], channel: int, gain_mode: Optional[str] = "absolute", - buffer_size: Optional[int] = 256_000, bias_t: bool = False, ): if channel not in (0, None): - raise ValueError("RTL-SDR supports only channel 0 for RX.") + raise SDRParameterError("RTL-SDR supports only channel 0 for RX.") self.set_rx_sample_rate(sample_rate=sample_rate) self.set_rx_center_frequency(center_frequency=center_frequency) self.set_rx_gain(gain=gain, gain_mode=gain_mode) - self.rx_buffer_size = int(buffer_size or self.rx_buffer_size) self.rx_channel = 0 + self.rx_buffer_size = self._calculate_optimal_buffer_size(sample_rate) + print(f"RTL-SDR buffer: {self.rx_buffer_size} samples for {sample_rate/1e6:.1f} MS/s") if bias_t: self.set_bias_tee(True) @@ -78,58 +77,102 @@ class RTLSDR(SDR): return {"sample_rate": self.rx_sample_rate, "center_frequency": self.rx_center_frequency, "gain": self.rx_gain} def set_rx_sample_rate(self, sample_rate): + """ + Set the sample rate of the receiver. + Not callable during recording; RTL-SDR requires stream stop/restart to change sample rate. + """ + if not ((sample_rate > 230e3 and sample_rate < 300e3) or (sample_rate > 900 and sample_rate < 3.2e6)): + raise SDRParameterError( + f"{self.__class__.__name__}: Sample rate {sample_rate/1e6:.3f} Msps " + f"out of range: [{2:.3f} - {20:.3f} Msps]" + ) + self.radio.sample_rate = float(sample_rate) self.rx_sample_rate = self.radio.sample_rate print(f"RTL RX Sample Rate = {self.radio.get_sample_rate()}") def set_rx_center_frequency(self, center_frequency): - self.radio.center_freq = float(center_frequency) - self.rx_center_frequency = self.radio.center_freq - print(f"RTL RX Center Frequency = {self.radio.get_center_freq()}") + """ + Set the center frequency of the receiver. + Not callable during recording; RTL-SDR requires stream stop/restart to change center frequency. + """ + with self._param_lock: + min_rate, max_rate = 25e6, 1.75e9 + if center_frequency < min_rate or center_frequency > max_rate: + raise SDRParameterError( + f"{self.__class__.__name__}: Center frequency {center_frequency/1e9:.3f} GHz " + f"out of range: [{min_rate/1e9:.3f} - {max_rate/1e9:.3f} GHz]" + ) + + self.radio.center_freq = float(center_frequency) + self.rx_center_frequency = self.radio.center_freq + print(f"RTL RX Center Frequency = {self.radio.get_center_freq()}") def set_rx_gain(self, gain, gain_mode="absolute"): - available_gains = self.radio.get_gains() + """ + Set the gain of the receiver. Callable during streaming. + """ + with self._param_lock: + available_gains = self.radio.get_gains() - if gain is None: - self.radio.gain = "auto" - self.rx_gain = "auto" - else: - if not available_gains: - warnings.warn( - "No gain table reported by RTL-SDR; applying requested gain directly.", - RuntimeWarning, - ) - target_gain = gain + if gain is None: + self.radio.gain = "auto" + self.rx_gain = "auto" else: - min_gain = min(available_gains) - max_gain = max(available_gains) - - if gain_mode == "relative": - if gain > 0: - raise ValueError( - "When gain_mode = 'relative', gain must be < 0. This sets\ - the gain relative to the maximum possible gain." - ) - target_gain = max_gain + gain - else: - target_gain = gain - - if target_gain < min_gain or target_gain > max_gain: - print( - f"Requested gain {target_gain} dB out of range;\ - clamping to valid span {min_gain}-{max_gain} dB." + if not available_gains: + warnings.warn( + "No gain table reported by RTL-SDR; applying requested gain directly.", + RuntimeWarning, ) - target_gain = min(max(target_gain, min_gain), max_gain) + target_gain = gain + else: + min_gain = min(available_gains) + max_gain = max(available_gains) - target_gain = min(available_gains, key=lambda g: abs(g - target_gain)) + if gain_mode == "relative": + if gain > 0: + raise SDRParameterError( + "When gain_mode = 'relative', gain must be < 0. This sets\ + the gain relative to the maximum possible gain." + ) + target_gain = max_gain + gain + else: + target_gain = gain - self.radio.set_gain(target_gain) - self.rx_gain = self.radio.get_gain() + if target_gain < min_gain or target_gain > max_gain: + print( + f"Requested gain {target_gain} dB out of range;\ + clamping to valid span {min_gain}-{max_gain} dB." + ) + target_gain = min(max(target_gain, min_gain), max_gain) - print(f"RTL RX Gain = {self.radio.get_gain()}") - print(f"Available RTL RX Gains: {available_gains}") + target_gain = min(available_gains, key=lambda g: abs(g - target_gain)) - def record(self, num_samples: Optional[int] = None, rx_time: Optional[int | float] = None): + self.radio.set_gain(target_gain) + self.rx_gain = self.radio.get_gain() + + print(f"RTL RX Gain = {self.radio.get_gain()}") + print(f"Available RTL RX Gains: {available_gains}") + + def _calculate_optimal_buffer_size(self, sample_rate): + """USB packet alignment for stability.""" + # RTL-SDR USB transfers in 16k chunks + min_size = 16384 + max_size = 262144 # 256k + + # Target: 50ms of data per buffer + target = int(sample_rate * 0.05) + + # Round up to 16k boundary + size = ((target + 16383) // 16384) * 16384 + + return max(min_size, min(size, max_size)) + + def record( + self, + num_samples: Optional[int] = None, + rx_time: Optional[int | float] = None, + ) -> Recording: """ Create a radio recording (iq samples and metadata) of a given length from the RTL-SDR. Either num_samples or rx_time must be provided. @@ -147,13 +190,13 @@ class RTLSDR(SDR): raise RuntimeError("RX was not initialized. init_rx() must be called before record().") if num_samples is not None and rx_time is not None: - raise ValueError("Only input one of num_samples or rx_time") + raise SDRParameterError("Only input one of num_samples or rx_time") elif num_samples is not None: pass elif rx_time is not None: num_samples = int(rx_time * self.rx_sample_rate) else: - raise ValueError("Must provide input of one of num_samples or rx_time") + raise SDRParameterError("Must provide input of one of num_samples or rx_time") # RTL-SDR has USB buffer limitations - use consistent 256k chunks # Always read full chunks to avoid USB overflow issues with partial reads @@ -232,6 +275,10 @@ class RTLSDR(SDR): def close(self): try: self.radio.close() + del self.radio finally: self._enable_rx = False self._enable_tx = False + + def supports_dynamic_updates(self) -> dict: + return {"center_frequency": False, "sample_rate": False, "gain": True} -- 2.34.1 From a0d0899eabec5d0d9e598ef25e0bafda6d855a33 Mon Sep 17 00:00:00 2001 From: madrigal Date: Mon, 17 Nov 2025 13:29:32 -0500 Subject: [PATCH 6/8] Added gain setter, fixed setting sample rate in init_rx --- src/ria_toolkit_oss/sdr/thinkrf.py | 68 ++++++++++++++++++------------ 1 file changed, 42 insertions(+), 26 deletions(-) diff --git a/src/ria_toolkit_oss/sdr/thinkrf.py b/src/ria_toolkit_oss/sdr/thinkrf.py index 67d425d..7d108ab 100644 --- a/src/ria_toolkit_oss/sdr/thinkrf.py +++ b/src/ria_toolkit_oss/sdr/thinkrf.py @@ -36,7 +36,7 @@ except SyntaxError as exc: # pragma: no cover - Python 2/3 compatibility issue print("Manual fix: Run `python scripts/fix_pyrf_python3.py` from ria-toolkit-oss directory") raise exc -from ria_toolkit_oss.sdr.sdr import SDR +from ria_toolkit_oss.sdr.sdr import SDR, SDRParameterError class ThinkRF(SDR): @@ -51,7 +51,7 @@ class ThinkRF(SDR): super().__init__() if identifier is None: - raise ValueError("ThinkRF requires an IP address or hostname identifier") + raise SDRParameterError("ThinkRF requires an IP address or hostname identifier") self.identifier = identifier try: @@ -90,7 +90,7 @@ class ThinkRF(SDR): mode = capture_mode.lower() if mode not in {"block", "stream"}: - raise ValueError("capture_mode must be either 'block' or 'stream'") + raise SDRParameterError("capture_mode must be either 'block' or 'stream'") self._rfe_mode = rfe_mode self._attenuation = int(max(0, min(attenuation, 30))) @@ -113,10 +113,12 @@ class ThinkRF(SDR): decimation: Optional[int] = None, ): if channel not in (0, None): - raise ValueError("ThinkRF devices expose a single receive channel") + raise SDRParameterError("ThinkRF supports only channel 0 for RX.") stream_mode = getattr(self, "_capture_mode", "block") == "stream" - actual_decimation, actual_sample_rate = self.set_rx_sample_rate(sample_rate=sample_rate, decimation=decimation) + actual_decimation, _ = self.set_rx_sample_rate( + sample_rate=sample_rate, decimation=decimation, stream_mode=stream_mode + ) self.radio.reset() self.radio.scpiset(":SYSTEM:FLUSH") @@ -127,15 +129,7 @@ class ThinkRF(SDR): self.radio.rfe_mode(self._rfe_mode) self.set_rx_center_frequency(center_frequency=center_frequency) - - attenuation = self._attenuation if gain is None else int(gain) # gain - attenuation = max(0, min(attenuation, 30)) - self.radio.attenuator(attenuation) - - gain_profile = self._gain_profile - if gain_mode and isinstance(gain_mode, str) and gain_mode.upper() in {"LOW", "MEDIUM", "HIGH", "VLOW"}: - gain_profile = gain_mode.upper() - self.radio.gain(gain_profile.lower()) # WSA.gain() expects lowercase + self.set_rx_gain(gain=gain, gain_mode=gain_mode, actual_decimation=actual_decimation) self.radio.decimation(actual_decimation) if stream_mode: @@ -153,14 +147,6 @@ class ThinkRF(SDR): self.radio.scpiset(f":TRACE:BLOCK:PACKETS {self._packets_per_block}") self.radio.scpiset(":TRACE:BLOCK:DATA?") - self.rx_gain = { - "attenuation_dB": attenuation, - "profile": gain_profile, - "decimation": actual_decimation, - "rfe_mode": self._rfe_mode, - "spp": self._samples_per_packet, - "ppb": self._packets_per_block, - } self.rx_buffer_size = self._samples_per_packet self.rx_channel = 0 @@ -168,6 +154,10 @@ class ThinkRF(SDR): self._tx_initialized = False def set_rx_sample_rate(self, sample_rate, decimation, stream_mode): + """ + Set the sample rate of the receiver. + Not callable during recording; ThinkRF requires stream stop/restart to change sample rate. + """ # Enforce sample rate / decimation # Note: decimation parameter takes precedence if provided actual_decimation, actual_sample_rate = self.enforce_sample_rate(sample_rate, decimation) @@ -188,9 +178,32 @@ class ThinkRF(SDR): return actual_decimation, actual_sample_rate def set_rx_center_frequency(self, center_frequency): - self.radio.freq(int(center_frequency)) - self.rx_center_frequency = self.radio.freq - print(f"ThinkRF RX Center Frequency = {self.radio.freq}") + """ + Set the center frequency of the receiver. Callable during streaming. + """ + with self._param_lock: + self.radio.freq(int(center_frequency)) + self.rx_center_frequency = self.radio.freq + print(f"ThinkRF RX Center Frequency = {self.radio.freq}") + + def set_rx_gain(self, gain, gain_mode, actual_decimation): + attenuation = self._attenuation if gain is None else int(gain) # gain + attenuation = max(0, min(attenuation, 30)) + self.radio.attenuator(attenuation) + + gain_profile = self._gain_profile + if gain_mode and isinstance(gain_mode, str) and gain_mode.upper() in {"LOW", "MEDIUM", "HIGH", "VLOW"}: + gain_profile = gain_mode.upper() + self.radio.gain(gain_profile.lower()) # WSA.gain() expects lowercase + + self.rx_gain = { + "attenuation_dB": attenuation, + "profile": gain_profile, + "decimation": actual_decimation, + "rfe_mode": self._rfe_mode, + "spp": self._samples_per_packet, + "ppb": self._packets_per_block, + } def _stream_rx(self, callback): if not self._rx_initialized: @@ -431,7 +444,7 @@ class ThinkRF(SDR): For decimation 1 or 2, block captures are limited by onboard RAM. """ if decimation <= 2 and num_samples > self.MAX_ONBOARD_SAMPLES: - raise ValueError( + raise SDRParameterError( f"ThinkRF: Cannot capture {num_samples} samples at decimation {decimation}. " f"Onboard RAM limit is ~{self.MAX_ONBOARD_SAMPLES} samples for dec 1/2. " f"Either reduce num_samples or use stream mode (increase decimation to >=4)." @@ -446,3 +459,6 @@ class ThinkRF(SDR): "fstop": int(center_frequency) + half, "amplitude": -100, } + + def supports_dynamic_updates(self) -> dict: + return {"center_frequency": True, "sample_rate": False, "gain": False} -- 2.34.1 From 10801ffb5741b9e04308ab27da7ac3d1deb7d611 Mon Sep 17 00:00:00 2001 From: madrigal Date: Mon, 17 Nov 2025 13:40:40 -0500 Subject: [PATCH 7/8] Implemented close method, minor updates and improvements --- src/ria_toolkit_oss/sdr/usrp.py | 210 ++++++++++++++++++-------------- 1 file changed, 118 insertions(+), 92 deletions(-) diff --git a/src/ria_toolkit_oss/sdr/usrp.py b/src/ria_toolkit_oss/sdr/usrp.py index 7e7f905..f458378 100644 --- a/src/ria_toolkit_oss/sdr/usrp.py +++ b/src/ria_toolkit_oss/sdr/usrp.py @@ -7,7 +7,7 @@ import numpy as np import uhd from ria_toolkit_oss.datatypes.recording import Recording -from ria_toolkit_oss.sdr.sdr import SDR +from ria_toolkit_oss.sdr.sdr import SDR, SDRParameterError class USRP(SDR): @@ -40,7 +40,7 @@ class USRP(SDR): channel: int, gain: int, gain_mode: Optional[str] = "absolute", - rx_buffer_size: int = 960000, + rx_buffer_size: Optional[int] = None, ): """ Initializes the USRP for receiving. @@ -63,8 +63,6 @@ class USRP(SDR): :rtype: dict """ - self.rx_buffer_size = rx_buffer_size - # build USRP object usrp_args = _generate_usrp_config_string(sample_rate=sample_rate, device_dict=self.device_dict) self.usrp = uhd.usrp.MultiUSRP(usrp_args) @@ -72,7 +70,7 @@ class USRP(SDR): # check if channel arg is valid max_num_channels = self.usrp.get_rx_num_channels() if channel + 1 > max_num_channels: - raise IOError(f"Channel {channel} not valid for device with {max_num_channels} channels.") + raise SDRParameterError(f"Channel {channel} not valid for device with {max_num_channels} channels.") self.set_rx_sample_rate(sample_rate=sample_rate, channel=channel) self.set_rx_center_frequency(center_frequency=center_frequency, channel=channel) @@ -81,6 +79,20 @@ class USRP(SDR): self.rx_channel = channel print(f"USRP RX Channel = {self.rx_channel}") + stream_args = uhd.usrp.StreamArgs("fc32", "sc16") + stream_args.channels = [self.rx_channel] + + self.metadata = uhd.types.RXMetadata() + self.rx_stream = self.usrp.get_rx_stream(stream_args) + + if rx_buffer_size is None: # In case it's none + self.rx_buffer_size = self.rx_stream.get_max_num_samps() + else: + self.rx_buffer_size = rx_buffer_size + + # set timeout based on buffer size and sample rate, with a safety factor of 5 + self.timeout = (self.rx_buffer_size / self.rx_sample_rate) * 5 + # flag to prevent user from calling certain functions before this one. self._rx_initialized = True self._tx_initialized = False @@ -88,68 +100,76 @@ class USRP(SDR): return {"sample_rate": self.rx_sample_rate, "center_frequency": self.rx_center_frequency, "gain": self.rx_gain} def set_rx_sample_rate(self, sample_rate, channel=0): + """ + Set the sample rate of the receiver. Callable during streaming. + """ # check if sample rate arg is valid # Note: B200/B210 devices auto-adjust master clock rate, so get_rx_rates() returns # the range for the CURRENT master clock, not the maximum possible range. # Skip validation for B-series devices and let UHD handle it. - device_type = self.device_dict.get("type", "").lower() - if device_type not in ["b200", "b210"]: - sample_rate_range = self.usrp.get_rx_rates() - if sample_rate < sample_rate_range.start() or sample_rate > sample_rate_range.stop(): - raise IOError( - f"Sample rate {sample_rate} not valid for this USRP.\nValid\ - range is {sample_rate_range.start()}\ - to {sample_rate_range.stop()}." - ) - self.usrp.set_rx_rate(sample_rate, channel) - self.rx_sample_rate = self.usrp.get_rx_rate(channel) - print(f"USRP RX Sample Rate = {self.rx_sample_rate}") + with self._param_lock: + device_type = self.device_dict.get("type", "").lower() + if device_type not in ["b200", "b210"]: + sample_rate_range = self.usrp.get_rx_rates() + min_rate, max_rate = sample_rate_range.start(), sample_rate_range.stop() + if sample_rate < min_rate or sample_rate > max_rate: + raise SDRParameterError( + f"{self.__class__.__name__}: Sample rate {sample_rate/1e6:.3f} Msps " + f"out of range: [{min_rate/1e6:.3f} - {max_rate/1e6:.3f} Msps]" + ) + + self.usrp.set_rx_rate(sample_rate, channel) + self.rx_sample_rate = self.usrp.get_rx_rate(channel) + print(f"USRP RX Sample Rate = {self.rx_sample_rate}") def set_rx_center_frequency(self, center_frequency, channel=0): - center_frequency_range = self.usrp.get_rx_freq_range() - if center_frequency < center_frequency_range.start() or center_frequency > center_frequency_range.stop(): - raise IOError( - f"Center frequency {center_frequency} out of range for USRP.\ - \nValid range is {center_frequency_range.start()} \ - to {center_frequency_range.stop()}." - ) - self.usrp.set_rx_freq(uhd.libpyuhd.types.tune_request(center_frequency), channel) - self.rx_center_frequency = self.usrp.get_rx_freq(channel) - print(f"USRP RX Center Frequency = {self.rx_center_frequency}") + """ + Set the center frequency of the receiver. Callable during streaming. + """ + with self._param_lock: + center_frequency_range = self.usrp.get_rx_freq_range() + min_rate, max_rate = center_frequency_range.start(), center_frequency_range.stop() + if center_frequency < min_rate or center_frequency > max_rate: + raise SDRParameterError( + f"{self.__class__.__name__}: Center frequency {center_frequency/1e9:.3f} GHz " + f"out of range: [{min_rate/1e9:.3f} - {max_rate/1e9:.3f} GHz]" + ) + + self.usrp.set_rx_freq(uhd.libpyuhd.types.tune_request(center_frequency), channel) + self.rx_center_frequency = self.usrp.get_rx_freq(channel) + print(f"USRP RX Center Frequency = {self.rx_center_frequency}") def set_rx_gain(self, gain, gain_mode="absolute", channel=0): - # check if gain arg is valid - gain_range = self.usrp.get_rx_gain_range() - if gain_mode == "relative": - if gain > 0: - raise ValueError( - "When gain_mode = 'relative', gain must be < 0. This sets\ - the gain relative to the maximum possible gain." - ) + """ + Set the gain of the receiver. Callable during streaming. + """ + with self._param_lock: + # check if gain arg is valid + gain_range = self.usrp.get_rx_gain_range() + if gain_mode == "relative": + if gain > 0: + raise SDRParameterError( + "When gain_mode = 'relative', gain must be < 0. This sets\ + the gain relative to the maximum possible gain." + ) + else: + # set gain relative to max + abs_gain = gain_range.stop() + gain else: - # set gain relative to max - abs_gain = gain_range.stop() + gain - else: - abs_gain = gain - if abs_gain < gain_range.start() or abs_gain > gain_range.stop(): - print(f"Gain {abs_gain} out of range for this USRP.") - print(f"Gain range: {gain_range.start()} to {gain_range.stop()} dB") - abs_gain = min(max(abs_gain, gain_range.start()), gain_range.stop()) - self.usrp.set_rx_gain(abs_gain, channel) - self.rx_gain = self.usrp.get_rx_gain(channel) - print(f"USRP RX Gain = {self.rx_gain}") + abs_gain = gain + if abs_gain < gain_range.start() or abs_gain > gain_range.stop(): + print(f"Gain {abs_gain} out of range for this USRP.") + print(f"Gain range: {gain_range.start()} to {gain_range.stop()} dB") + abs_gain = min(max(abs_gain, gain_range.start()), gain_range.stop()) + self.usrp.set_rx_gain(abs_gain, channel) + self.rx_gain = self.usrp.get_rx_gain(channel) + print(f"USRP RX Gain = {self.rx_gain}") def _stream_rx(self, callback): - if not self._rx_initialized: raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record()") - stream_args = uhd.usrp.StreamArgs("fc32", "sc16") - stream_args.channels = [self.rx_channel] - - self.metadata = uhd.types.RXMetadata() - self.rx_stream = self.usrp.get_rx_stream(stream_args) - + # send command to start the rx stream stream_command = uhd.types.StreamCMD(uhd.types.StreamMode.start_cont) stream_command.stream_now = True self.rx_stream.issue_stream_cmd(stream_command) @@ -160,19 +180,19 @@ class USRP(SDR): receive_buffer = np.zeros((1, self.rx_buffer_size), dtype=np.complex64) while self._enable_rx: - - # 1 is the timeout #TODO maybe set this intelligently based on the desired sample rate - self.rx_stream.recv(receive_buffer, self.metadata, 1) - + self.rx_stream.recv(receive_buffer, self.metadata, self.timeout) # TODO set metadata correctly, sending real sample rate plus any error codes # sending complex signal callback(buffer=receive_buffer, metadata=self.metadata) if self.metadata.error_code != uhd.types.RXMetadataErrorCode.none: - print(f"Error while receiving samples: {self.metadata.strerror()}") + if self.metadata.error_code == uhd.types.RXMetadataErrorCode.overflow: + print("\033[93mWarning: Buffer Overflow Detected.\033[0m") if self.metadata.error_code == uhd.types.RXMetadataErrorCode.timeout: - print("Stopping receive due to timeout error.") + print("\033[91Stopping receive due to timeout error.\033[0m") self.stop() + + # stop streaming wait_time = 0.1 stop_time = self.usrp.get_time_now() + wait_time stop_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.stop_cont) @@ -180,10 +200,14 @@ class USRP(SDR): stop_cmd.time_spec = stop_time self.rx_stream.issue_stream_cmd(stop_cmd) time.sleep(wait_time) # TODO figure out what a realistic wait time is here. - del self.rx_stream + print("USRP RX Completed.") - def record(self, num_samples: Optional[int] = None, rx_time: Optional[int | float] = None): + def record( + self, + num_samples: Optional[int] = None, + rx_time: Optional[int | float] = None, + ) -> Recording: """ Create a radio recording (iq samples and metadata) of a given length from the USRP. Either num_samples or rx_time must be provided. @@ -200,41 +224,31 @@ class USRP(SDR): raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record()") if num_samples is not None and rx_time is not None: - raise ValueError("Only input one of num_samples or rx_time") + raise SDRParameterError("Only input one of num_samples or rx_time") elif num_samples is not None: pass elif rx_time is not None: num_samples = int(rx_time * self.rx_sample_rate) else: - raise ValueError("Must provide input of one of num_samples or rx_time") - - stream_args = uhd.usrp.StreamArgs("fc32", "sc16") - stream_args.channels = [self.rx_channel] - - self.metadata = uhd.types.RXMetadata() - self.rx_stream = self.usrp.get_rx_stream(stream_args) + raise SDRParameterError("Must provide input of one of num_samples or rx_time") + # send command to start the rx stream stream_command = uhd.types.StreamCMD(uhd.types.StreamMode.start_cont) stream_command.stream_now = True self.rx_stream.issue_stream_cmd(stream_command) # receive loop self._enable_rx = True - print("USRP Starting RX...") store_array = np.zeros((1, (num_samples // self.rx_buffer_size + 1) * self.rx_buffer_size), dtype=np.complex64) receive_buffer = np.zeros((1, self.rx_buffer_size), dtype=np.complex64) + print("USRP Starting RX...") + + # write complex samples to receive buffer for i in range(num_samples // self.rx_buffer_size + 1): - - # write samples to receive buffer - # they should already be complex - - # 1 is the timeout #TODO maybe set this intelligently based on the desired sample rate - self.rx_stream.recv(receive_buffer, self.metadata, 1) - - # TODO set metadata correctly, sending real sample rate plus any error codes - # sending complex signal + self.rx_stream.recv(receive_buffer, self.metadata, self.timeout) store_array[:, i * self.rx_buffer_size : (i + 1) * self.rx_buffer_size] = receive_buffer + # stop streaming wait_time = 0.1 stop_time = self.usrp.get_time_now() + wait_time stop_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.stop_cont) @@ -242,7 +256,7 @@ class USRP(SDR): stop_cmd.time_spec = stop_time self.rx_stream.issue_stream_cmd(stop_cmd) time.sleep(wait_time) # TODO figure out what a realistic wait time is here. - del self.rx_stream + print("USRP RX Completed.") metadata = { "source": self.__class__.__name__, @@ -287,7 +301,7 @@ class USRP(SDR): # check if channel arg is valid max_num_channels = self.usrp.get_rx_num_channels() if channel + 1 > max_num_channels: - raise IOError(f"Channel {channel} not valid for device with {max_num_channels} channels.") + raise SDRParameterError(f"Channel {channel} not valid for device with {max_num_channels} channels.") self.set_tx_sample_rate(sample_rate=sample_rate, channel=channel) self.set_tx_center_frequency(center_frequency=center_frequency, channel=channel) @@ -313,23 +327,26 @@ class USRP(SDR): device_type = self.device_dict.get("type", "").lower() if device_type not in ["b200", "b210"]: sample_rate_range = self.usrp.get_tx_rates() - if sample_rate < sample_rate_range.start() or sample_rate > sample_rate_range.stop(): - raise IOError( - f"Sample rate {sample_rate} not valid for this USRP.\nValid\ - range is {sample_rate_range.start()} to {sample_rate_range.stop()}." + min_rate, max_rate = sample_rate_range.start(), sample_rate_range.stop() + if sample_rate < min_rate or sample_rate > max_rate: + raise SDRParameterError( + f"{self.__class__.__name__}: Sample rate {sample_rate/1e6:.3f} Msps " + f"out of range: [{min_rate/1e6:.3f} - {max_rate/1e6:.3f} Msps]" ) + self.usrp.set_tx_rate(sample_rate, channel) self.tx_sample_rate = self.usrp.get_tx_rate(channel) print(f"USRP TX Sample Rate = {self.tx_sample_rate}") def set_tx_center_frequency(self, center_frequency, channel=0): center_frequency_range = self.usrp.get_tx_freq_range() - if center_frequency < center_frequency_range.start() or center_frequency > center_frequency_range.stop(): - raise IOError( - f"Center frequency {center_frequency} out of range for USRP.\ - \nValid range is {center_frequency_range.start()}\ - to {center_frequency_range.stop()}." + min_rate, max_rate = center_frequency_range.start(), center_frequency_range.stop() + if center_frequency < min_rate or center_frequency > max_rate: + raise SDRParameterError( + f"{self.__class__.__name__}: Center frequency {center_frequency/1e9:.3f} GHz " + f"out of range: [{min_rate/1e9:.3f} - {max_rate/1e9:.3f} GHz]" ) + self.usrp.set_tx_freq(uhd.types.TuneRequest(center_frequency), channel) self.tx_center_frequency = self.usrp.get_tx_freq(channel) print(f"USRP TX Center Frequency = {self.tx_center_frequency}") @@ -339,7 +356,7 @@ class USRP(SDR): gain_range = self.usrp.get_tx_gain_range() if gain_mode == "relative": if gain > 0: - raise ValueError( + raise SDRParameterError( "When gain_mode = 'relative', gain must be < 0. This sets\ the gain relative to the maximum possible gain." ) @@ -358,7 +375,13 @@ class USRP(SDR): print(f"USRP TX Gain = {self.tx_gain}") def close(self): - pass + self._tx_initialized = False + self._rx_initialized = False + if hasattr(self, "rx_stream"): + del self.rx_stream + if hasattr(self, "usrp"): + del self.usrp + self.usrp = None def _stream_tx(self, callback): @@ -439,6 +462,9 @@ class USRP(SDR): print(f"USRP clock source set to {self.usrp.get_clock_source(0)}") + def supports_dynamic_updates(self) -> dict: + return {"center_frequency": True, "sample_rate": True, "gain": True} + def _create_device_dict(identifier_value=None): """ -- 2.34.1 From e88cfafc506e117c756dd19c2d38e0803ff8975d Mon Sep 17 00:00:00 2001 From: madrigal Date: Mon, 17 Nov 2025 13:51:27 -0500 Subject: [PATCH 8/8] Added garbage collection to viewers to prevent crashing, minor fixes to plots --- src/ria_toolkit_oss/view/view_signal.py | 35 +++++++++++-------- .../view/view_signal_simple.py | 5 +++ 2 files changed, 26 insertions(+), 14 deletions(-) diff --git a/src/ria_toolkit_oss/view/view_signal.py b/src/ria_toolkit_oss/view/view_signal.py index b045e0e..2d94efa 100644 --- a/src/ria_toolkit_oss/view/view_signal.py +++ b/src/ria_toolkit_oss/view/view_signal.py @@ -1,3 +1,4 @@ +import gc import os import textwrap from typing import Optional @@ -8,6 +9,7 @@ from matplotlib import gridspec from PIL import Image from scipy.fft import fft, fftshift from scipy.signal import spectrogram +from scipy.signal.windows import hann from ria_toolkit_oss.datatypes.recording import Recording from ria_toolkit_oss.view.tools import ( @@ -122,7 +124,7 @@ def view_sig( plot_y_indx = plot_y_indx + 2 fft_size = get_fft_size(plot_length=plot_length) - f, t_spec, Sxx = spectrogram( + _, t_spec, Sxx = spectrogram( complex_signal[:plot_length], fs=sample_rate, nperseg=fft_size, @@ -132,14 +134,16 @@ def view_sig( ) # shift frequencies so zero is centered + f_bins = np.fft.fftfreq(fft_size, d=1.0 / sample_rate) + f_bins = np.fft.fftshift(f_bins) + f_bins = f_bins + center_frequency Sxx = np.fft.fftshift(Sxx, axes=0) - f = np.fft.fftshift(f) - sample_rate / 2 + center_frequency spec_ax.imshow( 10 * np.log10(Sxx + 1e-12), aspect="auto", origin="lower", - extent=[t_spec[0], t_spec[-1], f[0], f[-1]], + extent=[t_spec[0], t_spec[-1], f_bins[0], f_bins[-1]], cmap="twilight", ) @@ -169,18 +173,17 @@ def view_sig( freq_ax = plt.subplot(gs[plot_y_indx : plot_y_indx + 2, :]) plot_y_indx = plot_y_indx + 2 - epsilon = 1e-10 - spectrum = np.abs(fftshift(fft(complex_signal[0:plot_length]))) - freqs = ( - np.linspace(-1 * (sample_rate / 2), (sample_rate / 2), len(complex_signal[0:plot_length])) - + center_frequency - ) + # Apply window to reduce spectral leakage + window = hann(len(complex_signal[:plot_length])) + spectrum = np.abs(fftshift(fft(complex_signal[:plot_length] * window))) - # Use semi-log for the y-axis - freq_ax.semilogy(freqs, spectrum + epsilon, color=COLORS["accent"], linewidth=0.8) - freq_ax.set_xlabel("Frequency") - freq_ax.set_ylabel("Magnitude") - freq_ax.set_title("Frequency Spectrum", fontsize=subtitle_fontsize) + # Convert to dB + spectrum_db = 20 * np.log10(spectrum + 1e-12) # 20*log for magnitude + + freqs = np.linspace(-sample_rate / 2, sample_rate / 2, len(complex_signal[:plot_length])) + center_frequency + freq_ax.plot(freqs, spectrum_db, color=COLORS["accent"], linewidth=0.8) + freq_ax.set_ylabel("Magnitude (dB)") + freq_ax.set_title("Frequency Spectrum (Windowed FFT)", fontsize=subtitle_fontsize) set_spines(freq_ax, spines) if constellation: @@ -255,3 +258,7 @@ def view_sig( output_path, _ = set_path(output_path=output_path) plt.savefig(output_path, dpi=dpi) print(f"Saved signal plot to {output_path}") + + # Garbage collection and clean up to prevent memory overloading + plt.close("all") + gc.collect() diff --git a/src/ria_toolkit_oss/view/view_signal_simple.py b/src/ria_toolkit_oss/view/view_signal_simple.py index ec5570e..cbca651 100644 --- a/src/ria_toolkit_oss/view/view_signal_simple.py +++ b/src/ria_toolkit_oss/view/view_signal_simple.py @@ -2,6 +2,7 @@ from __future__ import annotations +import gc from typing import Optional import matplotlib @@ -318,6 +319,10 @@ def view_simple_sig( return output_path plt.show() + + # Garbage collection and clean up to prevent memory overloading + plt.close("all") + gc.collect() return None -- 2.34.1