2026-04-20 12:33:14 -04:00
|
|
|
"""TX campaign executor — synthesises and transmits signals via a local SDR.
|
|
|
|
|
|
|
|
|
|
The TxExecutor receives a transmitter config dict (matching the
|
|
|
|
|
``sdr_agent`` control method's schema) and a step schedule, then for each
|
|
|
|
|
step builds a signal chain with the block generator and transmits it via
|
|
|
|
|
the local SDR device.
|
|
|
|
|
|
|
|
|
|
Supported modulations (``modulation`` field in config):
|
|
|
|
|
BPSK, QPSK, 8PSK, 16QAM, 64QAM, 256QAM, FSK, OOK, GMSK, OQPSK
|
|
|
|
|
|
|
|
|
|
Example config dict (matches CampaignConfig transmitter with
|
|
|
|
|
``control_method: sdr_agent``)::
|
|
|
|
|
|
|
|
|
|
{
|
|
|
|
|
"id": "synthetic-tx",
|
|
|
|
|
"type": "sdr",
|
|
|
|
|
"control_method": "sdr_agent",
|
|
|
|
|
"sdr_agent": {
|
|
|
|
|
"modulation": "QPSK",
|
|
|
|
|
"order": 4,
|
|
|
|
|
"symbol_rate": 1000000,
|
|
|
|
|
"center_frequency": 0.0,
|
|
|
|
|
"filter": "rrc",
|
|
|
|
|
"rolloff": 0.35
|
|
|
|
|
},
|
|
|
|
|
"schedule": [
|
|
|
|
|
{"label": "step1", "duration": 10, "power_dbm": -10}
|
|
|
|
|
]
|
|
|
|
|
}
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
|
|
import logging
|
|
|
|
|
import threading
|
|
|
|
|
from typing import Any
|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
2026-04-21 14:34:48 -04:00
|
|
|
|
|
|
|
|
def _parse_hz(val: object) -> float:
|
|
|
|
|
"""Parse a frequency value that may be a float (Hz) or a string like '2.45GHz'."""
|
|
|
|
|
if isinstance(val, (int, float)):
|
|
|
|
|
return float(val)
|
|
|
|
|
s = str(val).strip()
|
|
|
|
|
for suffix, mult in (("GHz", 1e9), ("MHz", 1e6), ("kHz", 1e3), ("Hz", 1.0)):
|
|
|
|
|
if s.endswith(suffix):
|
|
|
|
|
return float(s[: -len(suffix)]) * mult
|
|
|
|
|
return float(s)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _parse_seconds(val: object) -> float:
|
|
|
|
|
"""Parse a duration value that may be a float (seconds) or a string like '5s'."""
|
|
|
|
|
if isinstance(val, (int, float)):
|
|
|
|
|
return float(val)
|
|
|
|
|
s = str(val).strip()
|
|
|
|
|
return float(s[:-1]) if s.endswith("s") else float(s)
|
|
|
|
|
|
|
|
|
|
|
2026-04-20 12:33:14 -04:00
|
|
|
# Mapping from modulation name → (PSK/QAM order, generator_type)
|
|
|
|
|
# 'psk' uses PSKGenerator, 'qam' uses QAMGenerator
|
|
|
|
|
_MOD_TABLE: dict[str, tuple[int, str]] = {
|
2026-04-20 16:49:52 -04:00
|
|
|
"BPSK": (1, "psk"),
|
|
|
|
|
"QPSK": (2, "psk"),
|
|
|
|
|
"8PSK": (3, "psk"),
|
|
|
|
|
"16QAM": (4, "qam"),
|
|
|
|
|
"64QAM": (6, "qam"),
|
2026-04-20 12:33:14 -04:00
|
|
|
"256QAM": (8, "qam"),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
_SPECIAL_MODS = {"FSK", "OOK", "GMSK", "OQPSK"}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TxExecutor:
|
|
|
|
|
"""Synthesise and transmit a signal campaign via a local SDR.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
config: Transmitter config dict (must have ``sdr_agent`` sub-dict with
|
|
|
|
|
modulation params, and ``schedule`` list of step dicts).
|
|
|
|
|
sdr_device: SDR device name to open in TX mode (e.g. "pluto", "usrp").
|
|
|
|
|
stop_event: External event that aborts the TX loop mid-step.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def __init__(
|
|
|
|
|
self,
|
|
|
|
|
config: dict,
|
|
|
|
|
sdr_device: str = "unknown",
|
|
|
|
|
stop_event: threading.Event | None = None,
|
|
|
|
|
) -> None:
|
|
|
|
|
self.config = config
|
|
|
|
|
self.sdr_device = sdr_device
|
|
|
|
|
self.stop_event = stop_event or threading.Event()
|
|
|
|
|
self._sdr: Any = None
|
|
|
|
|
|
|
|
|
|
def run(self) -> None:
|
|
|
|
|
"""Execute all steps in the schedule, transmitting for each step duration."""
|
|
|
|
|
agent_cfg: dict = self.config.get("sdr_agent") or {}
|
|
|
|
|
schedule: list[dict] = self.config.get("schedule") or []
|
|
|
|
|
|
|
|
|
|
if not schedule:
|
|
|
|
|
logger.warning("TxExecutor: no schedule steps — nothing to transmit")
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
modulation: str = agent_cfg.get("modulation", "QPSK").upper()
|
|
|
|
|
symbol_rate: float = float(agent_cfg.get("symbol_rate", 1e6))
|
2026-04-21 14:34:48 -04:00
|
|
|
center_freq: float = _parse_hz(agent_cfg.get("center_frequency", 0.0))
|
2026-04-20 12:33:14 -04:00
|
|
|
filter_type: str = agent_cfg.get("filter", "rrc").lower()
|
|
|
|
|
rolloff: float = float(agent_cfg.get("rolloff", 0.35))
|
|
|
|
|
|
|
|
|
|
# Upsampling factor: samples_per_symbol, fixed at 8 for SDR compatibility.
|
|
|
|
|
sps = 8
|
|
|
|
|
sample_rate = symbol_rate * sps
|
|
|
|
|
|
|
|
|
|
self._init_sdr(sample_rate, center_freq)
|
|
|
|
|
try:
|
|
|
|
|
for step in schedule:
|
|
|
|
|
if self.stop_event.is_set():
|
|
|
|
|
break
|
|
|
|
|
self._execute_step(step, modulation, sps, symbol_rate, filter_type, rolloff)
|
|
|
|
|
finally:
|
|
|
|
|
self._close_sdr()
|
|
|
|
|
|
|
|
|
|
def _execute_step(
|
|
|
|
|
self,
|
|
|
|
|
step: dict,
|
|
|
|
|
modulation: str,
|
|
|
|
|
sps: int,
|
|
|
|
|
symbol_rate: float,
|
|
|
|
|
filter_type: str,
|
|
|
|
|
rolloff: float,
|
|
|
|
|
) -> None:
|
2026-04-21 14:34:48 -04:00
|
|
|
duration: float = _parse_seconds(step.get("duration", 10.0))
|
2026-04-20 12:33:14 -04:00
|
|
|
label: str = step.get("label", "step")
|
|
|
|
|
gain: float = float(step.get("power_dbm") or 0.0)
|
|
|
|
|
sample_rate = symbol_rate * sps
|
|
|
|
|
|
|
|
|
|
logger.info(
|
|
|
|
|
"TX step '%s': %.0f s, %s @ %.3f MHz (sps=%d, filter=%s)",
|
2026-04-20 16:49:52 -04:00
|
|
|
label,
|
|
|
|
|
duration,
|
|
|
|
|
modulation,
|
|
|
|
|
symbol_rate / 1e6,
|
|
|
|
|
sps,
|
|
|
|
|
filter_type,
|
2026-04-20 12:33:14 -04:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
num_samples = int(duration * sample_rate)
|
|
|
|
|
signal = self._synthesise(modulation, sps, num_samples, filter_type, rolloff)
|
|
|
|
|
|
|
|
|
|
if self._sdr is not None:
|
|
|
|
|
try:
|
|
|
|
|
# Apply gain update if SDR supports it
|
|
|
|
|
if hasattr(self._sdr, "set_tx_gain"):
|
|
|
|
|
self._sdr.set_tx_gain(gain)
|
|
|
|
|
self._sdr.tx_recording(signal, tx_time=duration)
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
logger.error("TX step '%s' SDR error: %s", label, exc)
|
|
|
|
|
else:
|
|
|
|
|
# No SDR available — simulate by sleeping for the step duration.
|
2026-04-20 16:49:52 -04:00
|
|
|
logger.warning("TX step '%s': no SDR — simulating %.0f s delay", label, duration)
|
2026-04-20 12:33:14 -04:00
|
|
|
self.stop_event.wait(timeout=duration)
|
|
|
|
|
|
|
|
|
|
def _synthesise(
|
|
|
|
|
self,
|
|
|
|
|
modulation: str,
|
|
|
|
|
sps: int,
|
|
|
|
|
num_samples: int,
|
|
|
|
|
filter_type: str,
|
|
|
|
|
rolloff: float,
|
|
|
|
|
):
|
|
|
|
|
"""Build a block-generator chain and return IQ samples as a numpy array."""
|
|
|
|
|
try:
|
|
|
|
|
import numpy as np
|
2026-04-20 16:49:52 -04:00
|
|
|
|
2026-04-20 12:33:14 -04:00
|
|
|
from ria_toolkit_oss.signal.block_generator import (
|
|
|
|
|
BinarySource,
|
|
|
|
|
GMSKModulator,
|
|
|
|
|
Mapper,
|
|
|
|
|
OOKModulator,
|
|
|
|
|
OQPSKModulator,
|
|
|
|
|
RaisedCosineFilter,
|
|
|
|
|
RootRaisedCosineFilter,
|
|
|
|
|
Upsampling,
|
|
|
|
|
)
|
|
|
|
|
from ria_toolkit_oss.signal.block_generator.continuous_modulation.fsk_modulator import (
|
|
|
|
|
FSKModulator,
|
|
|
|
|
)
|
|
|
|
|
except ImportError as exc:
|
|
|
|
|
raise RuntimeError(f"ria_toolkit_oss block generator not available: {exc}") from exc
|
|
|
|
|
|
|
|
|
|
# ── Special modulations with their own source-connected modulator ──
|
|
|
|
|
if modulation in ("OOK", "GMSK", "OQPSK"):
|
|
|
|
|
src = BinarySource()
|
|
|
|
|
if modulation == "OOK":
|
|
|
|
|
mod = OOKModulator(src, samples_per_symbol=sps)
|
|
|
|
|
elif modulation == "GMSK":
|
|
|
|
|
mod = GMSKModulator(src, samples_per_symbol=sps)
|
|
|
|
|
else:
|
|
|
|
|
mod = OQPSKModulator(src, samples_per_symbol=sps)
|
|
|
|
|
recording = mod.record(num_samples)
|
|
|
|
|
flat = np.asarray(recording.data).flatten().astype(np.complex64)
|
|
|
|
|
if len(flat) < num_samples:
|
|
|
|
|
flat = np.tile(flat, num_samples // len(flat) + 1)
|
|
|
|
|
return flat[:num_samples]
|
|
|
|
|
|
|
|
|
|
if modulation == "FSK":
|
|
|
|
|
symbol_rate = num_samples / sps
|
|
|
|
|
bits_per_sym = 1 # 2-FSK
|
|
|
|
|
num_bits = max(num_samples // sps, 128) * bits_per_sym
|
|
|
|
|
bits = BinarySource()((1, num_bits))
|
|
|
|
|
mod = FSKModulator(
|
|
|
|
|
num_bits_per_symbol=bits_per_sym,
|
|
|
|
|
frequency_spacing=symbol_rate * 0.5,
|
|
|
|
|
symbol_duration=1.0 / max(symbol_rate, 1.0),
|
|
|
|
|
sampling_frequency=symbol_rate * sps,
|
|
|
|
|
)
|
|
|
|
|
flat = np.asarray(mod(bits)).flatten().astype(np.complex64)
|
|
|
|
|
if len(flat) < num_samples:
|
|
|
|
|
flat = np.tile(flat, num_samples // len(flat) + 1)
|
|
|
|
|
return flat[:num_samples]
|
|
|
|
|
|
|
|
|
|
# ── PSK / QAM via Mapper → Upsampling → pulse filter ──────────────
|
|
|
|
|
if modulation not in _MOD_TABLE:
|
|
|
|
|
logger.warning("Unknown modulation %r — defaulting to QPSK", modulation)
|
|
|
|
|
modulation = "QPSK"
|
|
|
|
|
|
|
|
|
|
bits_per_sym, gen_type = _MOD_TABLE[modulation]
|
|
|
|
|
mod_family = "QAM" if gen_type == "qam" else "PSK"
|
|
|
|
|
|
|
|
|
|
source = BinarySource()
|
|
|
|
|
mapper = Mapper(constellation_type=mod_family, num_bits_per_symbol=bits_per_sym)
|
|
|
|
|
upsampler = Upsampling(factor=sps)
|
|
|
|
|
|
|
|
|
|
mapper.connect_input([source])
|
|
|
|
|
upsampler.connect_input([mapper])
|
|
|
|
|
|
|
|
|
|
if filter_type in ("rrc",):
|
|
|
|
|
pulse_filter = RootRaisedCosineFilter(span_in_symbols=6, upsampling_factor=sps, beta=rolloff)
|
|
|
|
|
pulse_filter.connect_input([upsampler])
|
|
|
|
|
recording = pulse_filter.record(num_samples)
|
|
|
|
|
elif filter_type in ("rc",):
|
|
|
|
|
pulse_filter = RaisedCosineFilter(span_in_symbols=6, upsampling_factor=sps, beta=rolloff)
|
|
|
|
|
pulse_filter.connect_input([upsampler])
|
|
|
|
|
recording = pulse_filter.record(num_samples)
|
|
|
|
|
else:
|
|
|
|
|
# "none", "rect", "gaussian" — use upsampler output directly
|
|
|
|
|
recording = upsampler.record(num_samples)
|
|
|
|
|
|
|
|
|
|
flat = np.asarray(recording.data).flatten().astype(np.complex64)
|
|
|
|
|
if len(flat) < num_samples:
|
|
|
|
|
flat = np.tile(flat, num_samples // len(flat) + 1)
|
|
|
|
|
return flat[:num_samples]
|
|
|
|
|
|
|
|
|
|
def _init_sdr(self, sample_rate: float, center_freq: float) -> None:
|
|
|
|
|
try:
|
|
|
|
|
from ria_toolkit_oss.sdr import get_sdr_device
|
2026-04-20 16:49:52 -04:00
|
|
|
|
2026-04-20 12:33:14 -04:00
|
|
|
self._sdr = get_sdr_device(self.sdr_device)
|
|
|
|
|
self._sdr.init_tx(
|
|
|
|
|
sample_rate=sample_rate,
|
|
|
|
|
center_frequency=center_freq,
|
|
|
|
|
gain=0,
|
|
|
|
|
channel=0,
|
|
|
|
|
gain_mode="manual",
|
|
|
|
|
)
|
2026-04-20 16:49:52 -04:00
|
|
|
logger.info(
|
|
|
|
|
"TX SDR initialised: %s @ %.3f MHz, %.1f Msps", self.sdr_device, center_freq / 1e6, sample_rate / 1e6
|
|
|
|
|
)
|
2026-04-20 12:33:14 -04:00
|
|
|
except Exception as exc:
|
|
|
|
|
logger.warning("TX SDR init failed (%s) — will simulate: %s", self.sdr_device, exc)
|
|
|
|
|
self._sdr = None
|
|
|
|
|
|
|
|
|
|
def _close_sdr(self) -> None:
|
|
|
|
|
if self._sdr is not None:
|
|
|
|
|
try:
|
|
|
|
|
self._sdr.close()
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
logger.debug("TX SDR close error: %s", exc)
|
|
|
|
|
self._sdr = None
|