updates_and_fixes #12

Merged
madrigal merged 9 commits from updates_and_fixes into main 2025-11-18 15:01:25 -05:00
Showing only changes of commit bca962d7b2 - Show all commits

View File

@ -6,7 +6,7 @@ import numpy as np
from ria_toolkit_oss.datatypes.recording import Recording from ria_toolkit_oss.datatypes.recording import Recording
from ria_toolkit_oss.sdr._external.libhackrf import HackRF as hrf from ria_toolkit_oss.sdr._external.libhackrf import HackRF as hrf
from ria_toolkit_oss.sdr.sdr import SDR from ria_toolkit_oss.sdr.sdr import SDR, SDRParameterError
class HackRF(SDR): class HackRF(SDR):
@ -21,7 +21,7 @@ class HackRF(SDR):
""" """
if identifier != "": if identifier != "":
print(f"Warning, radio identifier {identifier} provided for HackRF but will not be used.") warnings.warn(f"HackRF: Identifier '{identifier}' will be ignored", UserWarning)
print("Initializing HackRF radio.") print("Initializing HackRF radio.")
try: try:
@ -33,8 +33,6 @@ class HackRF(SDR):
print("Failed to find HackRF radio.") print("Failed to find HackRF radio.")
raise e raise e
super().__init__()
def init_rx( def init_rx(
self, self,
sample_rate: int | float, sample_rate: int | float,
@ -64,14 +62,8 @@ class HackRF(SDR):
:type gain_mode: str :type gain_mode: str
""" """
print("Initializing RX") print("Initializing RX")
self.set_sample_rate(sample_rate=sample_rate)
self.rx_sample_rate = sample_rate self.set_center_frequency(center_frequency=center_frequency)
self.radio.sample_rate = int(sample_rate)
print(f"HackRF sample rate = {self.radio.sample_rate}")
self.rx_center_frequency = center_frequency
self.radio.center_freq = int(center_frequency)
print(f"HackRF center frequency = {self.radio.center_freq}")
# Distribute gain across amplifier stages # Distribute gain across amplifier stages
rx_gain_min = 0 rx_gain_min = 0
@ -79,7 +71,7 @@ class HackRF(SDR):
if gain_mode == "relative": if gain_mode == "relative":
if gain > 0: if gain > 0:
raise ValueError( raise SDRParameterError(
"When gain_mode = 'relative', gain must be < 0. This " "When gain_mode = 'relative', gain must be < 0. This "
"sets the gain relative to the maximum possible gain." "sets the gain relative to the maximum possible gain."
) )
@ -99,7 +91,9 @@ class HackRF(SDR):
self.rx_gain = abs_gain self.rx_gain = abs_gain
print(f"HackRF gain distribution: Amp={self.amp_enabled}, LNA={self.rx_lna_gain}dB, VGA={self.rx_vga_gain}dB") print(f"HackRF gain distribution: Amp={self.amp_enabled}, LNA={self.rx_lna_gain}dB, VGA={self.rx_vga_gain}dB")
print("To individually modify the HackRF gains, use set_gain_amp(), set_rx_lna_gain(), and set_rx_vga_gain().") print(
"To individually modify the HackRF gains, use set_gain_amp(), set_rx_lna_gain(), and set_rx_vga_gain().\n"
)
self._tx_initialized = False self._tx_initialized = False
self._rx_initialized = True self._rx_initialized = True
@ -122,13 +116,13 @@ class HackRF(SDR):
raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record()") raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record()")
if num_samples is not None and rx_time is not None: if num_samples is not None and rx_time is not None:
raise ValueError("Only input one of num_samples or rx_time") raise SDRParameterError("Only input one of num_samples or rx_time")
elif num_samples is not None: elif num_samples is not None:
self._num_samples_to_record = num_samples self._num_samples_to_record = num_samples
elif rx_time is not None: elif rx_time is not None:
self._num_samples_to_record = int(rx_time * self.rx_sample_rate) self._num_samples_to_record = int(rx_time * self.sample_rate)
else: else:
raise ValueError("Must provide input of one of num_samples or rx_time") raise SDRParameterError("Must provide input of one of num_samples or rx_time")
print("HackRF Starting RX...") print("HackRF Starting RX...")
@ -137,18 +131,15 @@ class HackRF(SDR):
print("HackRF RX Completed.") print("HackRF RX Completed.")
# Create 1xN array for single-channel recording rx_complex = self.convert_rx_samples(rx_samples=all_samples)
store_array = np.zeros((1, self._num_samples_to_record), dtype=np.complex64)
store_array[0, :] = all_samples
metadata = { metadata = {
"source": self.__class__.__name__, "source": self.__class__.__name__,
"sample_rate": self.rx_sample_rate, "sample_rate": self.sample_rate,
"center_frequency": self.rx_center_frequency, "center_frequency": self.center_frequency,
"gain": self.rx_gain, "gain": self.rx_gain,
} }
return Recording(data=store_array, metadata=metadata) return Recording(data=rx_complex, metadata=metadata)
def init_tx( def init_tx(
self, self,
@ -174,19 +165,14 @@ class HackRF(SDR):
""" """
print("Initializing TX") print("Initializing TX")
self.tx_sample_rate = sample_rate self.set_sample_rate(sample_rate=sample_rate)
self.radio.sample_rate = int(sample_rate) self.set_center_frequency(center_frequency=center_frequency)
print(f"HackRF sample rate = {self.radio.sample_rate}")
self.tx_center_frequency = center_frequency
self.radio.center_freq = int(center_frequency)
print(f"HackRF center frequency = {self.radio.center_freq}")
tx_gain_min = 0 tx_gain_min = 0
tx_gain_max = 47 tx_gain_max = 47
if gain_mode == "relative": if gain_mode == "relative":
if gain > 0: if gain > 0:
raise ValueError( raise SDRParameterError(
"When gain_mode = 'relative', gain must be < 0. This \ "When gain_mode = 'relative', gain must be < 0. This \
sets the gain relative to the maximum possible gain." sets the gain relative to the maximum possible gain."
) )
@ -197,14 +183,14 @@ class HackRF(SDR):
if abs_gain < tx_gain_min or abs_gain > tx_gain_max: if abs_gain < tx_gain_min or abs_gain > tx_gain_max:
abs_gain = min(max(gain, tx_gain_min), tx_gain_max) abs_gain = min(max(gain, tx_gain_min), tx_gain_max)
print(f"Gain {gain} out of range for Pluto.") print(f"Gain {gain} out of range for HackRF.")
print(f"Gain range: {tx_gain_min} to {tx_gain_max} dB") print(f"Gain range: {tx_gain_min} to {tx_gain_max} dB")
self.set_gain_amp(True) self.set_gain_amp(True)
self.set_tx_vga_gain(abs_gain) self.set_tx_vga_gain(abs_gain)
self.tx_gain = abs_gain self.tx_gain = abs_gain
print(f"HackRF gain distribution: Amp={self.amp_enabled}, VGA={self.tx_vga_gain}dB") print(f"HackRF gain distribution: Amp={self.amp_enabled}, VGA={self.tx_vga_gain}dB")
print("To individually modify the HackRF gains, use set_gain_amp() or set_tx_vga_gain().") print("To individually modify the HackRF gains, use set_gain_amp() or set_tx_vga_gain().\n")
self._tx_initialized = True self._tx_initialized = True
self._rx_initialized = False self._rx_initialized = False
@ -229,13 +215,13 @@ class HackRF(SDR):
:type tx_time: int or float, optional :type tx_time: int or float, optional
""" """
if num_samples is not None and tx_time is not None: if num_samples is not None and tx_time is not None:
raise ValueError("Only input one of num_samples or tx_time") raise SDRParameterError("Only input one of num_samples or tx_time")
elif num_samples is not None: elif num_samples is not None:
tx_time = num_samples / self.tx_sample_rate tx_time = num_samples / self.sample_rate
elif tx_time is not None: elif tx_time is not None:
pass pass
else: else:
tx_time = len(recording) / self.tx_sample_rate tx_time = len(recording) / self.sample_rate
if isinstance(recording, np.ndarray): if isinstance(recording, np.ndarray):
samples = recording samples = recording
@ -275,6 +261,62 @@ class HackRF(SDR):
self.radio.set_txvga_gain(vga_gain) self.radio.set_txvga_gain(vga_gain)
self.tx_vga_gain = vga_gain self.tx_vga_gain = vga_gain
def set_sample_rate(self, sample_rate):
if sample_rate < 2e6 or sample_rate > 20e6:
raise SDRParameterError(
f"{self.__class__.__name__}: Sample rate {sample_rate/1e6:.3f} Msps "
f"out of range: [{2:.3f} - {20:.3f} Msps]"
)
self.sample_rate = sample_rate
self.radio.sample_rate = int(sample_rate)
print(f"HackRF sample rate = {self.radio.sample_rate}")
def set_rx_sample_rate(self, sample_rate):
"""
Set the sample rate.
Not callable during recording; HackRF requires stream stop/restart to change sample rate.
"""
self.set_sample_rate(sample_rate=sample_rate)
def set_tx_sample_rate(self, sample_rate):
self.set_sample_rate(sample_rate=sample_rate)
def set_center_frequency(self, center_frequency):
with self._param_lock:
if center_frequency < 1e6 or center_frequency > 6e9:
raise SDRParameterError(
f"{self.__class__.__name__}: Center frequency {center_frequency/1e9:.3f} GHz "
f"out of range: [{1e6/1e9:.3f} - {6e9/1e9:.3f} GHz]"
)
self.center_frequency = center_frequency
self.radio.center_freq = int(center_frequency)
print(f"HackRF center frequency = {self.radio.center_freq}")
def set_rx_center_frequency(self, center_frequency):
"""
Set the center frequency. Callable during streaming.
"""
self.set_center_frequency(center_frequency=center_frequency)
def set_tx_center_frequency(self, center_frequency):
self.set_center_frequency(center_frequency=center_frequency)
def convert_rx_samples(self, rx_samples):
# Handle conversion depending on dtype
if np.issubdtype(rx_samples.dtype, np.complexfloating):
# Already complex: just normalize
rx_complex = rx_samples.astype(np.complex64) / 128.0
elif np.issubdtype(rx_samples.dtype, np.integer):
# Raw interleaved I/Q bytes: convert to complex
i_samples = rx_samples[0::2].astype(np.float32)
q_samples = rx_samples[1::2].astype(np.float32)
rx_complex = (i_samples + 1j * q_samples) / 128.0
else:
raise TypeError(f"Unexpected dtype from read_samples: {rx_samples.dtype}")
# Ensure 2D array: 1xN for single channel
return rx_complex.reshape((1, -1))
def set_clock_source(self, source): def set_clock_source(self, source):
self.radio.set_clock_source(source) self.radio.set_clock_source(source)
@ -288,7 +330,11 @@ class HackRF(SDR):
raise NotImplementedError("Underlying HackRF interface lacks bias-tee control") from exc raise NotImplementedError("Underlying HackRF interface lacks bias-tee control") from exc
def close(self): def close(self):
self.radio.close() try:
self.radio.close()
del self.radio
finally:
self._enable_rx = False
def _stream_rx(self, callback): def _stream_rx(self, callback):
""" """
@ -342,3 +388,6 @@ class HackRF(SDR):
def _stream_tx(self, callback): def _stream_tx(self, callback):
return super()._stream_tx(callback) return super()._stream_tx(callback)
def supports_dynamic_updates(self) -> dict:
return {"center_frequency": True, "sample_rate": False, "gain": False}