From 195db4a27db894f14a70c5fc36afa8e007441026 Mon Sep 17 00:00:00 2001 From: ben Date: Tue, 14 Apr 2026 10:45:54 -0400 Subject: [PATCH 1/4] quick fix --- poetry.toml | 2 ++ src/ria_toolkit_oss/sdr/sdr.py | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 35 insertions(+) create mode 100644 poetry.toml diff --git a/poetry.toml b/poetry.toml new file mode 100644 index 0000000..25758d2 --- /dev/null +++ b/poetry.toml @@ -0,0 +1,2 @@ +[virtualenvs.options] +system-site-packages = true diff --git a/src/ria_toolkit_oss/sdr/sdr.py b/src/ria_toolkit_oss/sdr/sdr.py index 36e26f7..f2ea9f4 100644 --- a/src/ria_toolkit_oss/sdr/sdr.py +++ b/src/ria_toolkit_oss/sdr/sdr.py @@ -43,6 +43,13 @@ class SDR(ABC): self.tx_gain = None self._param_lock = threading.RLock() # Reentrant lock + # Pending config consumed by rx() on first call and by _apply_sdr_config + # in the agent inference loop. Subclasses that need different defaults + # (e.g. MockSDR) can overwrite these in their own __init__. + self.center_freq: float = 2.4e9 + self.sample_rate: float = 10e6 + self.gain: float = 40.0 + def record(self, num_samples: Optional[int] = None, rx_time: Optional[int | float] = None) -> Recording: """ Create a radio recording of a given length. Either ``num_samples`` or ``rx_time`` must be provided. @@ -100,6 +107,32 @@ class SDR(ABC): self._num_buffers_processed = 0 return recording + def rx(self, num_samples: int) -> "np.ndarray": + """Return *num_samples* complex IQ samples as a 1-D complex64 array. + + This is the interface used by the agent inference loop. On first call, + ``init_rx()`` is invoked automatically using the values stored in + ``center_freq``, ``sample_rate``, and ``gain`` (set beforehand by + ``_apply_sdr_config``). Subsequent calls stream directly. + + Subclasses may override this for hardware-native capture APIs (e.g. + ``MockSDR`` uses AWGN generation; ``PlutoSDR`` could use + ``self.radio.rx()``). + """ + if not self._rx_initialized: + gain = self.gain if isinstance(self.gain, (int, float)) else 40.0 + self.init_rx( + sample_rate=self.sample_rate, + center_frequency=self.center_freq, + gain=gain, + channel=0, + ) + recording = self.record(num_samples=num_samples) + # Recording.data is either a list of 1-D arrays (one per channel) or a + # 2-D ndarray (channels × samples). Either way, index 0 is channel 0. + data = recording.data + return data[0] if hasattr(data, "__getitem__") else data + def stream_to_zmq(self, zmq_address, n_samples: int, buffer_size: Optional[int] = 10000): """ Stream iq samples as interleaved bytes via zmq. From efc09481104e9c1fddccbab27a8bc1ce9fce7ed8 Mon Sep 17 00:00:00 2001 From: ben Date: Fri, 17 Apr 2026 09:43:59 -0400 Subject: [PATCH 2/4] ria composer support --- src/ria_toolkit_oss/orchestration/campaign.py | 6 +- src/ria_toolkit_oss/orchestration/executor.py | 61 ++- .../remote_control/__init__.py | 6 + .../remote_control/remote_transmitter.py | 147 +++++++ .../remote_transmitter_controller.py | 210 ++++++++++ tests/remote_control/__init__.py | 0 .../remote_control/test_remote_transmitter.py | 266 ++++++++++++ .../test_remote_transmitter_controller.py | 294 +++++++++++++ .../test_sdr_remote_integration.py | 391 ++++++++++++++++++ 9 files changed, 1379 insertions(+), 2 deletions(-) create mode 100644 src/ria_toolkit_oss/remote_control/__init__.py create mode 100644 src/ria_toolkit_oss/remote_control/remote_transmitter.py create mode 100644 src/ria_toolkit_oss/remote_control/remote_transmitter_controller.py create mode 100644 tests/remote_control/__init__.py create mode 100644 tests/remote_control/test_remote_transmitter.py create mode 100644 tests/remote_control/test_remote_transmitter_controller.py create mode 100644 tests/remote_control/test_sdr_remote_integration.py diff --git a/src/ria_toolkit_oss/orchestration/campaign.py b/src/ria_toolkit_oss/orchestration/campaign.py index 9d96c96..027c33f 100644 --- a/src/ria_toolkit_oss/orchestration/campaign.py +++ b/src/ria_toolkit_oss/orchestration/campaign.py @@ -223,13 +223,16 @@ class TransmitterConfig: id: str type: str # "wifi", "bluetooth", "sdr", "external" - control_method: str # "external_script" | "sdr" + control_method: str # "external_script" | "sdr" | "sdr_remote" schedule: list[CaptureStep] # For external_script control script: Optional[str] = None # path to control script device: Optional[str] = None # e.g. "/dev/wlan0" + # For sdr_remote control — keys: host, ssh_user, ssh_key_path, device_type, device_id, zmq_port + sdr_remote: Optional[dict] = None + @classmethod def from_dict(cls, d: dict) -> "TransmitterConfig": schedule = [CaptureStep.from_dict(s) for s in d.get("schedule", [])] @@ -240,6 +243,7 @@ class TransmitterConfig: schedule=schedule, script=d.get("script"), device=d.get("device"), + sdr_remote=d.get("sdr_remote"), ) diff --git a/src/ria_toolkit_oss/orchestration/executor.py b/src/ria_toolkit_oss/orchestration/executor.py index 629c0d8..1bdd4d8 100644 --- a/src/ria_toolkit_oss/orchestration/executor.py +++ b/src/ria_toolkit_oss/orchestration/executor.py @@ -196,6 +196,7 @@ class CampaignExecutor: self.config = config self.progress_cb = progress_cb self._sdr = None + self._remote_tx_controllers: dict = {} if verbose: logging.basicConfig(level=logging.DEBUG) @@ -222,6 +223,7 @@ class CampaignExecutor: ) self._init_sdr() + self._init_remote_tx_controllers() try: total = self.config.total_steps() step_index = 0 @@ -248,6 +250,7 @@ class CampaignExecutor: ) finally: self._close_sdr() + self._close_remote_tx_controllers() result.end_time = time.time() logger.info( @@ -287,6 +290,41 @@ class CampaignExecutor: logger.warning(f"SDR close error: {e}") self._sdr = None + # ------------------------------------------------------------------ + # Remote Tx controller management + # ------------------------------------------------------------------ + + def _init_remote_tx_controllers(self) -> None: + """Open SSH+ZMQ connections for all sdr_remote transmitters.""" + from ria_toolkit_oss.remote_control import RemoteTransmitterController + + for tx in self.config.transmitters: + if tx.control_method != "sdr_remote": + continue + cfg = tx.sdr_remote + if not cfg: + raise RuntimeError(f"Transmitter '{tx.id}' uses sdr_remote but has no sdr_remote config") + logger.info(f"Connecting remote Tx controller for {tx.id} → {cfg['host']}") + ctrl = RemoteTransmitterController( + host=cfg["host"], + ssh_user=cfg["ssh_user"], + ssh_key_path=cfg["ssh_key_path"], + zmq_port=int(cfg.get("zmq_port", 5556)), + ) + ctrl.set_radio( + device_type=cfg["device_type"], + device_id=cfg.get("device_id", ""), + ) + self._remote_tx_controllers[tx.id] = ctrl + + def _close_remote_tx_controllers(self) -> None: + for tx_id, ctrl in list(self._remote_tx_controllers.items()): + try: + ctrl.close() + except Exception as exc: + logger.warning(f"Error closing remote Tx controller {tx_id}: {exc}") + self._remote_tx_controllers.clear() + def _record(self, duration_s: float) -> Recording: """Capture ``duration_s`` seconds of IQ samples.""" num_samples = int(duration_s * self.config.recorder.sample_rate) @@ -372,7 +410,8 @@ class CampaignExecutor: traffic, etc. The script is responsible for applying the configuration and returning promptly (i.e. not blocking for the capture duration). - For SDR transmitters this is a no-op placeholder (TX not yet implemented). + For ``sdr_remote`` the remote ZMQ controller calls ``init_tx`` then + starts a background transmit thread that runs for the step duration. """ if transmitter.control_method == "external_script": if not transmitter.script: @@ -384,6 +423,20 @@ class CampaignExecutor: elif transmitter.control_method == "sdr": logger.debug("SDR TX not yet implemented — skipping start") + elif transmitter.control_method == "sdr_remote": + ctrl = self._remote_tx_controllers.get(transmitter.id) + if ctrl is None: + raise RuntimeError(f"No remote Tx controller found for transmitter '{transmitter.id}'") + gain = step.power_dbm if step.power_dbm is not None else 0.0 + ctrl.init_tx( + center_frequency=self.config.recorder.center_freq, + sample_rate=self.config.recorder.sample_rate, + gain=gain, + channel=step.channel or 0, + ) + # Start transmission in background; _record() runs concurrently + ctrl.transmit_async(step.duration + 1.0) + else: logger.warning(f"Unknown control method '{transmitter.control_method}' — skipping") @@ -391,6 +444,7 @@ class CampaignExecutor: """Signal the transmitter to stop. Calls ``