updates_and_fixes #12

Merged
madrigal merged 9 commits from updates_and_fixes into main 2025-11-18 15:01:25 -05:00
Showing only changes of commit 10801ffb57 - Show all commits

View File

@ -7,7 +7,7 @@ import numpy as np
import uhd import uhd
from ria_toolkit_oss.datatypes.recording import Recording from ria_toolkit_oss.datatypes.recording import Recording
from ria_toolkit_oss.sdr.sdr import SDR from ria_toolkit_oss.sdr.sdr import SDR, SDRParameterError
class USRP(SDR): class USRP(SDR):
@ -40,7 +40,7 @@ class USRP(SDR):
channel: int, channel: int,
gain: int, gain: int,
gain_mode: Optional[str] = "absolute", gain_mode: Optional[str] = "absolute",
rx_buffer_size: int = 960000, rx_buffer_size: Optional[int] = None,
): ):
""" """
Initializes the USRP for receiving. Initializes the USRP for receiving.
@ -63,8 +63,6 @@ class USRP(SDR):
:rtype: dict :rtype: dict
""" """
self.rx_buffer_size = rx_buffer_size
# build USRP object # build USRP object
usrp_args = _generate_usrp_config_string(sample_rate=sample_rate, device_dict=self.device_dict) usrp_args = _generate_usrp_config_string(sample_rate=sample_rate, device_dict=self.device_dict)
self.usrp = uhd.usrp.MultiUSRP(usrp_args) self.usrp = uhd.usrp.MultiUSRP(usrp_args)
@ -72,7 +70,7 @@ class USRP(SDR):
# check if channel arg is valid # check if channel arg is valid
max_num_channels = self.usrp.get_rx_num_channels() max_num_channels = self.usrp.get_rx_num_channels()
if channel + 1 > max_num_channels: if channel + 1 > max_num_channels:
raise IOError(f"Channel {channel} not valid for device with {max_num_channels} channels.") raise SDRParameterError(f"Channel {channel} not valid for device with {max_num_channels} channels.")
self.set_rx_sample_rate(sample_rate=sample_rate, channel=channel) self.set_rx_sample_rate(sample_rate=sample_rate, channel=channel)
self.set_rx_center_frequency(center_frequency=center_frequency, channel=channel) self.set_rx_center_frequency(center_frequency=center_frequency, channel=channel)
@ -81,6 +79,20 @@ class USRP(SDR):
self.rx_channel = channel self.rx_channel = channel
print(f"USRP RX Channel = {self.rx_channel}") print(f"USRP RX Channel = {self.rx_channel}")
stream_args = uhd.usrp.StreamArgs("fc32", "sc16")
stream_args.channels = [self.rx_channel]
self.metadata = uhd.types.RXMetadata()
self.rx_stream = self.usrp.get_rx_stream(stream_args)
if rx_buffer_size is None: # In case it's none
self.rx_buffer_size = self.rx_stream.get_max_num_samps()
else:
self.rx_buffer_size = rx_buffer_size
# set timeout based on buffer size and sample rate, with a safety factor of 5
self.timeout = (self.rx_buffer_size / self.rx_sample_rate) * 5
# flag to prevent user from calling certain functions before this one. # flag to prevent user from calling certain functions before this one.
self._rx_initialized = True self._rx_initialized = True
self._tx_initialized = False self._tx_initialized = False
@ -88,68 +100,76 @@ class USRP(SDR):
return {"sample_rate": self.rx_sample_rate, "center_frequency": self.rx_center_frequency, "gain": self.rx_gain} return {"sample_rate": self.rx_sample_rate, "center_frequency": self.rx_center_frequency, "gain": self.rx_gain}
def set_rx_sample_rate(self, sample_rate, channel=0): def set_rx_sample_rate(self, sample_rate, channel=0):
"""
Set the sample rate of the receiver. Callable during streaming.
"""
# check if sample rate arg is valid # check if sample rate arg is valid
# Note: B200/B210 devices auto-adjust master clock rate, so get_rx_rates() returns # Note: B200/B210 devices auto-adjust master clock rate, so get_rx_rates() returns
# the range for the CURRENT master clock, not the maximum possible range. # the range for the CURRENT master clock, not the maximum possible range.
# Skip validation for B-series devices and let UHD handle it. # Skip validation for B-series devices and let UHD handle it.
device_type = self.device_dict.get("type", "").lower() with self._param_lock:
if device_type not in ["b200", "b210"]: device_type = self.device_dict.get("type", "").lower()
sample_rate_range = self.usrp.get_rx_rates() if device_type not in ["b200", "b210"]:
if sample_rate < sample_rate_range.start() or sample_rate > sample_rate_range.stop(): sample_rate_range = self.usrp.get_rx_rates()
raise IOError( min_rate, max_rate = sample_rate_range.start(), sample_rate_range.stop()
f"Sample rate {sample_rate} not valid for this USRP.\nValid\ if sample_rate < min_rate or sample_rate > max_rate:
range is {sample_rate_range.start()}\ raise SDRParameterError(
to {sample_rate_range.stop()}." f"{self.__class__.__name__}: Sample rate {sample_rate/1e6:.3f} Msps "
) f"out of range: [{min_rate/1e6:.3f} - {max_rate/1e6:.3f} Msps]"
self.usrp.set_rx_rate(sample_rate, channel) )
self.rx_sample_rate = self.usrp.get_rx_rate(channel)
print(f"USRP RX Sample Rate = {self.rx_sample_rate}") self.usrp.set_rx_rate(sample_rate, channel)
self.rx_sample_rate = self.usrp.get_rx_rate(channel)
print(f"USRP RX Sample Rate = {self.rx_sample_rate}")
def set_rx_center_frequency(self, center_frequency, channel=0): def set_rx_center_frequency(self, center_frequency, channel=0):
center_frequency_range = self.usrp.get_rx_freq_range() """
if center_frequency < center_frequency_range.start() or center_frequency > center_frequency_range.stop(): Set the center frequency of the receiver. Callable during streaming.
raise IOError( """
f"Center frequency {center_frequency} out of range for USRP.\ with self._param_lock:
\nValid range is {center_frequency_range.start()} \ center_frequency_range = self.usrp.get_rx_freq_range()
to {center_frequency_range.stop()}." min_rate, max_rate = center_frequency_range.start(), center_frequency_range.stop()
) if center_frequency < min_rate or center_frequency > max_rate:
self.usrp.set_rx_freq(uhd.libpyuhd.types.tune_request(center_frequency), channel) raise SDRParameterError(
self.rx_center_frequency = self.usrp.get_rx_freq(channel) f"{self.__class__.__name__}: Center frequency {center_frequency/1e9:.3f} GHz "
print(f"USRP RX Center Frequency = {self.rx_center_frequency}") f"out of range: [{min_rate/1e9:.3f} - {max_rate/1e9:.3f} GHz]"
)
self.usrp.set_rx_freq(uhd.libpyuhd.types.tune_request(center_frequency), channel)
self.rx_center_frequency = self.usrp.get_rx_freq(channel)
print(f"USRP RX Center Frequency = {self.rx_center_frequency}")
def set_rx_gain(self, gain, gain_mode="absolute", channel=0): def set_rx_gain(self, gain, gain_mode="absolute", channel=0):
# check if gain arg is valid """
gain_range = self.usrp.get_rx_gain_range() Set the gain of the receiver. Callable during streaming.
if gain_mode == "relative": """
if gain > 0: with self._param_lock:
raise ValueError( # check if gain arg is valid
"When gain_mode = 'relative', gain must be < 0. This sets\ gain_range = self.usrp.get_rx_gain_range()
the gain relative to the maximum possible gain." if gain_mode == "relative":
) if gain > 0:
raise SDRParameterError(
"When gain_mode = 'relative', gain must be < 0. This sets\
the gain relative to the maximum possible gain."
)
else:
# set gain relative to max
abs_gain = gain_range.stop() + gain
else: else:
# set gain relative to max abs_gain = gain
abs_gain = gain_range.stop() + gain if abs_gain < gain_range.start() or abs_gain > gain_range.stop():
else: print(f"Gain {abs_gain} out of range for this USRP.")
abs_gain = gain print(f"Gain range: {gain_range.start()} to {gain_range.stop()} dB")
if abs_gain < gain_range.start() or abs_gain > gain_range.stop(): abs_gain = min(max(abs_gain, gain_range.start()), gain_range.stop())
print(f"Gain {abs_gain} out of range for this USRP.") self.usrp.set_rx_gain(abs_gain, channel)
print(f"Gain range: {gain_range.start()} to {gain_range.stop()} dB") self.rx_gain = self.usrp.get_rx_gain(channel)
abs_gain = min(max(abs_gain, gain_range.start()), gain_range.stop()) print(f"USRP RX Gain = {self.rx_gain}")
self.usrp.set_rx_gain(abs_gain, channel)
self.rx_gain = self.usrp.get_rx_gain(channel)
print(f"USRP RX Gain = {self.rx_gain}")
def _stream_rx(self, callback): def _stream_rx(self, callback):
if not self._rx_initialized: if not self._rx_initialized:
raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record()") raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record()")
stream_args = uhd.usrp.StreamArgs("fc32", "sc16") # send command to start the rx stream
stream_args.channels = [self.rx_channel]
self.metadata = uhd.types.RXMetadata()
self.rx_stream = self.usrp.get_rx_stream(stream_args)
stream_command = uhd.types.StreamCMD(uhd.types.StreamMode.start_cont) stream_command = uhd.types.StreamCMD(uhd.types.StreamMode.start_cont)
stream_command.stream_now = True stream_command.stream_now = True
self.rx_stream.issue_stream_cmd(stream_command) self.rx_stream.issue_stream_cmd(stream_command)
@ -160,19 +180,19 @@ class USRP(SDR):
receive_buffer = np.zeros((1, self.rx_buffer_size), dtype=np.complex64) receive_buffer = np.zeros((1, self.rx_buffer_size), dtype=np.complex64)
while self._enable_rx: while self._enable_rx:
self.rx_stream.recv(receive_buffer, self.metadata, self.timeout)
# 1 is the timeout #TODO maybe set this intelligently based on the desired sample rate
self.rx_stream.recv(receive_buffer, self.metadata, 1)
# TODO set metadata correctly, sending real sample rate plus any error codes # TODO set metadata correctly, sending real sample rate plus any error codes
# sending complex signal # sending complex signal
callback(buffer=receive_buffer, metadata=self.metadata) callback(buffer=receive_buffer, metadata=self.metadata)
if self.metadata.error_code != uhd.types.RXMetadataErrorCode.none: if self.metadata.error_code != uhd.types.RXMetadataErrorCode.none:
print(f"Error while receiving samples: {self.metadata.strerror()}") if self.metadata.error_code == uhd.types.RXMetadataErrorCode.overflow:
print("\033[93mWarning: Buffer Overflow Detected.\033[0m")
if self.metadata.error_code == uhd.types.RXMetadataErrorCode.timeout: if self.metadata.error_code == uhd.types.RXMetadataErrorCode.timeout:
print("Stopping receive due to timeout error.") print("\033[91Stopping receive due to timeout error.\033[0m")
self.stop() self.stop()
# stop streaming
wait_time = 0.1 wait_time = 0.1
stop_time = self.usrp.get_time_now() + wait_time stop_time = self.usrp.get_time_now() + wait_time
stop_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.stop_cont) stop_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.stop_cont)
@ -180,10 +200,14 @@ class USRP(SDR):
stop_cmd.time_spec = stop_time stop_cmd.time_spec = stop_time
self.rx_stream.issue_stream_cmd(stop_cmd) self.rx_stream.issue_stream_cmd(stop_cmd)
time.sleep(wait_time) # TODO figure out what a realistic wait time is here. time.sleep(wait_time) # TODO figure out what a realistic wait time is here.
del self.rx_stream
print("USRP RX Completed.") print("USRP RX Completed.")
def record(self, num_samples: Optional[int] = None, rx_time: Optional[int | float] = None): def record(
self,
num_samples: Optional[int] = None,
rx_time: Optional[int | float] = None,
) -> Recording:
""" """
Create a radio recording (iq samples and metadata) of a given length from the USRP. Create a radio recording (iq samples and metadata) of a given length from the USRP.
Either num_samples or rx_time must be provided. Either num_samples or rx_time must be provided.
@ -200,41 +224,31 @@ class USRP(SDR):
raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record()") raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record()")
if num_samples is not None and rx_time is not None: if num_samples is not None and rx_time is not None:
raise ValueError("Only input one of num_samples or rx_time") raise SDRParameterError("Only input one of num_samples or rx_time")
elif num_samples is not None: elif num_samples is not None:
pass pass
elif rx_time is not None: elif rx_time is not None:
num_samples = int(rx_time * self.rx_sample_rate) num_samples = int(rx_time * self.rx_sample_rate)
else: else:
raise ValueError("Must provide input of one of num_samples or rx_time") raise SDRParameterError("Must provide input of one of num_samples or rx_time")
stream_args = uhd.usrp.StreamArgs("fc32", "sc16")
stream_args.channels = [self.rx_channel]
self.metadata = uhd.types.RXMetadata()
self.rx_stream = self.usrp.get_rx_stream(stream_args)
# send command to start the rx stream
stream_command = uhd.types.StreamCMD(uhd.types.StreamMode.start_cont) stream_command = uhd.types.StreamCMD(uhd.types.StreamMode.start_cont)
stream_command.stream_now = True stream_command.stream_now = True
self.rx_stream.issue_stream_cmd(stream_command) self.rx_stream.issue_stream_cmd(stream_command)
# receive loop # receive loop
self._enable_rx = True self._enable_rx = True
print("USRP Starting RX...")
store_array = np.zeros((1, (num_samples // self.rx_buffer_size + 1) * self.rx_buffer_size), dtype=np.complex64) store_array = np.zeros((1, (num_samples // self.rx_buffer_size + 1) * self.rx_buffer_size), dtype=np.complex64)
receive_buffer = np.zeros((1, self.rx_buffer_size), dtype=np.complex64) receive_buffer = np.zeros((1, self.rx_buffer_size), dtype=np.complex64)
print("USRP Starting RX...")
# write complex samples to receive buffer
for i in range(num_samples // self.rx_buffer_size + 1): for i in range(num_samples // self.rx_buffer_size + 1):
self.rx_stream.recv(receive_buffer, self.metadata, self.timeout)
# write samples to receive buffer
# they should already be complex
# 1 is the timeout #TODO maybe set this intelligently based on the desired sample rate
self.rx_stream.recv(receive_buffer, self.metadata, 1)
# TODO set metadata correctly, sending real sample rate plus any error codes
# sending complex signal
store_array[:, i * self.rx_buffer_size : (i + 1) * self.rx_buffer_size] = receive_buffer store_array[:, i * self.rx_buffer_size : (i + 1) * self.rx_buffer_size] = receive_buffer
# stop streaming
wait_time = 0.1 wait_time = 0.1
stop_time = self.usrp.get_time_now() + wait_time stop_time = self.usrp.get_time_now() + wait_time
stop_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.stop_cont) stop_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.stop_cont)
@ -242,7 +256,7 @@ class USRP(SDR):
stop_cmd.time_spec = stop_time stop_cmd.time_spec = stop_time
self.rx_stream.issue_stream_cmd(stop_cmd) self.rx_stream.issue_stream_cmd(stop_cmd)
time.sleep(wait_time) # TODO figure out what a realistic wait time is here. time.sleep(wait_time) # TODO figure out what a realistic wait time is here.
del self.rx_stream
print("USRP RX Completed.") print("USRP RX Completed.")
metadata = { metadata = {
"source": self.__class__.__name__, "source": self.__class__.__name__,
@ -287,7 +301,7 @@ class USRP(SDR):
# check if channel arg is valid # check if channel arg is valid
max_num_channels = self.usrp.get_rx_num_channels() max_num_channels = self.usrp.get_rx_num_channels()
if channel + 1 > max_num_channels: if channel + 1 > max_num_channels:
raise IOError(f"Channel {channel} not valid for device with {max_num_channels} channels.") raise SDRParameterError(f"Channel {channel} not valid for device with {max_num_channels} channels.")
self.set_tx_sample_rate(sample_rate=sample_rate, channel=channel) self.set_tx_sample_rate(sample_rate=sample_rate, channel=channel)
self.set_tx_center_frequency(center_frequency=center_frequency, channel=channel) self.set_tx_center_frequency(center_frequency=center_frequency, channel=channel)
@ -313,23 +327,26 @@ class USRP(SDR):
device_type = self.device_dict.get("type", "").lower() device_type = self.device_dict.get("type", "").lower()
if device_type not in ["b200", "b210"]: if device_type not in ["b200", "b210"]:
sample_rate_range = self.usrp.get_tx_rates() sample_rate_range = self.usrp.get_tx_rates()
if sample_rate < sample_rate_range.start() or sample_rate > sample_rate_range.stop(): min_rate, max_rate = sample_rate_range.start(), sample_rate_range.stop()
raise IOError( if sample_rate < min_rate or sample_rate > max_rate:
f"Sample rate {sample_rate} not valid for this USRP.\nValid\ raise SDRParameterError(
range is {sample_rate_range.start()} to {sample_rate_range.stop()}." f"{self.__class__.__name__}: Sample rate {sample_rate/1e6:.3f} Msps "
f"out of range: [{min_rate/1e6:.3f} - {max_rate/1e6:.3f} Msps]"
) )
self.usrp.set_tx_rate(sample_rate, channel) self.usrp.set_tx_rate(sample_rate, channel)
self.tx_sample_rate = self.usrp.get_tx_rate(channel) self.tx_sample_rate = self.usrp.get_tx_rate(channel)
print(f"USRP TX Sample Rate = {self.tx_sample_rate}") print(f"USRP TX Sample Rate = {self.tx_sample_rate}")
def set_tx_center_frequency(self, center_frequency, channel=0): def set_tx_center_frequency(self, center_frequency, channel=0):
center_frequency_range = self.usrp.get_tx_freq_range() center_frequency_range = self.usrp.get_tx_freq_range()
if center_frequency < center_frequency_range.start() or center_frequency > center_frequency_range.stop(): min_rate, max_rate = center_frequency_range.start(), center_frequency_range.stop()
raise IOError( if center_frequency < min_rate or center_frequency > max_rate:
f"Center frequency {center_frequency} out of range for USRP.\ raise SDRParameterError(
\nValid range is {center_frequency_range.start()}\ f"{self.__class__.__name__}: Center frequency {center_frequency/1e9:.3f} GHz "
to {center_frequency_range.stop()}." f"out of range: [{min_rate/1e9:.3f} - {max_rate/1e9:.3f} GHz]"
) )
self.usrp.set_tx_freq(uhd.types.TuneRequest(center_frequency), channel) self.usrp.set_tx_freq(uhd.types.TuneRequest(center_frequency), channel)
self.tx_center_frequency = self.usrp.get_tx_freq(channel) self.tx_center_frequency = self.usrp.get_tx_freq(channel)
print(f"USRP TX Center Frequency = {self.tx_center_frequency}") print(f"USRP TX Center Frequency = {self.tx_center_frequency}")
@ -339,7 +356,7 @@ class USRP(SDR):
gain_range = self.usrp.get_tx_gain_range() gain_range = self.usrp.get_tx_gain_range()
if gain_mode == "relative": if gain_mode == "relative":
if gain > 0: if gain > 0:
raise ValueError( raise SDRParameterError(
"When gain_mode = 'relative', gain must be < 0. This sets\ "When gain_mode = 'relative', gain must be < 0. This sets\
the gain relative to the maximum possible gain." the gain relative to the maximum possible gain."
) )
@ -358,7 +375,13 @@ class USRP(SDR):
print(f"USRP TX Gain = {self.tx_gain}") print(f"USRP TX Gain = {self.tx_gain}")
def close(self): def close(self):
pass self._tx_initialized = False
self._rx_initialized = False
if hasattr(self, "rx_stream"):
del self.rx_stream
if hasattr(self, "usrp"):
del self.usrp
self.usrp = None
def _stream_tx(self, callback): def _stream_tx(self, callback):
@ -439,6 +462,9 @@ class USRP(SDR):
print(f"USRP clock source set to {self.usrp.get_clock_source(0)}") print(f"USRP clock source set to {self.usrp.get_clock_source(0)}")
def supports_dynamic_updates(self) -> dict:
return {"center_frequency": True, "sample_rate": True, "gain": True}
def _create_device_dict(identifier_value=None): def _create_device_dict(identifier_value=None):
""" """