"""Server-side ZMQ RPC receiver for SDR transmission. Run this script on the Tx machine. The script binds a ZMQ REP socket and waits for JSON-RPC commands from a :class:`RemoteTransmitterController`. Requires: zmq, and ria-toolkit or utils installed for SDR support. """ from __future__ import annotations import argparse import io import json import logging from contextlib import redirect_stderr, redirect_stdout import zmq logger = logging.getLogger(__name__) class RemoteTransmitter: """Executes SDR Tx commands received over ZMQ. Loads the appropriate SDR driver dynamically so the script can run on machines that have only a subset of SDR libraries installed. """ def __init__(self) -> None: self._sdr = None def set_radio(self, radio_str: str, identifier: str = "") -> None: """Initialise the SDR radio. Args: radio_str: SDR type — pluto | usrp | hackrf | bladerf. identifier: Device-specific identifier (IP, serial, etc.). """ radio_str = radio_str.lower() try: if radio_str in ("pluto", "plutosdr"): from ria_toolkit_oss.sdr.pluto import Pluto self._sdr = Pluto(identifier) elif radio_str in ("usrp",): from ria_toolkit_oss.sdr.usrp import USRP self._sdr = USRP(identifier) elif radio_str in ("hackrf", "hackrf_one"): from ria_toolkit_oss.sdr.hackrf import HackRF self._sdr = HackRF(identifier) elif radio_str in ("bladerf", "blade"): from ria_toolkit_oss.sdr.blade import Blade self._sdr = Blade(identifier) else: raise ValueError(f"Unknown SDR type: {radio_str!r}") except ImportError as exc: raise RuntimeError(f"SDR driver for '{radio_str}' is not installed: {exc}") from exc def init_tx( self, center_frequency: float, sample_rate: float, gain: float, channel: int = 0, gain_mode: str = "absolute", ) -> None: if self._sdr is None: raise RuntimeError("Call set_radio() before init_tx()") self._sdr.init_tx( center_frequency=center_frequency, sample_rate=sample_rate, gain=gain, channel=channel, ) def transmit(self, duration_s: float) -> None: """Transmit a continuous wave for ``duration_s`` seconds.""" if self._sdr is None: raise RuntimeError("Call set_radio() and init_tx() before transmit()") import time # Transmit in a loop until duration has elapsed end = time.monotonic() + duration_s while time.monotonic() < end: try: self._sdr.tx_cw() except AttributeError: time.sleep(0.01) def stop(self) -> None: """Stop transmission and close the SDR.""" if self._sdr is not None: try: self._sdr.close() except Exception: pass self._sdr = None def run_function(self, command_dict: dict) -> dict: """Dispatch a JSON-RPC command and return a response dict.""" out_buf = io.StringIO() err_buf = io.StringIO() fn = command_dict.get("function_name", "") try: with redirect_stdout(out_buf), redirect_stderr(err_buf): if fn == "set_radio": self.set_radio( radio_str=command_dict["radio_str"], identifier=command_dict.get("identifier", ""), ) elif fn == "init_tx": self.init_tx( center_frequency=command_dict["center_frequency"], sample_rate=command_dict["sample_rate"], gain=command_dict["gain"], channel=command_dict.get("channel", 0), gain_mode=command_dict.get("gain_mode", "absolute"), ) elif fn == "transmit": self.transmit(duration_s=command_dict.get("duration_s", 1.0)) elif fn == "stop": self.stop() else: raise ValueError(f"Unknown function: {fn!r}") return {"status": True, "message": out_buf.getvalue(), "error_message": err_buf.getvalue()} except Exception as exc: logger.exception("Error executing %s", fn) return {"status": False, "message": out_buf.getvalue(), "error_message": str(exc)} def _serve(port: int) -> None: context = zmq.Context() socket = context.socket(zmq.REP) socket.bind(f"tcp://*:{port}") logger.info("RemoteTransmitter listening on port %d", port) tx = RemoteTransmitter() while True: raw = socket.recv() cmd = json.loads(raw.decode()) response = tx.run_function(cmd) socket.send(json.dumps(response).encode()) if __name__ == "__main__": logging.basicConfig(level=logging.INFO) parser = argparse.ArgumentParser(description="SDR Tx ZMQ server") parser.add_argument("--port", type=int, default=5556) args = parser.parse_args() _serve(args.port)