From efc09481104e9c1fddccbab27a8bc1ce9fce7ed8 Mon Sep 17 00:00:00 2001 From: ben Date: Fri, 17 Apr 2026 09:43:59 -0400 Subject: [PATCH 1/2] ria composer support --- src/ria_toolkit_oss/orchestration/campaign.py | 6 +- src/ria_toolkit_oss/orchestration/executor.py | 61 ++- .../remote_control/__init__.py | 6 + .../remote_control/remote_transmitter.py | 147 +++++++ .../remote_transmitter_controller.py | 210 ++++++++++ tests/remote_control/__init__.py | 0 .../remote_control/test_remote_transmitter.py | 266 ++++++++++++ .../test_remote_transmitter_controller.py | 294 +++++++++++++ .../test_sdr_remote_integration.py | 391 ++++++++++++++++++ 9 files changed, 1379 insertions(+), 2 deletions(-) create mode 100644 src/ria_toolkit_oss/remote_control/__init__.py create mode 100644 src/ria_toolkit_oss/remote_control/remote_transmitter.py create mode 100644 src/ria_toolkit_oss/remote_control/remote_transmitter_controller.py create mode 100644 tests/remote_control/__init__.py create mode 100644 tests/remote_control/test_remote_transmitter.py create mode 100644 tests/remote_control/test_remote_transmitter_controller.py create mode 100644 tests/remote_control/test_sdr_remote_integration.py diff --git a/src/ria_toolkit_oss/orchestration/campaign.py b/src/ria_toolkit_oss/orchestration/campaign.py index 9d96c96..027c33f 100644 --- a/src/ria_toolkit_oss/orchestration/campaign.py +++ b/src/ria_toolkit_oss/orchestration/campaign.py @@ -223,13 +223,16 @@ class TransmitterConfig: id: str type: str # "wifi", "bluetooth", "sdr", "external" - control_method: str # "external_script" | "sdr" + control_method: str # "external_script" | "sdr" | "sdr_remote" schedule: list[CaptureStep] # For external_script control script: Optional[str] = None # path to control script device: Optional[str] = None # e.g. "/dev/wlan0" + # For sdr_remote control — keys: host, ssh_user, ssh_key_path, device_type, device_id, zmq_port + sdr_remote: Optional[dict] = None + @classmethod def from_dict(cls, d: dict) -> "TransmitterConfig": schedule = [CaptureStep.from_dict(s) for s in d.get("schedule", [])] @@ -240,6 +243,7 @@ class TransmitterConfig: schedule=schedule, script=d.get("script"), device=d.get("device"), + sdr_remote=d.get("sdr_remote"), ) diff --git a/src/ria_toolkit_oss/orchestration/executor.py b/src/ria_toolkit_oss/orchestration/executor.py index 629c0d8..1bdd4d8 100644 --- a/src/ria_toolkit_oss/orchestration/executor.py +++ b/src/ria_toolkit_oss/orchestration/executor.py @@ -196,6 +196,7 @@ class CampaignExecutor: self.config = config self.progress_cb = progress_cb self._sdr = None + self._remote_tx_controllers: dict = {} if verbose: logging.basicConfig(level=logging.DEBUG) @@ -222,6 +223,7 @@ class CampaignExecutor: ) self._init_sdr() + self._init_remote_tx_controllers() try: total = self.config.total_steps() step_index = 0 @@ -248,6 +250,7 @@ class CampaignExecutor: ) finally: self._close_sdr() + self._close_remote_tx_controllers() result.end_time = time.time() logger.info( @@ -287,6 +290,41 @@ class CampaignExecutor: logger.warning(f"SDR close error: {e}") self._sdr = None + # ------------------------------------------------------------------ + # Remote Tx controller management + # ------------------------------------------------------------------ + + def _init_remote_tx_controllers(self) -> None: + """Open SSH+ZMQ connections for all sdr_remote transmitters.""" + from ria_toolkit_oss.remote_control import RemoteTransmitterController + + for tx in self.config.transmitters: + if tx.control_method != "sdr_remote": + continue + cfg = tx.sdr_remote + if not cfg: + raise RuntimeError(f"Transmitter '{tx.id}' uses sdr_remote but has no sdr_remote config") + logger.info(f"Connecting remote Tx controller for {tx.id} → {cfg['host']}") + ctrl = RemoteTransmitterController( + host=cfg["host"], + ssh_user=cfg["ssh_user"], + ssh_key_path=cfg["ssh_key_path"], + zmq_port=int(cfg.get("zmq_port", 5556)), + ) + ctrl.set_radio( + device_type=cfg["device_type"], + device_id=cfg.get("device_id", ""), + ) + self._remote_tx_controllers[tx.id] = ctrl + + def _close_remote_tx_controllers(self) -> None: + for tx_id, ctrl in list(self._remote_tx_controllers.items()): + try: + ctrl.close() + except Exception as exc: + logger.warning(f"Error closing remote Tx controller {tx_id}: {exc}") + self._remote_tx_controllers.clear() + def _record(self, duration_s: float) -> Recording: """Capture ``duration_s`` seconds of IQ samples.""" num_samples = int(duration_s * self.config.recorder.sample_rate) @@ -372,7 +410,8 @@ class CampaignExecutor: traffic, etc. The script is responsible for applying the configuration and returning promptly (i.e. not blocking for the capture duration). - For SDR transmitters this is a no-op placeholder (TX not yet implemented). + For ``sdr_remote`` the remote ZMQ controller calls ``init_tx`` then + starts a background transmit thread that runs for the step duration. """ if transmitter.control_method == "external_script": if not transmitter.script: @@ -384,6 +423,20 @@ class CampaignExecutor: elif transmitter.control_method == "sdr": logger.debug("SDR TX not yet implemented — skipping start") + elif transmitter.control_method == "sdr_remote": + ctrl = self._remote_tx_controllers.get(transmitter.id) + if ctrl is None: + raise RuntimeError(f"No remote Tx controller found for transmitter '{transmitter.id}'") + gain = step.power_dbm if step.power_dbm is not None else 0.0 + ctrl.init_tx( + center_frequency=self.config.recorder.center_freq, + sample_rate=self.config.recorder.sample_rate, + gain=gain, + channel=step.channel or 0, + ) + # Start transmission in background; _record() runs concurrently + ctrl.transmit_async(step.duration + 1.0) + else: logger.warning(f"Unknown control method '{transmitter.control_method}' — skipping") @@ -391,6 +444,7 @@ class CampaignExecutor: """Signal the transmitter to stop. Calls ``