diff --git a/scripts/pluto_tx_smoke.py b/scripts/pluto_tx_smoke.py index 64adbb9..97913ec 100755 --- a/scripts/pluto_tx_smoke.py +++ b/scripts/pluto_tx_smoke.py @@ -66,8 +66,9 @@ class LoggingFakeWs: pass -def _make_iq_frame(buffer_size: int, tone_hz: float, sample_rate: float, - phase_offset: float = 0.0) -> tuple[bytes, float]: +def _make_iq_frame( + buffer_size: int, tone_hz: float, sample_rate: float, phase_offset: float = 0.0 +) -> tuple[bytes, float]: """Return ``(interleaved_float32_bytes, next_phase)`` for a sine tone. Emitting one continuous phase-coherent tone requires threading the phase @@ -93,7 +94,9 @@ def _make_pluto_factory(identifier: str | None): if device != "pluto": raise ValueError(f"this script only drives pluto; got device={device!r}") from ria_toolkit_oss.sdr.pluto import Pluto + return Pluto(identifier=identifier) + return factory @@ -130,13 +133,14 @@ async def _run(args: argparse.Namespace) -> int: # Abort if tx_start was rejected by an interlock (no session → nothing to do). if streamer._tx is None: - print("tx_start rejected — see [tx_status] line above for the reason.", - file=sys.stderr) + print("tx_start rejected — see [tx_status] line above for the reason.", file=sys.stderr) return 2 - print(f"Transmitting at {args.frequency/1e6:.3f} MHz with " - f"{args.tone/1e3:.1f} kHz baseband tone at gain {args.gain} dB. " - f"{'Running for ' + str(args.duration) + 's' if args.duration > 0 else 'Run until Ctrl-C'}.") + print( + f"Transmitting at {args.frequency/1e6:.3f} MHz with " + f"{args.tone/1e3:.1f} kHz baseband tone at gain {args.gain} dB. " + f"{'Running for ' + str(args.duration) + 's' if args.duration > 0 else 'Run until Ctrl-C'}." + ) # Arrange a clean shutdown on Ctrl-C. stop = asyncio.Event() @@ -157,12 +161,11 @@ async def _run(args: argparse.Namespace) -> int: # topped up. The queue's own backpressure keeps us from spinning. produce_interval = buffer_dt * 0.5 try: + async def producer(): nonlocal phase while not stop.is_set(): - frame, phase = _make_iq_frame( - args.buffer_size, args.tone, args.sample_rate, phase - ) + frame, phase = _make_iq_frame(args.buffer_size, args.tone, args.sample_rate, phase) await streamer.on_binary(frame) await asyncio.sleep(produce_interval) @@ -193,20 +196,17 @@ def main() -> int: p = argparse.ArgumentParser( description="End-to-end TX smoke test: agent → Pluto continuous tone.", ) - p.add_argument("--identifier", default=None, - help="Pluto IP/hostname (default: auto-discover pluto.local)") - p.add_argument("--frequency", type=float, default=3_410_000_000.0, - help="TX LO in Hz (default 2.45 GHz)") - p.add_argument("--gain", type=float, default=-0.0, - help="TX gain in dB; Pluto range [-89, 0] (default -30)") - p.add_argument("--sample-rate", type=float, default=1_000_000.0, - help="Baseband sample rate (default 1 Msps)") - p.add_argument("--tone", type=float, default=100_000.0, - help="Baseband tone offset in Hz; 0 = DC (default 100 kHz)") - p.add_argument("--buffer-size", type=int, default=4096, - help="Complex samples per frame (default 4096)") - p.add_argument("--duration", type=float, default=60.0, - help="Seconds to transmit; 0 = run until Ctrl-C (default 30)") + p.add_argument("--identifier", default=None, help="Pluto IP/hostname (default: auto-discover pluto.local)") + p.add_argument("--frequency", type=float, default=3_410_000_000.0, help="TX LO in Hz (default 2.45 GHz)") + p.add_argument("--gain", type=float, default=-0.0, help="TX gain in dB; Pluto range [-89, 0] (default -30)") + p.add_argument("--sample-rate", type=float, default=1_000_000.0, help="Baseband sample rate (default 1 Msps)") + p.add_argument( + "--tone", type=float, default=100_000.0, help="Baseband tone offset in Hz; 0 = DC (default 100 kHz)" + ) + p.add_argument("--buffer-size", type=int, default=4096, help="Complex samples per frame (default 4096)") + p.add_argument( + "--duration", type=float, default=60.0, help="Seconds to transmit; 0 = run until Ctrl-C (default 30)" + ) p.add_argument("--log-level", default="INFO") args = p.parse_args() diff --git a/scripts/pluto_tx_ws_smoke.py b/scripts/pluto_tx_ws_smoke.py index d4c8344..f828e0c 100755 --- a/scripts/pluto_tx_ws_smoke.py +++ b/scripts/pluto_tx_ws_smoke.py @@ -41,8 +41,7 @@ from ria_toolkit_oss.agent.streamer import Streamer from ria_toolkit_oss.agent.ws_client import WsClient -def _make_iq_frame(buffer_size: int, tone_hz: float, sample_rate: float, - phase_offset: float) -> tuple[bytes, float]: +def _make_iq_frame(buffer_size: int, tone_hz: float, sample_rate: float, phase_offset: float) -> tuple[bytes, float]: n = np.arange(buffer_size, dtype=np.float64) phase = 2.0 * np.pi * tone_hz / sample_rate * n + phase_offset amp = 0.7 @@ -59,7 +58,9 @@ def _make_pluto_factory(identifier: str | None): if device != "pluto": raise ValueError(f"this script only drives pluto; got device={device!r}") from ria_toolkit_oss.sdr.pluto import Pluto + return Pluto(identifier=identifier) + return factory @@ -73,27 +74,29 @@ async def _mock_hub_handler(ws, args, stop: asyncio.Event): payload = json.loads(first) if payload.get("type") == "heartbeat": caps = payload.get("capabilities") - print(f"[mock-hub] agent heartbeat: capabilities={caps} " - f"tx_enabled={payload.get('tx_enabled')}") + print(f"[mock-hub] agent heartbeat: capabilities={caps} " f"tx_enabled={payload.get('tx_enabled')}") except asyncio.TimeoutError: print("[mock-hub] warning: no heartbeat received in first 2s") # Arm the agent's TX path. - await ws.send(json.dumps({ - "type": "tx_start", - "app_id": "ws-smoke", - "radio_config": { - "device": "pluto", - "identifier": args.identifier, - "tx_sample_rate": int(args.sample_rate), - "tx_center_frequency": int(args.frequency), - "tx_gain": int(args.gain), - "buffer_size": int(args.buffer_size), - "underrun_policy": "repeat", - }, - })) - print(f"[mock-hub] sent tx_start at {args.frequency/1e6:.3f} MHz, " - f"gain={args.gain} dB") + await ws.send( + json.dumps( + { + "type": "tx_start", + "app_id": "ws-smoke", + "radio_config": { + "device": "pluto", + "identifier": args.identifier, + "tx_sample_rate": int(args.sample_rate), + "tx_center_frequency": int(args.frequency), + "tx_gain": int(args.gain), + "buffer_size": int(args.buffer_size), + "underrun_policy": "repeat", + }, + } + ) + ) + print(f"[mock-hub] sent tx_start at {args.frequency/1e6:.3f} MHz, " f"gain={args.gain} dB") # Producer: push IQ frames at a steady clip. Use a concurrent receiver so # tx_status frames show up in real time rather than being queued behind @@ -112,15 +115,11 @@ async def _mock_hub_handler(ws, args, stop: asyncio.Event): recv_task = asyncio.create_task(receiver()) try: - deadline = None if args.duration <= 0 else ( - asyncio.get_event_loop().time() + args.duration - ) + deadline = None if args.duration <= 0 else (asyncio.get_event_loop().time() + args.duration) while not stop.is_set(): if deadline is not None and asyncio.get_event_loop().time() >= deadline: break - frame, phase = _make_iq_frame( - args.buffer_size, args.tone, args.sample_rate, phase - ) + frame, phase = _make_iq_frame(args.buffer_size, args.tone, args.sample_rate, phase) try: await ws.send(frame) except websockets.ConnectionClosed: @@ -204,20 +203,15 @@ def main() -> int: p = argparse.ArgumentParser( description="Full-stack TX smoke: localhost mock-hub → WS → agent → Pluto.", ) - p.add_argument("--identifier", default=None, - help="Pluto IP/hostname (default: auto-discover pluto.local)") - p.add_argument("--frequency", type=float, default=2_450_000_000.0, - help="TX LO in Hz (default 2.45 GHz)") - p.add_argument("--gain", type=float, default=0.0, - help="TX gain in dB; Pluto range [-89, 0] (default 0)") - p.add_argument("--sample-rate", type=float, default=1_000_000.0, - help="Baseband sample rate (default 1 Msps)") - p.add_argument("--tone", type=float, default=100_000.0, - help="Baseband tone offset in Hz (default 100 kHz)") - p.add_argument("--buffer-size", type=int, default=4096, - help="Complex samples per frame (default 4096)") - p.add_argument("--duration", type=float, default=30.0, - help="Seconds to transmit; 0 = run until Ctrl-C (default 30)") + p.add_argument("--identifier", default=None, help="Pluto IP/hostname (default: auto-discover pluto.local)") + p.add_argument("--frequency", type=float, default=2_450_000_000.0, help="TX LO in Hz (default 2.45 GHz)") + p.add_argument("--gain", type=float, default=0.0, help="TX gain in dB; Pluto range [-89, 0] (default 0)") + p.add_argument("--sample-rate", type=float, default=1_000_000.0, help="Baseband sample rate (default 1 Msps)") + p.add_argument("--tone", type=float, default=100_000.0, help="Baseband tone offset in Hz (default 100 kHz)") + p.add_argument("--buffer-size", type=int, default=4096, help="Complex samples per frame (default 4096)") + p.add_argument( + "--duration", type=float, default=30.0, help="Seconds to transmit; 0 = run until Ctrl-C (default 30)" + ) p.add_argument("--log-level", default="INFO") args = p.parse_args() diff --git a/src/ria_toolkit_oss/agent/cli.py b/src/ria_toolkit_oss/agent/cli.py index dec8420..83a7769 100644 --- a/src/ria_toolkit_oss/agent/cli.py +++ b/src/ria_toolkit_oss/agent/cli.py @@ -118,9 +118,9 @@ def _derive_ws_url(hub_url: str, agent_id: str) -> str: return "" base = hub_url.rstrip("/") if base.startswith("https://"): - base = "wss://" + base[len("https://"):] + base = "wss://" + base[len("https://") :] elif base.startswith("http://"): - base = "ws://" + base[len("http://"):] + base = "ws://" + base[len("http://") :] suffix = f"/screens/agent/ws?agent_id={agent_id}" if agent_id else "/screens/agent/ws" return base + suffix diff --git a/src/ria_toolkit_oss/agent/config.py b/src/ria_toolkit_oss/agent/config.py index 431094a..37d20c8 100644 --- a/src/ria_toolkit_oss/agent/config.py +++ b/src/ria_toolkit_oss/agent/config.py @@ -22,6 +22,7 @@ import os from dataclasses import asdict, dataclass, field from pathlib import Path + def _resolve_default_path() -> Path: return Path(os.environ.get("RIA_AGENT_CONFIG", str(Path.home() / ".ria" / "agent.json"))) diff --git a/src/ria_toolkit_oss/agent/hardware.py b/src/ria_toolkit_oss/agent/hardware.py index d585e8f..98b4683 100644 --- a/src/ria_toolkit_oss/agent/hardware.py +++ b/src/ria_toolkit_oss/agent/hardware.py @@ -46,9 +46,7 @@ def heartbeat_payload( if c.tx_max_duration_s is not None: payload["tx_max_duration_s"] = float(c.tx_max_duration_s) if c.tx_allowed_freq_ranges: - payload["tx_allowed_freq_ranges"] = [ - [float(lo), float(hi)] for lo, hi in c.tx_allowed_freq_ranges - ] + payload["tx_allowed_freq_ranges"] = [[float(lo), float(hi)] for lo, hi in c.tx_allowed_freq_ranges] if app_id: payload["app_id"] = app_id if sessions: diff --git a/src/ria_toolkit_oss/agent/streamer.py b/src/ria_toolkit_oss/agent/streamer.py index 51f1dce..6f727ff 100644 --- a/src/ria_toolkit_oss/agent/streamer.py +++ b/src/ria_toolkit_oss/agent/streamer.py @@ -270,9 +270,7 @@ class Streamer: ) self._rx = session await self._send_status("streaming", app_id) - session.task = asyncio.create_task( - self._capture_loop(session), name="ria-streamer-capture" - ) + session.task = asyncio.create_task(self._capture_loop(session), name="ria-streamer-capture") async def _handle_rx_stop(self, msg: dict) -> None: session = self._rx @@ -310,9 +308,7 @@ class Streamer: logger.warning("Applying configure failed: %s", exc) try: - samples = await loop.run_in_executor( - None, session.sdr.rx, session.buffer_size - ) + samples = await loop.run_in_executor(None, session.sdr.rx, session.buffer_size) except Exception as exc: from ria_toolkit_oss.sdr import SdrDisconnectedError @@ -342,7 +338,7 @@ class Streamer: # ================================================================== # TX - async def _handle_tx_start(self, msg: dict) -> None: + async def _handle_tx_start(self, msg: dict) -> None: # noqa: C901 app_id = msg.get("app_id") or "" radio_config = dict(msg.get("radio_config") or {}) @@ -383,9 +379,7 @@ class Streamer: buffer_size = int(radio_config.pop("buffer_size", _DEFAULT_BUFFER_SIZE)) underrun_policy = str(radio_config.pop("underrun_policy", "pause")) if underrun_policy not in ("pause", "zero", "repeat"): - await self._send_tx_status( - app_id, "error", f"invalid underrun_policy {underrun_policy!r}" - ) + await self._send_tx_status(app_id, "error", f"invalid underrun_policy {underrun_policy!r}") return if not device: await self._send_tx_status(app_id, "error", "tx_start missing radio_config.device") @@ -404,15 +398,10 @@ class Streamer: # manifest bug and we want it surfaced immediately, not papered # over with stale radio state. if hasattr(sdr, "init_tx"): - init_args = { - k: radio_config.get(f"tx_{k}") - for k in ("sample_rate", "center_frequency", "gain") - } + init_args = {k: radio_config.get(f"tx_{k}") for k in ("sample_rate", "center_frequency", "gain")} missing = [f"tx_{k}" for k, v in init_args.items() if v is None] if missing: - raise ValueError( - f"tx_start missing required radio_config keys: {missing}" - ) + raise ValueError(f"tx_start missing required radio_config keys: {missing}") sdr.init_tx( sample_rate=init_args["sample_rate"], center_frequency=init_args["center_frequency"], @@ -498,9 +487,8 @@ class Streamer: return _silence(n) # Max-duration watchdog. - if ( - session.max_duration_s is not None - and (time.monotonic() - session.started_at) >= float(session.max_duration_s) + if session.max_duration_s is not None and (time.monotonic() - session.started_at) >= float( + session.max_duration_s ): session.stop_event.set() try: @@ -528,7 +516,7 @@ class Streamer: if arr.size < 2 or arr.size % 2 != 0: logger.warning("Malformed TX frame: %d floats (must be non-zero even count)", arr.size) return self._underrun_fill(session, n) - samples = (arr[0::2].astype(np.complex64) + 1j * arr[1::2].astype(np.complex64)) + samples = arr[0::2].astype(np.complex64) + 1j * arr[1::2].astype(np.complex64) if samples.size < n: out = np.zeros(n, dtype=np.complex64) out[: samples.size] = samples @@ -747,6 +735,7 @@ def _default_sdr_factory(device: str, identifier: str | None): # --------------------------------------------------------------------------- # Top-level entry + async def run_streamer(ws_url: str, token: str, *, cfg: AgentConfig | None = None) -> None: """Connect to *ws_url* and run the streamer loop until cancelled.""" ws = WsClient(ws_url, token) diff --git a/src/ria_toolkit_oss/app/cli.py b/src/ria_toolkit_oss/app/cli.py index 9bfb479..7a2b7c7 100644 --- a/src/ria_toolkit_oss/app/cli.py +++ b/src/ria_toolkit_oss/app/cli.py @@ -37,7 +37,7 @@ def _engine(cfg: _config.AppConfig, sudo_override: bool = False) -> list[str]: for exe in ("docker", "podman"): if shutil.which(exe): use_sudo = sudo_override or cfg.sudo - return (["sudo", exe] if use_sudo else [exe]) + return ["sudo", exe] if use_sudo else [exe] print("error: neither 'docker' nor 'podman' found on PATH", file=sys.stderr) sys.exit(2) @@ -96,7 +96,9 @@ def _hardware_flags(labels: dict, no_gpu: bool, no_usb: bool, no_host_net: bool) if _gpu_available(): flags += ["--gpus", "all"] else: - notes.append("image wants GPU but no NVIDIA runtime detected — skipping --gpus (use --force-gpu to override)") + notes.append( + "image wants GPU but no NVIDIA runtime detected — skipping --gpus (use --force-gpu to override)" + ) if hw_items & {"pluto", "rtlsdr", "hackrf", "bladerf"} and not no_usb: flags += ["--device", "/dev/bus/usb"] diff --git a/src/ria_toolkit_oss/sdr/__init__.py b/src/ria_toolkit_oss/sdr/__init__.py index 4b327a2..a712be6 100644 --- a/src/ria_toolkit_oss/sdr/__init__.py +++ b/src/ria_toolkit_oss/sdr/__init__.py @@ -15,8 +15,13 @@ __all__ = [ ] from .mock import MockSDR -from .sdr import SDR, SDRError, SdrDisconnectedError, SDRParameterError, translate_disconnect # noqa: F401 - +from .sdr import ( # noqa: F401 + SDR, + SdrDisconnectedError, + SDRError, + SDRParameterError, + translate_disconnect, +) _DRIVER_CANDIDATES: tuple[tuple[str, str, str], ...] = ( ("mock", "ria_toolkit_oss.sdr.mock", "MockSDR"), diff --git a/src/ria_toolkit_oss/sdr/pluto.py b/src/ria_toolkit_oss/sdr/pluto.py index 88243b1..c78d36f 100644 --- a/src/ria_toolkit_oss/sdr/pluto.py +++ b/src/ria_toolkit_oss/sdr/pluto.py @@ -8,7 +8,12 @@ import adi import numpy as np from ria_toolkit_oss.datatypes.recording import Recording -from ria_toolkit_oss.sdr.sdr import SDR, SDRError, SDRParameterError, translate_disconnect +from ria_toolkit_oss.sdr.sdr import ( + SDR, + SDRError, + SDRParameterError, + translate_disconnect, +) class Pluto(SDR): diff --git a/src/ria_toolkit_oss/sdr/sdr.py b/src/ria_toolkit_oss/sdr/sdr.py index aba68d5..443f8fa 100644 --- a/src/ria_toolkit_oss/sdr/sdr.py +++ b/src/ria_toolkit_oss/sdr/sdr.py @@ -583,7 +583,7 @@ _DISCONNECT_MARKERS = ( "i/o error", "input/output error", "errno 19", # ENODEV - "errno 5", # EIO + "errno 5", # EIO ) diff --git a/tests/agent/test_cli_tx.py b/tests/agent/test_cli_tx.py index 1543d4c..da66e91 100644 --- a/tests/agent/test_cli_tx.py +++ b/tests/agent/test_cli_tx.py @@ -26,9 +26,11 @@ class _FakeResp: def _run_register(argv: list[str], cfg_path) -> int: fake_resp = _FakeResp({"agent_id": "agent-1", "token": "tok-abc"}) - with patch.dict("os.environ", {"RIA_AGENT_CONFIG": str(cfg_path)}, clear=False), \ - patch("urllib.request.urlopen", return_value=fake_resp), \ - patch.object(sys, "argv", ["ria-agent", *argv]): + with ( + patch.dict("os.environ", {"RIA_AGENT_CONFIG": str(cfg_path)}, clear=False), + patch("urllib.request.urlopen", return_value=fake_resp), + patch.object(sys, "argv", ["ria-agent", *argv]), + ): try: agent_cli.main() except SystemExit as exc: @@ -96,9 +98,11 @@ def test_stream_allow_tx_does_not_persist(tmp_path): captured["cfg"] = cfg return None - with patch.dict("os.environ", {"RIA_AGENT_CONFIG": str(cfg_path)}, clear=False), \ - patch("ria_toolkit_oss.agent.streamer.run_streamer", new=_fake_run_streamer), \ - patch.object(sys, "argv", ["ria-agent", "stream", "--allow-tx"]): + with ( + patch.dict("os.environ", {"RIA_AGENT_CONFIG": str(cfg_path)}, clear=False), + patch("ria_toolkit_oss.agent.streamer.run_streamer", new=_fake_run_streamer), + patch.object(sys, "argv", ["ria-agent", "stream", "--allow-tx"]), + ): try: agent_cli.main() except SystemExit: diff --git a/tests/agent/test_integration.py b/tests/agent/test_integration.py index 168e7a6..01eb9ec 100644 --- a/tests/agent/test_integration.py +++ b/tests/agent/test_integration.py @@ -70,9 +70,7 @@ def test_server_start_stream_stop_cycle_over_real_ws(): reconnect_pause=0.05, ) streamer = Streamer(ws=client, sdr_factory=lambda d, i: MockSDR(buffer_size=32, seed=0)) - task = asyncio.create_task( - client.run(on_message=streamer.on_message, heartbeat=streamer.build_heartbeat) - ) + task = asyncio.create_task(client.run(on_message=streamer.on_message, heartbeat=streamer.build_heartbeat)) await asyncio.wait_for(ready.wait(), timeout=3.0) await asyncio.wait_for(stopped.wait(), timeout=3.0) client.stop() diff --git a/tests/agent/test_integration_tx.py b/tests/agent/test_integration_tx.py index 4fc13af..e5239d7 100644 --- a/tests/agent/test_integration_tx.py +++ b/tests/agent/test_integration_tx.py @@ -77,10 +77,7 @@ def test_server_tx_start_binary_stop_cycle_over_real_ws(): msg = await asyncio.wait_for(ws.recv(), timeout=2.0) if isinstance(msg, str): control_frames.append(json.loads(msg)) - if any( - f.get("type") == "tx_status" and f.get("state") == "transmitting" - for f in control_frames - ): + if any(f.get("type") == "tx_status" and f.get("state") == "transmitting" for f in control_frames): break await ws.send(json.dumps({"type": "tx_stop", "app_id": "tx-app"})) diff --git a/tests/agent/test_param_lock_contention.py b/tests/agent/test_param_lock_contention.py index e3d84fc..e70229e 100644 --- a/tests/agent/test_param_lock_contention.py +++ b/tests/agent/test_param_lock_contention.py @@ -30,7 +30,6 @@ from ria_toolkit_oss.agent.config import AgentConfig from ria_toolkit_oss.agent.streamer import Streamer from ria_toolkit_oss.sdr.mock import MockSDR - _STRESS_S = float(os.environ.get("RIA_LOCK_STRESS_S", "2.0")) @@ -156,18 +155,21 @@ def test_full_duplex_stays_healthy_over_stress_window(): s = Streamer(ws=ws, sdr_factory=lambda d, i: sdr, cfg=AgentConfig(tx_enabled=True)) await s.on_message( - {"type": "start", "app_id": "app-1", - "radio_config": {"device": "mock", "buffer_size": BUF}} + {"type": "start", "app_id": "app-1", "radio_config": {"device": "mock", "buffer_size": BUF}} ) await s.on_message( - {"type": "tx_start", "app_id": "app-1", - "radio_config": { - "device": "mock", "buffer_size": BUF, - "tx_sample_rate": 1_000_000, - "tx_center_frequency": 2.45e9, - "tx_gain": -20, - "underrun_policy": "zero", - }} + { + "type": "tx_start", + "app_id": "app-1", + "radio_config": { + "device": "mock", + "buffer_size": BUF, + "tx_sample_rate": 1_000_000, + "tx_center_frequency": 2.45e9, + "tx_gain": -20, + "underrun_policy": "zero", + }, + } ) marker = np.arange(BUF, dtype=np.complex64) + 1 @@ -180,12 +182,10 @@ def test_full_duplex_stays_healthy_over_stress_window(): # which routes through the same setters the stress test above # verifies. await s.on_message( - {"type": "tx_configure", "app_id": "app-1", - "radio_config": {"tx_sample_rate": 1_000_000 + i}} + {"type": "tx_configure", "app_id": "app-1", "radio_config": {"tx_sample_rate": 1_000_000 + i}} ) await s.on_message( - {"type": "configure", "app_id": "app-1", - "radio_config": {"sample_rate": 2_000_000 + i}} + {"type": "configure", "app_id": "app-1", "radio_config": {"sample_rate": 2_000_000 + i}} ) i += 1 await asyncio.sleep(0.005) @@ -197,8 +197,7 @@ def test_full_duplex_stays_healthy_over_stress_window(): ws, s = asyncio.run(scenario()) # No error frame leaked out. - errors = [m for m in ws.json_sent - if m.get("type") in ("error", "tx_status") and m.get("state") == "error"] + errors = [m for m in ws.json_sent if m.get("type") in ("error", "tx_status") and m.get("state") == "error"] assert errors == [], f"Unexpected error frames: {errors}" # RX produced IQ frames and TX's callback ran — heartbeat-level contention # check: both setter paths were hit at least once during configure dispatch. diff --git a/tests/agent/test_streamer.py b/tests/agent/test_streamer.py index da2956c..44f98e0 100644 --- a/tests/agent/test_streamer.py +++ b/tests/agent/test_streamer.py @@ -121,9 +121,7 @@ def test_start_without_device_emits_error(): def test_configure_queues_update(): async def scenario(): streamer = Streamer(ws=FakeWs(), sdr_factory=_factory) - await streamer.on_message( - {"type": "configure", "app_id": "x", "radio_config": {"center_frequency": 915e6}} - ) + await streamer.on_message({"type": "configure", "app_id": "x", "radio_config": {"center_frequency": 915e6}}) # Before start(), pending config lives on the standalone dict exposed via the _pending_config shim. return streamer._pending_config diff --git a/tests/agent/test_tx_safety.py b/tests/agent/test_tx_safety.py index 2de2939..385835f 100644 --- a/tests/agent/test_tx_safety.py +++ b/tests/agent/test_tx_safety.py @@ -143,10 +143,7 @@ def test_rejects_duplicate_tx_session(): return ws ws = asyncio.run(scenario()) - errors = [ - m for m in ws.json_sent - if m.get("type") == "tx_status" and m.get("state") == "error" - ] + errors = [m for m in ws.json_sent if m.get("type") == "tx_status" and m.get("state") == "error"] assert any("already active" in e.get("message", "") for e in errors) diff --git a/tests/agent/test_tx_underrun.py b/tests/agent/test_tx_underrun.py index 8fbe020..95e4277 100644 --- a/tests/agent/test_tx_underrun.py +++ b/tests/agent/test_tx_underrun.py @@ -70,10 +70,7 @@ def test_underrun_pause_stops_session_and_emits_status(): # Do not push any buffers. The callback underruns on first tick and # the watchdog should emit "underrun" and tear down. for _ in range(100): - if any( - m.get("type") == "tx_status" and m.get("state") == "underrun" - for m in ws.json_sent - ): + if any(m.get("type") == "tx_status" and m.get("state") == "underrun" for m in ws.json_sent): break await asyncio.sleep(0.01) for _ in range(50): @@ -103,9 +100,7 @@ def test_underrun_zero_keeps_session_alive(): ws, still_alive = asyncio.run(scenario()) # No underrun status emitted (policy absorbs it silently). - assert not any( - m.get("type") == "tx_status" and m.get("state") == "underrun" for m in ws.json_sent - ) + assert not any(m.get("type") == "tx_status" and m.get("state") == "underrun" for m in ws.json_sent) assert still_alive # All produced buffers are zero (no real data was pushed). assert sdr.tx_produced, "expected at least one TX callback invocation" @@ -129,9 +124,7 @@ def test_underrun_repeat_replays_last_buffer(): ws, sdr = asyncio.run(scenario()) # No underrun status emitted. - assert not any( - m.get("type") == "tx_status" and m.get("state") == "underrun" for m in ws.json_sent - ) + assert not any(m.get("type") == "tx_status" and m.get("state") == "underrun" for m in ws.json_sent) # At least two buffers equal to the marker — the real one and ≥1 repeat. matching = [b for b in sdr.tx_produced if np.array_equal(b, marker)] assert len(matching) >= 2, f"expected ≥2 buffers matching marker, got {len(matching)}" diff --git a/tests/agent/test_ws_client.py b/tests/agent/test_ws_client.py index c113b64..7717d5f 100644 --- a/tests/agent/test_ws_client.py +++ b/tests/agent/test_ws_client.py @@ -142,9 +142,7 @@ def test_malformed_control_frame_does_not_crash(): async def on_msg(m): handled.append(m) - task = asyncio.create_task( - client.run(on_message=on_msg, heartbeat=lambda: {"type": "heartbeat"}) - ) + task = asyncio.create_task(client.run(on_message=on_msg, heartbeat=lambda: {"type": "heartbeat"})) for _ in range(50): if handled: break diff --git a/tests/agent/test_ws_client_binary.py b/tests/agent/test_ws_client_binary.py index 4d9ddc1..70bd97c 100644 --- a/tests/agent/test_ws_client_binary.py +++ b/tests/agent/test_ws_client_binary.py @@ -102,9 +102,7 @@ def test_binary_frame_dropped_when_no_handler(): async def on_msg(m): messages.append(m) - task = asyncio.create_task( - client.run(on_message=on_msg, heartbeat=lambda: {"type": "heartbeat"}) - ) + task = asyncio.create_task(client.run(on_message=on_msg, heartbeat=lambda: {"type": "heartbeat"})) for _ in range(50): if messages: break