ria-toolkit-oss/src/ria_toolkit_oss/sdr/rtlsdr.py

191 lines
6.8 KiB
Python
Raw Normal View History

"""RTL-SDR device integration for the RIA Toolkit."""
import time
import warnings
from typing import Optional
import numpy as np
try:
from rtlsdr import RtlSdr
except ImportError as exc: # pragma: no cover - dependency provided by end user
raise ImportError("pyrtlsdr is required to use the RTLSDR class") from exc
from ria_toolkit_oss.sdr.sdr import SDR
class RTLSDR(SDR):
"""SDR interface for RTL-SDR dongles using pyrtlsdr."""
def __init__(self, identifier: Optional[int | str] = None):
super().__init__()
try:
if identifier is None:
self.radio = RtlSdr()
else:
self.radio = RtlSdr(identifier)
print(f"Initialized RTL-SDR with identifier [{identifier}].")
except Exception as exc:
print(f"Failed to initialize RTL-SDR with identifier [{identifier}].")
raise exc
self.rx_buffer_size = 256_000
self.rx_channel = 0
def supports_bias_tee(self) -> bool:
return True
def set_bias_tee(self, enable: bool):
self.radio.set_bias_tee(bool(enable))
state = "enabled" if enable else "disabled"
print(f"RTL-SDR bias tee {state}.")
def init_rx(
self,
sample_rate: int | float,
center_frequency: int | float,
gain: Optional[int],
channel: int,
gain_mode: Optional[str] = "absolute",
buffer_size: Optional[int] = 256_000,
bias_t: bool = False,
):
if channel not in (0, None):
raise ValueError("RTL-SDR supports only channel 0 for RX.")
self.radio.sample_rate = float(sample_rate)
self.rx_sample_rate = self.radio.sample_rate
self.radio.center_freq = float(center_frequency)
self.rx_center_frequency = self.radio.center_freq
available_gains = getattr(self.radio, "gains", [])
if gain is None:
self.radio.gain = "auto"
self.rx_gain = "auto"
else:
if not available_gains:
warnings.warn(
"No gain table reported by RTL-SDR; applying requested gain directly.",
RuntimeWarning,
)
target_gain = gain
else:
if gain_mode == "relative":
if gain > 0:
raise ValueError(
"When gain_mode = 'relative', gain must be < 0. This sets the gain relative to the maximum."
)
target_gain = max(available_gains) + gain
else:
target_gain = gain
min_gain = min(available_gains)
max_gain = max(available_gains)
if target_gain < min_gain or target_gain > max_gain:
print(f"Requested gain {target_gain} dB out of range; clamping to valid span {min_gain}-{max_gain} dB.")
target_gain = min(max(target_gain, min_gain), max_gain)
target_gain = min(available_gains, key=lambda g: abs(g - target_gain))
self.radio.gain = target_gain
self.rx_gain = self.radio.gain
self.rx_buffer_size = int(buffer_size or self.rx_buffer_size)
self.rx_channel = 0
if bias_t:
self.set_bias_tee(True)
time.sleep(1)
self._rx_initialized = True
self._tx_initialized = False
def init_tx(self, *args, **kwargs): # pragma: no cover - RTL-SDR is RX only
raise NotImplementedError("RTL-SDR does not support transmit operations")
def record(self, num_samples):
"""
Record a fixed number of samples from RTL-SDR.
Args:
num_samples: Number of samples to capture
Returns:
Recording object with captured samples
"""
from ria_toolkit_oss.datatypes.recording import Recording
if not self._rx_initialized:
raise RuntimeError("RX was not initialized. init_rx() must be called before record().")
print("RTL-SDR Starting RX...")
# RTL-SDR has USB buffer limitations - use consistent 256k chunks
# Always read full chunks to avoid USB overflow issues with partial reads
max_samples_per_read = 262144 # 256k samples = stable chunk size
num_full_reads = num_samples // max_samples_per_read
remainder = num_samples % max_samples_per_read
signal = np.array([], dtype=np.complex64)
# Read full chunks
for i in range(num_full_reads):
try:
chunk = self.radio.read_samples(max_samples_per_read)
signal = np.append(signal, chunk)
except Exception as e:
print(f"Error while reading samples: {e}")
break
# Read remainder if needed (round up to power of 2 for USB compatibility)
if remainder > 0 and len(signal) == num_full_reads * max_samples_per_read:
# Round up to next 16k boundary for USB stability
padded_remainder = ((remainder + 16383) // 16384) * 16384
try:
chunk = self.radio.read_samples(padded_remainder)
signal = np.append(signal, chunk[:remainder]) # Only keep what we need
except Exception as e:
print(f"Error while reading final chunk: {e}")
print("RTL-SDR RX Completed.")
# Create 1xN array for single-channel recording
store_array = np.zeros((1, len(signal)), dtype=np.complex64)
store_array[0, :] = signal
metadata = {
"source": self.__class__.__name__,
"sample_rate": self.rx_sample_rate,
"center_frequency": self.rx_center_frequency,
"gain": self.rx_gain,
}
return Recording(data=store_array, metadata=metadata)
def _stream_rx(self, callback):
if not self._rx_initialized:
raise RuntimeError("RX was not initialized. init_rx() must be called before _stream_rx() or record().")
print("RTL-SDR Starting RX...")
self._enable_rx = True
try:
while self._enable_rx:
samples = self.radio.read_samples(self.rx_buffer_size)
callback(buffer=np.asarray(samples, dtype=np.complex64), metadata=None)
finally:
print("RTL-SDR RX Completed.")
def _stream_tx(self, callback): # pragma: no cover - RTL-SDR is RX only
raise NotImplementedError("RTL-SDR does not support transmit operations")
def close(self):
try:
self.radio.close()
finally:
self._enable_rx = False
self._enable_tx = False
def set_clock_source(self, source): # pragma: no cover - not applicable to RTL-SDR
raise NotImplementedError("RTL-SDR does not support external clock configuration")