From 10801ffb5741b9e04308ab27da7ac3d1deb7d611 Mon Sep 17 00:00:00 2001 From: madrigal Date: Mon, 17 Nov 2025 13:40:40 -0500 Subject: [PATCH] Implemented close method, minor updates and improvements --- src/ria_toolkit_oss/sdr/usrp.py | 210 ++++++++++++++++++-------------- 1 file changed, 118 insertions(+), 92 deletions(-) diff --git a/src/ria_toolkit_oss/sdr/usrp.py b/src/ria_toolkit_oss/sdr/usrp.py index 7e7f905..f458378 100644 --- a/src/ria_toolkit_oss/sdr/usrp.py +++ b/src/ria_toolkit_oss/sdr/usrp.py @@ -7,7 +7,7 @@ import numpy as np import uhd from ria_toolkit_oss.datatypes.recording import Recording -from ria_toolkit_oss.sdr.sdr import SDR +from ria_toolkit_oss.sdr.sdr import SDR, SDRParameterError class USRP(SDR): @@ -40,7 +40,7 @@ class USRP(SDR): channel: int, gain: int, gain_mode: Optional[str] = "absolute", - rx_buffer_size: int = 960000, + rx_buffer_size: Optional[int] = None, ): """ Initializes the USRP for receiving. @@ -63,8 +63,6 @@ class USRP(SDR): :rtype: dict """ - self.rx_buffer_size = rx_buffer_size - # build USRP object usrp_args = _generate_usrp_config_string(sample_rate=sample_rate, device_dict=self.device_dict) self.usrp = uhd.usrp.MultiUSRP(usrp_args) @@ -72,7 +70,7 @@ class USRP(SDR): # check if channel arg is valid max_num_channels = self.usrp.get_rx_num_channels() if channel + 1 > max_num_channels: - raise IOError(f"Channel {channel} not valid for device with {max_num_channels} channels.") + raise SDRParameterError(f"Channel {channel} not valid for device with {max_num_channels} channels.") self.set_rx_sample_rate(sample_rate=sample_rate, channel=channel) self.set_rx_center_frequency(center_frequency=center_frequency, channel=channel) @@ -81,6 +79,20 @@ class USRP(SDR): self.rx_channel = channel print(f"USRP RX Channel = {self.rx_channel}") + stream_args = uhd.usrp.StreamArgs("fc32", "sc16") + stream_args.channels = [self.rx_channel] + + self.metadata = uhd.types.RXMetadata() + self.rx_stream = self.usrp.get_rx_stream(stream_args) + + if rx_buffer_size is None: # In case it's none + self.rx_buffer_size = self.rx_stream.get_max_num_samps() + else: + self.rx_buffer_size = rx_buffer_size + + # set timeout based on buffer size and sample rate, with a safety factor of 5 + self.timeout = (self.rx_buffer_size / self.rx_sample_rate) * 5 + # flag to prevent user from calling certain functions before this one. self._rx_initialized = True self._tx_initialized = False @@ -88,68 +100,76 @@ class USRP(SDR): return {"sample_rate": self.rx_sample_rate, "center_frequency": self.rx_center_frequency, "gain": self.rx_gain} def set_rx_sample_rate(self, sample_rate, channel=0): + """ + Set the sample rate of the receiver. Callable during streaming. + """ # check if sample rate arg is valid # Note: B200/B210 devices auto-adjust master clock rate, so get_rx_rates() returns # the range for the CURRENT master clock, not the maximum possible range. # Skip validation for B-series devices and let UHD handle it. - device_type = self.device_dict.get("type", "").lower() - if device_type not in ["b200", "b210"]: - sample_rate_range = self.usrp.get_rx_rates() - if sample_rate < sample_rate_range.start() or sample_rate > sample_rate_range.stop(): - raise IOError( - f"Sample rate {sample_rate} not valid for this USRP.\nValid\ - range is {sample_rate_range.start()}\ - to {sample_rate_range.stop()}." - ) - self.usrp.set_rx_rate(sample_rate, channel) - self.rx_sample_rate = self.usrp.get_rx_rate(channel) - print(f"USRP RX Sample Rate = {self.rx_sample_rate}") + with self._param_lock: + device_type = self.device_dict.get("type", "").lower() + if device_type not in ["b200", "b210"]: + sample_rate_range = self.usrp.get_rx_rates() + min_rate, max_rate = sample_rate_range.start(), sample_rate_range.stop() + if sample_rate < min_rate or sample_rate > max_rate: + raise SDRParameterError( + f"{self.__class__.__name__}: Sample rate {sample_rate/1e6:.3f} Msps " + f"out of range: [{min_rate/1e6:.3f} - {max_rate/1e6:.3f} Msps]" + ) + + self.usrp.set_rx_rate(sample_rate, channel) + self.rx_sample_rate = self.usrp.get_rx_rate(channel) + print(f"USRP RX Sample Rate = {self.rx_sample_rate}") def set_rx_center_frequency(self, center_frequency, channel=0): - center_frequency_range = self.usrp.get_rx_freq_range() - if center_frequency < center_frequency_range.start() or center_frequency > center_frequency_range.stop(): - raise IOError( - f"Center frequency {center_frequency} out of range for USRP.\ - \nValid range is {center_frequency_range.start()} \ - to {center_frequency_range.stop()}." - ) - self.usrp.set_rx_freq(uhd.libpyuhd.types.tune_request(center_frequency), channel) - self.rx_center_frequency = self.usrp.get_rx_freq(channel) - print(f"USRP RX Center Frequency = {self.rx_center_frequency}") + """ + Set the center frequency of the receiver. Callable during streaming. + """ + with self._param_lock: + center_frequency_range = self.usrp.get_rx_freq_range() + min_rate, max_rate = center_frequency_range.start(), center_frequency_range.stop() + if center_frequency < min_rate or center_frequency > max_rate: + raise SDRParameterError( + f"{self.__class__.__name__}: Center frequency {center_frequency/1e9:.3f} GHz " + f"out of range: [{min_rate/1e9:.3f} - {max_rate/1e9:.3f} GHz]" + ) + + self.usrp.set_rx_freq(uhd.libpyuhd.types.tune_request(center_frequency), channel) + self.rx_center_frequency = self.usrp.get_rx_freq(channel) + print(f"USRP RX Center Frequency = {self.rx_center_frequency}") def set_rx_gain(self, gain, gain_mode="absolute", channel=0): - # check if gain arg is valid - gain_range = self.usrp.get_rx_gain_range() - if gain_mode == "relative": - if gain > 0: - raise ValueError( - "When gain_mode = 'relative', gain must be < 0. This sets\ - the gain relative to the maximum possible gain." - ) + """ + Set the gain of the receiver. Callable during streaming. + """ + with self._param_lock: + # check if gain arg is valid + gain_range = self.usrp.get_rx_gain_range() + if gain_mode == "relative": + if gain > 0: + raise SDRParameterError( + "When gain_mode = 'relative', gain must be < 0. This sets\ + the gain relative to the maximum possible gain." + ) + else: + # set gain relative to max + abs_gain = gain_range.stop() + gain else: - # set gain relative to max - abs_gain = gain_range.stop() + gain - else: - abs_gain = gain - if abs_gain < gain_range.start() or abs_gain > gain_range.stop(): - print(f"Gain {abs_gain} out of range for this USRP.") - print(f"Gain range: {gain_range.start()} to {gain_range.stop()} dB") - abs_gain = min(max(abs_gain, gain_range.start()), gain_range.stop()) - self.usrp.set_rx_gain(abs_gain, channel) - self.rx_gain = self.usrp.get_rx_gain(channel) - print(f"USRP RX Gain = {self.rx_gain}") + abs_gain = gain + if abs_gain < gain_range.start() or abs_gain > gain_range.stop(): + print(f"Gain {abs_gain} out of range for this USRP.") + print(f"Gain range: {gain_range.start()} to {gain_range.stop()} dB") + abs_gain = min(max(abs_gain, gain_range.start()), gain_range.stop()) + self.usrp.set_rx_gain(abs_gain, channel) + self.rx_gain = self.usrp.get_rx_gain(channel) + print(f"USRP RX Gain = {self.rx_gain}") def _stream_rx(self, callback): - if not self._rx_initialized: raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record()") - stream_args = uhd.usrp.StreamArgs("fc32", "sc16") - stream_args.channels = [self.rx_channel] - - self.metadata = uhd.types.RXMetadata() - self.rx_stream = self.usrp.get_rx_stream(stream_args) - + # send command to start the rx stream stream_command = uhd.types.StreamCMD(uhd.types.StreamMode.start_cont) stream_command.stream_now = True self.rx_stream.issue_stream_cmd(stream_command) @@ -160,19 +180,19 @@ class USRP(SDR): receive_buffer = np.zeros((1, self.rx_buffer_size), dtype=np.complex64) while self._enable_rx: - - # 1 is the timeout #TODO maybe set this intelligently based on the desired sample rate - self.rx_stream.recv(receive_buffer, self.metadata, 1) - + self.rx_stream.recv(receive_buffer, self.metadata, self.timeout) # TODO set metadata correctly, sending real sample rate plus any error codes # sending complex signal callback(buffer=receive_buffer, metadata=self.metadata) if self.metadata.error_code != uhd.types.RXMetadataErrorCode.none: - print(f"Error while receiving samples: {self.metadata.strerror()}") + if self.metadata.error_code == uhd.types.RXMetadataErrorCode.overflow: + print("\033[93mWarning: Buffer Overflow Detected.\033[0m") if self.metadata.error_code == uhd.types.RXMetadataErrorCode.timeout: - print("Stopping receive due to timeout error.") + print("\033[91Stopping receive due to timeout error.\033[0m") self.stop() + + # stop streaming wait_time = 0.1 stop_time = self.usrp.get_time_now() + wait_time stop_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.stop_cont) @@ -180,10 +200,14 @@ class USRP(SDR): stop_cmd.time_spec = stop_time self.rx_stream.issue_stream_cmd(stop_cmd) time.sleep(wait_time) # TODO figure out what a realistic wait time is here. - del self.rx_stream + print("USRP RX Completed.") - def record(self, num_samples: Optional[int] = None, rx_time: Optional[int | float] = None): + def record( + self, + num_samples: Optional[int] = None, + rx_time: Optional[int | float] = None, + ) -> Recording: """ Create a radio recording (iq samples and metadata) of a given length from the USRP. Either num_samples or rx_time must be provided. @@ -200,41 +224,31 @@ class USRP(SDR): raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record()") if num_samples is not None and rx_time is not None: - raise ValueError("Only input one of num_samples or rx_time") + raise SDRParameterError("Only input one of num_samples or rx_time") elif num_samples is not None: pass elif rx_time is not None: num_samples = int(rx_time * self.rx_sample_rate) else: - raise ValueError("Must provide input of one of num_samples or rx_time") - - stream_args = uhd.usrp.StreamArgs("fc32", "sc16") - stream_args.channels = [self.rx_channel] - - self.metadata = uhd.types.RXMetadata() - self.rx_stream = self.usrp.get_rx_stream(stream_args) + raise SDRParameterError("Must provide input of one of num_samples or rx_time") + # send command to start the rx stream stream_command = uhd.types.StreamCMD(uhd.types.StreamMode.start_cont) stream_command.stream_now = True self.rx_stream.issue_stream_cmd(stream_command) # receive loop self._enable_rx = True - print("USRP Starting RX...") store_array = np.zeros((1, (num_samples // self.rx_buffer_size + 1) * self.rx_buffer_size), dtype=np.complex64) receive_buffer = np.zeros((1, self.rx_buffer_size), dtype=np.complex64) + print("USRP Starting RX...") + + # write complex samples to receive buffer for i in range(num_samples // self.rx_buffer_size + 1): - - # write samples to receive buffer - # they should already be complex - - # 1 is the timeout #TODO maybe set this intelligently based on the desired sample rate - self.rx_stream.recv(receive_buffer, self.metadata, 1) - - # TODO set metadata correctly, sending real sample rate plus any error codes - # sending complex signal + self.rx_stream.recv(receive_buffer, self.metadata, self.timeout) store_array[:, i * self.rx_buffer_size : (i + 1) * self.rx_buffer_size] = receive_buffer + # stop streaming wait_time = 0.1 stop_time = self.usrp.get_time_now() + wait_time stop_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.stop_cont) @@ -242,7 +256,7 @@ class USRP(SDR): stop_cmd.time_spec = stop_time self.rx_stream.issue_stream_cmd(stop_cmd) time.sleep(wait_time) # TODO figure out what a realistic wait time is here. - del self.rx_stream + print("USRP RX Completed.") metadata = { "source": self.__class__.__name__, @@ -287,7 +301,7 @@ class USRP(SDR): # check if channel arg is valid max_num_channels = self.usrp.get_rx_num_channels() if channel + 1 > max_num_channels: - raise IOError(f"Channel {channel} not valid for device with {max_num_channels} channels.") + raise SDRParameterError(f"Channel {channel} not valid for device with {max_num_channels} channels.") self.set_tx_sample_rate(sample_rate=sample_rate, channel=channel) self.set_tx_center_frequency(center_frequency=center_frequency, channel=channel) @@ -313,23 +327,26 @@ class USRP(SDR): device_type = self.device_dict.get("type", "").lower() if device_type not in ["b200", "b210"]: sample_rate_range = self.usrp.get_tx_rates() - if sample_rate < sample_rate_range.start() or sample_rate > sample_rate_range.stop(): - raise IOError( - f"Sample rate {sample_rate} not valid for this USRP.\nValid\ - range is {sample_rate_range.start()} to {sample_rate_range.stop()}." + min_rate, max_rate = sample_rate_range.start(), sample_rate_range.stop() + if sample_rate < min_rate or sample_rate > max_rate: + raise SDRParameterError( + f"{self.__class__.__name__}: Sample rate {sample_rate/1e6:.3f} Msps " + f"out of range: [{min_rate/1e6:.3f} - {max_rate/1e6:.3f} Msps]" ) + self.usrp.set_tx_rate(sample_rate, channel) self.tx_sample_rate = self.usrp.get_tx_rate(channel) print(f"USRP TX Sample Rate = {self.tx_sample_rate}") def set_tx_center_frequency(self, center_frequency, channel=0): center_frequency_range = self.usrp.get_tx_freq_range() - if center_frequency < center_frequency_range.start() or center_frequency > center_frequency_range.stop(): - raise IOError( - f"Center frequency {center_frequency} out of range for USRP.\ - \nValid range is {center_frequency_range.start()}\ - to {center_frequency_range.stop()}." + min_rate, max_rate = center_frequency_range.start(), center_frequency_range.stop() + if center_frequency < min_rate or center_frequency > max_rate: + raise SDRParameterError( + f"{self.__class__.__name__}: Center frequency {center_frequency/1e9:.3f} GHz " + f"out of range: [{min_rate/1e9:.3f} - {max_rate/1e9:.3f} GHz]" ) + self.usrp.set_tx_freq(uhd.types.TuneRequest(center_frequency), channel) self.tx_center_frequency = self.usrp.get_tx_freq(channel) print(f"USRP TX Center Frequency = {self.tx_center_frequency}") @@ -339,7 +356,7 @@ class USRP(SDR): gain_range = self.usrp.get_tx_gain_range() if gain_mode == "relative": if gain > 0: - raise ValueError( + raise SDRParameterError( "When gain_mode = 'relative', gain must be < 0. This sets\ the gain relative to the maximum possible gain." ) @@ -358,7 +375,13 @@ class USRP(SDR): print(f"USRP TX Gain = {self.tx_gain}") def close(self): - pass + self._tx_initialized = False + self._rx_initialized = False + if hasattr(self, "rx_stream"): + del self.rx_stream + if hasattr(self, "usrp"): + del self.usrp + self.usrp = None def _stream_tx(self, callback): @@ -439,6 +462,9 @@ class USRP(SDR): print(f"USRP clock source set to {self.usrp.get_clock_source(0)}") + def supports_dynamic_updates(self) -> dict: + return {"center_frequency": True, "sample_rate": True, "gain": True} + def _create_device_dict(identifier_value=None): """