ria-toolkit-oss/src/ria_toolkit_oss/sdr/blade.py

619 lines
23 KiB
Python
Raw Normal View History

import gc
M
2025-10-16 15:22:07 -04:00
import warnings
from typing import Optional
import numpy as np
from bladerf import _bladerf
from ria_toolkit_oss.datatypes import Recording
from ria_toolkit_oss.sdr import SDR, SDRError, SDRParameterError
class Blade(SDR):
def __init__(self, identifier=""):
"""
Initialize a BladeRF device object and connect to the SDR hardware.
:param identifier: Not used for BladeRF.
BladeRF devices cannot currently be selected with and identifier value.
If there are multiple connected devices, the device in use may be selected randomly.
"""
if identifier != "":
warnings.warn(f"Blade: Identifier '{identifier}' will be ignored", UserWarning)
uut = self._probe_bladerf()
if uut is None:
print("No bladeRFs detected. Exiting.")
self._shutdown(error=-1, board=None)
print(uut)
self.device = _bladerf.BladeRF(uut)
self._print_versions(device=self.device)
self.bytes_per_sample = 4
super().__init__()
def _shutdown(self, error=0, board=None):
print("Shutting down with error code: " + str(error))
if board is not None:
board.close()
if error != 0:
raise OSError(f"BladeRF shutdown with error code: {error}")
else:
print("BladeRF shutdown successfully")
def _probe_bladerf(self):
device = None
print("Searching for bladeRF devices...")
try:
devinfos = _bladerf.get_device_list()
if len(devinfos) == 1:
device = "{backend}:device={usb_bus}:{usb_addr}".format(**devinfos[0]._asdict())
print("Found bladeRF device: " + str(device))
if len(devinfos) > 1:
print("Unsupported feature: more than one bladeRFs detected.")
print("\n".join([str(devinfo) for devinfo in devinfos]))
self._shutdown(error=-1, board=None)
except _bladerf.BladeRFError:
print("No bladeRF devices found.")
pass
return device
def _print_versions(self, device=None):
print("libbladeRF version:\t" + str(_bladerf.version()))
if device is not None:
print("Firmware version:\t" + str(device.get_fw_version()))
print("FPGA version:\t\t" + str(device.get_fpga_version()))
return 0
def init_rx(
self,
sample_rate: int | float,
center_frequency: int | float,
gain: int,
channel: int,
buffer_size: Optional[int] = 8192,
gain_mode: Optional[str] = "absolute",
):
"""
Initializes the BladeRF for receiving.
:param sample_rate: The sample rate for receiving.
:type sample_rate: int or float
:param center_frequency: The center frequency of the recording.
:type center_frequency: int or float
:param gain: The gain set for receiving on the BladeRF.
:type gain: int
:param channel: The channel the BladeRF is set to.
:type channel: int
:param buffer_size: The buffer size during receive. Defaults to 8192.
:type buffer_size: int
:param gain_mode: 'absolute' passes gain directly to the SDR;
'relative' means that gain should be a negative value, and it will be subtracted from the max gain (60).
M
2025-10-16 15:22:07 -04:00
:type gain_mode: str
"""
print("Initializing RX")
# Configure BladeRF
self.set_rx_channel(channel)
self.set_rx_sample_rate(sample_rate)
self.set_rx_center_frequency(center_frequency)
self.set_rx_gain(channel, gain, gain_mode)
self.set_rx_buffer_size(buffer_size)
bw = self.rx_sample_rate
if bw < 200000:
bw = 200000
elif bw > 56000000:
bw = 56000000
self.rx_ch.bandwidth = bw
self._rx_initialized = True
self._tx_initialized = False
def _stream_rx(self, callback):
if not self._rx_initialized:
raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record()")
# Setup synchronous stream
self.device.sync_config(
layout=_bladerf.ChannelLayout.RX_X1,
fmt=_bladerf.Format.SC16_Q11,
num_buffers=16,
buffer_size=self.rx_buffer_size,
num_transfers=8,
stream_timeout=3500000000,
)
print("Blade Starting RX...")
self.rx_ch.enable = True
self._enable_rx = True
while self._enable_rx:
# Create receive buffer and read in samples to buffer
# Add them to a list to convert and save after stream is finished
buffer = bytearray(self.rx_buffer_size * self.bytes_per_sample)
self.device.sync_rx(buffer, self.rx_buffer_size)
signal = self._convert_rx_samples(buffer)
self.buffer = buffer
# send callback complex signal
callback(buffer=signal, metadata=None)
# Disable module
print("Blade RX Completed.")
self.rx_ch.enable = False
def record(
self,
num_samples: Optional[int] = None,
rx_time: Optional[int | float] = None,
) -> Recording:
"""
Create a radio recording (iq samples and metadata) of a given length from the Blade.
Either num_samples or rx_time must be provided.
init_rx() must be called before record()
:param num_samples: The number of samples to record.
:type num_samples: int, optional
:param rx_time: The time to record.
:type rx_time: int or float, optional
returns: Recording object (iq samples and metadata)
"""
if not self._rx_initialized:
raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record()")
M
2025-10-16 15:22:07 -04:00
if num_samples is not None and rx_time is not None:
raise SDRParameterError("Only input one of num_samples or rx_time")
M
2025-10-16 15:22:07 -04:00
elif num_samples is not None:
self._num_samples_to_record = num_samples
elif rx_time is not None:
self._num_samples_to_record = int(rx_time * self.rx_sample_rate)
else:
raise SDRParameterError("Must provide input of one of num_samples or rx_time")
M
2025-10-16 15:22:07 -04:00
# Setup synchronous stream
self.device.sync_config(
layout=_bladerf.ChannelLayout.RX_X1,
fmt=_bladerf.Format.SC16_Q11,
num_buffers=16,
buffer_size=self.rx_buffer_size,
num_transfers=8,
stream_timeout=3500000000,
)
print("Blade Starting RX...")
with self._param_lock:
self._enable_rx = True
self.rx_ch.enable = True
M
2025-10-16 15:22:07 -04:00
store_array = np.zeros(
(1, (self._num_samples_to_record // self.rx_buffer_size + 1) * self.rx_buffer_size), dtype=np.complex64
)
M
2025-10-16 15:22:07 -04:00
for i in range(self._num_samples_to_record // self.rx_buffer_size + 1):
# Create receive buffer and read in samples to buffer
# Add them to a list to convert and save after stream is finished
buffer = bytearray(self.rx_buffer_size * self.bytes_per_sample)
self.device.sync_rx(buffer, self.rx_buffer_size)
signal = self._convert_rx_samples(buffer)
store_array[:, i * self.rx_buffer_size : (i + 1) * self.rx_buffer_size] = signal
# Disable module
print("Blade RX Completed.")
with self._param_lock:
self.rx_ch.enable = False
metadata = {
"source": self.__class__.__name__,
"sample_rate": self.rx_sample_rate,
"center_frequency": self.rx_center_frequency,
"gain": self.rx_gain,
}
M
2025-10-16 15:22:07 -04:00
return Recording(data=store_array[:, : self._num_samples_to_record], metadata=metadata)
def init_tx(
self,
sample_rate: int | float,
center_frequency: int | float,
gain: int,
channel: int,
buffer_size: Optional[int] = 32768,
M
2025-10-16 15:22:07 -04:00
gain_mode: Optional[str] = "absolute",
):
"""
Initializes the BladeRF for transmitting.
:param sample_rate: The sample rate for transmitting.
:type sample_rate: int or float
:param center_frequency: The center frequency of the recording.
:type center_frequency: int or float
:param gain: The gain set for transmitting on the BladeRF
:type gain: int
:param channel: The channel the BladeRF is set to.
:type channel: int
:param buffer_size: The buffer size during transmission. Defaults to 8192.
:type buffer_size: int
:param gain_mode: 'absolute' passes gain directly to the sdr,
'relative' means that gain should be a negative value, and it will be subtracted from the max gain (60).
M
2025-10-16 15:22:07 -04:00
:type gain_mode: str
:return: 0 if successful, -1 if there's an error.
:rtype: int
M
2025-10-16 15:22:07 -04:00
"""
# Configure BladeRF
self.set_tx_channel(channel)
self.set_tx_sample_rate(sample_rate)
self.set_tx_center_frequency(center_frequency)
self.set_tx_gain(channel=channel, gain=gain, gain_mode=gain_mode)
self.set_tx_buffer_size(buffer_size)
if self.tx_sample_rate >= 7.5e6 and self.tx_buffer_size < 65536:
warnings.warn(
"Blade: For high sample rates, a buffer size of 65536, 131072, or 262144 is recommended", UserWarning
)
M
2025-10-16 15:22:07 -04:00
bw = self.tx_sample_rate
if bw < 200000:
bw = 200000
elif bw > 56000000:
bw = 56000000
self.tx_ch.bandwidth = bw
if self.device is None:
print("TX: Invalid device handle.")
return -1
if self.tx_channel is None:
print("TX: Invalid channel.")
return -1
self._tx_initialized = True
self._rx_initialized = False
return 0
def _stream_tx(self, callback):
# Setup stream
self.device.sync_config(
layout=_bladerf.ChannelLayout.TX_X1,
fmt=_bladerf.Format.SC16_Q11,
num_buffers=16,
buffer_size=8192,
num_transfers=8,
stream_timeout=3500,
)
# Enable module
self.tx_ch.enable = True
self._enable_tx = True
print("Blade Starting TX...")
while self._enable_tx:
buffer = callback(self.tx_buffer_size) # [0]
byte_array = self._convert_tx_samples(buffer)
self.device.sync_tx(byte_array, len(buffer))
# Disable module
print("Blade TX Completed.")
self.tx_ch.enable = False
def tx_recording(
self,
M
2025-10-16 15:22:07 -04:00
recording: Recording | np.ndarray,
num_samples: Optional[int] = None,
tx_time: Optional[int | float] = None,
):
"""
Transmit the given IQ samples from the provided recording.
init_tx() must be called before this function.
:param recording: The recording to transmit.
:type recording: Recording or np.ndarray
:param num_samples: The number of samples to transmit, will repeat or
truncate the recording to this length. Defaults to None.
:type num_samples: int, optional
:param tx_time: The time to transmit, will repeat or truncate the
recording to this length. Defaults to None.
:type tx_time: int or float, optional
"""
if num_samples is not None and tx_time is not None:
raise SDRParameterError("Only input one of num_samples or tx_time")
elif num_samples is not None:
pass
elif tx_time is not None:
num_samples = int(tx_time * self.tx_sample_rate)
else:
num_samples = len(recording)
if isinstance(recording, np.ndarray):
samples = recording
elif isinstance(recording, Recording):
if len(recording.data) > 1:
warnings.warn("Recording object is multichannel, only channel 0 data was used for transmission")
samples = recording.data[0]
else:
raise SDRParameterError("recording must be np.ndarray or Recording")
samples = samples.astype(np.complex64, copy=False)
tx_bytes = self._convert_tx_samples(samples)
# Transmit in chunks
samples_sent = 0
len_samples = len(samples)
chunk_size = self.tx_buffer_size
# Setup stream
self.device.sync_config(
layout=_bladerf.ChannelLayout.TX_X1,
fmt=_bladerf.Format.SC16_Q11,
num_buffers=16,
buffer_size=self.tx_buffer_size,
num_transfers=8,
stream_timeout=3500,
)
# Enable module
self.tx_ch.enable = True
print("Blade Starting TX...")
try:
while samples_sent < num_samples:
this_chunk_size = min(chunk_size, num_samples - samples_sent)
start_idx = (samples_sent % len_samples) * self.bytes_per_sample
end_idx = start_idx + this_chunk_size * self.bytes_per_sample
end_idx %= len_samples * self.bytes_per_sample
if end_idx > start_idx:
chunk_bytes_arr = tx_bytes[start_idx:end_idx]
else:
chunk_bytes_arr = tx_bytes[start_idx:] + tx_bytes[:end_idx]
self.device.sync_tx(chunk_bytes_arr, this_chunk_size)
samples_sent += this_chunk_size
except KeyboardInterrupt:
print("\nTransmission interrupted by user")
# Disable module
print("Blade TX Completed.")
self.tx_ch.enable = False
def _convert_rx_samples(self, samples):
samples = np.frombuffer(samples, dtype=np.int16).astype(np.float32)
samples /= 2048
samples = samples[::2] + 1j * samples[1::2]
return samples
def _convert_tx_samples(self, samples):
# Normalize to maximum amplitude to prevent overflow
max_val = np.max(np.abs(samples))
if max_val > 0:
samples = samples / max_val # Normalize to [-1, 1]
# Scale to Q11 format (use 2047 instead of 2048 to avoid overflow)
# and interleave I/Q samples
tx_samples = np.zeros(len(samples) * 2, dtype=np.int16)
tx_samples[0::2] = (np.real(samples) * 2047).astype(np.int16) # I samples
tx_samples[1::2] = (np.imag(samples) * 2047).astype(np.int16) # Q samples
byte_array = tx_samples.tobytes()
return byte_array
def set_rx_channel(self, channel):
if channel != 0 and channel != 1:
raise SDRParameterError("Channel must be either 0 or 1.")
self.rx_channel = channel
self.rx_ch = self.device.Channel(_bladerf.CHANNEL_RX(channel))
print(f"\nBlade channel = {self.rx_ch}")
def set_rx_sample_rate(self, sample_rate):
"""
Set the sample rate of the receiver.
Not callable during recording; Blade requires stream stop/restart to change sample rate.
"""
with self._param_lock:
if hasattr(self, "rx_channel"):
range_list = self.device.get_sample_rate_range(self.rx_channel)
min_rate, max_rate = range_list[0], range_list[1]
else:
raise SDRError("Must set channel before setting center frequency")
if sample_rate < min_rate or sample_rate > max_rate:
raise SDRParameterError(
f"{self.__class__.__name__}: Sample rate {sample_rate/1e6:.3f} Msps "
f"out of range: [{min_rate/1e6:.3f} - {max_rate/1e6:.3f} Msps]"
)
self.rx_sample_rate = sample_rate
self.rx_ch.sample_rate = self.rx_sample_rate
print(f"Blade sample rate = {self.rx_ch.sample_rate}")
def set_rx_center_frequency(self, center_frequency):
"""
Set the center frequency of the receiver.
Not callable during recording; Blade requires stream stop/restart to change center frequency.
"""
with self._param_lock:
if hasattr(self, "rx_channel"):
range_list = self.device.get_frequency_range(self.rx_channel)
min_rate, max_rate = range_list[0], range_list[1]
else:
raise SDRError("Must set channel before setting center frequency")
if center_frequency < min_rate or center_frequency > max_rate:
raise SDRParameterError(
f"{self.__class__.__name__}: Center frequency {center_frequency/1e9:.3f} GHz "
f"out of range: [{min_rate/1e9:.3f} - {max_rate/1e9:.3f} GHz]"
)
self.rx_center_frequency = center_frequency
self.rx_ch.frequency = center_frequency
print(f"Blade center frequency = {self.rx_ch.frequency}")
def set_rx_gain(self, channel, gain, gain_mode):
"""
Set the gain of the receiver.
Not callable during recording; Blade requires stream stop/restart to change gain.
"""
with self._param_lock:
rx_gain_min = self.device.get_gain_range(channel)[0]
rx_gain_max = self.device.get_gain_range(channel)[1]
if gain_mode == "relative":
if gain > 0:
raise SDRParameterError(
"When gain_mode = 'relative', gain must be < 0. This sets \
the gain relative to the maximum possible gain."
)
else:
abs_gain = rx_gain_max + gain
else:
abs_gain = gain
if abs_gain < rx_gain_min or abs_gain > rx_gain_max:
abs_gain = min(max(gain, rx_gain_min), rx_gain_max)
print(f"Gain {abs_gain} out of range for Blade.")
print(f"Gain range: {rx_gain_min} to {rx_gain_max} dB")
self.rx_gain = abs_gain
self.rx_ch.gain = abs_gain
print(f"Blade gain = {self.rx_ch.gain}")
def set_rx_buffer_size(self, buffer_size):
self.rx_buffer_size = buffer_size
def set_tx_channel(self, channel):
if channel != 0 and channel != 1:
raise SDRParameterError("Channel must be either 0 or 1.")
self.tx_channel = channel
self.tx_ch = self.device.Channel(_bladerf.CHANNEL_TX(self.tx_channel))
print(f"\nBlade channel = {self.tx_ch}")
def set_tx_sample_rate(self, sample_rate):
if hasattr(self, "tx_channel"):
range_list = self.device.get_sample_rate_range(self.tx_channel)
min_rate, max_rate = range_list[0], range_list[1]
else:
raise SDRError("Must set channel before setting center frequency")
if sample_rate < min_rate or sample_rate > max_rate:
raise SDRParameterError(
f"{self.__class__.__name__}: Sample rate {sample_rate/1e6:.3f} Msps "
f"out of range: [{min_rate/1e6:.3f} - {max_rate/1e6:.3f} Msps]"
)
if sample_rate < min_rate or sample_rate > max_rate:
raise SDRParameterError(
f"{self.__class__.__name__}: Sample rate {sample_rate/1e6:.3f} Msps "
f"out of range: [{min_rate/1e6:.3f} - {max_rate/1e6:.3f} Msps]"
)
self.tx_sample_rate = sample_rate
self.tx_ch.sample_rate = self.tx_sample_rate
print(f"Blade sample rate = {self.tx_ch.sample_rate}")
def set_tx_center_frequency(self, center_frequency):
if hasattr(self, "tx_channel"):
range_list = self.device.get_frequency_range(self.tx_channel)
min_rate, max_rate = range_list[0], range_list[1]
else:
raise SDRError("Must set channel before setting center frequency")
if center_frequency < min_rate or center_frequency > max_rate:
raise SDRParameterError(
f"{self.__class__.__name__}: Center frequency {center_frequency/1e9:.3f} GHz "
f"out of range: [{min_rate/1e9:.3f} - {max_rate/1e9:.3f} GHz]"
)
self.tx_center_frequency = center_frequency
self.tx_ch.frequency = center_frequency
print(f"Blade center frequency = {self.tx_ch.frequency}")
def set_tx_gain(self, channel, gain, gain_mode):
tx_gain_min = self.device.get_gain_range(channel)[0]
tx_gain_max = self.device.get_gain_range(channel)[1]
if gain_mode == "relative":
if gain > 0:
raise SDRParameterError(
"When gain_mode = 'relative', gain must be < 0. This sets\
the gain relative to the maximum possible gain."
)
else:
abs_gain = tx_gain_max + gain
else:
abs_gain = gain
if abs_gain < tx_gain_min or abs_gain > tx_gain_max:
abs_gain = min(max(gain, tx_gain_min), tx_gain_max)
print(f"Gain {abs_gain} out of range for Blade.")
print(f"Gain range: {tx_gain_min} to {tx_gain_max} dB")
self.tx_gain = abs_gain
self.tx_ch.gain = abs_gain
print(f"Blade gain = {self.tx_ch.gain}")
def set_tx_buffer_size(self, buffer_size):
self.tx_buffer_size = buffer_size
def set_clock_source(self, source):
if source.lower() == "external":
self.device.set_pll_enable(True)
elif source.lower() == "internal":
print("Disabling PLL")
self.device.set_pll_enable(False)
print(f"Clock source set to {self.device.get_clock_select()}")
print(f"PLL Reference set to {self.device.get_pll_refclk()}")
M
2025-10-16 15:22:07 -04:00
def supports_bias_tee(self) -> bool:
return True
def set_bias_tee(self, enable: bool, channel: Optional[int] = None):
if channel is None:
channel = getattr(self, "rx_channel", getattr(self, "tx_channel", 0))
try:
bladerf_channel = _bladerf.CHANNEL_RX(channel)
self.device.set_bias_tee(bladerf_channel, bool(enable))
except AttributeError as exc: # pragma: no cover - depends on libbladeRF version
raise NotImplementedError("bladeRF binding lacks bias-tee control") from exc
state = "enabled" if enable else "disabled"
print(f"BladeRF bias tee {state} on channel {channel}.")
def close(self):
if hasattr(self, "device") and self.device is not None:
try:
if hasattr(self, "tx_ch"):
self.tx_ch.enable = False
if hasattr(self, "rx_ch"):
self.rx_ch.enable = False
self.device.close()
except Exception as e:
print(f"Warning: error closing bladeRF: {e}")
finally:
del self.device
self.device = None
gc.collect()
def supports_dynamic_updates(self) -> dict:
return {"center_frequency": False, "sample_rate": False, "gain": False}