# Agent TX Streaming — `ria-toolkit-oss` Handoff **Paired repo:** `ria-hub` (this doc lives here, but it's written for the Claude working in `ria-toolkit-oss`) **Source of truth for the overall design:** [Agent TX Streaming - Cross-Repo Plan.md](./Agent%20TX%20Streaming%20-%20Cross-Repo%20Plan.md) — read that first. **Status (ria-hub side):** landed 2026-04-16. Ready to talk to a TX-capable agent. --- ## Your job Implement Part A of the plan in `ria-toolkit-oss` (§A1–A8). The hub is already speaking the protocol below and waiting for an agent that can: 1. Accept hub → agent binary TX buffers over an existing WebSocket. 2. Enforce the operator-configured TX interlocks (`tx_enabled`, `tx_max_gain_db`, `tx_max_duration_s`, `tx_allowed_freq_ranges`). 3. Drive `sdr.init_tx` / `_stream_tx` so a real Pluto transmits what the hub sends. 4. Report status back as `tx_status` JSON frames. Full-duplex with the existing RX session on the same `app_id` must keep working. --- ## What ria-hub does (so you know what's on the other end of the wire) You don't need to know any of this to do your work — but if you hit a wall, here's the mental model: | Hub-side concept | What it does | |---|---| | `AgentTxSink` (Python, Celery worker) | Mirrors `AgentDataSource`. Publishes `tx_start`/`tx_stop`/`tx_configure` control JSON and raw binary IQ buffers intended for the agent. | | `/screens/agent/ws` FastAPI endpoint | The WebSocket you already connect to. Now also pumps hub → agent binary TX frames and republishes your `tx_status` JSON upstream to the Celery task. | | Redis channels (`agent:tx_iq:*`, `agent:events:*`) | Internal to the hub. You will never see them. Everything reaches you as WS frames. | | Capability gate | Hub refuses to launch a TX app unless it's seen a recent heartbeat from you with `"tx" ∈ capabilities` and `tx_enabled: true`. | | Audit log (`AgentTxAudit`) | Hub persists who started what transmission at what frequency and gain. Your error messages in `tx_status` end up in that record. | **Bottom line: from your process, this is still the same WebSocket you've been using. You're just getting new message types and a new class of binary frames going the other direction.** --- ## Protocol contract (the only thing you actually need) All additions. Existing RX messages (`start`/`stop`/`configure` + agent → hub binary) keep their current semantics — do not touch them. ### Hub → agent **JSON control frames** (text WS frames): ```jsonc // Arm TX. Call sdr.init_tx with this radio_config and start _stream_tx. // After this you'll start receiving binary frames (see below) that go into // the stream callback. { "type": "tx_start", "app_id": "app-abc", "radio_config": { "device": "pluto", "identifier": "ip:192.168.3.1", "tx_sample_rate": 1000000, "tx_center_frequency": 2450000000, "tx_gain": -20, // dB. Pluto: negative = attenuation. "tx_bandwidth": 1000000, // optional "buffer_size": 1024, // optional; complex samples per buffer "underrun_policy": "pause" // "pause" | "zero" | "repeat" } } // Stop TX, drain queue, pause_tx. RX session on the same app_id (if any) // stays alive. { "type": "tx_stop", "app_id": "app-abc" } // Apply parameter changes at the next buffer boundary. Any subset of // radio_config fields. { "type": "tx_configure", "app_id": "app-abc", "radio_config": { "tx_gain": -25 } } // Advisory — safe to ignore. Hub publishes this whenever it RPUSHes a new // binary buffer; it was wired so the WS bridge wakes up promptly. You do // NOT need to act on it. Consider it a keepalive. { "type": "tx_data_available", "app_id": "app-abc" } ``` **Binary frames** (binary WS frames): * Raw interleaved `float32` IQ samples in `[-1, 1]`. * One frame = one buffer. * Byte length is always `num_complex_samples × 8` (8 bytes per complex sample: two float32s). * **Only valid between `tx_start` and `tx_stop`.** If you receive a binary frame outside that window, drop it and log WARN — don't crash, don't panic. Format validator is already in `ria_toolkit_oss.sdr.sdr._verify_sample_format` — reuse it. ### Agent → hub **JSON status frames** (text WS frames). Use the existing `send_json` path: ```jsonc // Lifecycle — emit on every state transition. { "type": "tx_status", "app_id": "app-abc", "state": "armed" } { "type": "tx_status", "app_id": "app-abc", "state": "transmitting" } { "type": "tx_status", "app_id": "app-abc", "state": "underrun" } { "type": "tx_status", "app_id": "app-abc", "state": "done" } // Errors — include a human-readable message. Hub surfaces it to the UI // and writes it into the audit record. { "type": "tx_status", "app_id": "app-abc", "state": "error", "message": "gain -5 exceeds tx_max_gain_db=-15" } ``` **States** (hub assumes this vocabulary): * `armed` — `init_tx` done, callback started, queue empty, nothing transmitting yet. * `transmitting` — at least one buffer has flowed through the callback. * `underrun` — queue drained; what you do next depends on `underrun_policy`: * `"pause"` → call `pause_tx()`, emit `underrun`, stay paused until the hub sends a fresh `tx_start`. * `"zero"` → continue with `np.zeros(...)` fills, still emit `underrun` once so the hub can show the indicator. * `"repeat"` → loop the last good buffer, emit `underrun` once. * `done` — clean stop after `tx_stop`. * `error` — capability rejection or hardware failure. Include `message`. **Extended heartbeat** — you are already sending heartbeats. Grow the payload: ```jsonc { "type": "heartbeat", "hardware": ["mock", "pluto"], "status": "streaming", // unchanged semantics "capabilities": ["rx", "tx"], // NEW — derived from tx_enabled + SDR class having init_tx "tx_enabled": true, // NEW — mirror of cfg.tx_enabled "tx_max_gain_db": -10, // NEW — optional, from agent config "tx_max_duration_s": 60, // NEW — optional "tx_allowed_freq_ranges": [[2.4e9, 2.5e9]], // NEW — optional "sessions": { // NEW — optional per-session snapshot "rx": { "app_id": "app-abc", "state": "streaming" }, "tx": { "app_id": "app-abc", "state": "transmitting" } }, "app_id": "app-abc" // keep for back-compat } ``` The hub reads these fields, stores them on `ScreensAgent`, and gates TX launches on them. **If you don't advertise `tx` in `capabilities` and `tx_enabled: true`, the hub will refuse to start any TX app with HTTP 400 — no WS traffic will be generated.** ### Backpressure model (what happens when you can't keep up) * The hub caps its outbound TX queue at 200 buffers. If it fills, the hub either blocks on `write()` or drops the oldest buffer — both are benign for you. * On the agent side, enforce your own cap (plan §A2 suggests 8 buffers). When full, `await ws.send` on the hub will slow via TCP/WS backpressure. You don't need an application-level flow-control message. --- ## Implementation roadmap (mapped to the Cross-Repo Plan) Work in the order below. Each row is a single PR-sized unit. | # | Plan ref | Deliverable | Acceptance | |---|---|---|---| | 1 | §A3 | `AgentConfig` gains `tx_enabled`, `tx_max_gain_db`, `tx_max_duration_s`, `tx_allowed_freq_ranges`. `save()` keeps 0600. | Unit test: round-trip through `~/.ria/agent.json`. | | 2 | §A4 | `ria-agent register --allow-tx --tx-max-gain-db -10 --tx-max-duration 60` persists into config. `ria-agent stream --allow-tx` is a runtime override. | Integration test: `ria-agent register --allow-tx` then `cat ~/.ria/agent.json` shows fields. | | 3 | §A1 | `ws_client.run()` grows a `on_binary: Callable[[bytes], Awaitable[None]]` parameter. Reconnect + heartbeat + malformed-frame behavior unchanged. | Existing `test_ws_client.py` still passes; new `test_ws_client_binary.py` asserts bytes reach the handler. | | 4 | §A2 | Replace flat `self._sdr` / `self._app_id` state in `streamer.py` with `RxSession` + `TxSession` dataclasses. SDR instances cached by `(device, identifier)` so RX+TX share one handle on the same device. | Unit test: creating a TxSession on the same device as an active RxSession reuses the same SDR object. | | 5 | §A2 | `_handle_tx_start`, `_handle_tx_stop`, `_handle_tx_configure` + `on_binary(data)` → `self._tx.queue.put(data)`. TX loop runs `_stream_tx` in an executor thread with a thread-safe `queue.Queue` adapter. | Integration test against MockSDR: tx_start → 10 binary frames → tx_stop produces exactly those samples through the callback. | | 6 | §A2 | Underrun handling: `"pause"` / `"zero"` / `"repeat"` fills. Emits `tx_status: underrun` exactly once per drain event. | Unit test per policy against a slow producer. | | 7 | §A2 | Cap enforcement **before** opening the SDR: reject with `tx_status: error` if `tx_enabled=False`, gain exceeds cap, freq outside allowed ranges, or duration cap exceeded (watchdog in TX loop calls `tx_stop` after `tx_max_duration_s`). | Unit test per rejection path; SDR is never opened when rejection fires. | | 8 | §A5 | Heartbeat grows `capabilities`, `tx_enabled`, optional caps, `sessions`. | Integration test: start agent with `--allow-tx`, connect, verify heartbeat payload. | | 9 | §A6 | Audit the Pluto driver's `_tx_lock` + `_param_lock` interaction to ensure concurrent RX + TX on the same `adi.Pluto` doesn't race on attribute writes. `MockSDR.init_tx` already exists — no change needed. | Stress test: 30 seconds of concurrent RX + TX on MockSDR with `_param_lock` instrumented for contention. | | 10 | §A7 | Test matrix per plan: `test_streamer_tx`, `test_tx_safety`, `test_tx_underrun`, `test_full_duplex`, `test_ws_client_binary`, `test_integration_tx`. | All green in CI. | | 11 | §A8 | Docs: new `docs/agent_tx_protocol.md` OR extended section in existing agent protocol doc. Regulatory disclaimer included. | Lints + renders. | **Ship order advice:** 1 → 2 → 3 → 4 → (5 || 6) → 7 → 8 → 9 → 10 → 11. Steps 1–3 are strict prerequisites for everything else. Steps 5 and 6 can parallelize. Step 7 can't land without 5. --- ## Verification loop (how to prove the two sides talk) Once you've implemented §A1–A7, use this to close the loop with the live hub: 1. On the agent host, run `ria-agent register --allow-tx --tx-max-gain-db -10 --tx-max-duration 60` then `ria-agent stream`. 2. Confirm the hub has seen the heartbeat: `curl $HUB/screens/agents/json | jq '.agents[] | select(.agent_id==...) | {tx_enabled, capabilities}'` should show `tx_enabled: true` and `["rx","tx"]`. 3. Create a Screens app whose manifest contains a `dataSink.type == "agent"` pointing at your `agent_id`, or use the composer UI with a `PlutoTXOp` in the graph + the new TX-sink agent picker. 4. `POST /screens/apps/{id}/start` on the hub. You should observe, in order: 1. `tx_start` JSON on your WS. 2. Binary frames arriving (if the hub-side operator is actually generating buffers — may no-op for now since the operator refactor is planned but not done). 3. Your `tx_status: armed` JSON emitted back. 5. Stop the app. You should receive `tx_stop` and emit `tx_status: done`. 6. Provoke a rejection: set `tx_max_gain_db: -15` in your config, then start a TX app with `tx_gain: -5`. The hub should return `HTTP 400` from `/start` without any WS traffic — capability gate fires first. If you make it past the gate and it's still wrong, emit `tx_status: error` and the hub will surface the message to the UI. **Useful hub-side greps if something is wrong:** * `grep -r "tx_status" controller/app/modules/screens/` — see how the hub parses your frames. * `grep -r "tx_enabled" controller/app/modules/screens/` — see what heartbeat fields the hub reads. * `controller/app/modules/screens/agent_ws.py:200-290` — the WS handler's JSON dispatch. * `controller/app/modules/screens/data_sinks.py` — what the hub publishes on each control frame. --- ## Open questions (from the original plan that still apply) Answered since the plan was written: * ✅ **Operator name:** `PlutoTXOp` (PascalCase, stored in the hub's MongoDB `ops` collection via the application packager). * ✅ **Redis channel naming:** kept `agent:*` prefix on the hub side — you never see this. * ✅ **Status plumbing:** `tx_status` frames get republished on a hub-internal pub/sub and surface to the UI through the existing SSE stream. You just send the frames; the hub does the rest. Still open (flag when you have a preference): * **Bulk + loop fast-path.** If the hub's TX operator turns out to be a fixed recording played on loop, we could add a `{ "type": "tx_start", ..., "loop": true }` variant where the hub sends the buffer once and the agent uses the existing `tx_recording` path. Protocol-compatible with the streaming version. Defer until a real use case demands it. * **Multi-app-per-agent.** Out of scope for v1 (§Non-goals). If/when needed: prefix binary frames with a 4-byte session header and bump a `protocol_version` in the heartbeat. * **TX clock drift.** Relying on generous queue depth + stable local networks for v1. Longer term may need agent-side resampling. --- ## What lives in `ria-hub` now (reference) You don't need to read any of this, but if you're curious or need to debug the integration, these are the load-bearing bits on the hub side: | Path | What | |---|---| | `controller/app/modules/screens/data_sinks.py` | `AgentTxSink`, `LocalPlutoTxSink`, `build_data_sink` | | `controller/app/modules/screens/agent_ws.py` | `_forward_tx_binary`, heartbeat parsing, `tx_status` republish | | `controller/app/modules/screens/graph_derivation.py` | `_pluto_tx_spec_mapping`, `_SDR_SINK_MAP`, `_derive_data_sink` | | `controller/app/modules/screens/routes.py` | `_check_agent_tx_capability`, `AgentTxAudit` write, `POST /apps/{id}/sink-agent` | | `controller/app/modules/screens/models.py` | `ScreensAgent` TX fields, `AgentTxAudit` document | | `schemas/screens/app_manifest.schema.json` | `dataSink` schema block | | `web_src/js/components/screens/components/TxConsentModal.vue` | Pre-transmit consent dialog | | `web_src/js/components/screens/components/SinkPanel.vue` | TX-capable agent picker + live `tx_status` indicator | | `web_src/js/components/screens/ScreensApp.vue` | Consent gate + `tx_status` forwarding to children | --- ## Regulatory note (keep this in your docs too) Transmission is regulated in every jurisdiction. The agent-side interlocks (`tx_enabled`, caps, freq ranges) exist so the operator can configure safe defaults for an agent's physical location. They are not a substitute for licensing or for respecting local regulations. The hub shows a consent modal and writes an audit log so actions are attributable. None of this is a legal compliance layer — it's defense-in-depth.