2026-04-20 12:33:14 -04:00
|
|
|
"""Tests for TxExecutor — signal synthesis and step execution."""
|
|
|
|
|
|
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
|
|
import threading
|
2026-04-20 16:49:52 -04:00
|
|
|
from unittest.mock import patch
|
2026-04-20 12:33:14 -04:00
|
|
|
|
|
|
|
|
import numpy as np
|
|
|
|
|
import pytest
|
|
|
|
|
|
|
|
|
|
from ria_toolkit_oss.orchestration.tx_executor import TxExecutor
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _cfg(modulation="QPSK", symbol_rate=100_000, steps=None):
|
|
|
|
|
return {
|
|
|
|
|
"id": "test-tx",
|
|
|
|
|
"type": "sdr",
|
|
|
|
|
"control_method": "sdr_agent",
|
|
|
|
|
"sdr_agent": {
|
|
|
|
|
"modulation": modulation,
|
|
|
|
|
"symbol_rate": symbol_rate,
|
|
|
|
|
"center_frequency": 0.0,
|
|
|
|
|
"filter": "rrc",
|
|
|
|
|
"rolloff": 0.35,
|
|
|
|
|
},
|
|
|
|
|
"schedule": steps or [{"label": "step1", "duration": 0.001, "power_dbm": -10}],
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# Initialisation
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestTxExecutorInit:
|
|
|
|
|
def test_stores_sdr_device(self):
|
|
|
|
|
ex = TxExecutor(_cfg(), sdr_device="pluto")
|
|
|
|
|
assert ex.sdr_device == "pluto"
|
|
|
|
|
|
|
|
|
|
def test_stop_event_created_when_not_supplied(self):
|
|
|
|
|
ex = TxExecutor(_cfg())
|
|
|
|
|
assert isinstance(ex.stop_event, threading.Event)
|
|
|
|
|
assert not ex.stop_event.is_set()
|
|
|
|
|
|
|
|
|
|
def test_accepts_external_stop_event(self):
|
|
|
|
|
ev = threading.Event()
|
|
|
|
|
ex = TxExecutor(_cfg(), stop_event=ev)
|
|
|
|
|
assert ex.stop_event is ev
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# run() — schedule iteration
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestTxExecutorRun:
|
|
|
|
|
def test_empty_schedule_returns_immediately(self):
|
|
|
|
|
cfg = _cfg(steps=[])
|
|
|
|
|
ex = TxExecutor(cfg)
|
|
|
|
|
ex.run() # must not raise or block
|
|
|
|
|
|
|
|
|
|
def test_pre_set_stop_event_skips_all_steps(self):
|
|
|
|
|
ev = threading.Event()
|
|
|
|
|
ev.set()
|
|
|
|
|
ex = TxExecutor(_cfg(), stop_event=ev)
|
|
|
|
|
# If stop was set, _execute_step should never be called.
|
|
|
|
|
# run() should return cleanly without attempting synthesis.
|
|
|
|
|
ex.run()
|
|
|
|
|
|
|
|
|
|
def test_no_sdr_falls_back_to_simulation(self, monkeypatch):
|
|
|
|
|
"""Without SDR hardware TxExecutor simulates by calling stop_event.wait."""
|
|
|
|
|
cfg = _cfg(steps=[{"label": "s", "duration": 0.001, "power_dbm": 0}])
|
|
|
|
|
waited = []
|
|
|
|
|
real_ev = threading.Event()
|
|
|
|
|
|
|
|
|
|
def _fake_wait(timeout=None):
|
|
|
|
|
waited.append(timeout)
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
monkeypatch.setattr(real_ev, "wait", _fake_wait)
|
|
|
|
|
|
|
|
|
|
# Patch SDR init to always fail (forces simulation path)
|
|
|
|
|
with patch.object(TxExecutor, "_init_sdr", lambda self, *a, **kw: setattr(self, "_sdr", None)):
|
|
|
|
|
ex = TxExecutor(cfg, sdr_device="nonexistent_xyz", stop_event=real_ev)
|
|
|
|
|
ex.run()
|
|
|
|
|
|
|
|
|
|
assert len(waited) >= 1, "expected stop_event.wait to be called for simulation"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# _synthesise — all modulation types and filter types
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestSynthesise:
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
|
|
|
def _ex(self):
|
|
|
|
|
self.ex = TxExecutor(_cfg())
|
|
|
|
|
|
|
|
|
|
def _synth(self, mod, num_samples=256):
|
|
|
|
|
return self.ex._synthesise(mod, sps=4, num_samples=num_samples, filter_type="rrc", rolloff=0.35)
|
|
|
|
|
|
|
|
|
|
@pytest.mark.parametrize("mod", ["BPSK", "QPSK", "8PSK", "16QAM", "64QAM", "256QAM"])
|
|
|
|
|
def test_psk_qam_returns_complex64_array(self, mod):
|
|
|
|
|
sig = self._synth(mod)
|
|
|
|
|
assert sig.dtype == np.complex64
|
|
|
|
|
assert len(sig) == 256
|
|
|
|
|
|
|
|
|
|
def test_fsk_returns_correct_length(self):
|
|
|
|
|
sig = self._synth("FSK")
|
|
|
|
|
assert len(sig) == 256
|
|
|
|
|
|
|
|
|
|
def test_ook_returns_correct_length(self):
|
|
|
|
|
sig = self._synth("OOK")
|
|
|
|
|
assert len(sig) == 256
|
|
|
|
|
|
|
|
|
|
def test_gmsk_returns_correct_length(self):
|
|
|
|
|
sig = self._synth("GMSK")
|
|
|
|
|
assert len(sig) == 256
|
|
|
|
|
|
|
|
|
|
def test_oqpsk_returns_correct_length(self):
|
|
|
|
|
sig = self._synth("OQPSK")
|
|
|
|
|
assert len(sig) == 256
|
|
|
|
|
|
|
|
|
|
@pytest.mark.parametrize("mod", ["BPSK", "QPSK", "16QAM", "FSK", "OOK", "GMSK"])
|
|
|
|
|
def test_samples_are_finite(self, mod):
|
|
|
|
|
sig = self._synth(mod)
|
|
|
|
|
assert np.all(np.isfinite(sig.real)), f"{mod}: non-finite real samples"
|
|
|
|
|
assert np.all(np.isfinite(sig.imag)), f"{mod}: non-finite imag samples"
|
|
|
|
|
|
|
|
|
|
def test_unknown_modulation_defaults_to_qpsk(self):
|
|
|
|
|
sig = self._synth("UNKNOWN_MOD_XYZ")
|
|
|
|
|
assert len(sig) == 256
|
|
|
|
|
assert sig.dtype == np.complex64
|
|
|
|
|
|
|
|
|
|
@pytest.mark.parametrize("filter_type", ["rrc", "rc", "gaussian", "rect", "none"])
|
|
|
|
|
def test_all_filter_types(self, filter_type):
|
|
|
|
|
sig = self.ex._synthesise("QPSK", sps=4, num_samples=128, filter_type=filter_type, rolloff=0.35)
|
|
|
|
|
assert len(sig) == 128
|
|
|
|
|
|
|
|
|
|
@pytest.mark.parametrize("n", [64, 128, 512, 1024])
|
|
|
|
|
def test_output_length_matches_requested_samples(self, n):
|
|
|
|
|
sig = self._synth("QPSK", num_samples=n)
|
|
|
|
|
assert len(sig) == n
|
|
|
|
|
|
|
|
|
|
def test_bpsk_output_is_complex_not_real(self):
|
|
|
|
|
sig = self._synth("BPSK")
|
|
|
|
|
# complex64 always has imag part; just check dtype
|
|
|
|
|
assert sig.dtype == np.complex64
|
|
|
|
|
|
|
|
|
|
def test_256qam_correct_length(self):
|
|
|
|
|
sig = self._synth("256QAM")
|
|
|
|
|
assert len(sig) == 256
|