st_edits #6

Merged
madrigal merged 14 commits from st_edits into main 2025-10-24 10:21:12 -04:00
6 changed files with 958 additions and 396 deletions
Showing only changes of commit a9f8ad4bee - Show all commits

View File

@ -47,6 +47,21 @@ dependencies = [
"pyzmq (>=27.1.0,<28.0.0)",
]
[project.optional-dependencies]
# SDR hardware-specific dependencies (optional installs)
rtlsdr = ["pyrtlsdr>=0.2.9"]
pluto = ["pyadi-iio>=0.0.14"]
usrp = [] # Requires system UHD installation
hackrf = ["pyhackrf>=0.2.0"]
bladerf = [] # Requires system libbladerf installation
# All SDR hardware support
all-sdr = [
"pyrtlsdr>=0.2.9",
"pyadi-iio>=0.0.14",
"pyhackrf>=0.2.0",
]
[tool.poetry]
packages = [
{ include = "ria_toolkit_oss", from = "src" }

View File

@ -325,8 +325,8 @@ f.argtypes = [p_hackrf_device, POINTER(read_partid_serialno_t)]
# libhackrf.hackrf_set_txvga_gain.argtypes = [POINTER(hackrf_device), c_uint32]
## extern ADDAPI int ADDCALL hackrf_set_antenna_enable(hackrf_device*
## device, const uint8_t value);
# libhackrf.hackrf_set_antenna_enable.restype = c_int
# libhackrf.hackrf_set_antenna_enable.argtypes = [POINTER(hackrf_device), c_uint8]
libhackrf.hackrf_set_antenna_enable.restype = c_int
libhackrf.hackrf_set_antenna_enable.argtypes = [p_hackrf_device, c_uint8]
#
## extern ADDAPI const char* ADDCALL hackrf_error_name(enum hackrf_error errcode);
## libhackrf.hackrf_error_name.restype = POINTER(c_char)
@ -537,6 +537,16 @@ class HackRF(object):
raise IOError("error disabling amp")
return 0
def set_antenna_enable(self, enable):
value = 1 if enable else 0
result = libhackrf.hackrf_set_antenna_enable(self.dev_p, value)
if result != 0:
error_name = get_error_name(result)
raise IOError(f"Error setting antenna bias tee: {error_name} (Code {result})")
state = "enabled" if enable else "disabled"
print(f"HackRF antenna bias tee {state}.")
return 0
# rounds down to multiple of 8 (15 -> 8, 39 -> 32), etc.
# internally, hackrf_set_lna_gain does the same thing
# But we take care of it so we can keep track of the correct gain
@ -582,6 +592,75 @@ class HackRF(object):
if result != 0:
raise IOError("stop_rx failure")
def _rx_capture_callback(self, hackrf_transfer):
"""Instance method callback for RX capture - prevents garbage collection"""
try:
c = hackrf_transfer.contents
# Append bytes to buffer using string_at
from ctypes import string_at
byte_chunk = string_at(c.buffer, c.valid_length)
self._capture_buffer.extend(byte_chunk)
# Check if we have enough
if len(self._capture_buffer) >= self._capture_target:
self._capture_done = True
return 1 # Stop streaming
return 0
except Exception as e:
print(f"Error in RX capture callback: {e}")
import traceback
traceback.print_exc()
self._capture_done = True
return 1
def read_samples(self, num_samples):
"""
Block capture mode for HackRF - captures exactly num_samples.
This is safer than streaming for USB2 and avoids buffer overflow issues.
:param num_samples: Number of complex samples to capture
:return: numpy array of complex64 samples
"""
# Initialize capture state as instance variables
self._capture_buffer = bytearray()
self._capture_target = num_samples * 2 # 2 bytes per complex sample (I+Q as int8)
self._capture_done = False
# Store callback as instance variable to prevent garbage collection (like TX does)
self._rx_cb = _callback(self._rx_capture_callback)
# Start RX with the callback
result = libhackrf.hackrf_start_rx(self.dev_p, self._rx_cb, None)
if result != 0:
raise IOError("start_rx failure during read_samples")
# Wait for capture to complete
import time
timeout = num_samples / self.sample_rate + 5.0 # Add 5 second buffer
start_time = time.time()
while not self._capture_done:
if time.time() - start_time > timeout:
print("HackRF capture timeout!")
break
time.sleep(0.01)
# Stop RX
self.stop_rx()
# Convert bytes to complex samples
byte_data = bytes(self._capture_buffer[:self._capture_target])
all_samples = np.frombuffer(byte_data, dtype=np.int8).astype(np.float32).view(np.complex64)
# Clean up instance variables
del self._capture_buffer
del self._capture_target
del self._capture_done
del self._rx_cb
return all_samples[:num_samples]
# Add transmit gain property
def set_txvga_gain(self, gain):
if gain < 0 or gain > 47:

View File

@ -35,6 +35,22 @@ class Blade(SDR):
super().__init__()
def supports_bias_tee(self) -> bool:
return True
def set_bias_tee(self, enable: bool, channel: Optional[int] = None):
if channel is None:
channel = getattr(self, "rx_channel", getattr(self, "tx_channel", 0))
try:
bladerf_channel = _bladerf.CHANNEL_RX(channel)
self.device.set_bias_tee(bladerf_channel, bool(enable))
except AttributeError as exc: # pragma: no cover - depends on libbladeRF version
raise NotImplementedError("bladeRF binding lacks bias-tee control") from exc
state = "enabled" if enable else "disabled"
print(f"BladeRF bias tee {state} on channel {channel}.")
def _shutdown(self, error=0, board=None):
print("Shutting down with error code: " + str(error))
if board is not None:
@ -240,6 +256,91 @@ class Blade(SDR):
return Recording(data=store_array[:, :num_samples], metadata=metadata)
def tx_recording(
self,
recording: "Recording | np.ndarray",
num_samples: Optional[int] = None,
tx_time: Optional[int | float] = None,
):
"""
Transmit the given IQ samples from the provided recording.
init_tx() must be called before this function.
:param recording: The recording to transmit.
:type recording: Recording or np.ndarray
:param num_samples: The number of samples to transmit, will repeat or
truncate the recording to this length. Defaults to None.
:type num_samples: int, optional
:param tx_time: The time to transmit, will repeat or truncate the
recording to this length. Defaults to None.
:type tx_time: int or float, optional
"""
import warnings
import time
from ria_toolkit_oss.datatypes.recording import Recording
if num_samples is not None and tx_time is not None:
raise ValueError("Only input one of num_samples or tx_time")
elif num_samples is not None:
tx_time = num_samples / self.tx_sample_rate
elif tx_time is not None:
pass
else:
tx_time = len(recording) / self.tx_sample_rate
if isinstance(recording, np.ndarray):
samples = recording
elif isinstance(recording, Recording):
if len(recording.data) > 1:
warnings.warn("Recording object is multichannel, only channel 0 data was used for transmission")
samples = recording.data[0]
else:
raise TypeError("recording must be np.ndarray or Recording")
samples = samples.astype(np.complex64, copy=False)
# Setup stream
self.device.sync_config(
layout=_bladerf.ChannelLayout.TX_X1,
fmt=_bladerf.Format.SC16_Q11,
num_buffers=16,
buffer_size=self.tx_buffer_size,
num_transfers=8,
stream_timeout=3500,
)
# Enable module
self.tx_ch.enable = True
print("Blade Starting TX...")
# Transmit samples - repeat as needed for the duration
start_time = time.time()
sample_index = 0
try:
while time.time() - start_time < tx_time:
# Get next chunk
chunk_size = min(self.tx_buffer_size, len(samples) - sample_index)
if chunk_size == 0:
# Reached end, loop back
sample_index = 0
chunk_size = min(self.tx_buffer_size, len(samples))
chunk = samples[sample_index:sample_index + chunk_size]
sample_index += chunk_size
# Convert and transmit
byte_array = self._convert_tx_samples(chunk)
self.device.sync_tx(byte_array, len(chunk))
except KeyboardInterrupt:
print("\nTransmission interrupted by user")
# Disable module
print("Blade TX Completed.")
self.tx_ch.enable = False
def _stream_tx(self, callback):
# Setup stream

View File

@ -1,5 +1,6 @@
import time
import warnings
import math
from typing import Optional
import numpy as np
@ -35,10 +36,101 @@ class HackRF(SDR):
super().__init__()
def supports_bias_tee(self) -> bool:
return True
def set_bias_tee(self, enable: bool):
try:
self.radio.set_antenna_enable(bool(enable))
except AttributeError as exc: # pragma: no cover - defensive
raise NotImplementedError("Underlying HackRF interface lacks bias-tee control") from exc
def init_rx(self, sample_rate, center_frequency, gain, channel, gain_mode):
self._tx_initialized = False
"""
Initializes the HackRF for receiving.
HackRF has 3 gain stages:
- 14 dB front-end amplifier (on/off)
- LNA gain: 0-40 dB in 8 dB steps
- VGA gain: 0-62 dB in 2 dB steps
:param sample_rate: The sample rate for receiving.
:type sample_rate: int or float
:param center_frequency: The center frequency of the recording.
:type center_frequency: int or float
:param gain: The total gain set for receiving on the HackRF (distributed across stages)
:type gain: int
:param channel: The channel the HackRF is set to. (Not actually used)
:type channel: int
:param gain_mode: Gain mode setting. Currently only "absolute" is supported.
:type gain_mode: str
"""
print("Initializing RX")
self.rx_sample_rate = sample_rate
self.radio.sample_rate = int(sample_rate)
print(f"HackRF sample rate = {self.radio.sample_rate}")
self.rx_center_frequency = center_frequency
self.radio.center_freq = int(center_frequency)
print(f"HackRF center frequency = {self.radio.center_freq}")
# Distribute gain across amplifier stages
rx_gain_min = 0
rx_gain_max = 116 # 14 (amp) + 40 (LNA) + 62 (VGA)
if gain_mode == "relative":
if gain > 0:
raise ValueError(
"When gain_mode = 'relative', gain must be < 0. This "
"sets the gain relative to the maximum possible gain."
)
else:
abs_gain = rx_gain_max + gain
else:
abs_gain = gain
if abs_gain < rx_gain_min or abs_gain > rx_gain_max:
abs_gain = min(max(abs_gain, rx_gain_min), rx_gain_max)
print(f"Gain {gain} out of range for HackRF.")
print(f"Gain range: {rx_gain_min} to {rx_gain_max} dB")
# Distribute gain using the signal-testbed algorithm
enable_amp = False
remaining_gain = abs_gain
# Enable 14 dB pre-amp if gain is high enough
if remaining_gain > 30:
remaining_gain = remaining_gain - 14
enable_amp = True
print("HackRF: 14dB front-end amplifier enabled.")
# Distribute remaining gain between LNA and VGA
# LNA gets 60% of remaining gain, rounded down to 8 dB steps
lna_gain = math.floor(remaining_gain * 0.6)
lna_gain = lna_gain - (lna_gain % 8) # Round to 8 dB steps
if lna_gain > 40:
lna_gain = 40
# VGA gets the rest
vga_gain = remaining_gain - lna_gain
if vga_gain > 62:
vga_gain = 62
# Apply gain settings
if enable_amp:
self.radio.enable_amp()
else:
self.radio.disable_amp()
self.radio.set_lna_gain(lna_gain)
self.radio.set_vga_gain(vga_gain)
self.rx_gain = abs_gain
print(f"HackRF gain distribution: Amp={enable_amp}, LNA={lna_gain}dB, VGA={vga_gain}dB")
self._rx_initialized = True
return NotImplementedError("RX not yet implemented for HackRF")
self._tx_initialized = False
def init_tx(
self,
@ -151,10 +243,87 @@ class HackRF(SDR):
def close(self):
self.radio.close()
def _stream_rx(self, callback):
def record(self, num_samples):
"""
Record a specified number of samples from the HackRF using block capture mode.
This is more reliable than streaming for USB2 connections.
:param num_samples: Number of samples to capture
:type num_samples: int
:return: Recording object containing the captured data
:rtype: Recording
"""
if not self._rx_initialized:
raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record()")
return NotImplementedError("RX not yet implemented for HackRF")
raise RuntimeError("RX was not initialized. init_rx() must be called before record()")
print("HackRF Starting RX...")
# Use libhackrf's block capture method
all_samples = self.radio.read_samples(num_samples)
print("HackRF RX Completed.")
# Create 1xN array for single-channel recording
store_array = np.zeros((1, num_samples), dtype=np.complex64)
store_array[0, :] = all_samples
metadata = {
"source": self.__class__.__name__,
"sample_rate": self.rx_sample_rate,
"center_frequency": self.rx_center_frequency,
"gain": self.rx_gain,
}
return Recording(data=store_array, metadata=metadata)
def _stream_rx(self, callback):
"""
Stream samples from the HackRF using a callback function.
:param callback: Function to call for each buffer of samples
:type callback: callable
"""
if not self._rx_initialized:
raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx()")
print("HackRF Starting RX stream...")
self._enable_rx = True
def rx_callback(hackrf_transfer):
"""Internal callback that wraps the user's callback"""
try:
if not self._enable_rx:
return 1 # Stop
c = hackrf_transfer.contents
# Use ctypes string_at to safely copy the buffer
from ctypes import string_at
byte_data = string_at(c.buffer, c.valid_length)
# Convert bytes to int8, then to float32, then view as complex64
samples = np.frombuffer(byte_data, dtype=np.int8).astype(np.float32).view(np.complex64)
# Call user's callback
callback(buffer=samples, metadata=None)
return 0 if self._enable_rx else 1
except Exception as e:
print(f"Error in rx_callback: {e}")
return 1 # Stop on error
# Start RX
self.radio.start_rx(rx_callback)
# Wait while streaming
while self._enable_rx:
time.sleep(0.1)
# Stop RX
self.radio.stop_rx()
print("HackRF RX stream completed.")
def _stream_tx(self, callback):
return super()._stream_tx(callback)

View File

@ -0,0 +1,190 @@
"""RTL-SDR device integration for the RIA Toolkit."""
import time
import warnings
from typing import Optional
import numpy as np
try:
from rtlsdr import RtlSdr
except ImportError as exc: # pragma: no cover - dependency provided by end user
raise ImportError("pyrtlsdr is required to use the RTLSDR class") from exc
from ria_toolkit_oss.sdr.sdr import SDR
class RTLSDR(SDR):
"""SDR interface for RTL-SDR dongles using pyrtlsdr."""
def __init__(self, identifier: Optional[int | str] = None):
super().__init__()
try:
if identifier is None:
self.radio = RtlSdr()
else:
self.radio = RtlSdr(identifier)
print(f"Initialized RTL-SDR with identifier [{identifier}].")
except Exception as exc:
print(f"Failed to initialize RTL-SDR with identifier [{identifier}].")
raise exc
self.rx_buffer_size = 256_000
self.rx_channel = 0
def supports_bias_tee(self) -> bool:
return True
def set_bias_tee(self, enable: bool):
self.radio.set_bias_tee(bool(enable))
state = "enabled" if enable else "disabled"
print(f"RTL-SDR bias tee {state}.")
def init_rx(
self,
sample_rate: int | float,
center_frequency: int | float,
gain: Optional[int],
channel: int,
gain_mode: Optional[str] = "absolute",
buffer_size: Optional[int] = 256_000,
bias_t: bool = False,
):
if channel not in (0, None):
raise ValueError("RTL-SDR supports only channel 0 for RX.")
self.radio.sample_rate = float(sample_rate)
self.rx_sample_rate = self.radio.sample_rate
self.radio.center_freq = float(center_frequency)
self.rx_center_frequency = self.radio.center_freq
available_gains = getattr(self.radio, "gains", [])
if gain is None:
self.radio.gain = "auto"
self.rx_gain = "auto"
else:
if not available_gains:
warnings.warn(
"No gain table reported by RTL-SDR; applying requested gain directly.",
RuntimeWarning,
)
target_gain = gain
else:
if gain_mode == "relative":
if gain > 0:
raise ValueError(
"When gain_mode = 'relative', gain must be < 0. This sets the gain relative to the maximum."
)
target_gain = max(available_gains) + gain
else:
target_gain = gain
min_gain = min(available_gains)
max_gain = max(available_gains)
if target_gain < min_gain or target_gain > max_gain:
print(f"Requested gain {target_gain} dB out of range; clamping to valid span {min_gain}-{max_gain} dB.")
target_gain = min(max(target_gain, min_gain), max_gain)
target_gain = min(available_gains, key=lambda g: abs(g - target_gain))
self.radio.gain = target_gain
self.rx_gain = self.radio.gain
self.rx_buffer_size = int(buffer_size or self.rx_buffer_size)
self.rx_channel = 0
if bias_t:
self.set_bias_tee(True)
time.sleep(1)
self._rx_initialized = True
self._tx_initialized = False
def init_tx(self, *args, **kwargs): # pragma: no cover - RTL-SDR is RX only
raise NotImplementedError("RTL-SDR does not support transmit operations")
def record(self, num_samples):
"""
Record a fixed number of samples from RTL-SDR.
Args:
num_samples: Number of samples to capture
Returns:
Recording object with captured samples
"""
from ria_toolkit_oss.datatypes.recording import Recording
if not self._rx_initialized:
raise RuntimeError("RX was not initialized. init_rx() must be called before record().")
print("RTL-SDR Starting RX...")
# RTL-SDR has USB buffer limitations - use consistent 256k chunks
# Always read full chunks to avoid USB overflow issues with partial reads
max_samples_per_read = 262144 # 256k samples = stable chunk size
num_full_reads = num_samples // max_samples_per_read
remainder = num_samples % max_samples_per_read
signal = np.array([], dtype=np.complex64)
# Read full chunks
for i in range(num_full_reads):
try:
chunk = self.radio.read_samples(max_samples_per_read)
signal = np.append(signal, chunk)
except Exception as e:
print(f"Error while reading samples: {e}")
break
# Read remainder if needed (round up to power of 2 for USB compatibility)
if remainder > 0 and len(signal) == num_full_reads * max_samples_per_read:
# Round up to next 16k boundary for USB stability
padded_remainder = ((remainder + 16383) // 16384) * 16384
try:
chunk = self.radio.read_samples(padded_remainder)
signal = np.append(signal, chunk[:remainder]) # Only keep what we need
except Exception as e:
print(f"Error while reading final chunk: {e}")
print("RTL-SDR RX Completed.")
# Create 1xN array for single-channel recording
store_array = np.zeros((1, len(signal)), dtype=np.complex64)
store_array[0, :] = signal
metadata = {
"source": self.__class__.__name__,
"sample_rate": self.rx_sample_rate,
"center_frequency": self.rx_center_frequency,
"gain": self.rx_gain,
}
return Recording(data=store_array, metadata=metadata)
def _stream_rx(self, callback):
if not self._rx_initialized:
raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record().")
print("RTL-SDR Starting RX...")
self._enable_rx = True
try:
while self._enable_rx:
samples = self.radio.read_samples(self.rx_buffer_size)
callback(buffer=np.asarray(samples, dtype=np.complex64), metadata=None)
finally:
print("RTL-SDR RX Completed.")
def _stream_tx(self, callback): # pragma: no cover - RTL-SDR is RX only
raise NotImplementedError("RTL-SDR does not support transmit operations")
def close(self):
try:
self.radio.close()
finally:
self._enable_rx = False
self._enable_tx = False
def set_clock_source(self, source): # pragma: no cover - not applicable to RTL-SDR
raise NotImplementedError("RTL-SDR does not support external clock configuration")

View File

@ -304,6 +304,14 @@ class SDR(ABC):
def stop(self):
self.pause_rx()
def supports_bias_tee(self) -> bool:
"""Return True when the radio supports bias-tee control."""
return False
def set_bias_tee(self, enable: bool):
"""Enable or disable bias-tee power when supported by the radio."""
raise NotImplementedError(f"{self.__class__.__name__} does not support bias-tee control")
@abstractmethod
def close(self):
pass