diff --git a/src/ria_toolkit_oss/orchestration/campaign.py b/src/ria_toolkit_oss/orchestration/campaign.py index 5fe1128..6f9be35 100644 --- a/src/ria_toolkit_oss/orchestration/campaign.py +++ b/src/ria_toolkit_oss/orchestration/campaign.py @@ -297,6 +297,7 @@ class CampaignConfig: qa: QAConfig = field(default_factory=QAConfig) output: OutputConfig = field(default_factory=OutputConfig) mode: str = "controlled_testbed" + loops: int = 1 # repeat full schedule this many times; labels get _run{N:02d} suffix # --------------------------------------------------------------------------- # Loaders @@ -324,6 +325,7 @@ class CampaignConfig: return cls( name=safe_name, mode=str(campaign_meta.get("mode", "controlled_testbed")), + loops=max(1, int(campaign_meta.get("loops", 1))), recorder=RecorderConfig.from_dict(raw["recorder"]), transmitters=transmitters, qa=QAConfig.from_dict(raw.get("qa", {})), @@ -388,6 +390,7 @@ class CampaignConfig: return cls( name=safe_name, mode=str(campaign_meta.get("mode", "controlled_testbed")), + loops=max(1, int(campaign_meta.get("loops", 1))), recorder=RecorderConfig.from_dict(raw["recorder"]), transmitters=transmitters, qa=QAConfig.from_dict(raw.get("qa", {})), @@ -490,9 +493,9 @@ class CampaignConfig: ) def total_capture_time_s(self) -> float: - """Sum of all step durations across all transmitters.""" - return sum(step.duration for tx in self.transmitters for step in tx.schedule) + """Sum of all step durations across all transmitters and loops.""" + return sum(step.duration for tx in self.transmitters for step in tx.schedule) * self.loops def total_steps(self) -> int: - """Total number of capture steps across all transmitters.""" - return sum(len(tx.schedule) for tx in self.transmitters) + """Total number of capture steps across all transmitters and loops.""" + return sum(len(tx.schedule) for tx in self.transmitters) * self.loops diff --git a/src/ria_toolkit_oss/orchestration/executor.py b/src/ria_toolkit_oss/orchestration/executor.py index 66c5273..222b348 100644 --- a/src/ria_toolkit_oss/orchestration/executor.py +++ b/src/ria_toolkit_oss/orchestration/executor.py @@ -7,7 +7,7 @@ import logging import subprocess import threading import time -from dataclasses import dataclass, field +from dataclasses import dataclass, field, replace from pathlib import Path from typing import Callable, Optional @@ -236,10 +236,12 @@ class CampaignExecutor: """ result = CampaignResult(campaign_name=self.config.name) + loops = self.config.loops logger.info( f"Starting campaign '{self.config.name}': " - f"{self.config.total_steps()} steps, " - f"~{self.config.total_capture_time_s():.0f}s capture time" + f"{self.config.total_steps()} steps" + + (f" ({self.config.total_steps() // loops} × {loops} loops)" if loops > 1 else "") + + f", ~{self.config.total_capture_time_s():.0f}s capture time" ) self._init_sdr() @@ -248,26 +250,32 @@ class CampaignExecutor: total = self.config.total_steps() step_index = 0 - for transmitter in self.config.transmitters: - logger.info(f"Transmitter: {transmitter.id} ({len(transmitter.schedule)} steps)") - for step in transmitter.schedule: - step_result = self._execute_step(transmitter, step) - result.steps.append(step_result) - step_index += 1 + for loop_idx in range(loops): + if loops > 1: + logger.info(f"Loop {loop_idx + 1}/{loops}") + for transmitter in self.config.transmitters: + logger.info(f"Transmitter: {transmitter.id} ({len(transmitter.schedule)} steps)") + for step in transmitter.schedule: + looped_step = replace(step, label=f"{step.label}_run{loop_idx + 1:02d}") if loops > 1 else step + step_result = self._execute_step(transmitter, looped_step) + result.steps.append(step_result) + step_index += 1 - if self.progress_cb: - self.progress_cb(step_index, total, step_result) + if self.progress_cb: + self.progress_cb(step_index, total, step_result) - if step_result.error: - logger.warning(f"Step '{step.label}' error: {step_result.error}") - elif step_result.qa.flagged: - logger.warning(f"Step '{step.label}' flagged for review: " + "; ".join(step_result.qa.issues)) - else: - logger.info( - f"Step '{step.label}' OK " - f"(SNR {step_result.qa.snr_db:.1f} dB, " - f"{step_result.qa.duration_s:.1f}s)" - ) + if step_result.error: + logger.warning(f"Step '{looped_step.label}' error: {step_result.error}") + elif step_result.qa.flagged: + logger.warning( + f"Step '{looped_step.label}' flagged for review: " + "; ".join(step_result.qa.issues) + ) + else: + logger.info( + f"Step '{looped_step.label}' OK " + f"(SNR {step_result.qa.snr_db:.1f} dB, " + f"{step_result.qa.duration_s:.1f}s)" + ) finally: self._close_sdr() self._close_remote_tx_controllers() diff --git a/src/ria_toolkit_oss/orchestration/tx_executor.py b/src/ria_toolkit_oss/orchestration/tx_executor.py index 2d9e1c1..a3c9bdc 100644 --- a/src/ria_toolkit_oss/orchestration/tx_executor.py +++ b/src/ria_toolkit_oss/orchestration/tx_executor.py @@ -112,6 +112,7 @@ class TxExecutor: center_freq: float = _parse_hz(agent_cfg.get("center_frequency", 0.0)) filter_type: str = agent_cfg.get("filter", "rrc").lower() rolloff: float = float(agent_cfg.get("rolloff", 0.35)) + loops: int = max(1, int(self.config.get("loops", 1))) # Upsampling factor: samples_per_symbol, fixed at 8 for SDR compatibility. sps = 8 @@ -119,10 +120,18 @@ class TxExecutor: self._init_sdr(sample_rate, center_freq) try: - for step in schedule: + for loop_idx in range(loops): if self.stop_event.is_set(): break - self._execute_step(step, modulation, sps, symbol_rate, filter_type, rolloff) + if loops > 1: + logger.info("TX loop %d/%d", loop_idx + 1, loops) + for step in schedule: + if self.stop_event.is_set(): + break + looped_step = ( + {**step, "label": f"{step.get('label', 'step')}_run{loop_idx + 1:02d}"} if loops > 1 else step + ) + self._execute_step(looped_step, modulation, sps, symbol_rate, filter_type, rolloff) finally: self._close_sdr()