From 5035f0654a05da3ab756ad0c9764b10a38ae93f0 Mon Sep 17 00:00:00 2001 From: jonny Date: Thu, 16 Apr 2026 15:38:35 -0400 Subject: [PATCH] tx_race_condtion_fix --- src/ria_toolkit_oss/agent/streamer.py | 26 +++++++++++++++++--------- tests/agent/test_full_duplex.py | 1 + tests/agent/test_streamer_tx.py | 9 ++++++++- tests/agent/test_tx_safety.py | 9 ++++++++- tests/agent/test_tx_underrun.py | 1 + 5 files changed, 35 insertions(+), 11 deletions(-) diff --git a/src/ria_toolkit_oss/agent/streamer.py b/src/ria_toolkit_oss/agent/streamer.py index 6cf73e6..51f1dce 100644 --- a/src/ria_toolkit_oss/agent/streamer.py +++ b/src/ria_toolkit_oss/agent/streamer.py @@ -396,15 +396,23 @@ class Streamer: try: sdr, device_key = self._registry.acquire(device, identifier) _apply_sdr_config(sdr, radio_config) - # Only call init_tx when the hub supplied the three required - # parameters. Drivers that gate _stream_tx on _tx_initialized - # (e.g. Pluto) need this; drivers that don't (e.g. Mock) tolerate - # its absence. - init_args = { - k: radio_config.get(f"tx_{k}") - for k in ("sample_rate", "center_frequency", "gain") - } - if hasattr(sdr, "init_tx") and all(v is not None for v in init_args.values()): + # init_tx is mandatory for any driver that exposes it: drivers + # that gate _stream_tx on _tx_initialized (Pluto, HackRF, USRP, + # …) crash with a confusing "TX was not initialized" error 2 s + # later in the executor thread if we skip it. Treat the three + # required keys as a hard contract — a missing one is a hub-side + # manifest bug and we want it surfaced immediately, not papered + # over with stale radio state. + if hasattr(sdr, "init_tx"): + init_args = { + k: radio_config.get(f"tx_{k}") + for k in ("sample_rate", "center_frequency", "gain") + } + missing = [f"tx_{k}" for k, v in init_args.items() if v is None] + if missing: + raise ValueError( + f"tx_start missing required radio_config keys: {missing}" + ) sdr.init_tx( sample_rate=init_args["sample_rate"], center_frequency=init_args["center_frequency"], diff --git a/tests/agent/test_full_duplex.py b/tests/agent/test_full_duplex.py index 6ad2f62..05de3c1 100644 --- a/tests/agent/test_full_duplex.py +++ b/tests/agent/test_full_duplex.py @@ -75,6 +75,7 @@ def test_rx_and_tx_share_one_sdr_instance(): "radio_config": { "device": "mock", "buffer_size": 16, + "tx_sample_rate": 1_000_000, "tx_gain": -20, "tx_center_frequency": 2.45e9, "underrun_policy": "zero", diff --git a/tests/agent/test_streamer_tx.py b/tests/agent/test_streamer_tx.py index 6cb2bb4..ea1ba5b 100644 --- a/tests/agent/test_streamer_tx.py +++ b/tests/agent/test_streamer_tx.py @@ -120,7 +120,14 @@ def test_tx_stop_releases_sdr(): { "type": "tx_start", "app_id": "a", - "radio_config": {"device": "mock", "buffer_size": 8, "underrun_policy": "zero"}, + "radio_config": { + "device": "mock", + "buffer_size": 8, + "tx_sample_rate": 1_000_000, + "tx_center_frequency": 2.45e9, + "tx_gain": -20, + "underrun_policy": "zero", + }, } ) await asyncio.sleep(0.03) diff --git a/tests/agent/test_tx_safety.py b/tests/agent/test_tx_safety.py index 5307917..2de2939 100644 --- a/tests/agent/test_tx_safety.py +++ b/tests/agent/test_tx_safety.py @@ -27,7 +27,14 @@ def _last_tx_status(ws): def _tx_start(app_id="a", **radio): - rc = {"device": "mock", "buffer_size": 16, "underrun_policy": "zero"} + rc = { + "device": "mock", + "buffer_size": 16, + "tx_sample_rate": 1_000_000, + "tx_center_frequency": 2.45e9, + "tx_gain": -20, + "underrun_policy": "zero", + } rc.update(radio) return {"type": "tx_start", "app_id": app_id, "radio_config": rc} diff --git a/tests/agent/test_tx_underrun.py b/tests/agent/test_tx_underrun.py index e95feec..8fbe020 100644 --- a/tests/agent/test_tx_underrun.py +++ b/tests/agent/test_tx_underrun.py @@ -52,6 +52,7 @@ def _start_cfg(policy: str, buf: int = 8) -> dict: "radio_config": { "device": "mock", "buffer_size": buf, + "tx_sample_rate": 1_000_000, "tx_gain": -20, "tx_center_frequency": 2.45e9, "underrun_policy": policy,