ria-toolkit-oss/src/ria_toolkit_oss/sdr/pluto.py

634 lines
26 KiB
Python
Raw Normal View History

2025-09-12 11:32:49 -04:00
import threading
import time
import traceback
import warnings
from typing import Optional
import adi
import numpy as np
from ria_toolkit_oss.datatypes.recording import Recording
from ria_toolkit_oss.sdr.sdr import SDR, SDRError, SDRParameterError
2025-09-12 11:32:49 -04:00
class Pluto(SDR):
def __init__(self, identifier=None):
"""
Initialize a Pluto SDR device object and connect to the SDR hardware.
This software supports the ADALM Pluto SDR created by Analog Devices.
2025-09-12 11:32:49 -04:00
:param identifier: The value of the parameter that identifies the device.
:type identifier: str = "192.168.3.1", "pluto.local", etc
If no identifier is provided, it will select the first device found, with a warning.
If more than one device is found with the identifier, it will select the first of those devices.
"""
print(f"Initializing Pluto radio with identifier [{identifier}].")
try:
super().__init__()
self._tx_lock = threading.Lock()
2025-09-12 11:32:49 -04:00
if identifier is None:
uri = "ip:pluto.local"
else:
uri = f"ip:{identifier}"
# Detect MIMO capability by checking IIO channels (one-time, during init)
# Rev B: 2 channels (voltage0, voltage1) - single RX/TX only
# Rev C/D: 4 channels (voltage0-3) - dual RX/TX capable
test_radio = adi.ad9361(uri)
ctx = test_radio.ctx
dev = ctx.find_device("cf-ad9361-lpc")
if dev and len(dev.channels) >= 4:
# MIMO-capable hardware (Rev C/D)
self.radio = test_radio
self._mimo_capable = True
print(f"Successfully found MIMO-capable Pluto (Rev C/D) with identifier [{identifier}].")
else:
# Non-MIMO hardware (Rev B) - use standard Pluto driver
M
2025-10-16 15:22:07 -04:00
del test_radio
self.radio = adi.Pluto(uri)
self._mimo_capable = False
print(f"Successfully found Pluto (Rev B) with identifier [{identifier}].")
2025-09-12 11:32:49 -04:00
except Exception as e:
print(f"Failed to find Pluto radio with identifier [{identifier}].")
raise e
def init_rx(
self,
sample_rate: int | float,
center_frequency: int | float,
gain: int,
channel: int,
gain_mode: Optional[str] = "absolute",
):
"""
Initializes the Pluto for receiving.
:param sample_rate: The sample rate for receiving.
:type sample_rate: int or float
:param center_frequency: The center frequency of the recording.
:type center_frequency: int or float
:param gain: The gain set for receiving on the Pluto
:type gain: int
:param channel: The channel the Pluto is set to. Must be 0 or 1. 0
enables channel 1, 1 enables both channels.
2025-09-12 11:32:49 -04:00
:type channel: int
M
2025-10-16 15:22:07 -04:00
:param gain_mode: 'absolute' passes gain directly to the sdr,
'relative' means that gain should be a negative value, and it will
be subtracted from the max gain (74).
M
2025-10-16 15:22:07 -04:00
:type gain_mode: str
2025-09-12 11:32:49 -04:00
"""
print("Initializing RX")
self.set_rx_sample_rate(sample_rate=int(sample_rate))
print(f"Pluto sample rate = {self.radio.sample_rate}")
self.set_rx_center_frequency(center_frequency=int(center_frequency))
print(f"Pluto center frequency = {self.radio.rx_lo}")
self.set_rx_channel(channel=channel)
M
2025-10-16 15:22:07 -04:00
self.set_rx_gain(gain=gain, channel=channel, gain_mode=gain_mode)
2025-09-12 11:32:49 -04:00
if channel == 0:
print(f"Pluto gain = {self.radio.rx_hardwaregain_chan0}")
elif channel == 1:
M
2025-10-16 15:22:07 -04:00
self.set_rx_gain(gain=gain, channel=0, gain_mode=gain_mode)
2025-09-12 11:32:49 -04:00
print(f"Pluto gain = {self.radio.rx_hardwaregain_chan0}, {self.radio.rx_hardwaregain_chan1}")
self._rx_initialized = True
self._tx_initialized = False
M
2025-10-16 15:22:07 -04:00
return {"sample_rate": self.rx_sample_rate, "center_frequency": self.rx_center_frequency, "gain": self.rx_gain}
2025-09-12 11:32:49 -04:00
def init_tx(
self,
sample_rate: int | float,
center_frequency: int | float,
gain: int,
channel: int,
gain_mode: Optional[str] = "absolute",
):
"""
Initializes the Pluto for transmitting. Will transmit garbage during
center frequency tuning and setting the sample rate.
:param sample_rate: The sample rate for transmitting.
:type sample_rate: int or float
:param center_frequency: The center frequency of the recording.
:type center_frequency: int or float
:param gain: The gain set for transmitting on the Pluto
:type gain: int
:param channel: The channel the Pluto is set to. Must be 0 or 1. 0
enables channel 1, 1 enables both channels.
2025-09-12 11:32:49 -04:00
:type channel: int
M
2025-10-16 15:22:07 -04:00
:param gain_mode: 'absolute' passes gain directly to the sdr,
'relative' means that gain should be a negative value, and it will
be subtracted from the max gain (0).
M
2025-10-16 15:22:07 -04:00
:type gain_mode: str
2025-09-12 11:32:49 -04:00
"""
print("Initializing TX")
self.set_tx_sample_rate(sample_rate=int(sample_rate))
print(f"Pluto sample rate = {self.radio.sample_rate}")
self.set_tx_center_frequency(center_frequency=int(center_frequency))
print(f"Pluto center frequency = {self.radio.tx_lo}")
self.set_tx_channel(channel=channel)
M
2025-10-16 15:22:07 -04:00
self.set_tx_gain(gain=gain, channel=channel, gain_mode=gain_mode)
2025-09-12 11:32:49 -04:00
if channel == 0:
print(f"Pluto gain = {self.radio.tx_hardwaregain_chan0}")
elif channel == 1:
M
2025-10-16 15:22:07 -04:00
self.set_tx_gain(gain=gain, channel=0, gain_mode=gain_mode)
2025-09-12 11:32:49 -04:00
print(f"Pluto gain = {self.radio.tx_hardwaregain_chan0}, {self.radio.tx_hardwaregain_chan1}")
self._tx_initialized = True
self._rx_initialized = False
M
2025-10-16 15:22:07 -04:00
return {"sample_rate": self.tx_sample_rate, "center_frequency": self.tx_center_frequency, "gain": self.tx_gain}
2025-09-12 11:32:49 -04:00
def _stream_rx(self, callback):
if not self._rx_initialized:
raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record()")
self._enable_rx = True
while self._enable_rx is True:
# collect complex signa from radio
2025-09-12 11:32:49 -04:00
signal = self.radio.rx()
2025-09-12 11:32:49 -04:00
# send callback complex signal
callback(buffer=signal, metadata=None)
def _record_fast(self, num_samples):
"""Optimized single-buffer capture for ≤16M samples."""
self.set_rx_buffer_size(buffer_size=num_samples)
print("Pluto Starting RX...")
samples = self.radio.rx()
# Handle single/dual channel
if self.radio.rx_enabled_channels == [0]:
samples = [self._convert_rx_samples(samples)]
else:
samples = [self._convert_rx_samples(s) for s in samples]
print("Pluto RX Completed.")
metadata = {
"source": self.__class__.__name__,
"sample_rate": self.rx_sample_rate,
"center_frequency": self.rx_center_frequency,
"gain": self.rx_gain,
}
return Recording(data=samples, metadata=metadata)
def _record_chunked(self, num_samples):
"""Chunked streaming capture for >2M samples."""
# Use base class streaming with pre-allocation
chunk_size = 2_000_000 # 2M sample chunks (safe size)
self.set_rx_buffer_size(buffer_size=chunk_size)
self._max_num_buffers = (num_samples // chunk_size) + 1
self._num_buffers_processed = 0
self._accumulated_buffer = None
# Stream with accumulation callback
print("Pluto Starting RX...")
self._stream_rx(callback=self._accumulate_buffers_callback)
print("Pluto RX Completed.")
print(f"Corrupted buffer count: {self._corrupted_buffer_count}")
# Truncate to exact size
samples = self._accumulated_buffer[:, :num_samples]
samples_list = [self._convert_rx_samples(chan) for chan in samples]
metadata = {
"source": self.__class__.__name__,
"sample_rate": self.rx_sample_rate,
"center_frequency": self.rx_center_frequency,
"gain": self.rx_gain,
}
# Reset for next capture
self._accumulated_buffer = None
return Recording(data=samples_list, metadata=metadata)
def record(
self,
num_samples: Optional[int] = None,
rx_time: Optional[int | float] = None,
) -> Recording:
2025-09-12 11:32:49 -04:00
"""
Create a radio recording (iq samples and metadata) of a given length from the SDR.
Either num_samples or rx_time must be provided.
init_rx() must be called before record()
:param num_samples: The number of samples to record. Pluto max = 16M.
:type num_samples: int, optional
:param rx_time: The time to record.
:type rx_time: int or float, optional
returns: Recording object (iq samples and metadata)
"""
if not self._rx_initialized:
raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record()")
if num_samples is not None and rx_time is not None:
raise SDRParameterError("Only input one of num_samples or rx_time")
2025-09-12 11:32:49 -04:00
elif num_samples is not None:
self._num_samples_to_record = num_samples
elif rx_time is not None:
self._num_samples_to_record = int(rx_time * self.rx_sample_rate)
else:
raise SDRParameterError("Must provide input of one of num_samples or rx_time")
2025-09-12 11:32:49 -04:00
# Record in one go if there are less than 2,000,000 samples to record, record in chunks otherwise
if self._num_samples_to_record <= 2_000_000:
return self._record_fast(self._num_samples_to_record)
2025-09-12 11:32:49 -04:00
else:
return self._record_chunked(self._num_samples_to_record)
2025-09-12 11:32:49 -04:00
def _format_tx_data(self, recording: Recording | np.ndarray | list):
if isinstance(recording, np.ndarray):
M
2025-10-02 09:44:02 -04:00
data = self._convert_tx_samples(samples=recording)
2025-09-12 11:32:49 -04:00
elif isinstance(recording, Recording):
if self.radio.tx_enabled_channels == [0]:
samples = recording.data[0]
M
2025-10-02 09:44:02 -04:00
data = self._convert_tx_samples(samples=samples)
2025-09-12 11:32:49 -04:00
if len(recording.data) > 1:
warnings.warn("Recording object is multichannel, only channel 0 data was used for transmission")
else:
if len(recording.data) == 1:
warnings.warn(
"Recording has only 1 channel, the same data will be transmitted over both Pluto channels"
)
samples = recording.data[0]
data = [self._convert_tx_samples(samples), self._convert_tx_samples(samples)]
else:
if len(recording) > 2:
warnings.warn(
"More recordings were provided than channels in the Pluto. \
Only the first two recordings will be used"
)
sample0 = self._convert_tx_samples(recording.data[0])
sample1 = self._convert_tx_samples(recording.data[1])
data = [sample0, sample1]
elif isinstance(recording, list):
if len(recording) > 2:
warnings.warn(
"More recordings were provided than channels in the Pluto. \
Only the first two recordings will be used"
)
if isinstance(recording[0], np.ndarray):
data = [self._convert_tx_samples(recording[0]), self._convert_tx_samples(recording[1])]
elif isinstance(recording[0], Recording):
sample0 = self._convert_tx_samples(recording[0].data[0])
sample1 = self._convert_tx_samples(recording[1].data[0])
data = [sample0, sample1]
return data
def _timeout_cyclic_buffer(self, timeout):
time.sleep(timeout)
self.radio.tx_destroy_buffer()
self.radio.tx_cyclic_buffer = False
print("Pluto TX Completed.")
def interrupt_transmit(self):
with self._tx_lock:
self.radio.tx_destroy_buffer()
self.radio.tx_cyclic_buffer = False
2025-09-12 11:32:49 -04:00
print("Pluto TX Completed.")
def tx_recording(self, recording: Recording | np.ndarray | list, num_samples=None, tx_time=None, mode="timed"):
"""
Transmit the given iq samples from the provided recording.
init_tx() must be called before this function.
:param recording: The recording(s) to transmit.
:type recording: Recording, np.ndarray, list[Recording, np.ndarray]
:param num_samples: The number of samples to transmit, will repeat or
truncate the recording to this length. Defaults to None.
:type num_samples: int, optional
:param tx_time: The time to transmit, will repeat or truncate the
recording to this length. Defaults to None.
:type tx_time: int or float, optional
:param mode: The mode of transmission, either timed or continuous. Defaults to timed.
:type mode: str, optional
"""
if num_samples is not None and tx_time is not None:
raise SDRParameterError("Only input one of num_samples or tx_time")
2025-09-12 11:32:49 -04:00
elif num_samples is not None:
tx_time = num_samples / self.tx_sample_rate
elif tx_time is not None:
pass
else:
tx_time = len(recording) / self.tx_sample_rate
data = self._format_tx_data(recording=recording)
with self._tx_lock:
try:
if self.radio.tx_cyclic_buffer:
print("Destroying existing TX buffer...")
self.radio.tx_destroy_buffer()
self.radio.tx_cyclic_buffer = False
except Exception as e:
print(f"Error while destroying TX buffer: {e}")
self.radio.tx_cyclic_buffer = True
print("Pluto Starting TX...")
self.radio.tx(data_np=data)
if mode == "timed":
timeout_thread = threading.Thread(target=self._timeout_cyclic_buffer, args=([tx_time]))
timeout_thread.start()
timeout_thread.join()
2025-09-12 11:32:49 -04:00
def _stream_tx(self, callback):
if self._tx_initialized is False:
raise RuntimeError("TX was not initialized, init_tx must be called before _stream_tx")
if not hasattr(self, "tx_buffer_size"):
self.tx_buffer_size = 10000
2025-09-12 11:32:49 -04:00
self._enable_tx = True
while self._enable_tx is True:
buffer = self._convert_tx_samples(callback(self.tx_buffer_size))
2025-09-12 11:32:49 -04:00
self.radio.tx(buffer[0])
def set_rx_center_frequency(self, center_frequency):
"""
Set the center frequency of the receiver. Callable during streaming.
"""
with self._param_lock:
if center_frequency < 70e6 or center_frequency > 6e9:
raise SDRParameterError(
f"{self.__class__.__name__}: Center frequency {center_frequency/1e9:.3f} GHz "
f"out of range:\nStandard:\t[{325e6/1e9:.3f} - {3.8e9/1e9:.3f} GHz]\nHacked:\t"
f"[{70e6/1e9:.3f} - {6e9/1e9:.3f} GHz]"
)
2025-09-12 11:32:49 -04:00
try:
self.radio.rx_lo = int(center_frequency)
self.rx_center_frequency = center_frequency
except OSError as e:
raise SDRError(e)
except ValueError:
raise SDRParameterError(
f"{self.__class__.__name__}: Center frequency {center_frequency/1e9:.3f} GHz "
f"out of range:\nStandard:\t[{325e6/1e9:.3f} - {3.8e9/1e9:.3f} GHz]\nHacked:\t"
f"[{70e6/1e9:.3f} - {6e9/1e9:.3f} GHz]"
)
2025-09-12 11:32:49 -04:00
def set_rx_sample_rate(self, sample_rate):
"""
Set the sample rate of the receiver. Callable during streaming.
"""
with self._param_lock:
min_rate, max_rate = 65.1e3, 61.44e6
if sample_rate < min_rate or sample_rate > max_rate:
raise SDRParameterError(
f"{self.__class__.__name__}: Sample rate {sample_rate/1e6:.3f} Msps "
f"out of range: [{min_rate/1e6:.3f} - {max_rate/1e6:.3f} Msps]"
)
2025-09-12 11:32:49 -04:00
try:
# set the sample rate
self.radio.sample_rate = int(sample_rate)
self.rx_sample_rate = sample_rate
2025-09-12 11:32:49 -04:00
# set the front end filter width
self.radio.rx_rf_bandwidth = int(sample_rate)
except OSError as e:
raise SDRError(e)
except ValueError:
raise SDRParameterError(
f"{self.__class__.__name__}: Sample rate {sample_rate/1e6:.3f} Msps "
f"out of range: [{min_rate/1e6:.3f} - {max_rate/1e6:.3f} Msps]"
)
2025-09-12 11:32:49 -04:00
M
2025-10-16 15:22:07 -04:00
def set_rx_gain(self, gain, channel=0, gain_mode="absolute"):
"""
Set the gain of the receiver. Callable during streaming.
"""
with self._param_lock:
rx_gain_min = 0
rx_gain_max = 74
if gain_mode == "relative":
if gain > 0:
raise SDRParameterError(
"When gain_mode = 'relative', gain must be < 0. This sets \
the gain relative to the maximum possible gain."
)
else:
abs_gain = rx_gain_max + gain
M
2025-10-16 15:22:07 -04:00
else:
abs_gain = gain
M
2025-10-16 15:22:07 -04:00
if abs_gain < rx_gain_min or abs_gain > rx_gain_max:
abs_gain = min(max(gain, rx_gain_min), rx_gain_max)
print(f"Gain {gain} out of range for Pluto.")
print(f"Gain range: {rx_gain_min} to {rx_gain_max} dB")
M
2025-10-16 15:22:07 -04:00
self.rx_gain = abs_gain
2025-09-12 11:32:49 -04:00
if channel == 0:
M
2025-10-16 15:22:07 -04:00
if abs_gain is None:
2025-09-12 11:32:49 -04:00
self.radio.gain_control_mode_chan0 = "automatic"
print("Using Pluto Automatic Gain Control.")
else:
self.radio.gain_control_mode_chan0 = "manual"
M
2025-10-16 15:22:07 -04:00
self.radio.rx_hardwaregain_chan0 = abs_gain # dB
2025-09-12 11:32:49 -04:00
elif channel == 1:
try:
M
2025-10-16 15:22:07 -04:00
if abs_gain is None:
2025-09-12 11:32:49 -04:00
self.radio.gain_control_mode_chan1 = "automatic"
print("Using Pluto Automatic Gain Control.")
else:
self.radio.gain_control_mode_chan1 = "manual"
M
2025-10-16 15:22:07 -04:00
self.radio.rx_hardwaregain_chan1 = abs_gain # dB
2025-09-12 11:32:49 -04:00
except Exception as e:
print("Failed to use channel 1 on the PlutoSDR.\nThis is only available for revC versions.")
2025-09-12 11:32:49 -04:00
raise e
else:
raise SDRParameterError(f"Pluto channel must be 0 or 1 but was {channel}.")
2025-09-12 11:32:49 -04:00
def set_rx_channel(self, channel):
M
2025-10-16 15:22:07 -04:00
if channel == 0:
self.radio.rx_enabled_channels = [0]
elif channel == 1:
if not self._mimo_capable:
raise SDRParameterError(
"Dual RX channel requested (channel=1) but hardware is not MIMO-capable. "
"Dual RX/TX requires Pluto Rev C/D. Detected hardware: Rev B (single channel only)."
)
M
2025-10-16 15:22:07 -04:00
self.radio.rx_enabled_channels = [0, 1]
else:
raise SDRParameterError("Channel must be either 0 or 1.")
2025-09-12 11:32:49 -04:00
print(f"Pluto channel(s) = {self.radio.rx_enabled_channels}")
def set_rx_buffer_size(self, buffer_size: int):
M
2025-10-16 15:22:07 -04:00
if buffer_size is None:
raise SDRParameterError("Buffer_size must be provided.")
M
2025-10-16 15:22:07 -04:00
if buffer_size <= 0:
raise SDRParameterError("Buffer_size must be a positive integer.")
M
2025-10-16 15:22:07 -04:00
if hasattr(self, "radio"):
try:
self.radio.rx_buffer_size = buffer_size
except Exception as e:
raise SDRError(e)
2025-09-12 11:32:49 -04:00
def set_tx_center_frequency(self, center_frequency):
if center_frequency < 70e6 or center_frequency > 6e9:
raise SDRParameterError(
f"{self.__class__.__name__}: Center frequency {center_frequency/1e9:.3f} GHz "
f"out of range:\nStandard:\t[{325e6/1e9:.3f} - {3.8e9/1e9:.3f} GHz]\nHacked:\t"
f"[{70e6/1e9:.3f} - {6e9/1e9:.3f} GHz]"
)
2025-09-12 11:32:49 -04:00
try:
self.radio.tx_lo = int(center_frequency)
self.tx_center_frequency = center_frequency
except OSError as e:
raise SDRError(e)
except ValueError:
raise SDRParameterError(
f"{self.__class__.__name__}: Center frequency {center_frequency/1e9:.3f} GHz "
f"out of range:\nStandard:\t[{325e6/1e9:.3f} - {3.8e9/1e9:.3f} GHz]\nHacked:\t"
f"[{70e6/1e9:.3f} - {6e9/1e9:.3f} GHz]"
)
2025-09-12 11:32:49 -04:00
def set_tx_sample_rate(self, sample_rate):
min_rate, max_rate = 65.1e3, 61.44e6
if sample_rate < min_rate or sample_rate > max_rate:
raise SDRParameterError(
f"{self.__class__.__name__}: Sample rate {sample_rate/1e6:.3f} Msps "
f"out of range: [{min_rate/1e6:.3f} - {max_rate/1e6:.3f} Msps]"
)
2025-09-12 11:32:49 -04:00
try:
self.radio.sample_rate = sample_rate
self.tx_sample_rate = sample_rate
except OSError as e:
raise SDRError(e)
except ValueError:
raise SDRParameterError(
f"{self.__class__.__name__}: Sample rate {sample_rate/1e6:.3f} Msps "
f"out of range: [{min_rate/1e6:.3f} - {max_rate/1e6:.3f} Msps]"
)
2025-09-12 11:32:49 -04:00
M
2025-10-16 15:22:07 -04:00
def set_tx_gain(self, gain, channel=0, gain_mode="absolute"):
tx_gain_min = -89
tx_gain_max = 0
if gain_mode == "relative":
if gain > 0:
raise SDRParameterError(
M
2025-10-16 15:22:07 -04:00
"When gain_mode = 'relative', gain must be < 0. This sets\
the gain relative to the maximum possible gain."
)
else:
abs_gain = tx_gain_max + gain
else:
abs_gain = gain
if abs_gain < tx_gain_min or abs_gain > tx_gain_max:
abs_gain = min(max(gain, tx_gain_min), tx_gain_max)
print(f"Gain {gain} out of range for Pluto.")
print(f"Gain range: {tx_gain_min} to {tx_gain_max} dB")
2025-09-12 11:32:49 -04:00
try:
M
2025-10-16 15:22:07 -04:00
self.tx_gain = abs_gain
2025-09-12 11:32:49 -04:00
if channel == 0:
M
2025-10-16 15:22:07 -04:00
self.radio.tx_hardwaregain_chan0 = int(abs_gain)
2025-09-12 11:32:49 -04:00
elif channel == 1:
M
2025-10-16 15:22:07 -04:00
self.radio.tx_hardwaregain_chan1 = int(abs_gain)
2025-09-12 11:32:49 -04:00
else:
raise SDRParameterError(f"Pluto channel must be 0 or 1 but was {channel}.")
2025-09-12 11:32:49 -04:00
except Exception as e:
raise SDRError(e)
2025-09-12 11:32:49 -04:00
def set_tx_channel(self, channel):
if channel == 0:
M
2025-10-16 15:22:07 -04:00
self.radio.tx_enabled_channels = [0]
elif channel == 1:
if not self._mimo_capable:
raise SDRParameterError(
"Dual TX channel requested (channel=1) but hardware is not MIMO-capable. "
"Dual RX/TX requires Pluto Rev C/D. Detected hardware: Rev B (single channel only)."
)
self.radio.tx_enabled_channels = [0, 1]
M
2025-10-16 15:22:07 -04:00
else:
raise SDRParameterError("Channel must be either 0 or 1.")
2025-09-12 11:32:49 -04:00
print(f"Pluto channel(s) = {self.radio.tx_enabled_channels}")
def set_tx_buffer_size(self, buffer_size: int):
if buffer_size is None:
raise SDRParameterError("Buffer_size must be provided.")
if buffer_size <= 0:
raise SDRParameterError("Buffer_size must be a positive integer.")
self.tx_buffer_size = buffer_size
2025-09-12 11:32:49 -04:00
M
2025-10-16 15:22:07 -04:00
def close(self):
if self.radio.tx_cyclic_buffer:
self.radio.tx_destroy_buffer()
del self.radio
2025-09-12 11:32:49 -04:00
def _convert_rx_samples(self, samples):
return samples / (2**11)
def _convert_tx_samples(self, samples):
return samples.astype(np.complex64) * (2**14)
def set_clock_source(self, source):
raise NotImplementedError
def supports_dynamic_updates(self) -> dict:
return {"center_frequency": True, "sample_rate": True, "gain": True}
2025-09-12 11:32:49 -04:00
def _handle_OSError(e):
# process a common difficult to read error message into a more intuitive format
print("PlutoSDR valid arguments:")
print("Standard: ")
print("\tCenter frequency: 325-3800Mhz")
print("\tSample rate: 521kHz-20Mhz")
print("\tGain: -90-0")
print("Hacked:")
print("\tCenter frequency: 70-6000Mhz")
print("\tSample rate: 521kHz-56Mhz")
print("\tGain: -90-0")
stack_trace = traceback.format_exc()
print(stack_trace)
if "sampling_frequency" in stack_trace or "sample rates" in stack_trace:
raise ValueError("The sample rate was out of range for the Pluto SDR.\n")
if "tx_lo" in stack_trace or "rx_lo" in stack_trace:
raise ValueError("The center frequency was out of range for the Pluto SDR.\n")
if "hardwaregain" in stack_trace:
raise ValueError("The gain was out of range for the Pluto SDR.\n")