# Agent TX Streaming — Cross-Repo Plan **Repos:** `ria-toolkit-oss`, `ria-hub`, Screens frontend **Status:** Proposal / pre-implementation **Prerequisites:** The RX-streaming work from [screens_agent_handoff.md](./screens_agent_handoff.md) and [screens_agent_streamer_plan.md](./screens_agent_streamer_plan.md) is landed (agent WS protocol, `AgentDataSource`, `/screens/agents/register`, `/screens/agent/ws`). ## Goal Let a Screens app running on the hub drive a **remote agent's Pluto** (or other TX-capable SDR) to transmit — streaming IQ buffers end-to-end from an operator like `plutoTXoperator` into the agent's `sdr.tx()` path. Mirror image of what `AgentDataSource` already does for RX. ## Non-goals (v1) - Multi-tenant radio sharing (one app owns the radio at a time per agent). - Bulk/upload-once TX — superseded by streaming per request. - Arbitrary waveform generation in the agent. The agent is dumb pipe + hardware control; signal generation stays on the hub. ## Key design decisions | # | Decision | Value | |---|---|---| | D1 | **Delivery mode** | **Streaming**. Hub pushes binary IQ buffers continuously over the existing WS; agent's `_stream_tx` callback pulls them from an in-agent queue. | | D2 | **Full-duplex** | **Yes.** A single `app_id` may own both an RX session and a TX session on the same agent concurrently. Same physical SDR handle serves both (Pluto is FDD-capable; `init_rx` and `init_tx` are independent on one `adi.Pluto` instance). | | D3 | **Safety caps** | **Agent-enforced.** `~/.ria/agent.json` holds `tx_enabled`, `tx_max_gain_db`, `tx_max_duration_s`, optional `tx_allowed_freq_ranges: [[low,high], …]`. Agent rejects `tx_start` frames that violate any of these, independent of what the hub sends. | | D4 | **Buffer format** | Interleaved float32 IQ, range `[-1, 1]` — same as RX. Format-validated by `ria_toolkit_oss.sdr.sdr._verify_sample_format`. | | D5 | **Protocol evolution** | Keep existing RX messages (`start`/`stop`/`configure`) unchanged for back-compat. Add parallel `tx_start`/`tx_stop`/`tx_configure`. Heartbeat grows to advertise capabilities. | | D6 | **Underrun policy** | Default `pause`: if the TX queue empties, agent calls `pause_tx()` and emits `tx_status: underrun`. Hub must recover by sending a fresh `tx_start` + buffers. Configurable per session via `radio_config.underrun_policy ∈ {"pause", "zero", "repeat"}`. | | D7 | **Backpressure** | Rely on TCP/WS backpressure. Agent caps inbound TX queue at 8 buffers; `await ws.send` on the hub side slows when the agent doesn't drain. No application-level flow control in v1. | | D8 | **Session identity** | `app_id` identifies a Screens app. Each app has at most one RX session and one TX session per agent. Binary direction disambiguates: agent → hub binary = RX IQ; hub → agent binary = TX IQ. | ## Protocol specification Additions only. Existing RX messages from [screens_agent_handoff.md §Phase 4](./screens_agent_handoff.md) are unchanged. ### Hub → agent (JSON) ```jsonc // Arm the TX side. Agent calls init_tx, starts the stream_tx thread with an empty queue. // After this, hub sends binary TX buffers on the same WS. { "type": "tx_start", "app_id": "app-abc", "radio_config": { "device": "pluto", "identifier": "ip:192.168.3.1", "tx_sample_rate": 1000000, "tx_center_frequency": 2450000000, "tx_gain": -20, // dB, negative = attenuation on Pluto "tx_bandwidth": 1000000, // optional "buffer_size": 1024, "underrun_policy": "pause" // "pause" | "zero" | "repeat" } } // Apply parameter changes at the next buffer boundary. { "type": "tx_configure", "app_id": "app-abc", "radio_config": { "tx_gain": -25 } } // Stop TX, drain queue, pause_tx, release TX side (RX may continue if a separate RX session is live). { "type": "tx_stop", "app_id": "app-abc" } ``` ### Hub → agent (binary) Raw interleaved float32 IQ in `[-1, 1]`. One WS frame = one buffer = `buffer_size` complex samples = `buffer_size * 2 * 4` bytes. Delivered only between `tx_start` and `tx_stop`. Binary frames arriving outside that window are discarded and logged at WARN. ### Agent → hub (JSON) ```jsonc // Lifecycle events. { "type": "tx_status", "app_id": "app-abc", "state": "armed" } { "type": "tx_status", "app_id": "app-abc", "state": "transmitting" } { "type": "tx_status", "app_id": "app-abc", "state": "underrun" } // queue empty; TX paused { "type": "tx_status", "app_id": "app-abc", "state": "done" } { "type": "tx_status", "app_id": "app-abc", "state": "error", "message": "gain -5 exceeds tx_max_gain_db=-15" } // Reject reasons from agent-enforced caps/interlocks are surfaced via tx_status:error. ``` ### Heartbeat extension Existing `{type: heartbeat, hardware[], status}` grows: ```jsonc { "type": "heartbeat", "hardware": ["mock", "pluto"], "status": "streaming", // unchanged semantics "capabilities": ["rx", "tx"], // NEW — derived from tx_enabled + SDR class having init_tx "tx_enabled": true, // NEW — mirrors config flag "sessions": { // NEW — optional per-session snapshot "rx": { "app_id": "app-abc", "state": "streaming" }, "tx": { "app_id": "app-abc", "state": "transmitting" } }, "app_id": "app-abc" // kept for back-compat } ``` --- ## Part A — `ria-toolkit-oss` (this repo) ### A1. `agent/ws_client.py` Currently the WS client drops server → agent binary (`ws_client.py:77-79`). Add a binary handler alongside the JSON one. ```python BinaryHandler = Callable[[bytes], Awaitable[None]] async def run( self, on_message: MessageHandler, heartbeat: HeartbeatBuilder, on_binary: BinaryHandler | None = None, ) -> None: ... async for raw in self._ws: if isinstance(raw, bytes): if on_binary is not None: await on_binary(raw) continue ... ``` Keep the reconnect, heartbeat, and malformed-frame behavior unchanged. ### A2. `agent/streamer.py` — add TX sessions Replace the flat `self._sdr` / `self._app_id` / `self._capture_task` state with a session model: ```python @dataclass class RxSession: app_id: str sdr: Any buffer_size: int task: asyncio.Task pending_config: dict @dataclass class TxSession: app_id: str sdr: Any queue: asyncio.Queue[bytes] # bounded, maxsize=8 task: asyncio.Task # runs _stream_tx in executor underrun_policy: str pending_config: dict bytes_transmitted: int = 0 started_at: float = 0.0 # for tx_max_duration_s enforcement ``` The streamer holds `self._rx: RxSession | None` and `self._tx: TxSession | None`. SDR instances are cached by `(device, identifier)` — when RX and TX name the same device, both sessions share one handle (matters for Pluto FDD). **New handlers**: - `_handle_tx_start(msg)` — check `cfg.tx_enabled`, validate gain/duration/freq against caps, open/resolve SDR, `sdr.init_tx(...)`, start `_tx_loop`, emit `tx_status: armed`. - `_handle_tx_stop(msg)` — cancel TX task, `sdr.pause_tx()`, drain queue, release SDR if no RX session on it, emit `tx_status: done`. - `_handle_tx_configure(msg)` — stash into `self._tx.pending_config`, applied at next buffer boundary (same pattern as RX). - `on_binary(data)` — if `self._tx`: `await self._tx.queue.put(data)` (awaiting here is the backpressure mechanism). Else: log and drop. **TX loop** (runs in an executor thread via `loop.run_in_executor`, like the RX capture loop): ```python def _tx_callback(num_samples: int) -> np.ndarray: # Called by sdr._stream_tx on every buffer boundary. try: raw = self._tx_queue_sync.get(timeout=0.1) except queue.Empty: return self._underrun_fill(num_samples) # policy-driven samples = np.frombuffer(raw, dtype=np.float32).view(np.complex64) if len(samples) < num_samples: return _pad_zero(samples, num_samples) return samples[:num_samples] ``` Use a thread-safe `queue.Queue` for the TX side (the `asyncio.Queue` lives on the event loop; the executor thread reads from a sibling `queue.Queue` fed by a tiny asyncio→threading adapter). **Underrun fills**: - `"pause"`: signal the main loop to call `sdr.pause_tx()`, emit `tx_status: underrun`, exit the callback. - `"zero"`: return `np.zeros(num_samples, dtype=np.complex64)`. - `"repeat"`: return the last good buffer (cached). If no buffer yet: zeros. **Cap enforcement** in `_handle_tx_start` (before opening the SDR): ```python if not self._cfg.tx_enabled: return await self._send_error_tx(app_id, "tx disabled on this agent") if (cap := self._cfg.tx_max_gain_db) is not None and tx_gain > cap: return await self._send_error_tx(app_id, f"gain {tx_gain} exceeds cap {cap}") if (cap := self._cfg.tx_max_duration_s) is not None: # enforced by a watchdog in _tx_loop that calls tx_stop after cap seconds ... for (lo, hi) in self._cfg.tx_allowed_freq_ranges or []: if lo <= tx_center_frequency <= hi: break else: if self._cfg.tx_allowed_freq_ranges: return await self._send_error_tx(app_id, f"freq {tx_center_frequency} outside allowed ranges") ``` ### A3. `agent/config.py` Extend `AgentConfig`: ```python @dataclass class AgentConfig: # existing fields… tx_enabled: bool = False tx_max_gain_db: float | None = None tx_max_duration_s: float | None = None tx_allowed_freq_ranges: list[tuple[float, float]] | None = None ``` `save()` preserves existing 0600 perms. ### A4. `agent/cli.py` - `ria-agent register --allow-tx --tx-max-gain-db -10 --tx-max-duration 60` — persist the interlock into config. - `ria-agent stream --allow-tx` — runtime override (sets `cfg.tx_enabled=True` for the life of the process without writing config). - `ria-agent detect` unchanged. ### A5. `agent/hardware.py` ```python def heartbeat_payload(status, app_id=None, *, cfg: AgentConfig, sessions: dict | None = None) -> dict: caps = ["rx"] if cfg.tx_enabled: caps.append("tx") payload = { "type": "heartbeat", "hardware": available_devices(), "status": status, "capabilities": caps, "tx_enabled": cfg.tx_enabled, } if app_id: payload["app_id"] = app_id if sessions: payload["sessions"] = sessions return payload ``` ### A6. SDR layer - **Audit**: [`sdr/pluto.py`](../src/ria_toolkit_oss/sdr/pluto.py) `tx_recording` + `_stream_tx` paths already use `_tx_lock` (line 31, 323, 360). Double-check concurrent-with-RX behavior: the `adi.Pluto` Python object is not thread-safe for arbitrary attribute writes, so all `set_tx_*` / `set_rx_*` must go through the shared `_param_lock` (already present at [`sdr/sdr.py:44`](../src/ria_toolkit_oss/sdr/sdr.py#L44)). Verify `rx()` in a loop + `_stream_tx` in another thread don't step on each other. - **MockSDR** already has `init_tx` + `_stream_tx` (`sdr/mock.py:70-100`). No changes needed for mock-based tests. - **Other TX-capable drivers** (blade, usrp, hackrf): out of scope for v1; leave their `init_tx` as-is. ### A7. Tests (`tests/agent/`) - `test_streamer_tx.py` — `tx_start` → binary frames → `_stream_tx` callback pulls correct samples → `tx_stop` cleans up. - `test_tx_safety.py` — cap violations (gain, duration, freq, `tx_enabled=False`) each produce `tx_status: error` and never open the SDR. - `test_tx_underrun.py` — each policy (`pause`, `zero`, `repeat`) exercised against a fake slow producer. - `test_full_duplex.py` — one `app_id` sends `start` + `tx_start`; both sessions share one MockSDR; both produce their expected frames; stopping one does not stop the other. - `test_ws_client_binary.py` — binary frames now reach the binary handler. - `test_integration_tx.py` — end-to-end against local `websockets` server + MockSDR. ### A8. Docs - Add a TX section to any existing agent protocol doc (or create `docs/agent_tx_protocol.md`). - Include a regulatory disclaimer: the operator is responsible for transmissions. The agent is an enabler, not a policy layer beyond the interlocks. --- ## Part B — `ria-hub` > Paths below are conceptual — confirm against the actual module layout in `ria-hub` before editing. Anchor points reference the RX handoff at [screens_agent_handoff.md §Part B](./screens_agent_handoff.md). ### B1. `AgentTxSink` (new) Mirror of `AgentDataSource`. Location: `controller/app/modules/screens/data_sinks.py` (or wherever output sinks live in `ria-hub`). Responsibilities: - `prepare(radio_config)` — send `tx_start` via Redis pub/sub on `screens:agent:{agent_id}:tx` → WS proxy → agent. - `write(buffer: np.ndarray | bytes)` — convert to interleaved float32 bytes, send as binary over the WS. Awaits on WS backpressure. - `configure(partial_radio_config)` — send `tx_configure`. - `close()` — send `tx_stop`. - Subscribes to the agent's `tx_status` frames (via the same Redis pub/sub channel used for RX status today) and surfaces state back to the orchestrator. An `error` state aborts the Celery task. ### B2. Refactor `plutoTXoperator` The existing operator presumably calls `radio.tx(...)` against a directly-attached Pluto. Abstract the "output" into an injectable sink: ```python class PlutoTxOperator: def __init__(self, sink: TxSink, ...): self.sink = sink # AgentTxSink when dataSink.type == "agent", else LocalPlutoTxSink def run(self, ...): self.sink.prepare(self.radio_config) while not stop: buf = self._generate_next_buffer() self.sink.write(buf) self.sink.close() ``` The local path (existing direct-hardware behavior) becomes `LocalPlutoTxSink`, a thin wrapper around the current `radio.tx` calls. No behavior change for existing deployments. `build_data_sink()` (to match `build_data_source()` from B1/B6) routes on `dataSink.type`. ### B3. Manifest schema Add `dataSink` alongside `dataSource` in the manifest. New `type: "agent"`: ```json { "dataSource": { "type": "agent", "device": "pluto", "agent_id": "agent-abc", "params": { "sample_rate": 1000000, "center_frequency": 2450000000, "gain": 40 } }, "dataSink": { "type": "agent", "device": "pluto", "agent_id": "agent-abc", "params": { "tx_sample_rate": 1000000, "tx_center_frequency": 2450000000, "tx_gain": -20, "underrun_policy": "pause" } } } ``` Update Pydantic models + JSON schema validators in `controller/app/modules/screens/graph_derivation.py` (or equivalent). When `dataSource.agent_id == dataSink.agent_id` and both target `pluto` with the same `identifier`, the agent will naturally share one SDR handle — no special-casing needed on the hub side. ### B4. WS endpoint extensions `/screens/agent/ws` already exists. Add: - Support for hub → agent **binary frames** (currently binary is agent → hub only). FastAPI's `WebSocket.send_bytes` works directly; just route binary from the Redis pub/sub channel through to the WS. - New Redis pub/sub channel `screens:agent:{agent_id}:tx` for outbound TX control JSON + a separate `screens:agent:{agent_id}:tx_bin` for outbound binary. (Two channels because many Redis brokers don't love mixing binary into text-keyed channels; if your deployment uses Redis 6+ with `SUBSCRIBE` that handles bytes, one channel is fine.) ### B5. Celery wiring When `dataSink.type == "agent"`, the Celery task that runs the TX-containing graph uses `AgentTxSink` instead of a local sink. The operator code (`plutoTXoperator`) is unchanged because the sink abstraction hides the difference. Full-duplex: a single task with both `dataSource.type == "agent"` and `dataSink.type == "agent"` pointing at the same agent spawns both the RX consumer loop (existing `AgentDataSource.next_chunk` via BLPOP) and the TX producer loop (`AgentTxSink.write`). Both sides are wired up before any capture frames are sent. ### B6. Capability gating Before any control path sends `tx_start`: ```python agent = get_agent(agent_id) if agent.last_heartbeat.age > 60: # stale raise HTTPException(503, "agent not responding") if "tx" not in agent.last_heartbeat.capabilities: raise HTTPException(400, "agent has not opted in to transmission (tx_enabled=false)") ``` Surface clear errors to the Screens UI so the user knows it's an agent config issue, not an app config issue. ### B7. Audit log New MongoDB collection `agent_tx_audit`: ``` { agent_id, app_id, user_id, center_frequency_hz, tx_gain_db, duration_s, num_samples, started_at, ended_at, terminal_status, // "done" | "error" | "underrun" | "cancelled" error_message? } ``` Write on every `tx_start`. Update on terminal `tx_status`. Index on `{agent_id, started_at}` for admin-view queries. ### B8. Registration — no change needed `POST /screens/agents/register` and `~/.ria/agent.json` already cover credential storage. The TX interlock (`tx_enabled`, caps) is written by the *agent operator* via `ria-agent register --allow-tx`; the hub only reads the heartbeat to learn whether an agent will accept TX. --- ## Part C — Screens (Vue 3 frontend) ### C1. App composer - **Agent picker** (exists from RX work) grows a "TX capable" filter toggle; hides agents whose heartbeat `capabilities` lacks `"tx"`. - When the graph contains `plutoTXoperator` (or any future TX operator): - Render a **dataSink** section mirroring dataSource. - Fields: device, agent_id, identifier, tx_sample_rate, tx_center_frequency, tx_gain, underrun_policy. - Validation: tx_center_frequency within radio band; tx_gain within agent-advertised max (read from heartbeat when available). ### C2. Run-time UI - **Consent modal** on "Start" for any app whose manifest contains a `dataSink.type: "agent"`: > "This app will transmit on **2.450 GHz** at **-20 dB** through agent **lab-pluto-01**. I confirm this transmission is permitted under my local radio regulations." Required checkbox, cannot be remembered across apps. - **TX status indicator** in the running-app view: shows `armed` / `transmitting` / `underrun` state from `tx_status` frames. Red banner on `underrun` or `error`. - **Stop TX button** always visible during transmission; fires `tx_stop` immediately. Separate from "Stop app" (which also stops RX). ### C3. Admin view Extend the agents list from B8 of the RX handoff: - Column: **TX**: `enabled` / `disabled` / `in-use by app X`. - Agent detail page: show `tx_max_gain_db`, `tx_max_duration_s`, `tx_allowed_freq_ranges`, and the last 10 rows from `agent_tx_audit` filtered to this agent. --- ## Rollout order 1. **Part A §A1-A3, A7** — agent-side TX session + binary ingress + safety, all behind `--allow-tx`. Mock-based tests. Shippable standalone; no consumer yet. 2. **Part B §B1-B5** — hub sink + manifest + WS extension + Celery wiring. End-to-end test: Screens app with `plutoTXoperator` + agent sink → real Pluto in the lab → verify carrier on a spectrum analyzer. 3. **Part B §B6-B7** — capability gating + audit log. Blocks general release, not lab use. 4. **Part C §C1** — composer UI for TX apps. 5. **Part C §C2-C3** — consent modal + admin view. Gate for first non-internal user. Parts A + B can land on parallel branches and meet at step 2's integration test. Part C can start in parallel with B once the manifest shape in B3 is stable. ## Test matrix (integration) | Scenario | Expected | |---|---| | App with RX only, agent connected | RX as today (regression guard) | | App with TX only, agent `tx_enabled=True` | TX starts, underrun → pause, stop cleans up | | App with RX + TX same agent, same device | One Pluto handle serves both; independent gains/frequencies | | App with TX, agent `tx_enabled=False` | Hub rejects at gate with 400; no WS traffic generated | | App with TX, gain exceeds agent cap | `tx_status: error`; SDR never opened | | Hub stops sending TX buffers mid-stream | `underrun` emitted after queue drains; agent paused cleanly | | WS drops during TX | Agent cancels TX task, pauses hardware, reconnects; hub must re-issue `tx_start` | | Agent process killed during TX | Hardware stops (existing `close()` already handles this; verify `_tx_lock` released) | ## Open questions - **Waveform source**: is `plutoTXoperator` a real-time generator emitting on a clock, or does it synthesize a fixed recording and loop? If the latter, worth exposing a "bulk + loop" fast-path — hub sends the buffer once, agent loops it via existing `tx_recording`. Same protocol (`tx_start` + one buffer + `loop: true`), much less WS traffic. - **Multi-app-per-agent**: out of scope for v1 (§Non-goals). When needed: add a session id to binary frames (4-byte prefix: magic + stream_id + reserved), bump a `protocol_version` in the heartbeat. - **Streaming TX clock drift**: if hub and agent sample clocks drift, repeating zeros on underrun is audible/visible in the spectrum. Longer term: agent-side resampling or PLL, both expensive. v1: rely on generous queue depth + stable local networks. - **Other TX-capable SDRs**: HackRF, USRP, bladeRF. The `_CONFIG_ATTR_MAP` in [`agent/streamer.py:169-175`](../src/ria_toolkit_oss/agent/streamer.py#L169-L175) will need per-driver entries when those come online. ## Regulatory note Transmission is regulated in every jurisdiction. The agent-side interlocks (`tx_enabled`, caps, freq ranges) exist so the operator can configure safe defaults for an agent's physical location. They are not a substitute for licensing or for respecting local regulations. The hub's consent modal and audit log exist so actions are attributable. None of this is a legal compliance layer — it's a defense-in-depth mechanism.