Compare commits

..

2 Commits

Author SHA1 Message Date
ben
22b035dbee format fixes
Some checks failed
Build Sphinx Docs Set / Build Docs (pull_request) Has been cancelled
Test with tox / Test with tox (3.10) (pull_request) Has been cancelled
Test with tox / Test with tox (3.11) (pull_request) Has been cancelled
Test with tox / Test with tox (3.12) (pull_request) Has been cancelled
Build Project / Build Project (3.12) (pull_request) Has been cancelled
Build Project / Build Project (3.11) (pull_request) Has been cancelled
Build Project / Build Project (3.10) (pull_request) Has been cancelled
2026-04-20 13:51:15 -04:00
ben
8e23558d90 Fix flake8 lint errors and regenerate poetry.lock
- Add TYPE_CHECKING guard for paramiko/zmq annotations in remote_transmitter_controller.py
- Remove unused imports (sys, threading, importlib, call) from remote_control tests
- Remove unused mock_ctrl_kwarg variable
- Add noqa C901 to _handle_tx_start (legitimately complex interlock logic)
- Regenerate poetry.lock to sync with pyproject.toml

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-20 13:50:59 -04:00
25 changed files with 184 additions and 192 deletions

4
poetry.lock generated
View File

@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 2.3.4 and should not be changed by hand.
# This file is automatically @generated by Poetry 2.1.2 and should not be changed by hand.
[[package]]
name = "alabaster"
@ -1096,7 +1096,7 @@ files = [
[package.dependencies]
attrs = ">=22.2.0"
jsonschema-specifications = ">=2023.3.6"
jsonschema-specifications = ">=2023.03.6"
referencing = ">=0.28.4"
rpds-py = ">=0.25.0"

View File

@ -66,8 +66,9 @@ class LoggingFakeWs:
pass
def _make_iq_frame(buffer_size: int, tone_hz: float, sample_rate: float,
phase_offset: float = 0.0) -> tuple[bytes, float]:
def _make_iq_frame(
buffer_size: int, tone_hz: float, sample_rate: float, phase_offset: float = 0.0
) -> tuple[bytes, float]:
"""Return ``(interleaved_float32_bytes, next_phase)`` for a sine tone.
Emitting one continuous phase-coherent tone requires threading the phase
@ -93,7 +94,9 @@ def _make_pluto_factory(identifier: str | None):
if device != "pluto":
raise ValueError(f"this script only drives pluto; got device={device!r}")
from ria_toolkit_oss.sdr.pluto import Pluto
return Pluto(identifier=identifier)
return factory
@ -130,13 +133,14 @@ async def _run(args: argparse.Namespace) -> int:
# Abort if tx_start was rejected by an interlock (no session → nothing to do).
if streamer._tx is None:
print("tx_start rejected — see [tx_status] line above for the reason.",
file=sys.stderr)
print("tx_start rejected — see [tx_status] line above for the reason.", file=sys.stderr)
return 2
print(f"Transmitting at {args.frequency/1e6:.3f} MHz with "
f"{args.tone/1e3:.1f} kHz baseband tone at gain {args.gain} dB. "
f"{'Running for ' + str(args.duration) + 's' if args.duration > 0 else 'Run until Ctrl-C'}.")
print(
f"Transmitting at {args.frequency/1e6:.3f} MHz with "
f"{args.tone/1e3:.1f} kHz baseband tone at gain {args.gain} dB. "
f"{'Running for ' + str(args.duration) + 's' if args.duration > 0 else 'Run until Ctrl-C'}."
)
# Arrange a clean shutdown on Ctrl-C.
stop = asyncio.Event()
@ -157,12 +161,11 @@ async def _run(args: argparse.Namespace) -> int:
# topped up. The queue's own backpressure keeps us from spinning.
produce_interval = buffer_dt * 0.5
try:
async def producer():
nonlocal phase
while not stop.is_set():
frame, phase = _make_iq_frame(
args.buffer_size, args.tone, args.sample_rate, phase
)
frame, phase = _make_iq_frame(args.buffer_size, args.tone, args.sample_rate, phase)
await streamer.on_binary(frame)
await asyncio.sleep(produce_interval)
@ -193,20 +196,17 @@ def main() -> int:
p = argparse.ArgumentParser(
description="End-to-end TX smoke test: agent → Pluto continuous tone.",
)
p.add_argument("--identifier", default=None,
help="Pluto IP/hostname (default: auto-discover pluto.local)")
p.add_argument("--frequency", type=float, default=3_410_000_000.0,
help="TX LO in Hz (default 2.45 GHz)")
p.add_argument("--gain", type=float, default=-0.0,
help="TX gain in dB; Pluto range [-89, 0] (default -30)")
p.add_argument("--sample-rate", type=float, default=1_000_000.0,
help="Baseband sample rate (default 1 Msps)")
p.add_argument("--tone", type=float, default=100_000.0,
help="Baseband tone offset in Hz; 0 = DC (default 100 kHz)")
p.add_argument("--buffer-size", type=int, default=4096,
help="Complex samples per frame (default 4096)")
p.add_argument("--duration", type=float, default=60.0,
help="Seconds to transmit; 0 = run until Ctrl-C (default 30)")
p.add_argument("--identifier", default=None, help="Pluto IP/hostname (default: auto-discover pluto.local)")
p.add_argument("--frequency", type=float, default=3_410_000_000.0, help="TX LO in Hz (default 2.45 GHz)")
p.add_argument("--gain", type=float, default=-0.0, help="TX gain in dB; Pluto range [-89, 0] (default -30)")
p.add_argument("--sample-rate", type=float, default=1_000_000.0, help="Baseband sample rate (default 1 Msps)")
p.add_argument(
"--tone", type=float, default=100_000.0, help="Baseband tone offset in Hz; 0 = DC (default 100 kHz)"
)
p.add_argument("--buffer-size", type=int, default=4096, help="Complex samples per frame (default 4096)")
p.add_argument(
"--duration", type=float, default=60.0, help="Seconds to transmit; 0 = run until Ctrl-C (default 30)"
)
p.add_argument("--log-level", default="INFO")
args = p.parse_args()

View File

@ -41,8 +41,7 @@ from ria_toolkit_oss.agent.streamer import Streamer
from ria_toolkit_oss.agent.ws_client import WsClient
def _make_iq_frame(buffer_size: int, tone_hz: float, sample_rate: float,
phase_offset: float) -> tuple[bytes, float]:
def _make_iq_frame(buffer_size: int, tone_hz: float, sample_rate: float, phase_offset: float) -> tuple[bytes, float]:
n = np.arange(buffer_size, dtype=np.float64)
phase = 2.0 * np.pi * tone_hz / sample_rate * n + phase_offset
amp = 0.7
@ -59,7 +58,9 @@ def _make_pluto_factory(identifier: str | None):
if device != "pluto":
raise ValueError(f"this script only drives pluto; got device={device!r}")
from ria_toolkit_oss.sdr.pluto import Pluto
return Pluto(identifier=identifier)
return factory
@ -73,27 +74,29 @@ async def _mock_hub_handler(ws, args, stop: asyncio.Event):
payload = json.loads(first)
if payload.get("type") == "heartbeat":
caps = payload.get("capabilities")
print(f"[mock-hub] agent heartbeat: capabilities={caps} "
f"tx_enabled={payload.get('tx_enabled')}")
print(f"[mock-hub] agent heartbeat: capabilities={caps} " f"tx_enabled={payload.get('tx_enabled')}")
except asyncio.TimeoutError:
print("[mock-hub] warning: no heartbeat received in first 2s")
# Arm the agent's TX path.
await ws.send(json.dumps({
"type": "tx_start",
"app_id": "ws-smoke",
"radio_config": {
"device": "pluto",
"identifier": args.identifier,
"tx_sample_rate": int(args.sample_rate),
"tx_center_frequency": int(args.frequency),
"tx_gain": int(args.gain),
"buffer_size": int(args.buffer_size),
"underrun_policy": "repeat",
},
}))
print(f"[mock-hub] sent tx_start at {args.frequency/1e6:.3f} MHz, "
f"gain={args.gain} dB")
await ws.send(
json.dumps(
{
"type": "tx_start",
"app_id": "ws-smoke",
"radio_config": {
"device": "pluto",
"identifier": args.identifier,
"tx_sample_rate": int(args.sample_rate),
"tx_center_frequency": int(args.frequency),
"tx_gain": int(args.gain),
"buffer_size": int(args.buffer_size),
"underrun_policy": "repeat",
},
}
)
)
print(f"[mock-hub] sent tx_start at {args.frequency/1e6:.3f} MHz, " f"gain={args.gain} dB")
# Producer: push IQ frames at a steady clip. Use a concurrent receiver so
# tx_status frames show up in real time rather than being queued behind
@ -112,15 +115,11 @@ async def _mock_hub_handler(ws, args, stop: asyncio.Event):
recv_task = asyncio.create_task(receiver())
try:
deadline = None if args.duration <= 0 else (
asyncio.get_event_loop().time() + args.duration
)
deadline = None if args.duration <= 0 else (asyncio.get_event_loop().time() + args.duration)
while not stop.is_set():
if deadline is not None and asyncio.get_event_loop().time() >= deadline:
break
frame, phase = _make_iq_frame(
args.buffer_size, args.tone, args.sample_rate, phase
)
frame, phase = _make_iq_frame(args.buffer_size, args.tone, args.sample_rate, phase)
try:
await ws.send(frame)
except websockets.ConnectionClosed:
@ -204,20 +203,15 @@ def main() -> int:
p = argparse.ArgumentParser(
description="Full-stack TX smoke: localhost mock-hub → WS → agent → Pluto.",
)
p.add_argument("--identifier", default=None,
help="Pluto IP/hostname (default: auto-discover pluto.local)")
p.add_argument("--frequency", type=float, default=2_450_000_000.0,
help="TX LO in Hz (default 2.45 GHz)")
p.add_argument("--gain", type=float, default=0.0,
help="TX gain in dB; Pluto range [-89, 0] (default 0)")
p.add_argument("--sample-rate", type=float, default=1_000_000.0,
help="Baseband sample rate (default 1 Msps)")
p.add_argument("--tone", type=float, default=100_000.0,
help="Baseband tone offset in Hz (default 100 kHz)")
p.add_argument("--buffer-size", type=int, default=4096,
help="Complex samples per frame (default 4096)")
p.add_argument("--duration", type=float, default=30.0,
help="Seconds to transmit; 0 = run until Ctrl-C (default 30)")
p.add_argument("--identifier", default=None, help="Pluto IP/hostname (default: auto-discover pluto.local)")
p.add_argument("--frequency", type=float, default=2_450_000_000.0, help="TX LO in Hz (default 2.45 GHz)")
p.add_argument("--gain", type=float, default=0.0, help="TX gain in dB; Pluto range [-89, 0] (default 0)")
p.add_argument("--sample-rate", type=float, default=1_000_000.0, help="Baseband sample rate (default 1 Msps)")
p.add_argument("--tone", type=float, default=100_000.0, help="Baseband tone offset in Hz (default 100 kHz)")
p.add_argument("--buffer-size", type=int, default=4096, help="Complex samples per frame (default 4096)")
p.add_argument(
"--duration", type=float, default=30.0, help="Seconds to transmit; 0 = run until Ctrl-C (default 30)"
)
p.add_argument("--log-level", default="INFO")
args = p.parse_args()

View File

@ -118,9 +118,9 @@ def _derive_ws_url(hub_url: str, agent_id: str) -> str:
return ""
base = hub_url.rstrip("/")
if base.startswith("https://"):
base = "wss://" + base[len("https://"):]
base = "wss://" + base[len("https://") :]
elif base.startswith("http://"):
base = "ws://" + base[len("http://"):]
base = "ws://" + base[len("http://") :]
suffix = f"/screens/agent/ws?agent_id={agent_id}" if agent_id else "/screens/agent/ws"
return base + suffix

View File

@ -22,6 +22,7 @@ import os
from dataclasses import asdict, dataclass, field
from pathlib import Path
def _resolve_default_path() -> Path:
return Path(os.environ.get("RIA_AGENT_CONFIG", str(Path.home() / ".ria" / "agent.json")))

View File

@ -46,9 +46,7 @@ def heartbeat_payload(
if c.tx_max_duration_s is not None:
payload["tx_max_duration_s"] = float(c.tx_max_duration_s)
if c.tx_allowed_freq_ranges:
payload["tx_allowed_freq_ranges"] = [
[float(lo), float(hi)] for lo, hi in c.tx_allowed_freq_ranges
]
payload["tx_allowed_freq_ranges"] = [[float(lo), float(hi)] for lo, hi in c.tx_allowed_freq_ranges]
if app_id:
payload["app_id"] = app_id
if sessions:

View File

@ -270,9 +270,7 @@ class Streamer:
)
self._rx = session
await self._send_status("streaming", app_id)
session.task = asyncio.create_task(
self._capture_loop(session), name="ria-streamer-capture"
)
session.task = asyncio.create_task(self._capture_loop(session), name="ria-streamer-capture")
async def _handle_rx_stop(self, msg: dict) -> None:
session = self._rx
@ -310,9 +308,7 @@ class Streamer:
logger.warning("Applying configure failed: %s", exc)
try:
samples = await loop.run_in_executor(
None, session.sdr.rx, session.buffer_size
)
samples = await loop.run_in_executor(None, session.sdr.rx, session.buffer_size)
except Exception as exc:
from ria_toolkit_oss.sdr import SdrDisconnectedError
@ -342,7 +338,7 @@ class Streamer:
# ==================================================================
# TX
async def _handle_tx_start(self, msg: dict) -> None:
async def _handle_tx_start(self, msg: dict) -> None: # noqa: C901
app_id = msg.get("app_id") or ""
radio_config = dict(msg.get("radio_config") or {})
@ -383,9 +379,7 @@ class Streamer:
buffer_size = int(radio_config.pop("buffer_size", _DEFAULT_BUFFER_SIZE))
underrun_policy = str(radio_config.pop("underrun_policy", "pause"))
if underrun_policy not in ("pause", "zero", "repeat"):
await self._send_tx_status(
app_id, "error", f"invalid underrun_policy {underrun_policy!r}"
)
await self._send_tx_status(app_id, "error", f"invalid underrun_policy {underrun_policy!r}")
return
if not device:
await self._send_tx_status(app_id, "error", "tx_start missing radio_config.device")
@ -404,15 +398,10 @@ class Streamer:
# manifest bug and we want it surfaced immediately, not papered
# over with stale radio state.
if hasattr(sdr, "init_tx"):
init_args = {
k: radio_config.get(f"tx_{k}")
for k in ("sample_rate", "center_frequency", "gain")
}
init_args = {k: radio_config.get(f"tx_{k}") for k in ("sample_rate", "center_frequency", "gain")}
missing = [f"tx_{k}" for k, v in init_args.items() if v is None]
if missing:
raise ValueError(
f"tx_start missing required radio_config keys: {missing}"
)
raise ValueError(f"tx_start missing required radio_config keys: {missing}")
sdr.init_tx(
sample_rate=init_args["sample_rate"],
center_frequency=init_args["center_frequency"],
@ -498,9 +487,8 @@ class Streamer:
return _silence(n)
# Max-duration watchdog.
if (
session.max_duration_s is not None
and (time.monotonic() - session.started_at) >= float(session.max_duration_s)
if session.max_duration_s is not None and (time.monotonic() - session.started_at) >= float(
session.max_duration_s
):
session.stop_event.set()
try:
@ -528,7 +516,7 @@ class Streamer:
if arr.size < 2 or arr.size % 2 != 0:
logger.warning("Malformed TX frame: %d floats (must be non-zero even count)", arr.size)
return self._underrun_fill(session, n)
samples = (arr[0::2].astype(np.complex64) + 1j * arr[1::2].astype(np.complex64))
samples = arr[0::2].astype(np.complex64) + 1j * arr[1::2].astype(np.complex64)
if samples.size < n:
out = np.zeros(n, dtype=np.complex64)
out[: samples.size] = samples
@ -747,6 +735,7 @@ def _default_sdr_factory(device: str, identifier: str | None):
# ---------------------------------------------------------------------------
# Top-level entry
async def run_streamer(ws_url: str, token: str, *, cfg: AgentConfig | None = None) -> None:
"""Connect to *ws_url* and run the streamer loop until cancelled."""
ws = WsClient(ws_url, token)

View File

@ -37,7 +37,7 @@ def _engine(cfg: _config.AppConfig, sudo_override: bool = False) -> list[str]:
for exe in ("docker", "podman"):
if shutil.which(exe):
use_sudo = sudo_override or cfg.sudo
return (["sudo", exe] if use_sudo else [exe])
return ["sudo", exe] if use_sudo else [exe]
print("error: neither 'docker' nor 'podman' found on PATH", file=sys.stderr)
sys.exit(2)
@ -96,7 +96,9 @@ def _hardware_flags(labels: dict, no_gpu: bool, no_usb: bool, no_host_net: bool)
if _gpu_available():
flags += ["--gpus", "all"]
else:
notes.append("image wants GPU but no NVIDIA runtime detected — skipping --gpus (use --force-gpu to override)")
notes.append(
"image wants GPU but no NVIDIA runtime detected — skipping --gpus (use --force-gpu to override)"
)
if hw_items & {"pluto", "rtlsdr", "hackrf", "bladerf"} and not no_usb:
flags += ["--device", "/dev/bus/usb"]

View File

@ -40,15 +40,19 @@ class RemoteTransmitter:
try:
if radio_str in ("pluto", "plutosdr"):
from ria_toolkit_oss.sdr.pluto import Pluto
self._sdr = Pluto(identifier)
elif radio_str in ("usrp",):
from ria_toolkit_oss.sdr.usrp import USRP
self._sdr = USRP(identifier)
elif radio_str in ("hackrf", "hackrf_one"):
from ria_toolkit_oss.sdr.hackrf import HackRF
self._sdr = HackRF(identifier)
elif radio_str in ("bladerf", "blade"):
from ria_toolkit_oss.sdr.blade import Blade
self._sdr = Blade(identifier)
else:
raise ValueError(f"Unknown SDR type: {radio_str!r}")
@ -77,6 +81,7 @@ class RemoteTransmitter:
if self._sdr is None:
raise RuntimeError("Call set_radio() and init_tx() before transmit()")
import time
# Transmit in a loop until duration has elapsed
end = time.monotonic() + duration_s
while time.monotonic() < end:

View File

@ -13,6 +13,11 @@ import json
import logging
import threading
import time
from typing import TYPE_CHECKING
if TYPE_CHECKING:
import paramiko
import zmq
logger = logging.getLogger(__name__)
@ -158,16 +163,21 @@ class RemoteTransmitterController:
"""
logger.info(
"init_tx: fc=%.3f MHz, fs=%.3f MHz, gain=%.1f dB, ch=%d",
center_frequency / 1e6, sample_rate / 1e6, gain, channel,
center_frequency / 1e6,
sample_rate / 1e6,
gain,
channel,
)
self._send(
{
"function_name": "init_tx",
"center_frequency": center_frequency,
"sample_rate": sample_rate,
"gain": gain,
"channel": channel,
"gain_mode": gain_mode,
}
)
self._send({
"function_name": "init_tx",
"center_frequency": center_frequency,
"sample_rate": sample_rate,
"gain": gain,
"channel": channel,
"gain_mode": gain_mode,
})
def transmit_async(self, duration_s: float) -> None:
"""Start a timed CW transmission in a background thread.

View File

@ -15,8 +15,13 @@ __all__ = [
]
from .mock import MockSDR
from .sdr import SDR, SDRError, SdrDisconnectedError, SDRParameterError, translate_disconnect # noqa: F401
from .sdr import ( # noqa: F401
SDR,
SdrDisconnectedError,
SDRError,
SDRParameterError,
translate_disconnect,
)
_DRIVER_CANDIDATES: tuple[tuple[str, str, str], ...] = (
("mock", "ria_toolkit_oss.sdr.mock", "MockSDR"),

View File

@ -8,7 +8,12 @@ import adi
import numpy as np
from ria_toolkit_oss.datatypes.recording import Recording
from ria_toolkit_oss.sdr.sdr import SDR, SDRError, SDRParameterError, translate_disconnect
from ria_toolkit_oss.sdr.sdr import (
SDR,
SDRError,
SDRParameterError,
translate_disconnect,
)
class Pluto(SDR):

View File

@ -583,7 +583,7 @@ _DISCONNECT_MARKERS = (
"i/o error",
"input/output error",
"errno 19", # ENODEV
"errno 5", # EIO
"errno 5", # EIO
)

View File

@ -26,9 +26,11 @@ class _FakeResp:
def _run_register(argv: list[str], cfg_path) -> int:
fake_resp = _FakeResp({"agent_id": "agent-1", "token": "tok-abc"})
with patch.dict("os.environ", {"RIA_AGENT_CONFIG": str(cfg_path)}, clear=False), \
patch("urllib.request.urlopen", return_value=fake_resp), \
patch.object(sys, "argv", ["ria-agent", *argv]):
with (
patch.dict("os.environ", {"RIA_AGENT_CONFIG": str(cfg_path)}, clear=False),
patch("urllib.request.urlopen", return_value=fake_resp),
patch.object(sys, "argv", ["ria-agent", *argv]),
):
try:
agent_cli.main()
except SystemExit as exc:
@ -96,9 +98,11 @@ def test_stream_allow_tx_does_not_persist(tmp_path):
captured["cfg"] = cfg
return None
with patch.dict("os.environ", {"RIA_AGENT_CONFIG": str(cfg_path)}, clear=False), \
patch("ria_toolkit_oss.agent.streamer.run_streamer", new=_fake_run_streamer), \
patch.object(sys, "argv", ["ria-agent", "stream", "--allow-tx"]):
with (
patch.dict("os.environ", {"RIA_AGENT_CONFIG": str(cfg_path)}, clear=False),
patch("ria_toolkit_oss.agent.streamer.run_streamer", new=_fake_run_streamer),
patch.object(sys, "argv", ["ria-agent", "stream", "--allow-tx"]),
):
try:
agent_cli.main()
except SystemExit:

View File

@ -70,9 +70,7 @@ def test_server_start_stream_stop_cycle_over_real_ws():
reconnect_pause=0.05,
)
streamer = Streamer(ws=client, sdr_factory=lambda d, i: MockSDR(buffer_size=32, seed=0))
task = asyncio.create_task(
client.run(on_message=streamer.on_message, heartbeat=streamer.build_heartbeat)
)
task = asyncio.create_task(client.run(on_message=streamer.on_message, heartbeat=streamer.build_heartbeat))
await asyncio.wait_for(ready.wait(), timeout=3.0)
await asyncio.wait_for(stopped.wait(), timeout=3.0)
client.stop()

View File

@ -77,10 +77,7 @@ def test_server_tx_start_binary_stop_cycle_over_real_ws():
msg = await asyncio.wait_for(ws.recv(), timeout=2.0)
if isinstance(msg, str):
control_frames.append(json.loads(msg))
if any(
f.get("type") == "tx_status" and f.get("state") == "transmitting"
for f in control_frames
):
if any(f.get("type") == "tx_status" and f.get("state") == "transmitting" for f in control_frames):
break
await ws.send(json.dumps({"type": "tx_stop", "app_id": "tx-app"}))

View File

@ -30,7 +30,6 @@ from ria_toolkit_oss.agent.config import AgentConfig
from ria_toolkit_oss.agent.streamer import Streamer
from ria_toolkit_oss.sdr.mock import MockSDR
_STRESS_S = float(os.environ.get("RIA_LOCK_STRESS_S", "2.0"))
@ -156,18 +155,21 @@ def test_full_duplex_stays_healthy_over_stress_window():
s = Streamer(ws=ws, sdr_factory=lambda d, i: sdr, cfg=AgentConfig(tx_enabled=True))
await s.on_message(
{"type": "start", "app_id": "app-1",
"radio_config": {"device": "mock", "buffer_size": BUF}}
{"type": "start", "app_id": "app-1", "radio_config": {"device": "mock", "buffer_size": BUF}}
)
await s.on_message(
{"type": "tx_start", "app_id": "app-1",
"radio_config": {
"device": "mock", "buffer_size": BUF,
"tx_sample_rate": 1_000_000,
"tx_center_frequency": 2.45e9,
"tx_gain": -20,
"underrun_policy": "zero",
}}
{
"type": "tx_start",
"app_id": "app-1",
"radio_config": {
"device": "mock",
"buffer_size": BUF,
"tx_sample_rate": 1_000_000,
"tx_center_frequency": 2.45e9,
"tx_gain": -20,
"underrun_policy": "zero",
},
}
)
marker = np.arange(BUF, dtype=np.complex64) + 1
@ -180,12 +182,10 @@ def test_full_duplex_stays_healthy_over_stress_window():
# which routes through the same setters the stress test above
# verifies.
await s.on_message(
{"type": "tx_configure", "app_id": "app-1",
"radio_config": {"tx_sample_rate": 1_000_000 + i}}
{"type": "tx_configure", "app_id": "app-1", "radio_config": {"tx_sample_rate": 1_000_000 + i}}
)
await s.on_message(
{"type": "configure", "app_id": "app-1",
"radio_config": {"sample_rate": 2_000_000 + i}}
{"type": "configure", "app_id": "app-1", "radio_config": {"sample_rate": 2_000_000 + i}}
)
i += 1
await asyncio.sleep(0.005)
@ -197,8 +197,7 @@ def test_full_duplex_stays_healthy_over_stress_window():
ws, s = asyncio.run(scenario())
# No error frame leaked out.
errors = [m for m in ws.json_sent
if m.get("type") in ("error", "tx_status") and m.get("state") == "error"]
errors = [m for m in ws.json_sent if m.get("type") in ("error", "tx_status") and m.get("state") == "error"]
assert errors == [], f"Unexpected error frames: {errors}"
# RX produced IQ frames and TX's callback ran — heartbeat-level contention
# check: both setter paths were hit at least once during configure dispatch.

View File

@ -121,9 +121,7 @@ def test_start_without_device_emits_error():
def test_configure_queues_update():
async def scenario():
streamer = Streamer(ws=FakeWs(), sdr_factory=_factory)
await streamer.on_message(
{"type": "configure", "app_id": "x", "radio_config": {"center_frequency": 915e6}}
)
await streamer.on_message({"type": "configure", "app_id": "x", "radio_config": {"center_frequency": 915e6}})
# Before start(), pending config lives on the standalone dict exposed via the _pending_config shim.
return streamer._pending_config

View File

@ -143,10 +143,7 @@ def test_rejects_duplicate_tx_session():
return ws
ws = asyncio.run(scenario())
errors = [
m for m in ws.json_sent
if m.get("type") == "tx_status" and m.get("state") == "error"
]
errors = [m for m in ws.json_sent if m.get("type") == "tx_status" and m.get("state") == "error"]
assert any("already active" in e.get("message", "") for e in errors)

View File

@ -70,10 +70,7 @@ def test_underrun_pause_stops_session_and_emits_status():
# Do not push any buffers. The callback underruns on first tick and
# the watchdog should emit "underrun" and tear down.
for _ in range(100):
if any(
m.get("type") == "tx_status" and m.get("state") == "underrun"
for m in ws.json_sent
):
if any(m.get("type") == "tx_status" and m.get("state") == "underrun" for m in ws.json_sent):
break
await asyncio.sleep(0.01)
for _ in range(50):
@ -103,9 +100,7 @@ def test_underrun_zero_keeps_session_alive():
ws, still_alive = asyncio.run(scenario())
# No underrun status emitted (policy absorbs it silently).
assert not any(
m.get("type") == "tx_status" and m.get("state") == "underrun" for m in ws.json_sent
)
assert not any(m.get("type") == "tx_status" and m.get("state") == "underrun" for m in ws.json_sent)
assert still_alive
# All produced buffers are zero (no real data was pushed).
assert sdr.tx_produced, "expected at least one TX callback invocation"
@ -129,9 +124,7 @@ def test_underrun_repeat_replays_last_buffer():
ws, sdr = asyncio.run(scenario())
# No underrun status emitted.
assert not any(
m.get("type") == "tx_status" and m.get("state") == "underrun" for m in ws.json_sent
)
assert not any(m.get("type") == "tx_status" and m.get("state") == "underrun" for m in ws.json_sent)
# At least two buffers equal to the marker — the real one and ≥1 repeat.
matching = [b for b in sdr.tx_produced if np.array_equal(b, marker)]
assert len(matching) >= 2, f"expected ≥2 buffers matching marker, got {len(matching)}"

View File

@ -142,9 +142,7 @@ def test_malformed_control_frame_does_not_crash():
async def on_msg(m):
handled.append(m)
task = asyncio.create_task(
client.run(on_message=on_msg, heartbeat=lambda: {"type": "heartbeat"})
)
task = asyncio.create_task(client.run(on_message=on_msg, heartbeat=lambda: {"type": "heartbeat"}))
for _ in range(50):
if handled:
break

View File

@ -102,9 +102,7 @@ def test_binary_frame_dropped_when_no_handler():
async def on_msg(m):
messages.append(m)
task = asyncio.create_task(
client.run(on_message=on_msg, heartbeat=lambda: {"type": "heartbeat"})
)
task = asyncio.create_task(client.run(on_message=on_msg, heartbeat=lambda: {"type": "heartbeat"}))
for _ in range(50):
if messages:
break

View File

@ -12,7 +12,6 @@ import pytest
from ria_toolkit_oss.remote_control.remote_transmitter import RemoteTransmitter
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
@ -241,34 +240,40 @@ class TestRunFunction:
def test_init_tx_without_radio_returns_failure(self):
tx = RemoteTransmitter()
resp = tx.run_function({
"function_name": "init_tx",
"center_frequency": 2.4e9,
"sample_rate": 20e6,
"gain": 0,
})
resp = tx.run_function(
{
"function_name": "init_tx",
"center_frequency": 2.4e9,
"sample_rate": 20e6,
"gain": 0,
}
)
assert resp["status"] is False
assert resp["error_message"]
def test_init_tx_with_radio_success(self):
tx = self._tx_with_mock_sdr()
resp = tx.run_function({
"function_name": "init_tx",
"center_frequency": 2.4e9,
"sample_rate": 20e6,
"gain": 30,
})
resp = tx.run_function(
{
"function_name": "init_tx",
"center_frequency": 2.4e9,
"sample_rate": 20e6,
"gain": 30,
}
)
assert resp["status"] is True
def test_transmit_runs_for_short_duration(self):
tx = self._tx_with_mock_sdr()
tx._sdr.init_tx = MagicMock()
resp = tx.run_function({
"function_name": "init_tx",
"center_frequency": 2.4e9,
"sample_rate": 20e6,
"gain": 0,
})
resp = tx.run_function(
{
"function_name": "init_tx",
"center_frequency": 2.4e9,
"sample_rate": 20e6,
"gain": 0,
}
)
resp = tx.run_function({"function_name": "transmit", "duration_s": 0.02})
assert resp["status"] is True

View File

@ -7,8 +7,6 @@ sys.modules so they run regardless of whether the packages are installed.
from __future__ import annotations
import json
import sys
import threading
import time
from types import ModuleType
from unittest.mock import MagicMock, patch
@ -199,15 +197,11 @@ class TestErrorHandling:
def test_missing_paramiko_raises_runtime_error(self):
"""If paramiko is absent, connecting gives a clear RuntimeError."""
import importlib
import ria_toolkit_oss.remote_control.remote_transmitter_controller as mod
with patch.dict("sys.modules", {"paramiko": None}):
with pytest.raises((RuntimeError, ImportError)):
mod.RemoteTransmitterController(
host="h", ssh_user="u", ssh_key_path="/k"
)
mod.RemoteTransmitterController(host="h", ssh_user="u", ssh_key_path="/k")
# ---------------------------------------------------------------------------

View File

@ -2,7 +2,7 @@
from __future__ import annotations
from unittest.mock import MagicMock, call, patch
from unittest.mock import MagicMock, patch
import pytest
@ -12,7 +12,6 @@ from ria_toolkit_oss.orchestration.campaign import (
TransmitterConfig,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
@ -179,9 +178,7 @@ class TestInitRemoteTxControllers:
}
]
executor = _make_executor(d)
with patch(
"ria_toolkit_oss.remote_control.RemoteTransmitterController"
) as mock_cls:
with patch("ria_toolkit_oss.remote_control.RemoteTransmitterController") as mock_cls:
executor._init_remote_tx_controllers()
mock_cls.assert_not_called()
assert executor._remote_tx_controllers == {}
@ -264,7 +261,7 @@ class TestStartTransmitterSdrRemote:
tx = executor.config.transmitters[0]
step = CaptureStep(duration=5.0, label="nochan")
executor._start_transmitter(tx, step)
_, kwargs = mock_ctrl_kwarg = ctrl.init_tx.call_args
_, kwargs = ctrl.init_tx.call_args
assert kwargs["channel"] == 0
def test_missing_controller_raises(self):
@ -381,7 +378,11 @@ class TestRunWithSdrRemote:
),
patch.object(executor, "_close_sdr"),
patch.object(executor, "_close_remote_tx_controllers"),
patch.object(executor, "_execute_step", return_value=MagicMock(error=None, qa=MagicMock(flagged=False, snr_db=20.0, duration_s=10.0))),
patch.object(
executor,
"_execute_step",
return_value=MagicMock(error=None, qa=MagicMock(flagged=False, snr_db=20.0, duration_s=10.0)),
),
):
executor.run()
@ -401,6 +402,7 @@ class TestTransmitBufferAndTimeout:
def _executor_with_ctrl(self):
from ria_toolkit_oss.orchestration.executor import CampaignExecutor
cfg = CampaignConfig.from_dict(_FULL_CAMPAIGN_DICT)
executor = CampaignExecutor(cfg)
ctrl = MagicMock()