"""TX campaign executor — synthesises and transmits signals via a local SDR. The TxExecutor receives a transmitter config dict (matching the ``sdr_agent`` control method's schema) and a step schedule, then for each step builds a signal chain with the block generator and transmits it via the local SDR device. Supported modulations (``modulation`` field in config): BPSK, QPSK, 8PSK, 16QAM, 64QAM, 256QAM, FSK, OOK, GMSK, OQPSK Example config dict (matches CampaignConfig transmitter with ``control_method: sdr_agent``):: { "id": "synthetic-tx", "type": "sdr", "control_method": "sdr_agent", "sdr_agent": { "modulation": "QPSK", "order": 4, "symbol_rate": 1000000, "center_frequency": 0.0, "filter": "rrc", "rolloff": 0.35 }, "schedule": [ {"label": "step1", "duration": 10, "power_dbm": -10} ] } """ from __future__ import annotations import logging import threading from typing import Any logger = logging.getLogger(__name__) def _parse_hz(val: object) -> float: """Parse a frequency value that may be a float (Hz) or a string like '2.45GHz'.""" if isinstance(val, (int, float)): return float(val) s = str(val).strip() for suffix, mult in (("GHz", 1e9), ("MHz", 1e6), ("kHz", 1e3), ("Hz", 1.0)): if s.endswith(suffix): return float(s[: -len(suffix)]) * mult return float(s) def _parse_seconds(val: object) -> float: """Parse a duration value that may be a float (seconds) or a string like '5s'.""" if isinstance(val, (int, float)): return float(val) s = str(val).strip() return float(s[:-1]) if s.endswith("s") else float(s) # Mapping from modulation name → (PSK/QAM order, generator_type) # 'psk' uses PSKGenerator, 'qam' uses QAMGenerator _MOD_TABLE: dict[str, tuple[int, str]] = { "BPSK": (1, "psk"), "QPSK": (2, "psk"), "8PSK": (3, "psk"), "16QAM": (4, "qam"), "64QAM": (6, "qam"), "256QAM": (8, "qam"), } _SPECIAL_MODS = {"FSK", "OOK", "GMSK", "OQPSK"} # usrp-uhd-client's tx_recording() streams 2 000-sample chunks and loops the # source buffer for the full tx_time, so only this many samples ever need to # be in RAM regardless of step duration or sample rate. # 50 000 complex64 samples ≈ 400 kB — enough spectral diversity for looping. _SYNTH_BLOCK_SAMPLES = 50_000 class TxExecutor: """Synthesise and transmit a signal campaign via a local SDR. Args: config: Transmitter config dict (must have ``sdr_agent`` sub-dict with modulation params, and ``schedule`` list of step dicts). sdr_device: SDR device name to open in TX mode (e.g. "pluto", "usrp"). stop_event: External event that aborts the TX loop mid-step. """ def __init__( self, config: dict, sdr_device: str = "unknown", stop_event: threading.Event | None = None, ) -> None: self.config = config self.sdr_device = sdr_device self.stop_event = stop_event or threading.Event() self._sdr: Any = None def run(self) -> None: """Execute all steps in the schedule, transmitting for each step duration.""" agent_cfg: dict = self.config.get("sdr_agent") or {} schedule: list[dict] = self.config.get("schedule") or [] if not schedule: logger.warning("TxExecutor: no schedule steps — nothing to transmit") return modulation: str = agent_cfg.get("modulation", "QPSK").upper() symbol_rate: float = float(agent_cfg.get("symbol_rate", 1e6)) center_freq: float = _parse_hz(agent_cfg.get("center_frequency", 0.0)) filter_type: str = agent_cfg.get("filter", "rrc").lower() rolloff: float = float(agent_cfg.get("rolloff", 0.35)) # Upsampling factor: samples_per_symbol, fixed at 8 for SDR compatibility. sps = 8 sample_rate = symbol_rate * sps self._init_sdr(sample_rate, center_freq) try: for step in schedule: if self.stop_event.is_set(): break self._execute_step(step, modulation, sps, symbol_rate, filter_type, rolloff) finally: self._close_sdr() def _execute_step( self, step: dict, modulation: str, sps: int, symbol_rate: float, filter_type: str, rolloff: float, ) -> None: duration: float = _parse_seconds(step.get("duration", 10.0)) label: str = step.get("label", "step") gain: float = float(step.get("power_dbm") or 0.0) sample_rate = symbol_rate * sps logger.info( "TX step '%s': %.0f s, %s @ %.3f MHz (sps=%d, filter=%s)", label, duration, modulation, symbol_rate / 1e6, sps, filter_type, ) num_samples = int(duration * sample_rate) # Synthesise a short representative block. tx_recording() loops this # buffer for the full tx_time using a 2 000-sample streaming callback, # so peak memory is O(_SYNTH_BLOCK_SAMPLES) regardless of duration. block_size = min(num_samples, _SYNTH_BLOCK_SAMPLES) signal = self._synthesise(modulation, sps, block_size, filter_type, rolloff) if self._sdr is not None: try: # Apply gain update if SDR supports it if hasattr(self._sdr, "set_tx_gain"): self._sdr.set_tx_gain(gain) self._sdr.tx_recording(signal, tx_time=duration) except Exception as exc: logger.error("TX step '%s' SDR error: %s", label, exc) else: # No SDR available — simulate by sleeping for the step duration. logger.warning("TX step '%s': no SDR — simulating %.0f s delay", label, duration) self.stop_event.wait(timeout=duration) def _synthesise( self, modulation: str, sps: int, num_samples: int, filter_type: str, rolloff: float, ): """Build a block-generator chain and return IQ samples as a numpy array.""" try: import numpy as np from ria_toolkit_oss.signal.block_generator import ( BinarySource, GMSKModulator, Mapper, OOKModulator, OQPSKModulator, RaisedCosineFilter, RootRaisedCosineFilter, Upsampling, ) from ria_toolkit_oss.signal.block_generator.continuous_modulation.fsk_modulator import ( FSKModulator, ) except ImportError as exc: raise RuntimeError(f"ria_toolkit_oss block generator not available: {exc}") from exc # ── Special modulations with their own source-connected modulator ── if modulation in ("OOK", "GMSK", "OQPSK"): src = BinarySource() if modulation == "OOK": mod = OOKModulator(src, samples_per_symbol=sps) elif modulation == "GMSK": mod = GMSKModulator(src, samples_per_symbol=sps) else: mod = OQPSKModulator(src, samples_per_symbol=sps) recording = mod.record(num_samples) flat = np.asarray(recording.data).flatten().astype(np.complex64) if len(flat) < num_samples: flat = np.tile(flat, num_samples // len(flat) + 1) return flat[:num_samples] if modulation == "FSK": symbol_rate = num_samples / sps bits_per_sym = 1 # 2-FSK num_bits = max(num_samples // sps, 128) * bits_per_sym bits = BinarySource()((1, num_bits)) mod = FSKModulator( num_bits_per_symbol=bits_per_sym, frequency_spacing=symbol_rate * 0.5, symbol_duration=1.0 / max(symbol_rate, 1.0), sampling_frequency=symbol_rate * sps, ) flat = np.asarray(mod(bits)).flatten().astype(np.complex64) if len(flat) < num_samples: flat = np.tile(flat, num_samples // len(flat) + 1) return flat[:num_samples] # ── PSK / QAM via Mapper → Upsampling → pulse filter ────────────── if modulation not in _MOD_TABLE: logger.warning("Unknown modulation %r — defaulting to QPSK", modulation) modulation = "QPSK" bits_per_sym, gen_type = _MOD_TABLE[modulation] mod_family = "QAM" if gen_type == "qam" else "PSK" source = BinarySource() mapper = Mapper(constellation_type=mod_family, num_bits_per_symbol=bits_per_sym) upsampler = Upsampling(factor=sps) mapper.connect_input([source]) upsampler.connect_input([mapper]) if filter_type in ("rrc",): pulse_filter = RootRaisedCosineFilter(span_in_symbols=6, upsampling_factor=sps, beta=rolloff) pulse_filter.connect_input([upsampler]) recording = pulse_filter.record(num_samples) elif filter_type in ("rc",): pulse_filter = RaisedCosineFilter(span_in_symbols=6, upsampling_factor=sps, beta=rolloff) pulse_filter.connect_input([upsampler]) recording = pulse_filter.record(num_samples) else: # "none", "rect", "gaussian" — use upsampler output directly recording = upsampler.record(num_samples) flat = np.asarray(recording.data).flatten().astype(np.complex64) if len(flat) < num_samples: flat = np.tile(flat, num_samples // len(flat) + 1) return flat[:num_samples] def _init_sdr(self, sample_rate: float, center_freq: float) -> None: try: from ria_toolkit_oss.sdr import get_sdr_device self._sdr = get_sdr_device(self.sdr_device) self._sdr.init_tx( sample_rate=sample_rate, center_frequency=center_freq, gain=0, channel=0, gain_mode="manual", ) logger.info( "TX SDR initialised: %s @ %.3f MHz, %.1f Msps", self.sdr_device, center_freq / 1e6, sample_rate / 1e6 ) except Exception as exc: logger.warning("TX SDR init failed (%s) — will simulate: %s", self.sdr_device, exc) self._sdr = None def _close_sdr(self) -> None: if self._sdr is not None: try: self._sdr.close() except Exception as exc: logger.debug("TX SDR close error: %s", exc) self._sdr = None