Push Tracker
ria-toolkit-oss/src/ria_toolkit_oss/orchestration/tx_executor.py
ben 34b67c0c17
All checks were successful
Build Sphinx Docs Set / Build Docs (pull_request) Successful in 13m32s
Build Project / Build Project (3.12) (pull_request) Successful in 13m49s
Build Project / Build Project (3.11) (pull_request) Successful in 15m28s
Build Project / Build Project (3.10) (pull_request) Successful in 15m37s
Test with tox / Test with tox (3.10) (pull_request) Successful in 6m40s
Test with tox / Test with tox (3.11) (pull_request) Successful in 4m27s
Test with tox / Test with tox (3.12) (pull_request) Successful in 7m57s
campaign loop support
2026-04-21 15:56:04 -04:00

300 lines
11 KiB
Python

"""TX campaign executor — synthesises and transmits signals via a local SDR.
The TxExecutor receives a transmitter config dict (matching the
``sdr_agent`` control method's schema) and a step schedule, then for each
step builds a signal chain with the block generator and transmits it via
the local SDR device.
Supported modulations (``modulation`` field in config):
BPSK, QPSK, 8PSK, 16QAM, 64QAM, 256QAM, FSK, OOK, GMSK, OQPSK
Example config dict (matches CampaignConfig transmitter with
``control_method: sdr_agent``)::
{
"id": "synthetic-tx",
"type": "sdr",
"control_method": "sdr_agent",
"sdr_agent": {
"modulation": "QPSK",
"order": 4,
"symbol_rate": 1000000,
"center_frequency": 0.0,
"filter": "rrc",
"rolloff": 0.35
},
"schedule": [
{"label": "step1", "duration": 10, "power_dbm": -10}
]
}
"""
from __future__ import annotations
import logging
import threading
from typing import Any
logger = logging.getLogger(__name__)
def _parse_hz(val: object) -> float:
"""Parse a frequency value that may be a float (Hz) or a string like '2.45GHz'."""
if isinstance(val, (int, float)):
return float(val)
s = str(val).strip()
for suffix, mult in (("GHz", 1e9), ("MHz", 1e6), ("kHz", 1e3), ("Hz", 1.0)):
if s.endswith(suffix):
return float(s[: -len(suffix)]) * mult
return float(s)
def _parse_seconds(val: object) -> float:
"""Parse a duration value that may be a float (seconds) or a string like '5s'."""
if isinstance(val, (int, float)):
return float(val)
s = str(val).strip()
return float(s[:-1]) if s.endswith("s") else float(s)
# Mapping from modulation name → (PSK/QAM order, generator_type)
# 'psk' uses PSKGenerator, 'qam' uses QAMGenerator
_MOD_TABLE: dict[str, tuple[int, str]] = {
"BPSK": (1, "psk"),
"QPSK": (2, "psk"),
"8PSK": (3, "psk"),
"16QAM": (4, "qam"),
"64QAM": (6, "qam"),
"256QAM": (8, "qam"),
}
_SPECIAL_MODS = {"FSK", "OOK", "GMSK", "OQPSK"}
# usrp-uhd-client's tx_recording() streams 2 000-sample chunks and loops the
# source buffer for the full tx_time, so only this many samples ever need to
# be in RAM regardless of step duration or sample rate.
# 50 000 complex64 samples ≈ 400 kB — enough spectral diversity for looping.
_SYNTH_BLOCK_SAMPLES = 50_000
class TxExecutor:
"""Synthesise and transmit a signal campaign via a local SDR.
Args:
config: Transmitter config dict (must have ``sdr_agent`` sub-dict with
modulation params, and ``schedule`` list of step dicts).
sdr_device: SDR device name to open in TX mode (e.g. "pluto", "usrp").
stop_event: External event that aborts the TX loop mid-step.
"""
def __init__(
self,
config: dict,
sdr_device: str = "unknown",
stop_event: threading.Event | None = None,
) -> None:
self.config = config
self.sdr_device = sdr_device
self.stop_event = stop_event or threading.Event()
self._sdr: Any = None
def run(self) -> None:
"""Execute all steps in the schedule, transmitting for each step duration."""
agent_cfg: dict = self.config.get("sdr_agent") or {}
schedule: list[dict] = self.config.get("schedule") or []
if not schedule:
logger.warning("TxExecutor: no schedule steps — nothing to transmit")
return
modulation: str = agent_cfg.get("modulation", "QPSK").upper()
symbol_rate: float = float(agent_cfg.get("symbol_rate", 1e6))
center_freq: float = _parse_hz(agent_cfg.get("center_frequency", 0.0))
filter_type: str = agent_cfg.get("filter", "rrc").lower()
rolloff: float = float(agent_cfg.get("rolloff", 0.35))
loops: int = max(1, int(self.config.get("loops", 1)))
# Upsampling factor: samples_per_symbol, fixed at 8 for SDR compatibility.
sps = 8
sample_rate = symbol_rate * sps
self._init_sdr(sample_rate, center_freq)
try:
for loop_idx in range(loops):
if self.stop_event.is_set():
break
if loops > 1:
logger.info("TX loop %d/%d", loop_idx + 1, loops)
for step in schedule:
if self.stop_event.is_set():
break
looped_step = (
{**step, "label": f"{step.get('label', 'step')}_run{loop_idx + 1:02d}"} if loops > 1 else step
)
self._execute_step(looped_step, modulation, sps, symbol_rate, filter_type, rolloff)
finally:
self._close_sdr()
def _execute_step(
self,
step: dict,
modulation: str,
sps: int,
symbol_rate: float,
filter_type: str,
rolloff: float,
) -> None:
duration: float = _parse_seconds(step.get("duration", 10.0))
label: str = step.get("label", "step")
gain: float = float(step.get("power_dbm") or 0.0)
sample_rate = symbol_rate * sps
logger.info(
"TX step '%s': %.0f s, %s @ %.3f MHz (sps=%d, filter=%s)",
label,
duration,
modulation,
symbol_rate / 1e6,
sps,
filter_type,
)
num_samples = int(duration * sample_rate)
# Synthesise a short representative block. tx_recording() loops this
# buffer for the full tx_time using a 2 000-sample streaming callback,
# so peak memory is O(_SYNTH_BLOCK_SAMPLES) regardless of duration.
block_size = min(num_samples, _SYNTH_BLOCK_SAMPLES)
signal = self._synthesise(modulation, sps, block_size, filter_type, rolloff)
if self._sdr is not None:
try:
# Apply gain update if SDR supports it
if hasattr(self._sdr, "set_tx_gain"):
self._sdr.set_tx_gain(gain)
self._sdr.tx_recording(signal, tx_time=duration)
except Exception as exc:
logger.error("TX step '%s' SDR error: %s", label, exc)
else:
# No SDR available — simulate by sleeping for the step duration.
logger.warning("TX step '%s': no SDR — simulating %.0f s delay", label, duration)
self.stop_event.wait(timeout=duration)
def _synthesise(
self,
modulation: str,
sps: int,
num_samples: int,
filter_type: str,
rolloff: float,
):
"""Build a block-generator chain and return IQ samples as a numpy array."""
try:
import numpy as np
from ria_toolkit_oss.signal.block_generator import (
BinarySource,
GMSKModulator,
Mapper,
OOKModulator,
OQPSKModulator,
RaisedCosineFilter,
RootRaisedCosineFilter,
Upsampling,
)
from ria_toolkit_oss.signal.block_generator.continuous_modulation.fsk_modulator import (
FSKModulator,
)
except ImportError as exc:
raise RuntimeError(f"ria_toolkit_oss block generator not available: {exc}") from exc
# ── Special modulations with their own source-connected modulator ──
if modulation in ("OOK", "GMSK", "OQPSK"):
src = BinarySource()
if modulation == "OOK":
mod = OOKModulator(src, samples_per_symbol=sps)
elif modulation == "GMSK":
mod = GMSKModulator(src, samples_per_symbol=sps)
else:
mod = OQPSKModulator(src, samples_per_symbol=sps)
recording = mod.record(num_samples)
flat = np.asarray(recording.data).flatten().astype(np.complex64)
if len(flat) < num_samples:
flat = np.tile(flat, num_samples // len(flat) + 1)
return flat[:num_samples]
if modulation == "FSK":
symbol_rate = num_samples / sps
bits_per_sym = 1 # 2-FSK
num_bits = max(num_samples // sps, 128) * bits_per_sym
bits = BinarySource()((1, num_bits))
mod = FSKModulator(
num_bits_per_symbol=bits_per_sym,
frequency_spacing=symbol_rate * 0.5,
symbol_duration=1.0 / max(symbol_rate, 1.0),
sampling_frequency=symbol_rate * sps,
)
flat = np.asarray(mod(bits)).flatten().astype(np.complex64)
if len(flat) < num_samples:
flat = np.tile(flat, num_samples // len(flat) + 1)
return flat[:num_samples]
# ── PSK / QAM via Mapper → Upsampling → pulse filter ──────────────
if modulation not in _MOD_TABLE:
logger.warning("Unknown modulation %r — defaulting to QPSK", modulation)
modulation = "QPSK"
bits_per_sym, gen_type = _MOD_TABLE[modulation]
mod_family = "QAM" if gen_type == "qam" else "PSK"
source = BinarySource()
mapper = Mapper(constellation_type=mod_family, num_bits_per_symbol=bits_per_sym)
upsampler = Upsampling(factor=sps)
mapper.connect_input([source])
upsampler.connect_input([mapper])
if filter_type in ("rrc",):
pulse_filter = RootRaisedCosineFilter(span_in_symbols=6, upsampling_factor=sps, beta=rolloff)
pulse_filter.connect_input([upsampler])
recording = pulse_filter.record(num_samples)
elif filter_type in ("rc",):
pulse_filter = RaisedCosineFilter(span_in_symbols=6, upsampling_factor=sps, beta=rolloff)
pulse_filter.connect_input([upsampler])
recording = pulse_filter.record(num_samples)
else:
# "none", "rect", "gaussian" — use upsampler output directly
recording = upsampler.record(num_samples)
flat = np.asarray(recording.data).flatten().astype(np.complex64)
if len(flat) < num_samples:
flat = np.tile(flat, num_samples // len(flat) + 1)
return flat[:num_samples]
def _init_sdr(self, sample_rate: float, center_freq: float) -> None:
try:
from ria_toolkit_oss.sdr import get_sdr_device
self._sdr = get_sdr_device(self.sdr_device)
self._sdr.init_tx(
sample_rate=sample_rate,
center_frequency=center_freq,
gain=0,
channel=0,
gain_mode="manual",
)
logger.info(
"TX SDR initialised: %s @ %.3f MHz, %.1f Msps", self.sdr_device, center_freq / 1e6, sample_rate / 1e6
)
except Exception as exc:
logger.warning("TX SDR init failed (%s) — will simulate: %s", self.sdr_device, exc)
self._sdr = None
def _close_sdr(self) -> None:
if self._sdr is not None:
try:
self._sdr.close()
except Exception as exc:
logger.debug("TX SDR close error: %s", exc)
self._sdr = None