ria-toolkit-oss/src/ria_toolkit_oss/sdr/hackrf.py

392 lines
14 KiB
Python
Raw Normal View History

2025-09-12 11:32:49 -04:00
import time
import warnings
from typing import Optional
import numpy as np
from ria_toolkit_oss.data.recording import Recording
from ria_toolkit_oss.sdr._external.libhackrf import HackRF as hrf
from ria_toolkit_oss.sdr.sdr import SDR, SDRParameterError
2025-09-12 11:32:49 -04:00
class HackRF(SDR):
def __init__(self, identifier=""):
"""
Initialize a HackRF device object and connect to the SDR hardware.
:param identifier: Not used for HackRF.
HackRF devices cannot currently be selected with and identifier value.
If there are multiple connected devices, the device in use may be selected randomly.
"""
if identifier != "":
warnings.warn(f"HackRF: Identifier '{identifier}' will be ignored", UserWarning)
2025-09-12 11:32:49 -04:00
print("Initializing HackRF radio.")
try:
super().__init__()
self.radio = hrf()
print("Successfully found HackRF radio.")
except Exception as e:
print("Failed to find HackRF radio.")
raise e
M
2025-10-16 15:22:07 -04:00
def init_rx(
self,
sample_rate: int | float,
center_frequency: int | float,
gain: int,
channel: int,
gain_mode: Optional[str] = "absolute",
):
"""
Initializes the HackRF for receiving.
HackRF has 3 gain stages:
- 14 dB front-end amplifier (on/off)
- LNA gain: 0-40 dB in 8 dB steps
- VGA gain: 0-62 dB in 2 dB steps
:param sample_rate: The sample rate for receiving.
:type sample_rate: int or float
:param center_frequency: The center frequency of the recording.
:type center_frequency: int or float
M
2025-10-16 15:22:07 -04:00
:param gain: The LNA gain set for receiving on the HackRF
:type gain: int
:param channel: The channel the HackRF is set to. (Not actually used)
:type channel: int
M
2025-10-16 15:22:07 -04:00
:param gain_mode: 'absolute' passes gain directly to the sdr,
'relative' means that gain should be a negative value, and it will be subtracted from the max gain (40).
:type gain_mode: str
"""
print("Initializing RX")
self.set_sample_rate(sample_rate=sample_rate)
self.set_center_frequency(center_frequency=center_frequency)
# Distribute gain across amplifier stages
rx_gain_min = 0
M
2025-10-16 15:22:07 -04:00
rx_gain_max = 40 # (LNA)
if gain_mode == "relative":
if gain > 0:
raise SDRParameterError(
"When gain_mode = 'relative', gain must be < 0. This "
"sets the gain relative to the maximum possible gain."
)
else:
abs_gain = rx_gain_max + gain
else:
abs_gain = gain
if abs_gain < rx_gain_min or abs_gain > rx_gain_max:
abs_gain = min(max(abs_gain, rx_gain_min), rx_gain_max)
print(f"Gain {gain} out of range for HackRF.")
print(f"Gain range: {rx_gain_min} to {rx_gain_max} dB")
M
2025-10-16 15:22:07 -04:00
self.set_gain_amp(False)
self.set_rx_vga_gain(45)
self.set_rx_lna_gain(abs_gain)
M
2025-10-23 16:44:43 -04:00
self.rx_gain = abs_gain
M
2025-10-16 15:22:07 -04:00
print(f"HackRF gain distribution: Amp={self.amp_enabled}, LNA={self.rx_lna_gain}dB, VGA={self.rx_vga_gain}dB")
print(
"To individually modify the HackRF gains, use set_gain_amp(), set_rx_lna_gain(), and set_rx_vga_gain().\n"
)
M
2025-10-16 15:22:07 -04:00
self._tx_initialized = False
self._rx_initialized = True
def record(self, num_samples: Optional[int] = None, rx_time: Optional[int | float] = None):
"""
Create a radio recording (iq samples and metadata) of a given length from the SDR.
HackRF uses block capture mode, which is more reliable than streaming for USB2 connections.
Either num_samples or rx_time must be provided.
init_rx() must be called before record()
:param num_samples: The number of samples to record.
:type num_samples: int, optional
:param rx_time: The time to record.
:type rx_time: int or float, optional
returns: Recording object (iq samples and metadata)
"""
if not self._rx_initialized:
raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record()")
if num_samples is not None and rx_time is not None:
raise SDRParameterError("Only input one of num_samples or rx_time")
M
2025-10-16 15:22:07 -04:00
elif num_samples is not None:
self._num_samples_to_record = num_samples
elif rx_time is not None:
self._num_samples_to_record = int(rx_time * self.sample_rate)
else:
raise SDRParameterError("Must provide input of one of num_samples or rx_time")
M
2025-10-16 15:22:07 -04:00
print("HackRF Starting RX...")
M
2025-10-16 15:22:07 -04:00
# Use libhackrf's block capture method
all_samples = self.radio.read_samples(self._num_samples_to_record)
M
2025-10-16 15:22:07 -04:00
print("HackRF RX Completed.")
rx_complex = self.convert_rx_samples(rx_samples=all_samples)
M
2025-10-16 15:22:07 -04:00
metadata = {
"source": self.__class__.__name__,
"sample_rate": self.sample_rate,
"center_frequency": self.center_frequency,
M
2025-10-16 15:22:07 -04:00
"gain": self.rx_gain,
}
return Recording(data=rx_complex, metadata=metadata)
2025-09-12 11:32:49 -04:00
def init_tx(
self,
sample_rate: int | float,
center_frequency: int | float,
gain: int,
channel: int,
gain_mode: Optional[str] = "absolute",
):
"""
Initializes the HackRF for transmitting.
:param sample_rate: The sample rate for transmitting.
:type sample_rate: int or float
:param center_frequency: The center frequency of the recording.
:type center_frequency: int or float
:param gain: The gain set for transmitting on the HackRF
:type gain: int
:param channel: The channel the HackRF is set to. (Not actually used)
:type channel: int
:param buffer_size: The buffer size during transmit. Defaults to 10000.
:type buffer_size: int
"""
print("Initializing TX")
self.set_sample_rate(sample_rate=sample_rate)
self.set_center_frequency(center_frequency=center_frequency)
2025-09-12 11:32:49 -04:00
tx_gain_min = 0
tx_gain_max = 47
if gain_mode == "relative":
if gain > 0:
M
2026-01-30 17:51:01 -05:00
raise SDRParameterError("When gain_mode = 'relative', gain must be < 0. This \
sets the gain relative to the maximum possible gain.")
2025-09-12 11:32:49 -04:00
else:
abs_gain = tx_gain_max + gain
else:
abs_gain = gain
if abs_gain < tx_gain_min or abs_gain > tx_gain_max:
abs_gain = min(max(gain, tx_gain_min), tx_gain_max)
print(f"Gain {gain} out of range for HackRF.")
2025-09-12 11:32:49 -04:00
print(f"Gain range: {tx_gain_min} to {tx_gain_max} dB")
M
2025-10-16 15:22:07 -04:00
self.set_gain_amp(True)
self.set_tx_vga_gain(abs_gain)
M
2025-10-23 16:44:43 -04:00
self.tx_gain = abs_gain
M
2025-10-16 15:22:07 -04:00
print(f"HackRF gain distribution: Amp={self.amp_enabled}, VGA={self.tx_vga_gain}dB")
print("To individually modify the HackRF gains, use set_gain_amp() or set_tx_vga_gain().\n")
2025-09-12 11:32:49 -04:00
self._tx_initialized = True
self._rx_initialized = False
def tx_recording(
self,
recording: Recording | np.ndarray,
num_samples: Optional[int] = None,
tx_time: Optional[int | float] = None,
):
"""
Transmit the given iq samples from the provided recording.
init_tx() must be called before this function.
:param recording: The recording to transmit.
:type recording: Recording or np.ndarray
:param num_samples: The number of samples to transmit, will repeat or
truncate the recording to this length. Defaults to None.
:type num_samples: int, optional
:param tx_time: The time to transmit, will repeat or truncate the
recording to this length. Defaults to None.
:type tx_time: int or float, optional
"""
if num_samples is not None and tx_time is not None:
raise SDRParameterError("Only input one of num_samples or tx_time")
2025-09-12 11:32:49 -04:00
elif num_samples is not None:
tx_time = num_samples / self.sample_rate
2025-09-12 11:32:49 -04:00
elif tx_time is not None:
pass
else:
tx_time = len(recording) / self.sample_rate
2025-09-12 11:32:49 -04:00
if isinstance(recording, np.ndarray):
samples = recording
elif isinstance(recording, Recording):
if len(recording.data) > 1:
warnings.warn("Recording object is multichannel, only channel 0 data was used for transmission")
samples = recording.data[0]
samples = samples.astype(np.complex64, copy=False)
M
2025-10-02 10:13:24 -04:00
if np.max(np.abs(samples)) >= 1:
samples = samples / (np.max(np.abs(samples)) + 1e-12)
2025-09-12 11:32:49 -04:00
print("HackRF Starting TX...")
self.radio.start_tx(samples=samples, repeat=True)
time.sleep(tx_time)
self.radio.stop_tx()
print("HackRF Tx Completed.")
M
2025-10-16 15:22:07 -04:00
def set_gain_amp(self, enable):
if enable:
self.radio.enable_amp()
self.amp_enabled = True
else:
self.radio.disable_amp()
self.amp_enabled = False
M
2025-10-16 15:22:07 -04:00
def set_rx_lna_gain(self, lna_gain):
self.radio.set_lna_gain(lna_gain)
self.rx_lna_gain = lna_gain
M
2025-10-16 15:22:07 -04:00
def set_rx_vga_gain(self, vga_gain):
self.radio.set_vga_gain(vga_gain)
self.rx_vga_gain = vga_gain
M
2025-10-16 15:22:07 -04:00
def set_tx_vga_gain(self, vga_gain):
self.radio.set_txvga_gain(vga_gain)
self.tx_vga_gain = vga_gain
def set_sample_rate(self, sample_rate):
if sample_rate < 2e6 or sample_rate > 20e6:
raise SDRParameterError(
f"{self.__class__.__name__}: Sample rate {sample_rate/1e6:.3f} Msps "
f"out of range: [{2:.3f} - {20:.3f} Msps]"
)
self.sample_rate = sample_rate
self.radio.sample_rate = int(sample_rate)
print(f"HackRF sample rate = {self.radio.sample_rate}")
def set_rx_sample_rate(self, sample_rate):
"""
Set the sample rate.
Not callable during recording; HackRF requires stream stop/restart to change sample rate.
"""
self.set_sample_rate(sample_rate=sample_rate)
def set_tx_sample_rate(self, sample_rate):
self.set_sample_rate(sample_rate=sample_rate)
def set_center_frequency(self, center_frequency):
with self._param_lock:
if center_frequency < 1e6 or center_frequency > 6e9:
raise SDRParameterError(
f"{self.__class__.__name__}: Center frequency {center_frequency/1e9:.3f} GHz "
f"out of range: [{1e6/1e9:.3f} - {6e9/1e9:.3f} GHz]"
)
self.center_frequency = center_frequency
self.radio.center_freq = int(center_frequency)
print(f"HackRF center frequency = {self.radio.center_freq}")
def set_rx_center_frequency(self, center_frequency):
"""
Set the center frequency. Callable during streaming.
"""
self.set_center_frequency(center_frequency=center_frequency)
def set_tx_center_frequency(self, center_frequency):
self.set_center_frequency(center_frequency=center_frequency)
def convert_rx_samples(self, rx_samples):
# Handle conversion depending on dtype
if np.issubdtype(rx_samples.dtype, np.complexfloating):
# Already complex: just normalize
rx_complex = rx_samples.astype(np.complex64) / 128.0
elif np.issubdtype(rx_samples.dtype, np.integer):
# Raw interleaved I/Q bytes: convert to complex
i_samples = rx_samples[0::2].astype(np.float32)
q_samples = rx_samples[1::2].astype(np.float32)
rx_complex = (i_samples + 1j * q_samples) / 128.0
else:
raise TypeError(f"Unexpected dtype from read_samples: {rx_samples.dtype}")
# Ensure 2D array: 1xN for single channel
return rx_complex.reshape((1, -1))
M
2025-10-16 15:22:07 -04:00
def set_clock_source(self, source):
self.radio.set_clock_source(source)
M
2025-10-16 15:22:07 -04:00
def supports_bias_tee(self) -> bool:
return True
M
2025-10-16 15:22:07 -04:00
def set_bias_tee(self, enable: bool):
try:
self.radio.set_antenna_enable(bool(enable))
except AttributeError as exc: # pragma: no cover - defensive
raise NotImplementedError("Underlying HackRF interface lacks bias-tee control") from exc
M
2025-10-16 15:22:07 -04:00
def close(self):
try:
self.radio.close()
del self.radio
finally:
self._enable_rx = False
2025-09-12 11:32:49 -04:00
def _stream_rx(self, callback):
"""
Stream samples from the HackRF using a callback function.
:param callback: Function to call for each buffer of samples
:type callback: callable
"""
2025-09-12 11:32:49 -04:00
if not self._rx_initialized:
raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx()")
print("HackRF Starting RX stream...")
self._enable_rx = True
def rx_callback(hackrf_transfer):
"""Internal callback that wraps the user's callback"""
try:
if not self._enable_rx:
return 1 # Stop
c = hackrf_transfer.contents
# Use ctypes string_at to safely copy the buffer
from ctypes import string_at
M
2025-10-16 15:22:07 -04:00
byte_data = string_at(c.buffer, c.valid_length)
# Convert bytes to int8, then to float32, then view as complex64
samples = np.frombuffer(byte_data, dtype=np.int8).astype(np.float32).view(np.complex64)
# Call user's callback
callback(buffer=samples, metadata=None)
return 0 if self._enable_rx else 1
except Exception as e:
print(f"Error in rx_callback: {e}")
return 1 # Stop on error
# Start RX
self.radio.start_rx(rx_callback)
# Wait while streaming
while self._enable_rx:
time.sleep(0.1)
# Stop RX
self.radio.stop_rx()
print("HackRF RX stream completed.")
2025-09-12 11:32:49 -04:00
def _stream_tx(self, callback):
return super()._stream_tx(callback)
def supports_dynamic_updates(self) -> dict:
return {"center_frequency": True, "sample_rate": False, "gain": False}