From 4aea2841bed3b0fc84e1dcb30107cb3f25d27fb5 Mon Sep 17 00:00:00 2001 From: ben Date: Tue, 21 Apr 2026 14:09:36 -0400 Subject: [PATCH] two-machine TX/RX --- src/ria_toolkit_oss/agent/legacy_executor.py | 9 +++++---- src/ria_toolkit_oss/orchestration/executor.py | 5 +++++ 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/src/ria_toolkit_oss/agent/legacy_executor.py b/src/ria_toolkit_oss/agent/legacy_executor.py index d8a56d6..91221e1 100644 --- a/src/ria_toolkit_oss/agent/legacy_executor.py +++ b/src/ria_toolkit_oss/agent/legacy_executor.py @@ -261,9 +261,10 @@ class NodeAgent: if command == "run_campaign": campaign_id: str = cmd.get("campaign_id") or str(uuid.uuid4()) config_dict: dict = cmd.get("payload") or {} + skip_local_tx: bool = bool(cmd.get("skip_local_tx", False)) threading.Thread( target=self._run_campaign, - args=(campaign_id, config_dict), + args=(campaign_id, config_dict, skip_local_tx), daemon=True, name=f"campaign-{campaign_id[:8]}", ).start() @@ -303,7 +304,7 @@ class NodeAgent: # Campaign execution # ------------------------------------------------------------------ - def _run_campaign(self, campaign_id: str, config_dict: dict) -> None: + def _run_campaign(self, campaign_id: str, config_dict: dict, skip_local_tx: bool = False) -> None: try: from ria_toolkit_oss.orchestration.campaign import CampaignConfig from ria_toolkit_oss.orchestration.executor import CampaignExecutor @@ -315,10 +316,10 @@ class NodeAgent: ) return - logger.info("Campaign %s starting", campaign_id[:8]) + logger.info("Campaign %s starting (skip_local_tx=%s)", campaign_id[:8], skip_local_tx) try: config = CampaignConfig.from_dict(config_dict) - executor = CampaignExecutor(config) + executor = CampaignExecutor(config, skip_local_tx=skip_local_tx) result = executor.run() logger.info("Campaign %s completed — uploading recordings", campaign_id[:8]) self._upload_recordings(campaign_id, config, result) diff --git a/src/ria_toolkit_oss/orchestration/executor.py b/src/ria_toolkit_oss/orchestration/executor.py index 445b16b..66c5273 100644 --- a/src/ria_toolkit_oss/orchestration/executor.py +++ b/src/ria_toolkit_oss/orchestration/executor.py @@ -209,9 +209,11 @@ class CampaignExecutor: config: CampaignConfig, progress_cb: Optional[Callable[[int, int, StepResult], None]] = None, verbose: bool = False, + skip_local_tx: bool = False, ): self.config = config self.progress_cb = progress_cb + self.skip_local_tx = skip_local_tx self._sdr = None self._remote_tx_controllers: dict = {} self._tx_executors: dict[str, tuple] = {} # tx_id → (TxExecutor, stop_event, thread) @@ -464,6 +466,9 @@ class CampaignExecutor: ctrl.transmit_async(step.duration + 1.0) elif transmitter.control_method == "sdr_agent": + if self.skip_local_tx: + logger.debug(f"skip_local_tx — TX for '{transmitter.id}' delegated to TX agent node") + return if not transmitter.sdr_agent: logger.warning(f"Transmitter '{transmitter.id}' has no sdr_agent config — skipping") return