"""Client-side SSH + ZMQ controller for a remote SDR transmitter. Run this on the Rx machine (or hub). It SSH-es into the Tx machine, starts :mod:`remote_transmitter` there, then sends JSON-RPC commands over ZMQ. Requires: paramiko, zmq. """ from __future__ import annotations import json import logging import threading import time logger = logging.getLogger(__name__) _STARTUP_WAIT_S = 2.0 # seconds to wait for remote ZMQ server to bind class RemoteTransmitterController: """SSH into a Tx machine, start the ZMQ server, and send commands. Args: host: IP or hostname of the Tx machine. ssh_user: SSH username. ssh_key_path: Path to SSH private key file. zmq_port: ZMQ port that the remote transmitter will bind on. """ def __init__( self, host: str, ssh_user: str, ssh_key_path: str, zmq_port: int = 5556, ) -> None: self._host = host self._zmq_port = zmq_port self._ssh: paramiko.SSHClient | None = None self._ssh_stdout = None self._context: zmq.Context | None = None self._socket: zmq.Socket | None = None self._tx_thread: threading.Thread | None = None self._lock = threading.Lock() self._connect(host, ssh_user, ssh_key_path, zmq_port) # ------------------------------------------------------------------ # Connection management # ------------------------------------------------------------------ def _connect(self, host: str, ssh_user: str, ssh_key_path: str, zmq_port: int) -> None: """Open SSH tunnel, start remote server, connect ZMQ socket.""" try: import paramiko except ImportError as exc: raise RuntimeError("paramiko is required for remote SDR control: pip install paramiko") from exc try: import zmq except ImportError as exc: raise RuntimeError("pyzmq is required for remote SDR control: pip install pyzmq") from exc logger.info("SSH connecting to %s@%s …", ssh_user, host) self._ssh = paramiko.SSHClient() self._ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self._ssh.connect(hostname=host, username=ssh_user, key_filename=ssh_key_path) cmd = f"python -m ria_toolkit_oss.remote_control.remote_transmitter --port {zmq_port}" logger.info("Starting remote Tx server: %s", cmd) _, self._ssh_stdout, _ = self._ssh.exec_command(cmd) time.sleep(_STARTUP_WAIT_S) self._context = zmq.Context() self._socket = self._context.socket(zmq.REQ) self._socket.connect(f"tcp://{host}:{zmq_port}") logger.info("ZMQ connected to tcp://%s:%d", host, zmq_port) def close(self) -> None: """Tear down ZMQ and SSH connections.""" if self._socket is not None: try: self._socket.close(linger=0) except Exception: pass self._socket = None if self._context is not None: try: self._context.term() except Exception: pass self._context = None if self._ssh_stdout is not None: try: self._ssh_stdout.channel.close() except Exception: pass self._ssh_stdout = None if self._ssh is not None: try: self._ssh.close() except Exception: pass self._ssh = None logger.info("RemoteTransmitterController closed") # ------------------------------------------------------------------ # ZMQ dispatch # ------------------------------------------------------------------ def _send(self, command: dict) -> dict: """Send a JSON-RPC command and return the response dict (thread-safe).""" with self._lock: if self._socket is None: raise RuntimeError("Controller is closed") self._socket.send(json.dumps(command).encode()) raw = self._socket.recv() reply: dict = json.loads(raw.decode()) if not reply.get("status"): raise RuntimeError( f"Remote command '{command.get('function_name')}' failed: " f"{reply.get('error_message', 'unknown error')}" ) return reply # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------ def set_radio(self, device_type: str, device_id: str = "") -> None: """Initialise the SDR radio on the Tx machine. Args: device_type: SDR type — ``pluto``, ``usrp``, ``hackrf``, ``bladerf``. device_id: Device-specific identifier (IP, serial, etc.). """ logger.info("set_radio(%s, %r)", device_type, device_id) self._send({"function_name": "set_radio", "radio_str": device_type, "identifier": device_id}) def init_tx( self, center_frequency: float, sample_rate: float, gain: float, channel: int = 0, gain_mode: str = "absolute", ) -> None: """Configure Tx parameters on the remote SDR. Args: center_frequency: Center frequency in Hz. sample_rate: Sample rate in Hz. gain: Tx gain in dB. channel: RF channel index (default 0). gain_mode: ``"absolute"`` (default) or ``"relative"``. """ logger.info( "init_tx: fc=%.3f MHz, fs=%.3f MHz, gain=%.1f dB, ch=%d", center_frequency / 1e6, sample_rate / 1e6, gain, channel, ) self._send({ "function_name": "init_tx", "center_frequency": center_frequency, "sample_rate": sample_rate, "gain": gain, "channel": channel, "gain_mode": gain_mode, }) def transmit_async(self, duration_s: float) -> None: """Start a timed CW transmission in a background thread. Returns immediately. Call :meth:`wait_transmit` after recording to ensure the transmit thread has finished before the next step. Args: duration_s: Transmission duration in seconds. """ logger.info("transmit_async: %.1f s", duration_s) def _run() -> None: try: self._send({"function_name": "transmit", "duration_s": duration_s}) except Exception as exc: logger.warning("Background transmit error: %s", exc) self._tx_thread = threading.Thread(target=_run, daemon=True, name="remote-tx") self._tx_thread.start() def wait_transmit(self, timeout: float | None = None) -> None: """Wait for the background transmit thread to finish. Args: timeout: Maximum seconds to wait. ``None`` = wait indefinitely. """ if self._tx_thread is not None: self._tx_thread.join(timeout=timeout) self._tx_thread = None def stop(self) -> None: """Stop transmission and release the remote SDR, then close connections.""" logger.info("Sending stop to remote Tx") try: self._send({"function_name": "stop"}) except Exception as exc: logger.warning("stop command error (may be normal if connection closed): %s", exc) finally: self.close()