diff --git a/poetry.lock b/poetry.lock index f235cb2..9d1e5fe 100644 --- a/poetry.lock +++ b/poetry.lock @@ -3749,4 +3749,4 @@ files = [ [metadata] lock-version = "2.1" python-versions = ">=3.10" -content-hash = "ffde300b2fc93161d2279a6e2b899bc988d3b5eb3833135821830affc9a5fb62" +content-hash = "66c9adf647316db90f963da05e8a83574378bfa4db2c69ce751446b5ee7c408c" diff --git a/pyproject.toml b/pyproject.toml index 00784cc..48a9e1c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -50,7 +50,7 @@ dependencies = [ "pyyaml (>=6.0.3,<7.0.0)", "click (>=8.1.0,<9.0.0)", "matplotlib (>=3.8.0,<4.0.0)", - "paramiko (>=4.0.0)" + "paramiko (>=3.5.1)" ] # [project.optional-dependencies] Commented out to prevent Tox tests from failing diff --git a/src/ria_toolkit_oss/orchestration/campaign.py b/src/ria_toolkit_oss/orchestration/campaign.py index 027c33f..5fe1128 100644 --- a/src/ria_toolkit_oss/orchestration/campaign.py +++ b/src/ria_toolkit_oss/orchestration/campaign.py @@ -233,6 +233,9 @@ class TransmitterConfig: # For sdr_remote control — keys: host, ssh_user, ssh_key_path, device_type, device_id, zmq_port sdr_remote: Optional[dict] = None + # For sdr_agent control — keys: modulation, order, symbol_rate, center_frequency, filter, rolloff + sdr_agent: Optional[dict] = None + @classmethod def from_dict(cls, d: dict) -> "TransmitterConfig": schedule = [CaptureStep.from_dict(s) for s in d.get("schedule", [])] @@ -244,6 +247,7 @@ class TransmitterConfig: script=d.get("script"), device=d.get("device"), sdr_remote=d.get("sdr_remote"), + sdr_agent=d.get("sdr_agent"), ) diff --git a/src/ria_toolkit_oss/orchestration/executor.py b/src/ria_toolkit_oss/orchestration/executor.py index 3467995..445b16b 100644 --- a/src/ria_toolkit_oss/orchestration/executor.py +++ b/src/ria_toolkit_oss/orchestration/executor.py @@ -5,6 +5,7 @@ from __future__ import annotations import json import logging import subprocess +import threading import time from dataclasses import dataclass, field from pathlib import Path @@ -16,6 +17,7 @@ from ria_toolkit_oss.io.recording import to_sigmf from .campaign import CampaignConfig, CaptureStep, TransmitterConfig from .labeler import build_output_filename, label_recording from .qa import QAResult, check_recording +from .tx_executor import TxExecutor logger = logging.getLogger(__name__) @@ -212,6 +214,7 @@ class CampaignExecutor: self.progress_cb = progress_cb self._sdr = None self._remote_tx_controllers: dict = {} + self._tx_executors: dict[str, tuple] = {} # tx_id → (TxExecutor, stop_event, thread) if verbose: logging.basicConfig(level=logging.DEBUG) @@ -266,6 +269,7 @@ class CampaignExecutor: finally: self._close_sdr() self._close_remote_tx_controllers() + self._close_tx_executors() result.end_time = time.time() logger.info( @@ -340,6 +344,12 @@ class CampaignExecutor: logger.warning(f"Error closing remote Tx controller {tx_id}: {exc}") self._remote_tx_controllers.clear() + def _close_tx_executors(self) -> None: + for tx_id, (_, stop_event, t) in list(self._tx_executors.items()): + stop_event.set() + t.join(timeout=5.0) + self._tx_executors.clear() + def _record(self, duration_s: float) -> Recording: """Capture ``duration_s`` seconds of IQ samples.""" num_samples = int(duration_s * self.config.recorder.sample_rate) @@ -453,6 +463,27 @@ class CampaignExecutor: # Start transmission in background; _record() runs concurrently ctrl.transmit_async(step.duration + 1.0) + elif transmitter.control_method == "sdr_agent": + if not transmitter.sdr_agent: + logger.warning(f"Transmitter '{transmitter.id}' has no sdr_agent config — skipping") + return + step_dict: dict = {"label": step.label, "duration": step.duration + 1.0} + if step.power_dbm is not None: + step_dict["power_dbm"] = step.power_dbm + tx_config = { + "id": transmitter.id, + "sdr_agent": transmitter.sdr_agent, + "schedule": [step_dict], + } + rec = self.config.recorder + tx_device = transmitter.device or rec.device + sdr_device = _DEVICE_ALIASES.get(tx_device.lower(), tx_device.lower()) + stop_event = threading.Event() + executor = TxExecutor(tx_config, sdr_device=sdr_device, stop_event=stop_event) + t = threading.Thread(target=executor.run, daemon=True, name=f"tx-{transmitter.id}") + self._tx_executors[transmitter.id] = (executor, stop_event, t) + t.start() + else: logger.warning(f"Unknown control method '{transmitter.control_method}' — skipping") @@ -475,6 +506,13 @@ class CampaignExecutor: if ctrl is not None: ctrl.wait_transmit(timeout=step.duration + 10.0) + elif transmitter.control_method == "sdr_agent": + entry = self._tx_executors.pop(transmitter.id, None) + if entry is not None: + _, stop_event, t = entry + stop_event.set() + t.join(timeout=step.duration + 10.0) + @staticmethod def _step_params_json(transmitter: TransmitterConfig, step: CaptureStep) -> str: """Serialise step parameters to a JSON string for the control script."""