zfp-oss #27
|
|
@ -261,9 +261,10 @@ class NodeAgent:
|
|||
if command == "run_campaign":
|
||||
campaign_id: str = cmd.get("campaign_id") or str(uuid.uuid4())
|
||||
config_dict: dict = cmd.get("payload") or {}
|
||||
skip_local_tx: bool = bool(cmd.get("skip_local_tx", False))
|
||||
threading.Thread(
|
||||
target=self._run_campaign,
|
||||
args=(campaign_id, config_dict),
|
||||
args=(campaign_id, config_dict, skip_local_tx),
|
||||
daemon=True,
|
||||
name=f"campaign-{campaign_id[:8]}",
|
||||
).start()
|
||||
|
|
@ -303,7 +304,7 @@ class NodeAgent:
|
|||
# Campaign execution
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _run_campaign(self, campaign_id: str, config_dict: dict) -> None:
|
||||
def _run_campaign(self, campaign_id: str, config_dict: dict, skip_local_tx: bool = False) -> None:
|
||||
try:
|
||||
from ria_toolkit_oss.orchestration.campaign import CampaignConfig
|
||||
from ria_toolkit_oss.orchestration.executor import CampaignExecutor
|
||||
|
|
@ -315,10 +316,10 @@ class NodeAgent:
|
|||
)
|
||||
return
|
||||
|
||||
logger.info("Campaign %s starting", campaign_id[:8])
|
||||
logger.info("Campaign %s starting (skip_local_tx=%s)", campaign_id[:8], skip_local_tx)
|
||||
try:
|
||||
config = CampaignConfig.from_dict(config_dict)
|
||||
executor = CampaignExecutor(config)
|
||||
executor = CampaignExecutor(config, skip_local_tx=skip_local_tx)
|
||||
result = executor.run()
|
||||
logger.info("Campaign %s completed — uploading recordings", campaign_id[:8])
|
||||
self._upload_recordings(campaign_id, config, result)
|
||||
|
|
|
|||
|
|
@ -209,9 +209,11 @@ class CampaignExecutor:
|
|||
config: CampaignConfig,
|
||||
progress_cb: Optional[Callable[[int, int, StepResult], None]] = None,
|
||||
verbose: bool = False,
|
||||
skip_local_tx: bool = False,
|
||||
):
|
||||
self.config = config
|
||||
self.progress_cb = progress_cb
|
||||
self.skip_local_tx = skip_local_tx
|
||||
self._sdr = None
|
||||
self._remote_tx_controllers: dict = {}
|
||||
self._tx_executors: dict[str, tuple] = {} # tx_id → (TxExecutor, stop_event, thread)
|
||||
|
|
@ -464,6 +466,9 @@ class CampaignExecutor:
|
|||
ctrl.transmit_async(step.duration + 1.0)
|
||||
|
||||
elif transmitter.control_method == "sdr_agent":
|
||||
if self.skip_local_tx:
|
||||
logger.debug(f"skip_local_tx — TX for '{transmitter.id}' delegated to TX agent node")
|
||||
return
|
||||
if not transmitter.sdr_agent:
|
||||
logger.warning(f"Transmitter '{transmitter.id}' has no sdr_agent config — skipping")
|
||||
return
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user