M
madrigal
8a66860d33
All checks were successful
Build Sphinx Docs Set / Build Docs (pull_request) Successful in 15m51s
Build Project / Build Project (3.10) (pull_request) Successful in 16m14s
Build Project / Build Project (3.11) (pull_request) Successful in 17m9s
Build Project / Build Project (3.12) (pull_request) Successful in 2m29s
Test with tox / Test with tox (3.12) (pull_request) Successful in 21m28s
Test with tox / Test with tox (3.10) (pull_request) Successful in 22m50s
Test with tox / Test with tox (3.11) (pull_request) Successful in 23m18s
578 lines
24 KiB
Python
578 lines
24 KiB
Python
import subprocess
|
|
import time
|
|
import warnings
|
|
from typing import Optional
|
|
|
|
import numpy as np
|
|
import uhd
|
|
|
|
from ria_toolkit_oss.data.recording import Recording
|
|
from ria_toolkit_oss.sdr.sdr import SDR, SDRParameterError
|
|
|
|
|
|
class USRP(SDR):
|
|
def __init__(self, identifier: str = None):
|
|
"""
|
|
Initialize a USRP device object and connect to the SDR hardware.
|
|
|
|
This software supports all USRP SDRs created by Ettus Research.
|
|
|
|
:param identifier: The value of the parameter that identifies the device.
|
|
:type identifier: str = "192.168.0.0", "MyB210", name or address found in uhd_find_devices
|
|
|
|
If no identifier is provided, it will select the first device found, with a warning.
|
|
If more than one device is found with the identifier, it will select the first of those devices.
|
|
"""
|
|
super().__init__()
|
|
|
|
self.default_buffer_size = 8000
|
|
|
|
# get all the info from only one of the parameters
|
|
self.device_dict = _create_device_dict(identifier)
|
|
|
|
self._rx_initialized = False
|
|
self._tx_initialized = False
|
|
|
|
def init_rx(
|
|
self,
|
|
sample_rate: int | float,
|
|
center_frequency: int | float,
|
|
channel: int,
|
|
gain: int,
|
|
gain_mode: Optional[str] = "absolute",
|
|
rx_buffer_size: Optional[int] = None,
|
|
):
|
|
"""
|
|
Initializes the USRP for receiving.
|
|
|
|
:param sample_rate: The sample rate for receiving.
|
|
:type sample_rate: int or float
|
|
:param center_frequency: The center frequency of the recording.
|
|
:type center_frequency: int or float
|
|
:param gain: The gain set for receiving on the USRP
|
|
:type gain: int
|
|
:param channel: The channel the USRP is set to.
|
|
:type channel: int
|
|
:param gain_mode: 'absolute' passes gain directly to the sdr,
|
|
'relative' means that gain should be a negative value, and it will be subtracted from the max gain.
|
|
:type gain_mode: str
|
|
:param rx_buffer_size: Internal buffer size for receiving samples. Defaults to 960000.
|
|
:type rx_buffer_size: int
|
|
|
|
:return: A dictionary with the actual RX parameters after configuration.
|
|
:rtype: dict
|
|
"""
|
|
|
|
# build USRP object
|
|
usrp_args = _generate_usrp_config_string(sample_rate=sample_rate, device_dict=self.device_dict)
|
|
self.usrp = uhd.usrp.MultiUSRP(usrp_args)
|
|
|
|
# check if channel arg is valid
|
|
max_num_channels = self.usrp.get_rx_num_channels()
|
|
if channel + 1 > max_num_channels:
|
|
raise SDRParameterError(f"Channel {channel} not valid for device with {max_num_channels} channels.")
|
|
|
|
self.set_rx_sample_rate(sample_rate=sample_rate, channel=channel)
|
|
self.set_rx_center_frequency(center_frequency=center_frequency, channel=channel)
|
|
self.set_rx_gain(gain=gain, gain_mode=gain_mode, channel=channel)
|
|
|
|
self.rx_channel = channel
|
|
print(f"USRP RX Channel = {self.rx_channel}")
|
|
|
|
stream_args = uhd.usrp.StreamArgs("fc32", "sc16")
|
|
stream_args.channels = [self.rx_channel]
|
|
|
|
self.metadata = uhd.types.RXMetadata()
|
|
self.rx_stream = self.usrp.get_rx_stream(stream_args)
|
|
|
|
if rx_buffer_size is None: # In case it's none
|
|
self.rx_buffer_size = self.rx_stream.get_max_num_samps()
|
|
else:
|
|
self.rx_buffer_size = rx_buffer_size
|
|
|
|
# set timeout based on buffer size and sample rate, with a safety factor of 5
|
|
self.timeout = (self.rx_buffer_size / self.rx_sample_rate) * 5
|
|
|
|
# flag to prevent user from calling certain functions before this one.
|
|
self._rx_initialized = True
|
|
self._tx_initialized = False
|
|
|
|
return {"sample_rate": self.rx_sample_rate, "center_frequency": self.rx_center_frequency, "gain": self.rx_gain}
|
|
|
|
def set_rx_sample_rate(self, sample_rate, channel=0):
|
|
"""
|
|
Set the sample rate of the receiver. Callable during streaming.
|
|
"""
|
|
# check if sample rate arg is valid
|
|
# Note: B200/B210 devices auto-adjust master clock rate, so get_rx_rates() returns
|
|
# the range for the CURRENT master clock, not the maximum possible range.
|
|
# Skip validation for B-series devices and let UHD handle it.
|
|
with self._param_lock:
|
|
device_type = self.device_dict.get("type", "").lower()
|
|
if device_type not in ["b200", "b210"]:
|
|
sample_rate_range = self.usrp.get_rx_rates()
|
|
min_rate, max_rate = sample_rate_range.start(), sample_rate_range.stop()
|
|
if sample_rate < min_rate or sample_rate > max_rate:
|
|
raise SDRParameterError(
|
|
f"{self.__class__.__name__}: Sample rate {sample_rate/1e6:.3f} Msps "
|
|
f"out of range: [{min_rate/1e6:.3f} - {max_rate/1e6:.3f} Msps]"
|
|
)
|
|
|
|
self.usrp.set_rx_rate(sample_rate, channel)
|
|
self.rx_sample_rate = self.usrp.get_rx_rate(channel)
|
|
print(f"USRP RX Sample Rate = {self.rx_sample_rate}")
|
|
|
|
def set_rx_center_frequency(self, center_frequency, channel=0):
|
|
"""
|
|
Set the center frequency of the receiver. Callable during streaming.
|
|
"""
|
|
with self._param_lock:
|
|
center_frequency_range = self.usrp.get_rx_freq_range()
|
|
min_rate, max_rate = center_frequency_range.start(), center_frequency_range.stop()
|
|
if center_frequency < min_rate or center_frequency > max_rate:
|
|
raise SDRParameterError(
|
|
f"{self.__class__.__name__}: Center frequency {center_frequency/1e9:.3f} GHz "
|
|
f"out of range: [{min_rate/1e9:.3f} - {max_rate/1e9:.3f} GHz]"
|
|
)
|
|
|
|
self.usrp.set_rx_freq(uhd.libpyuhd.types.tune_request(center_frequency), channel)
|
|
self.rx_center_frequency = self.usrp.get_rx_freq(channel)
|
|
print(f"USRP RX Center Frequency = {self.rx_center_frequency}")
|
|
|
|
def set_rx_gain(self, gain, gain_mode="absolute", channel=0):
|
|
"""
|
|
Set the gain of the receiver. Callable during streaming.
|
|
"""
|
|
with self._param_lock:
|
|
# check if gain arg is valid
|
|
gain_range = self.usrp.get_rx_gain_range()
|
|
if gain_mode == "relative":
|
|
if gain > 0:
|
|
raise SDRParameterError("When gain_mode = 'relative', gain must be < 0. This sets\
|
|
the gain relative to the maximum possible gain.")
|
|
else:
|
|
# set gain relative to max
|
|
abs_gain = gain_range.stop() + gain
|
|
else:
|
|
abs_gain = gain
|
|
if abs_gain < gain_range.start() or abs_gain > gain_range.stop():
|
|
print(f"Gain {abs_gain} out of range for this USRP.")
|
|
print(f"Gain range: {gain_range.start()} to {gain_range.stop()} dB")
|
|
abs_gain = min(max(abs_gain, gain_range.start()), gain_range.stop())
|
|
self.usrp.set_rx_gain(abs_gain, channel)
|
|
self.rx_gain = self.usrp.get_rx_gain(channel)
|
|
print(f"USRP RX Gain = {self.rx_gain}")
|
|
|
|
def _stream_rx(self, callback):
|
|
if not self._rx_initialized:
|
|
raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record()")
|
|
|
|
# send command to start the rx stream
|
|
stream_command = uhd.types.StreamCMD(uhd.types.StreamMode.start_cont)
|
|
stream_command.stream_now = True
|
|
self.rx_stream.issue_stream_cmd(stream_command)
|
|
|
|
# receive loop
|
|
self._enable_rx = True
|
|
print("USRP Starting RX...")
|
|
receive_buffer = np.zeros((1, self.rx_buffer_size), dtype=np.complex64)
|
|
|
|
while self._enable_rx:
|
|
self.rx_stream.recv(receive_buffer, self.metadata, self.timeout)
|
|
# TODO set metadata correctly, sending real sample rate plus any error codes
|
|
# sending complex signal
|
|
callback(buffer=receive_buffer, metadata=self.metadata)
|
|
|
|
if self.metadata.error_code != uhd.types.RXMetadataErrorCode.none:
|
|
if self.metadata.error_code == uhd.types.RXMetadataErrorCode.overflow:
|
|
print("\033[93mWarning: Buffer Overflow Detected.\033[0m")
|
|
if self.metadata.error_code == uhd.types.RXMetadataErrorCode.timeout:
|
|
print("\033[91Stopping receive due to timeout error.\033[0m")
|
|
self.stop()
|
|
|
|
# stop streaming
|
|
wait_time = 0.1
|
|
stop_time = self.usrp.get_time_now() + wait_time
|
|
stop_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.stop_cont)
|
|
stop_cmd.stream_now = False
|
|
stop_cmd.time_spec = stop_time
|
|
self.rx_stream.issue_stream_cmd(stop_cmd)
|
|
time.sleep(wait_time) # TODO figure out what a realistic wait time is here.
|
|
|
|
print("USRP RX Completed.")
|
|
|
|
def record(
|
|
self,
|
|
num_samples: Optional[int] = None,
|
|
rx_time: Optional[int | float] = None,
|
|
) -> Recording:
|
|
"""
|
|
Create a radio recording (iq samples and metadata) of a given length from the USRP.
|
|
Either num_samples or rx_time must be provided.
|
|
init_rx() must be called before record()
|
|
|
|
:param num_samples: The number of samples to record.
|
|
:type num_samples: int, optional
|
|
:param rx_time: The time to record.
|
|
:type rx_time: int or float, optional
|
|
|
|
returns: Recording object (iq samples and metadata)
|
|
"""
|
|
if not self._rx_initialized:
|
|
raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record()")
|
|
|
|
if num_samples is not None and rx_time is not None:
|
|
raise SDRParameterError("Only input one of num_samples or rx_time")
|
|
elif num_samples is not None:
|
|
pass
|
|
elif rx_time is not None:
|
|
num_samples = int(rx_time * self.rx_sample_rate)
|
|
else:
|
|
raise SDRParameterError("Must provide input of one of num_samples or rx_time")
|
|
|
|
# send command to start the rx stream
|
|
stream_command = uhd.types.StreamCMD(uhd.types.StreamMode.start_cont)
|
|
stream_command.stream_now = True
|
|
self.rx_stream.issue_stream_cmd(stream_command)
|
|
|
|
# receive loop
|
|
self._enable_rx = True
|
|
store_array = np.zeros((1, (num_samples // self.rx_buffer_size + 1) * self.rx_buffer_size), dtype=np.complex64)
|
|
receive_buffer = np.zeros((1, self.rx_buffer_size), dtype=np.complex64)
|
|
print("USRP Starting RX...")
|
|
|
|
# write complex samples to receive buffer
|
|
for i in range(num_samples // self.rx_buffer_size + 1):
|
|
self.rx_stream.recv(receive_buffer, self.metadata, self.timeout)
|
|
store_array[:, i * self.rx_buffer_size : (i + 1) * self.rx_buffer_size] = receive_buffer
|
|
|
|
# stop streaming
|
|
wait_time = 0.1
|
|
stop_time = self.usrp.get_time_now() + wait_time
|
|
stop_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.stop_cont)
|
|
stop_cmd.stream_now = False
|
|
stop_cmd.time_spec = stop_time
|
|
self.rx_stream.issue_stream_cmd(stop_cmd)
|
|
time.sleep(wait_time) # TODO figure out what a realistic wait time is here.
|
|
|
|
print("USRP RX Completed.")
|
|
metadata = {
|
|
"source": self.__class__.__name__,
|
|
"sample_rate": self.rx_sample_rate,
|
|
"center_frequency": self.rx_center_frequency,
|
|
"gain": self.rx_gain,
|
|
}
|
|
|
|
return Recording(data=store_array[:, :num_samples], metadata=metadata)
|
|
|
|
def init_tx(
|
|
self,
|
|
sample_rate: int | float,
|
|
center_frequency: int | float,
|
|
gain: int,
|
|
channel: int,
|
|
gain_mode: Optional[str] = "absolute",
|
|
):
|
|
"""
|
|
Initializes the USRP for transmitting.
|
|
|
|
:param sample_rate: The sample rate for transmitting.
|
|
:type sample_rate: int or float
|
|
:param center_frequency: The center frequency of the recording.
|
|
:type center_frequency: int or float
|
|
:param gain: The gain set for transmitting on the USRP
|
|
:type gain: int
|
|
:param channel: The channel the USRP is set to.
|
|
:type channel: int
|
|
:param gain_mode: 'absolute' passes gain directly to the sdr,
|
|
'relative' means that gain should be a negative value, and it will be subtracted from the max gain.
|
|
:type gain_mode: str
|
|
"""
|
|
|
|
self.tx_buffer_size = 2000
|
|
|
|
print(f"USRP TX Gain Mode = '{gain_mode}'")
|
|
|
|
config_str = _generate_usrp_config_string(sample_rate=sample_rate, device_dict=self.device_dict)
|
|
self.usrp = uhd.usrp.MultiUSRP(config_str)
|
|
|
|
# check if channel arg is valid
|
|
max_num_channels = self.usrp.get_rx_num_channels()
|
|
if channel + 1 > max_num_channels:
|
|
raise SDRParameterError(f"Channel {channel} not valid for device with {max_num_channels} channels.")
|
|
|
|
self.set_tx_sample_rate(sample_rate=sample_rate, channel=channel)
|
|
self.set_tx_center_frequency(center_frequency=center_frequency, channel=channel)
|
|
self.set_tx_gain(gain=gain, gain_mode=gain_mode, channel=channel)
|
|
|
|
self.tx_channel = channel
|
|
print(f"USRP TX Channel = {self.tx_channel}")
|
|
|
|
self.usrp.set_clock_source("internal")
|
|
self.usrp.set_time_source("internal")
|
|
self.usrp.set_tx_antenna("TX/RX", channel)
|
|
|
|
self._tx_initialized = True
|
|
self._rx_initialized = False
|
|
|
|
return {"sample_rate": self.tx_sample_rate, "center_frequency": self.tx_center_frequency, "gain": self.tx_gain}
|
|
|
|
def set_tx_sample_rate(self, sample_rate, channel=0):
|
|
# check if sample rate arg is valid
|
|
# Note: B200/B210 devices auto-adjust master clock rate, so get_tx_rates() returns
|
|
# the range for the CURRENT master clock, not the maximum possible range.
|
|
# Skip validation for B-series devices and let UHD handle it.
|
|
device_type = self.device_dict.get("type", "").lower()
|
|
if device_type not in ["b200", "b210"]:
|
|
sample_rate_range = self.usrp.get_tx_rates()
|
|
min_rate, max_rate = sample_rate_range.start(), sample_rate_range.stop()
|
|
if sample_rate < min_rate or sample_rate > max_rate:
|
|
raise SDRParameterError(
|
|
f"{self.__class__.__name__}: Sample rate {sample_rate/1e6:.3f} Msps "
|
|
f"out of range: [{min_rate/1e6:.3f} - {max_rate/1e6:.3f} Msps]"
|
|
)
|
|
|
|
self.usrp.set_tx_rate(sample_rate, channel)
|
|
self.tx_sample_rate = self.usrp.get_tx_rate(channel)
|
|
print(f"USRP TX Sample Rate = {self.tx_sample_rate}")
|
|
|
|
def set_tx_center_frequency(self, center_frequency, channel=0):
|
|
center_frequency_range = self.usrp.get_tx_freq_range()
|
|
min_rate, max_rate = center_frequency_range.start(), center_frequency_range.stop()
|
|
if center_frequency < min_rate or center_frequency > max_rate:
|
|
raise SDRParameterError(
|
|
f"{self.__class__.__name__}: Center frequency {center_frequency/1e9:.3f} GHz "
|
|
f"out of range: [{min_rate/1e9:.3f} - {max_rate/1e9:.3f} GHz]"
|
|
)
|
|
|
|
self.usrp.set_tx_freq(uhd.types.TuneRequest(center_frequency), channel)
|
|
self.tx_center_frequency = self.usrp.get_tx_freq(channel)
|
|
print(f"USRP TX Center Frequency = {self.tx_center_frequency}")
|
|
|
|
def set_tx_gain(self, gain, gain_mode="absolute", channel=0):
|
|
# Ensure gain is within valid range
|
|
gain_range = self.usrp.get_tx_gain_range()
|
|
if gain_mode == "relative":
|
|
if gain > 0:
|
|
raise SDRParameterError("When gain_mode = 'relative', gain must be < 0. This sets\
|
|
the gain relative to the maximum possible gain.")
|
|
else:
|
|
# set gain relative to max
|
|
abs_gain = gain_range.stop() + gain
|
|
else:
|
|
abs_gain = gain
|
|
if abs_gain < gain_range.start() or abs_gain > gain_range.stop():
|
|
print(f"Gain {abs_gain} out of range for this USRP.")
|
|
print(f"Gain range: {gain_range.start()} to {gain_range.stop()} dB")
|
|
abs_gain = min(max(abs_gain, gain_range.start()), gain_range.stop())
|
|
|
|
self.usrp.set_tx_gain(abs_gain, channel)
|
|
self.tx_gain = self.usrp.get_tx_gain(channel)
|
|
print(f"USRP TX Gain = {self.tx_gain}")
|
|
|
|
def close(self):
|
|
self._tx_initialized = False
|
|
self._rx_initialized = False
|
|
if hasattr(self, "rx_stream"):
|
|
del self.rx_stream
|
|
if hasattr(self, "usrp"):
|
|
del self.usrp
|
|
self.usrp = None
|
|
|
|
def _stream_tx(self, callback):
|
|
|
|
stream_args = uhd.usrp.StreamArgs("fc32", "sc16") # wire and cpu data formats
|
|
stream_args.channels = [self.tx_channel]
|
|
tx_stream = self.usrp.get_tx_stream(stream_args)
|
|
|
|
metadata = uhd.types.TXMetadata()
|
|
|
|
metadata.start_of_burst = True
|
|
metadata.end_of_burst = False
|
|
self._enable_tx = True
|
|
print("USRP Starting TX...")
|
|
|
|
while self._enable_tx:
|
|
buffer = callback(self.tx_buffer_size)
|
|
tx_stream.send(buffer, metadata)
|
|
metadata.start_of_burst = False
|
|
|
|
print("USRP TX Completed.")
|
|
|
|
def tx_recording(
|
|
self,
|
|
recording: Recording | np.ndarray,
|
|
num_samples: Optional[int] = None,
|
|
tx_time: Optional[int | float] = None,
|
|
):
|
|
"""
|
|
Transmit the given iq samples from the provided recording.
|
|
init_tx() must be called before this function.
|
|
|
|
:param recording: The recording to transmit.
|
|
:type recording: Recording or np.ndarray
|
|
:param num_samples: The number of samples to transmit, will repeat or
|
|
truncate the recording to this length. Defaults to None.
|
|
:type num_samples: int, optional
|
|
:param tx_time: The time to transmit, will repeat or truncate the
|
|
recording to this length. Defaults to None.
|
|
:type tx_time: int or float, optional
|
|
"""
|
|
|
|
if num_samples is not None and tx_time is not None:
|
|
raise ValueError("Only input one of num_samples or tx_time")
|
|
elif num_samples is not None:
|
|
self._num_samples_to_transmit = num_samples
|
|
elif tx_time is not None:
|
|
self._num_samples_to_transmit = int(tx_time * self.tx_sample_rate)
|
|
else:
|
|
self._num_samples_to_transmit = len(recording)
|
|
|
|
if isinstance(recording, np.ndarray):
|
|
samples = recording
|
|
elif isinstance(recording, Recording):
|
|
if len(recording.data) > 1:
|
|
warnings.warn("Recording object is multichannel, only channel 0 data was used for transmission")
|
|
|
|
samples = recording.data[0]
|
|
|
|
samples = samples.astype(np.complex64, copy=False)
|
|
|
|
# This is extremely important
|
|
# Ensure array is contiguous
|
|
samples = np.ascontiguousarray(samples)
|
|
|
|
# Ensure correct byte order
|
|
if samples.dtype.byteorder == ">":
|
|
samples = samples.byteswap().newbyteorder()
|
|
|
|
self._samples_to_transmit = samples
|
|
self._num_samples_transmitted = 0
|
|
|
|
self._stream_tx(self._loop_recording_callback)
|
|
|
|
def set_clock_source(self, source):
|
|
source = source.lower()
|
|
if source == "external":
|
|
self.usrp.set_clock_source(source)
|
|
|
|
print(f"USRP clock source set to {self.usrp.get_clock_source(0)}")
|
|
|
|
def supports_dynamic_updates(self) -> dict:
|
|
return {"center_frequency": True, "sample_rate": True, "gain": True}
|
|
|
|
|
|
def _create_device_dict(identifier_value=None):
|
|
"""
|
|
Get the dictionary of information corresponding to any unique identifier,
|
|
using uhd_find_devices.
|
|
"""
|
|
|
|
available_devices = _parse_uhd_find_devices()
|
|
print(available_devices)
|
|
if identifier_value is None:
|
|
print("\033[93mWarning: No USRP device identifier provided. Defaulting to the first USRP device found.\033[0m")
|
|
if len(available_devices) > 0:
|
|
formatted_dict_str = "\n".join([f"\t{key}: {value}" for key, value in available_devices[0].items()])
|
|
else:
|
|
raise IOError("\033[91mError: No USRP devices found.\033[0m")
|
|
print(f"Device information: \n{formatted_dict_str}")
|
|
return available_devices[0]
|
|
|
|
identified_devices = []
|
|
for device_dict in available_devices:
|
|
for key, value in device_dict.items():
|
|
if identifier_value is not None and str(value).lower() == str(identifier_value).lower():
|
|
identified_devices.append(device_dict)
|
|
break
|
|
|
|
if len(identified_devices) > 1:
|
|
print(f"\033[93mWarning: Found multiple USRP devices with identifier '{identifier_value}'.\033[0m")
|
|
print("\033[93mDefaulting to the first USRP device found with this identifier.\033[0m")
|
|
formatted_dict_str = "\n".join([f"\t{key}: {value}" for key, value in identified_devices[0].items()])
|
|
print(f"Device information: \n{formatted_dict_str}")
|
|
return identified_devices[0]
|
|
|
|
elif len(identified_devices) == 1:
|
|
print(f"\033[92mSuccessfully found USRP device with identifier '{identifier_value}'\033[0m")
|
|
formatted_dict_str = "\n".join([f"\t{key}: {value}" for key, value in identified_devices[0].items()])
|
|
print(f"Device information: \n{formatted_dict_str}")
|
|
return identified_devices[0]
|
|
|
|
elif len(identified_devices) == 0:
|
|
raise IOError(f"\033[31mError: No USRP device found for identifier '{identifier_value}'.\033[0m")
|
|
|
|
|
|
def _generate_usrp_config_string(sample_rate, device_dict):
|
|
"""
|
|
Create a correctly formatted string as expected by
|
|
uhd.usrp.MultiUSRP constructor
|
|
|
|
If it is a x300 there are two options for internal master clock settings
|
|
master_clock_rate_string = self.force_srate_xseries(sample_rate)
|
|
"""
|
|
|
|
if "type" in device_dict and device_dict["type"] == "x300":
|
|
master_clock_rate_string = _force_srate_xseries(sample_rate)
|
|
else:
|
|
master_clock_rate_string = ""
|
|
|
|
if "addr" in device_dict:
|
|
ip_address_string = f"addr={device_dict['addr']},"
|
|
else:
|
|
ip_address_string = ""
|
|
|
|
if "name" in device_dict:
|
|
name_string = f"name={device_dict['name']},"
|
|
else:
|
|
name_string = ""
|
|
|
|
config_string = ip_address_string + master_clock_rate_string + name_string
|
|
|
|
return config_string
|
|
|
|
|
|
def _force_srate_xseries(sample_rate):
|
|
two_hundred_rates = [200.0e6 / i for i in range(1, 201)] # down to 1MHz wide
|
|
one_eighty_four_rates = [184.32e6 / i for i in range(1, 185)] # down to ~ 1MHz wide
|
|
|
|
diff_two_hundred = min([abs(x - sample_rate) for x in two_hundred_rates])
|
|
diff_one_eighty_four = min([abs(x - sample_rate) for x in one_eighty_four_rates])
|
|
|
|
closest_list = "two_hundred_rates" if diff_two_hundred < diff_one_eighty_four else "one_eighty_four_rates"
|
|
if closest_list == "one_eighty_four_rates":
|
|
mcr_str = "master_clock_rate=184.32e6,"
|
|
# print("MCR set to 184.32 MHz")
|
|
else:
|
|
mcr_str = ""
|
|
return mcr_str
|
|
|
|
|
|
def _parse_uhd_find_devices():
|
|
"""
|
|
Parse the uhd_find_devices subprocess command output into usable data.
|
|
Returns: an array length = num_devices of dicts containing the data.
|
|
"""
|
|
p = subprocess.Popen("uhd_find_devices", stdout=subprocess.PIPE)
|
|
output, err = p.communicate()
|
|
separate_devices = output.rsplit(b"--")
|
|
cleaned_separate_devices = [device for device in separate_devices if len(device) >= 20]
|
|
list_of_dicts = []
|
|
for device_string in cleaned_separate_devices:
|
|
device_as_list = device_string.split(b"\n")
|
|
device_as_list = [device for device in device_as_list if len(device) >= 2]
|
|
for i in range(len(device_as_list)):
|
|
device_as_list[i] = device_as_list[i].strip(b" ")
|
|
|
|
device_dict = {}
|
|
for i in range(len(device_as_list)):
|
|
[key, value] = device_as_list[i].split(b":")
|
|
key = key.strip()
|
|
value = value.strip()
|
|
key = key.decode("utf-8") # cast to string
|
|
value = value.decode("utf-8")
|
|
device_dict.update({key: value})
|
|
|
|
list_of_dicts.append(device_dict)
|
|
return list_of_dicts
|