import threading import time import traceback import warnings from typing import Optional import adi import numpy as np from ria_toolkit_oss.datatypes.recording import Recording from ria_toolkit_oss.sdr.sdr import SDR, SDRError, SDRParameterError class Pluto(SDR): def __init__(self, identifier=None): """ Initialize a Pluto SDR device object and connect to the SDR hardware. This software supports the ADALM Pluto SDR created by Analog Devices. :param identifier: The value of the parameter that identifies the device. :type identifier: str = "192.168.3.1", "pluto.local", etc If no identifier is provided, it will select the first device found, with a warning. If more than one device is found with the identifier, it will select the first of those devices. """ print(f"Initializing Pluto radio with identifier [{identifier}].") try: super().__init__() self._tx_lock = threading.Lock() if identifier is None: uri = "ip:pluto.local" else: uri = f"ip:{identifier}" # Detect MIMO capability by checking IIO channels (one-time, during init) # Rev B: 2 channels (voltage0, voltage1) - single RX/TX only # Rev C/D: 4 channels (voltage0-3) - dual RX/TX capable test_radio = adi.ad9361(uri) ctx = test_radio.ctx dev = ctx.find_device("cf-ad9361-lpc") if dev and len(dev.channels) >= 4: # MIMO-capable hardware (Rev C/D) self.radio = test_radio self._mimo_capable = True print(f"Successfully found MIMO-capable Pluto (Rev C/D) with identifier [{identifier}].") else: # Non-MIMO hardware (Rev B) - use standard Pluto driver del test_radio self.radio = adi.Pluto(uri) self._mimo_capable = False print(f"Successfully found Pluto (Rev B) with identifier [{identifier}].") except Exception as e: print(f"Failed to find Pluto radio with identifier [{identifier}].") raise e def init_rx( self, sample_rate: int | float, center_frequency: int | float, gain: int, channel: int, gain_mode: Optional[str] = "absolute", ): """ Initializes the Pluto for receiving. :param sample_rate: The sample rate for receiving. :type sample_rate: int or float :param center_frequency: The center frequency of the recording. :type center_frequency: int or float :param gain: The gain set for receiving on the Pluto :type gain: int :param channel: The channel the Pluto is set to. Must be 0 or 1. 0 enables channel 1, 1 enables both channels. :type channel: int :param gain_mode: 'absolute' passes gain directly to the sdr, 'relative' means that gain should be a negative value, and it will be subtracted from the max gain (74). :type gain_mode: str """ print("Initializing RX") self.set_rx_sample_rate(sample_rate=int(sample_rate)) print(f"Pluto sample rate = {self.radio.sample_rate}") self.set_rx_center_frequency(center_frequency=int(center_frequency)) print(f"Pluto center frequency = {self.radio.rx_lo}") self.set_rx_channel(channel=channel) self.set_rx_gain(gain=gain, channel=channel, gain_mode=gain_mode) if channel == 0: print(f"Pluto gain = {self.radio.rx_hardwaregain_chan0}") elif channel == 1: self.set_rx_gain(gain=gain, channel=0, gain_mode=gain_mode) print(f"Pluto gain = {self.radio.rx_hardwaregain_chan0}, {self.radio.rx_hardwaregain_chan1}") self._rx_initialized = True self._tx_initialized = False return {"sample_rate": self.rx_sample_rate, "center_frequency": self.rx_center_frequency, "gain": self.rx_gain} def init_tx( self, sample_rate: int | float, center_frequency: int | float, gain: int, channel: int, gain_mode: Optional[str] = "absolute", ): """ Initializes the Pluto for transmitting. Will transmit garbage during center frequency tuning and setting the sample rate. :param sample_rate: The sample rate for transmitting. :type sample_rate: int or float :param center_frequency: The center frequency of the recording. :type center_frequency: int or float :param gain: The gain set for transmitting on the Pluto :type gain: int :param channel: The channel the Pluto is set to. Must be 0 or 1. 0 enables channel 1, 1 enables both channels. :type channel: int :param gain_mode: 'absolute' passes gain directly to the sdr, 'relative' means that gain should be a negative value, and it will be subtracted from the max gain (0). :type gain_mode: str """ print("Initializing TX") self.set_tx_sample_rate(sample_rate=int(sample_rate)) print(f"Pluto sample rate = {self.radio.sample_rate}") self.set_tx_center_frequency(center_frequency=int(center_frequency)) print(f"Pluto center frequency = {self.radio.tx_lo}") self.set_tx_channel(channel=channel) self.set_tx_gain(gain=gain, channel=channel, gain_mode=gain_mode) if channel == 0: print(f"Pluto gain = {self.radio.tx_hardwaregain_chan0}") elif channel == 1: self.set_tx_gain(gain=gain, channel=0, gain_mode=gain_mode) print(f"Pluto gain = {self.radio.tx_hardwaregain_chan0}, {self.radio.tx_hardwaregain_chan1}") self._tx_initialized = True self._rx_initialized = False return {"sample_rate": self.tx_sample_rate, "center_frequency": self.tx_center_frequency, "gain": self.tx_gain} def _stream_rx(self, callback): if not self._rx_initialized: raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record()") self._enable_rx = True while self._enable_rx is True: # collect complex signa from radio signal = self.radio.rx() # send callback complex signal callback(buffer=signal, metadata=None) def _record_fast(self, num_samples): """Optimized single-buffer capture for ≤16M samples.""" self.set_rx_buffer_size(buffer_size=num_samples) print("Pluto Starting RX...") samples = self.radio.rx() # Handle single/dual channel if self.radio.rx_enabled_channels == [0]: samples = [self._convert_rx_samples(samples)] else: samples = [self._convert_rx_samples(s) for s in samples] print("Pluto RX Completed.") metadata = { "source": self.__class__.__name__, "sample_rate": self.rx_sample_rate, "center_frequency": self.rx_center_frequency, "gain": self.rx_gain, } return Recording(data=samples, metadata=metadata) def _record_chunked(self, num_samples): """Chunked streaming capture for >2M samples.""" # Use base class streaming with pre-allocation chunk_size = 2_000_000 # 2M sample chunks (safe size) self.set_rx_buffer_size(buffer_size=chunk_size) self._max_num_buffers = (num_samples // chunk_size) + 1 self._num_buffers_processed = 0 self._accumulated_buffer = None # Stream with accumulation callback print("Pluto Starting RX...") self._stream_rx(callback=self._accumulate_buffers_callback) print("Pluto RX Completed.") print(f"Corrupted buffer count: {self._corrupted_buffer_count}") # Truncate to exact size samples = self._accumulated_buffer[:, :num_samples] samples_list = [self._convert_rx_samples(chan) for chan in samples] metadata = { "source": self.__class__.__name__, "sample_rate": self.rx_sample_rate, "center_frequency": self.rx_center_frequency, "gain": self.rx_gain, } # Reset for next capture self._accumulated_buffer = None return Recording(data=samples_list, metadata=metadata) def record( self, num_samples: Optional[int] = None, rx_time: Optional[int | float] = None, ) -> Recording: """ Create a radio recording (iq samples and metadata) of a given length from the SDR. Either num_samples or rx_time must be provided. init_rx() must be called before record() :param num_samples: The number of samples to record. Pluto max = 16M. :type num_samples: int, optional :param rx_time: The time to record. :type rx_time: int or float, optional returns: Recording object (iq samples and metadata) """ if not self._rx_initialized: raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record()") if num_samples is not None and rx_time is not None: raise SDRParameterError("Only input one of num_samples or rx_time") elif num_samples is not None: self._num_samples_to_record = num_samples elif rx_time is not None: self._num_samples_to_record = int(rx_time * self.rx_sample_rate) else: raise SDRParameterError("Must provide input of one of num_samples or rx_time") # Record in one go if there are less than 2,000,000 samples to record, record in chunks otherwise if self._num_samples_to_record <= 2_000_000: return self._record_fast(self._num_samples_to_record) else: return self._record_chunked(self._num_samples_to_record) def _format_tx_data(self, recording: Recording | np.ndarray | list): if isinstance(recording, np.ndarray): data = self._convert_tx_samples(samples=recording) elif isinstance(recording, Recording): if self.radio.tx_enabled_channels == [0]: samples = recording.data[0] data = self._convert_tx_samples(samples=samples) if len(recording.data) > 1: warnings.warn("Recording object is multichannel, only channel 0 data was used for transmission") else: if len(recording.data) == 1: warnings.warn( "Recording has only 1 channel, the same data will be transmitted over both Pluto channels" ) samples = recording.data[0] data = [self._convert_tx_samples(samples), self._convert_tx_samples(samples)] else: if len(recording) > 2: warnings.warn( "More recordings were provided than channels in the Pluto. \ Only the first two recordings will be used" ) sample0 = self._convert_tx_samples(recording.data[0]) sample1 = self._convert_tx_samples(recording.data[1]) data = [sample0, sample1] elif isinstance(recording, list): if len(recording) > 2: warnings.warn( "More recordings were provided than channels in the Pluto. \ Only the first two recordings will be used" ) if isinstance(recording[0], np.ndarray): data = [self._convert_tx_samples(recording[0]), self._convert_tx_samples(recording[1])] elif isinstance(recording[0], Recording): sample0 = self._convert_tx_samples(recording[0].data[0]) sample1 = self._convert_tx_samples(recording[1].data[0]) data = [sample0, sample1] return data def _timeout_cyclic_buffer(self, timeout): time.sleep(timeout) self.radio.tx_destroy_buffer() self.radio.tx_cyclic_buffer = False print("Pluto TX Completed.") def interrupt_transmit(self): with self._tx_lock: self.radio.tx_destroy_buffer() self.radio.tx_cyclic_buffer = False print("Pluto TX Completed.") def tx_recording(self, recording: Recording | np.ndarray | list, num_samples=None, tx_time=None, mode="timed"): """ Transmit the given iq samples from the provided recording. init_tx() must be called before this function. :param recording: The recording(s) to transmit. :type recording: Recording, np.ndarray, list[Recording, np.ndarray] :param num_samples: The number of samples to transmit, will repeat or truncate the recording to this length. Defaults to None. :type num_samples: int, optional :param tx_time: The time to transmit, will repeat or truncate the recording to this length. Defaults to None. :type tx_time: int or float, optional :param mode: The mode of transmission, either timed or continuous. Defaults to timed. :type mode: str, optional """ if num_samples is not None and tx_time is not None: raise SDRParameterError("Only input one of num_samples or tx_time") elif num_samples is not None: tx_time = num_samples / self.tx_sample_rate elif tx_time is not None: pass else: tx_time = len(recording) / self.tx_sample_rate data = self._format_tx_data(recording=recording) with self._tx_lock: try: if self.radio.tx_cyclic_buffer: print("Destroying existing TX buffer...") self.radio.tx_destroy_buffer() self.radio.tx_cyclic_buffer = False except Exception as e: print(f"Error while destroying TX buffer: {e}") self.radio.tx_cyclic_buffer = True print("Pluto Starting TX...") self.radio.tx(data_np=data) if mode == "timed": timeout_thread = threading.Thread(target=self._timeout_cyclic_buffer, args=([tx_time])) timeout_thread.start() timeout_thread.join() def _stream_tx(self, callback): if self._tx_initialized is False: raise RuntimeError("TX was not initialized, init_tx must be called before _stream_tx") if not hasattr(self, "tx_buffer_size"): self.tx_buffer_size = 10000 self._enable_tx = True while self._enable_tx is True: buffer = self._convert_tx_samples(callback(self.tx_buffer_size)) self.radio.tx(buffer[0]) def set_rx_center_frequency(self, center_frequency): """ Set the center frequency of the receiver. Callable during streaming. """ with self._param_lock: if center_frequency < 70e6 or center_frequency > 6e9: raise SDRParameterError( f"{self.__class__.__name__}: Center frequency {center_frequency/1e9:.3f} GHz " f"out of range:\nStandard:\t[{325e6/1e9:.3f} - {3.8e9/1e9:.3f} GHz]\nHacked:\t" f"[{70e6/1e9:.3f} - {6e9/1e9:.3f} GHz]" ) try: self.radio.rx_lo = int(center_frequency) self.rx_center_frequency = center_frequency except OSError as e: raise SDRError(e) except ValueError: raise SDRParameterError( f"{self.__class__.__name__}: Center frequency {center_frequency/1e9:.3f} GHz " f"out of range:\nStandard:\t[{325e6/1e9:.3f} - {3.8e9/1e9:.3f} GHz]\nHacked:\t" f"[{70e6/1e9:.3f} - {6e9/1e9:.3f} GHz]" ) def set_rx_sample_rate(self, sample_rate): """ Set the sample rate of the receiver. Callable during streaming. """ with self._param_lock: min_rate, max_rate = 65.1e3, 61.44e6 if sample_rate < min_rate or sample_rate > max_rate: raise SDRParameterError( f"{self.__class__.__name__}: Sample rate {sample_rate/1e6:.3f} Msps " f"out of range: [{min_rate/1e6:.3f} - {max_rate/1e6:.3f} Msps]" ) try: # set the sample rate self.radio.sample_rate = int(sample_rate) self.rx_sample_rate = sample_rate # set the front end filter width self.radio.rx_rf_bandwidth = int(sample_rate) except OSError as e: raise SDRError(e) except ValueError: raise SDRParameterError( f"{self.__class__.__name__}: Sample rate {sample_rate/1e6:.3f} Msps " f"out of range: [{min_rate/1e6:.3f} - {max_rate/1e6:.3f} Msps]" ) def set_rx_gain(self, gain, channel=0, gain_mode="absolute"): """ Set the gain of the receiver. Callable during streaming. """ with self._param_lock: rx_gain_min = 0 rx_gain_max = 74 if gain_mode == "relative": if gain > 0: raise SDRParameterError( "When gain_mode = 'relative', gain must be < 0. This sets \ the gain relative to the maximum possible gain." ) else: abs_gain = rx_gain_max + gain else: abs_gain = gain if abs_gain < rx_gain_min or abs_gain > rx_gain_max: abs_gain = min(max(gain, rx_gain_min), rx_gain_max) print(f"Gain {gain} out of range for Pluto.") print(f"Gain range: {rx_gain_min} to {rx_gain_max} dB") self.rx_gain = abs_gain if channel == 0: if abs_gain is None: self.radio.gain_control_mode_chan0 = "automatic" print("Using Pluto Automatic Gain Control.") else: self.radio.gain_control_mode_chan0 = "manual" self.radio.rx_hardwaregain_chan0 = abs_gain # dB elif channel == 1: try: if abs_gain is None: self.radio.gain_control_mode_chan1 = "automatic" print("Using Pluto Automatic Gain Control.") else: self.radio.gain_control_mode_chan1 = "manual" self.radio.rx_hardwaregain_chan1 = abs_gain # dB except Exception as e: print("Failed to use channel 1 on the PlutoSDR.\nThis is only available for revC versions.") raise e else: raise SDRParameterError(f"Pluto channel must be 0 or 1 but was {channel}.") def set_rx_channel(self, channel): if channel == 0: self.radio.rx_enabled_channels = [0] elif channel == 1: if not self._mimo_capable: raise SDRParameterError( "Dual RX channel requested (channel=1) but hardware is not MIMO-capable. " "Dual RX/TX requires Pluto Rev C/D. Detected hardware: Rev B (single channel only)." ) self.radio.rx_enabled_channels = [0, 1] else: raise SDRParameterError("Channel must be either 0 or 1.") print(f"Pluto channel(s) = {self.radio.rx_enabled_channels}") def set_rx_buffer_size(self, buffer_size: int): if buffer_size is None: raise SDRParameterError("Buffer_size must be provided.") if buffer_size <= 0: raise SDRParameterError("Buffer_size must be a positive integer.") if hasattr(self, "radio"): try: self.radio.rx_buffer_size = buffer_size except Exception as e: raise SDRError(e) def set_tx_center_frequency(self, center_frequency): if center_frequency < 70e6 or center_frequency > 6e9: raise SDRParameterError( f"{self.__class__.__name__}: Center frequency {center_frequency/1e9:.3f} GHz " f"out of range:\nStandard:\t[{325e6/1e9:.3f} - {3.8e9/1e9:.3f} GHz]\nHacked:\t" f"[{70e6/1e9:.3f} - {6e9/1e9:.3f} GHz]" ) try: self.radio.tx_lo = int(center_frequency) self.tx_center_frequency = center_frequency except OSError as e: raise SDRError(e) except ValueError: raise SDRParameterError( f"{self.__class__.__name__}: Center frequency {center_frequency/1e9:.3f} GHz " f"out of range:\nStandard:\t[{325e6/1e9:.3f} - {3.8e9/1e9:.3f} GHz]\nHacked:\t" f"[{70e6/1e9:.3f} - {6e9/1e9:.3f} GHz]" ) def set_tx_sample_rate(self, sample_rate): min_rate, max_rate = 65.1e3, 61.44e6 if sample_rate < min_rate or sample_rate > max_rate: raise SDRParameterError( f"{self.__class__.__name__}: Sample rate {sample_rate/1e6:.3f} Msps " f"out of range: [{min_rate/1e6:.3f} - {max_rate/1e6:.3f} Msps]" ) try: self.radio.sample_rate = sample_rate self.tx_sample_rate = sample_rate except OSError as e: raise SDRError(e) except ValueError: raise SDRParameterError( f"{self.__class__.__name__}: Sample rate {sample_rate/1e6:.3f} Msps " f"out of range: [{min_rate/1e6:.3f} - {max_rate/1e6:.3f} Msps]" ) def set_tx_gain(self, gain, channel=0, gain_mode="absolute"): tx_gain_min = -89 tx_gain_max = 0 if gain_mode == "relative": if gain > 0: raise SDRParameterError( "When gain_mode = 'relative', gain must be < 0. This sets\ the gain relative to the maximum possible gain." ) else: abs_gain = tx_gain_max + gain else: abs_gain = gain if abs_gain < tx_gain_min or abs_gain > tx_gain_max: abs_gain = min(max(gain, tx_gain_min), tx_gain_max) print(f"Gain {gain} out of range for Pluto.") print(f"Gain range: {tx_gain_min} to {tx_gain_max} dB") try: self.tx_gain = abs_gain if channel == 0: self.radio.tx_hardwaregain_chan0 = int(abs_gain) elif channel == 1: self.radio.tx_hardwaregain_chan1 = int(abs_gain) else: raise SDRParameterError(f"Pluto channel must be 0 or 1 but was {channel}.") except Exception as e: raise SDRError(e) def set_tx_channel(self, channel): if channel == 0: self.radio.tx_enabled_channels = [0] elif channel == 1: if not self._mimo_capable: raise SDRParameterError( "Dual TX channel requested (channel=1) but hardware is not MIMO-capable. " "Dual RX/TX requires Pluto Rev C/D. Detected hardware: Rev B (single channel only)." ) self.radio.tx_enabled_channels = [0, 1] else: raise SDRParameterError("Channel must be either 0 or 1.") print(f"Pluto channel(s) = {self.radio.tx_enabled_channels}") def set_tx_buffer_size(self, buffer_size: int): if buffer_size is None: raise SDRParameterError("Buffer_size must be provided.") if buffer_size <= 0: raise SDRParameterError("Buffer_size must be a positive integer.") self.tx_buffer_size = buffer_size def close(self): if self.radio.tx_cyclic_buffer: self.radio.tx_destroy_buffer() del self.radio def _convert_rx_samples(self, samples): return samples / (2**11) def _convert_tx_samples(self, samples): return samples.astype(np.complex64) * (2**14) def set_clock_source(self, source): raise NotImplementedError def supports_dynamic_updates(self) -> dict: return {"center_frequency": True, "sample_rate": True, "gain": True} def _handle_OSError(e): # process a common difficult to read error message into a more intuitive format print("PlutoSDR valid arguments:") print("Standard: ") print("\tCenter frequency: 325-3800Mhz") print("\tSample rate: 521kHz-20Mhz") print("\tGain: -90-0") print("Hacked:") print("\tCenter frequency: 70-6000Mhz") print("\tSample rate: 521kHz-56Mhz") print("\tGain: -90-0") stack_trace = traceback.format_exc() print(stack_trace) if "sampling_frequency" in stack_trace or "sample rates" in stack_trace: raise ValueError("The sample rate was out of range for the Pluto SDR.\n") if "tx_lo" in stack_trace or "rx_lo" in stack_trace: raise ValueError("The center frequency was out of range for the Pluto SDR.\n") if "hardwaregain" in stack_trace: raise ValueError("The gain was out of range for the Pluto SDR.\n")