diff --git a/src/ria_toolkit_oss/sdr/pluto.py b/src/ria_toolkit_oss/sdr/pluto.py index 6c617ee..47af8df 100644 --- a/src/ria_toolkit_oss/sdr/pluto.py +++ b/src/ria_toolkit_oss/sdr/pluto.py @@ -8,7 +8,7 @@ import adi import numpy as np from ria_toolkit_oss.datatypes.recording import Recording -from ria_toolkit_oss.sdr.sdr import SDR +from ria_toolkit_oss.sdr.sdr import SDR, SDRError, SDRParameterError class Pluto(SDR): @@ -28,6 +28,7 @@ class Pluto(SDR): print(f"Initializing Pluto radio with identifier [{identifier}].") try: super().__init__() + self._tx_lock = threading.Lock() if identifier is None: uri = "ip:pluto.local" @@ -74,10 +75,12 @@ class Pluto(SDR): :type center_frequency: int or float :param gain: The gain set for receiving on the Pluto :type gain: int - :param channel: The channel the Pluto is set to. Must be 0 or 1. 0 enables channel 1, 1 enables both channels. + :param channel: The channel the Pluto is set to. Must be 0 or 1. 0 + enables channel 1, 1 enables both channels. :type channel: int :param gain_mode: 'absolute' passes gain directly to the sdr, - 'relative' means that gain should be a negative value, and it will be subtracted from the max gain (74). + 'relative' means that gain should be a negative value, and it will + be subtracted from the max gain (74). :type gain_mode: str """ print("Initializing RX") @@ -88,20 +91,7 @@ class Pluto(SDR): self.set_rx_center_frequency(center_frequency=int(center_frequency)) print(f"Pluto center frequency = {self.radio.rx_lo}") - if channel == 0: - self.radio.rx_enabled_channels = [0] - print(f"Pluto channel(s) = {self.radio.rx_enabled_channels}") - elif channel == 1: - if not self._mimo_capable: - raise ValueError( - "Dual RX channel requested (channel=1) but hardware is not MIMO-capable. " - "Dual RX/TX requires Pluto Rev C/D. Detected hardware: Rev B (single channel only)." - ) - self.radio.rx_enabled_channels = [0, 1] - print(f"Pluto channel(s) = {self.radio.rx_enabled_channels}") - else: - raise ValueError("Channel must be either 0 or 1.") - + self.set_rx_channel(channel=channel) self.set_rx_gain(gain=gain, channel=channel, gain_mode=gain_mode) if channel == 0: print(f"Pluto gain = {self.radio.rx_hardwaregain_chan0}") @@ -109,8 +99,6 @@ class Pluto(SDR): self.set_rx_gain(gain=gain, channel=0, gain_mode=gain_mode) print(f"Pluto gain = {self.radio.rx_hardwaregain_chan0}, {self.radio.rx_hardwaregain_chan1}") - self.set_rx_buffer_size(getattr(self, "rx_buffer_size", 1024)) - self._rx_initialized = True self._tx_initialized = False @@ -134,10 +122,12 @@ class Pluto(SDR): :type center_frequency: int or float :param gain: The gain set for transmitting on the Pluto :type gain: int - :param channel: The channel the Pluto is set to. Must be 0 or 1. 0 enables channel 1, 1 enables both channels. + :param channel: The channel the Pluto is set to. Must be 0 or 1. 0 + enables channel 1, 1 enables both channels. :type channel: int :param gain_mode: 'absolute' passes gain directly to the sdr, - 'relative' means that gain should be a negative value, and it will be subtracted from the max gain (0). + 'relative' means that gain should be a negative value, and it will + be subtracted from the max gain (0). :type gain_mode: str """ @@ -149,20 +139,7 @@ class Pluto(SDR): self.set_tx_center_frequency(center_frequency=int(center_frequency)) print(f"Pluto center frequency = {self.radio.tx_lo}") - if channel == 0: - self.radio.tx_enabled_channels = [0] - print(f"Pluto channel(s) = {self.radio.tx_enabled_channels}") - elif channel == 1: - if not self._mimo_capable: - raise ValueError( - "Dual TX channel requested (channel=1) but hardware is not MIMO-capable. " - "Dual RX/TX requires Pluto Rev C/D. Detected hardware: Rev B (single channel only)." - ) - self.radio.tx_enabled_channels = [0, 1] - print(f"Pluto channel(s) = {self.radio.tx_enabled_channels}") - else: - raise ValueError("Channel must be either 0 or 1.") - + self.set_tx_channel(channel=channel) self.set_tx_gain(gain=gain, channel=channel, gain_mode=gain_mode) if channel == 0: print(f"Pluto gain = {self.radio.tx_hardwaregain_chan0}") @@ -179,16 +156,74 @@ class Pluto(SDR): if not self._rx_initialized: raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record()") - # print("Starting rx...") - self._enable_rx = True while self._enable_rx is True: + # collect complex signa from radio signal = self.radio.rx() - signal = self._convert_rx_samples(signal) + # send callback complex signal callback(buffer=signal, metadata=None) - def record(self, num_samples: Optional[int] = None, rx_time: Optional[int | float] = None): + def _record_fast(self, num_samples): + """Optimized single-buffer capture for ≤16M samples.""" + + self.set_rx_buffer_size(buffer_size=num_samples) + print("Pluto Starting RX...") + samples = self.radio.rx() + + # Handle single/dual channel + if self.radio.rx_enabled_channels == [0]: + samples = [self._convert_rx_samples(samples)] + else: + samples = [self._convert_rx_samples(s) for s in samples] + + print("Pluto RX Completed.") + + metadata = { + "source": self.__class__.__name__, + "sample_rate": self.rx_sample_rate, + "center_frequency": self.rx_center_frequency, + "gain": self.rx_gain, + } + return Recording(data=samples, metadata=metadata) + + def _record_chunked(self, num_samples): + """Chunked streaming capture for >2M samples.""" + + # Use base class streaming with pre-allocation + chunk_size = 2_000_000 # 2M sample chunks (safe size) + self.set_rx_buffer_size(buffer_size=chunk_size) + + self._max_num_buffers = (num_samples // chunk_size) + 1 + self._num_buffers_processed = 0 + self._accumulated_buffer = None + + # Stream with accumulation callback + print("Pluto Starting RX...") + self._stream_rx(callback=self._accumulate_buffers_callback) + print("Pluto RX Completed.") + print(f"Corrupted buffer count: {self._corrupted_buffer_count}") + + # Truncate to exact size + samples = self._accumulated_buffer[:, :num_samples] + samples_list = [self._convert_rx_samples(chan) for chan in samples] + + metadata = { + "source": self.__class__.__name__, + "sample_rate": self.rx_sample_rate, + "center_frequency": self.rx_center_frequency, + "gain": self.rx_gain, + } + + # Reset for next capture + self._accumulated_buffer = None + return Recording(data=samples_list, metadata=metadata) + + def record( + self, + num_samples: Optional[int] = None, + rx_time: Optional[int | float] = None, + ) -> Recording: """ Create a radio recording (iq samples and metadata) of a given length from the SDR. Either num_samples or rx_time must be provided. @@ -205,38 +240,19 @@ class Pluto(SDR): raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record()") if num_samples is not None and rx_time is not None: - raise ValueError("Only input one of num_samples or rx_time") + raise SDRParameterError("Only input one of num_samples or rx_time") elif num_samples is not None: self._num_samples_to_record = num_samples elif rx_time is not None: self._num_samples_to_record = int(rx_time * self.rx_sample_rate) else: - raise ValueError("Must provide input of one of num_samples or rx_time") + raise SDRParameterError("Must provide input of one of num_samples or rx_time") - if self._num_samples_to_record > 16000000: - raise NotImplementedError("Pluto record for num_samples>16M not implemented yet.") - self.radio.rx_buffer_size = self._num_samples_to_record - - print("Pluto Starting RX...") - samples = self.radio.rx() - if self.radio.rx_enabled_channels == [0]: - samples = self._convert_rx_samples(samples) - samples = [samples] + # Record in one go if there are less than 2,000,000 samples to record, record in chunks otherwise + if self._num_samples_to_record <= 2_000_000: + return self._record_fast(self._num_samples_to_record) else: - channel1 = self._convert_rx_samples(samples[0]) - channel2 = self._convert_rx_samples(samples[1]) - samples = [channel1, channel2] - print("Pluto RX Completed.") - - metadata = { - "source": self.__class__.__name__, - "sample_rate": self.rx_sample_rate, - "center_frequency": self.rx_center_frequency, - "gain": self.rx_gain, - } - - recording = Recording(data=samples, metadata=metadata) - return recording + return self._record_chunked(self._num_samples_to_record) def _format_tx_data(self, recording: Recording | np.ndarray | list): if isinstance(recording, np.ndarray): @@ -289,8 +305,9 @@ class Pluto(SDR): print("Pluto TX Completed.") def interrupt_transmit(self): - self.radio.tx_destroy_buffer() - self.radio.tx_cyclic_buffer = False + with self._tx_lock: + self.radio.tx_destroy_buffer() + self.radio.tx_cyclic_buffer = False print("Pluto TX Completed.") def tx_recording(self, recording: Recording | np.ndarray | list, num_samples=None, tx_time=None, mode="timed"): @@ -310,7 +327,7 @@ class Pluto(SDR): :type mode: str, optional """ if num_samples is not None and tx_time is not None: - raise ValueError("Only input one of num_samples or tx_time") + raise SDRParameterError("Only input one of num_samples or tx_time") elif num_samples is not None: tx_time = num_samples / self.tx_sample_rate elif tx_time is not None: @@ -320,82 +337,112 @@ class Pluto(SDR): data = self._format_tx_data(recording=recording) - try: - if self.radio.tx_cyclic_buffer: - print("Destroying existing TX buffer...") - self.radio.tx_destroy_buffer() - self.radio.tx_cyclic_buffer = False - except Exception as e: - print(f"Error while destroying TX buffer: {e}") + with self._tx_lock: + try: + if self.radio.tx_cyclic_buffer: + print("Destroying existing TX buffer...") + self.radio.tx_destroy_buffer() + self.radio.tx_cyclic_buffer = False + except Exception as e: + print(f"Error while destroying TX buffer: {e}") - self.radio.tx_cyclic_buffer = True - print("Pluto Starting TX...") - self.radio.tx(data_np=data) - if mode == "timed": - timeout_thread = threading.Thread(target=self._timeout_cyclic_buffer, args=([tx_time])) - timeout_thread.start() - timeout_thread.join() + self.radio.tx_cyclic_buffer = True + print("Pluto Starting TX...") + self.radio.tx(data_np=data) + if mode == "timed": + timeout_thread = threading.Thread(target=self._timeout_cyclic_buffer, args=([tx_time])) + timeout_thread.start() + timeout_thread.join() def _stream_tx(self, callback): if self._tx_initialized is False: raise RuntimeError("TX was not initialized, init_tx must be called before _stream_tx") - num_samples = 10000 - # TODO remove hardcode + if not hasattr(self, "tx_buffer_size"): + self.tx_buffer_size = 10000 self._enable_tx = True while self._enable_tx is True: - buffer = self._convert_tx_samples(callback(num_samples)) + buffer = self._convert_tx_samples(callback(self.tx_buffer_size)) self.radio.tx(buffer[0]) def set_rx_center_frequency(self, center_frequency): - try: - self.radio.rx_lo = int(center_frequency) - self.rx_center_frequency = center_frequency - except OSError as e: - _handle_OSError(e) - except ValueError as e: - _handle_OSError(e) + """ + Set the center frequency of the receiver. Callable during streaming. + """ + with self._param_lock: + if center_frequency < 70e6 or center_frequency > 6e9: + raise SDRParameterError( + f"{self.__class__.__name__}: Center frequency {center_frequency/1e9:.3f} GHz " + f"out of range:\nStandard:\t[{325e6/1e9:.3f} - {3.8e9/1e9:.3f} GHz]\nHacked:\t" + f"[{70e6/1e9:.3f} - {6e9/1e9:.3f} GHz]" + ) + + try: + self.radio.rx_lo = int(center_frequency) + self.rx_center_frequency = center_frequency + except OSError as e: + raise SDRError(e) + except ValueError: + raise SDRParameterError( + f"{self.__class__.__name__}: Center frequency {center_frequency/1e9:.3f} GHz " + f"out of range:\nStandard:\t[{325e6/1e9:.3f} - {3.8e9/1e9:.3f} GHz]\nHacked:\t" + f"[{70e6/1e9:.3f} - {6e9/1e9:.3f} GHz]" + ) def set_rx_sample_rate(self, sample_rate): - self.rx_sample_rate = sample_rate + """ + Set the sample rate of the receiver. Callable during streaming. + """ + with self._param_lock: + min_rate, max_rate = 65.1e3, 61.44e6 + if sample_rate < min_rate or sample_rate > max_rate: + raise SDRParameterError( + f"{self.__class__.__name__}: Sample rate {sample_rate/1e6:.3f} Msps " + f"out of range: [{min_rate/1e6:.3f} - {max_rate/1e6:.3f} Msps]" + ) - # TODO add logic for limiting sample rate + try: + # set the sample rate + self.radio.sample_rate = int(sample_rate) + self.rx_sample_rate = sample_rate - try: - self.radio.sample_rate = int(sample_rate) - - # set the front end filter width - self.radio.rx_rf_bandwidth = int(sample_rate) - except OSError as e: - _handle_OSError(e) - except ValueError as e: - _handle_OSError(e) + # set the front end filter width + self.radio.rx_rf_bandwidth = int(sample_rate) + except OSError as e: + raise SDRError(e) + except ValueError: + raise SDRParameterError( + f"{self.__class__.__name__}: Sample rate {sample_rate/1e6:.3f} Msps " + f"out of range: [{min_rate/1e6:.3f} - {max_rate/1e6:.3f} Msps]" + ) def set_rx_gain(self, gain, channel=0, gain_mode="absolute"): - rx_gain_min = 0 - rx_gain_max = 74 + """ + Set the gain of the receiver. Callable during streaming. + """ + with self._param_lock: + rx_gain_min = 0 + rx_gain_max = 74 - if gain_mode == "relative": - if gain > 0: - raise ValueError( - "When gain_mode = 'relative', gain must be < 0. This sets \ - the gain relative to the maximum possible gain." - ) + if gain_mode == "relative": + if gain > 0: + raise SDRParameterError( + "When gain_mode = 'relative', gain must be < 0. This sets \ + the gain relative to the maximum possible gain." + ) + else: + abs_gain = rx_gain_max + gain else: - abs_gain = rx_gain_max + gain - else: - abs_gain = gain + abs_gain = gain - if abs_gain < rx_gain_min or abs_gain > rx_gain_max: - abs_gain = min(max(gain, rx_gain_min), rx_gain_max) - print(f"Gain {gain} out of range for Pluto.") - print(f"Gain range: {rx_gain_min} to {rx_gain_max} dB") + if abs_gain < rx_gain_min or abs_gain > rx_gain_max: + abs_gain = min(max(gain, rx_gain_min), rx_gain_max) + print(f"Gain {gain} out of range for Pluto.") + print(f"Gain range: {rx_gain_min} to {rx_gain_max} dB") - self.rx_gain = abs_gain - try: + self.rx_gain = abs_gain if channel == 0: - if abs_gain is None: self.radio.gain_control_mode_chan0 = "automatic" print("Using Pluto Automatic Gain Control.") @@ -415,63 +462,77 @@ class Pluto(SDR): self.radio.rx_hardwaregain_chan1 = abs_gain # dB except Exception as e: - print("Failed to use channel 1 on the PlutoSDR. \nThis is only available for revC versions.") + print("Failed to use channel 1 on the PlutoSDR.\nThis is only available for revC versions.") raise e else: - raise ValueError(f"Pluto channel must be 0 or 1 but was {channel}.") - - except OSError as e: - _handle_OSError(e) - except ValueError as e: - _handle_OSError(e) + raise SDRParameterError(f"Pluto channel must be 0 or 1 but was {channel}.") def set_rx_channel(self, channel): if channel == 0: self.radio.rx_enabled_channels = [0] - print(f"Pluto channel(s) = {self.radio.rx_enabled_channels}") elif channel == 1: + if not self._mimo_capable: + raise SDRParameterError( + "Dual RX channel requested (channel=1) but hardware is not MIMO-capable. " + "Dual RX/TX requires Pluto Rev C/D. Detected hardware: Rev B (single channel only)." + ) self.radio.rx_enabled_channels = [0, 1] - print(f"Pluto channel(s) = {self.radio.rx_enabled_channels}") else: - raise ValueError("Channel must be either 0 or 1.") + raise SDRParameterError("Channel must be either 0 or 1.") - def set_rx_buffer_size(self, buffer_size): + print(f"Pluto channel(s) = {self.radio.rx_enabled_channels}") + + def set_rx_buffer_size(self, buffer_size: int): if buffer_size is None: - raise ValueError("Buffer_size must be provided.") - buffer_size = int(buffer_size) + raise SDRParameterError("Buffer_size must be provided.") if buffer_size <= 0: - raise ValueError("Buffer_size must be a positive integer.") - - self.rx_buffer_size = buffer_size + raise SDRParameterError("Buffer_size must be a positive integer.") if hasattr(self, "radio"): try: self.radio.rx_buffer_size = buffer_size - except OSError as e: - _handle_OSError(e) - except ValueError as e: - _handle_OSError(e) + except Exception as e: + raise SDRError(e) def set_tx_center_frequency(self, center_frequency): + if center_frequency < 70e6 or center_frequency > 6e9: + raise SDRParameterError( + f"{self.__class__.__name__}: Center frequency {center_frequency/1e9:.3f} GHz " + f"out of range:\nStandard:\t[{325e6/1e9:.3f} - {3.8e9/1e9:.3f} GHz]\nHacked:\t" + f"[{70e6/1e9:.3f} - {6e9/1e9:.3f} GHz]" + ) + try: self.radio.tx_lo = int(center_frequency) self.tx_center_frequency = center_frequency - except OSError as e: - _handle_OSError(e) - except ValueError as e: - _handle_OSError(e) + raise SDRError(e) + except ValueError: + raise SDRParameterError( + f"{self.__class__.__name__}: Center frequency {center_frequency/1e9:.3f} GHz " + f"out of range:\nStandard:\t[{325e6/1e9:.3f} - {3.8e9/1e9:.3f} GHz]\nHacked:\t" + f"[{70e6/1e9:.3f} - {6e9/1e9:.3f} GHz]" + ) def set_tx_sample_rate(self, sample_rate): + min_rate, max_rate = 65.1e3, 61.44e6 + if sample_rate < min_rate or sample_rate > max_rate: + raise SDRParameterError( + f"{self.__class__.__name__}: Sample rate {sample_rate/1e6:.3f} Msps " + f"out of range: [{min_rate/1e6:.3f} - {max_rate/1e6:.3f} Msps]" + ) + try: self.radio.sample_rate = sample_rate self.tx_sample_rate = sample_rate - except OSError as e: - _handle_OSError(e) - except ValueError as e: - _handle_OSError(e) + raise SDRError(e) + except ValueError: + raise SDRParameterError( + f"{self.__class__.__name__}: Sample rate {sample_rate/1e6:.3f} Msps " + f"out of range: [{min_rate/1e6:.3f} - {max_rate/1e6:.3f} Msps]" + ) def set_tx_gain(self, gain, channel=0, gain_mode="absolute"): tx_gain_min = -89 @@ -479,7 +540,7 @@ class Pluto(SDR): if gain_mode == "relative": if gain > 0: - raise ValueError( + raise SDRParameterError( "When gain_mode = 'relative', gain must be < 0. This sets\ the gain relative to the maximum possible gain." ) @@ -501,34 +562,39 @@ class Pluto(SDR): elif channel == 1: self.radio.tx_hardwaregain_chan1 = int(abs_gain) else: - raise ValueError(f"Pluto channel must be 0 or 1 but was {channel}.") + raise SDRParameterError(f"Pluto channel must be 0 or 1 but was {channel}.") - except OSError as e: - _handle_OSError(e) - except ValueError as e: - _handle_OSError(e) + except Exception as e: + raise SDRError(e) def set_tx_channel(self, channel): - if channel == 1: - self.radio.tx_enabled_channels = [0, 1] - print(f"Pluto channel(s) = {self.radio.tx_enabled_channels}") - elif channel == 0: + if channel == 0: self.radio.tx_enabled_channels = [0] - print(f"Pluto channel(s) = {self.radio.tx_enabled_channels}") + elif channel == 1: + if not self._mimo_capable: + raise SDRParameterError( + "Dual TX channel requested (channel=1) but hardware is not MIMO-capable. " + "Dual RX/TX requires Pluto Rev C/D. Detected hardware: Rev B (single channel only)." + ) + self.radio.tx_enabled_channels = [0, 1] else: - raise ValueError("Channel must be either 0 or 1.") + raise SDRParameterError("Channel must be either 0 or 1.") - def set_tx_buffer_size(self, buffer_size): - raise NotImplementedError + print(f"Pluto channel(s) = {self.radio.tx_enabled_channels}") + + def set_tx_buffer_size(self, buffer_size: int): + if buffer_size is None: + raise SDRParameterError("Buffer_size must be provided.") + if buffer_size <= 0: + raise SDRParameterError("Buffer_size must be a positive integer.") + + self.tx_buffer_size = buffer_size def close(self): if self.radio.tx_cyclic_buffer: self.radio.tx_destroy_buffer() del self.radio - def shutdown(self): - del self.radio - def _convert_rx_samples(self, samples): return samples / (2**11) @@ -538,6 +604,9 @@ class Pluto(SDR): def set_clock_source(self, source): raise NotImplementedError + def supports_dynamic_updates(self) -> dict: + return {"center_frequency": True, "sample_rate": True, "gain": True} + def _handle_OSError(e):