Push Tracker
ria-toolkit-oss/Agent TX Streaming Handoff.md

223 lines
14 KiB
Markdown
Raw Permalink Normal View History

J
2026-06-16 11:54:05 -04:00
# Agent TX Streaming — `ria-toolkit-oss` Handoff
**Paired repo:** `ria-hub` (this doc lives here, but it's written for the Claude working in `ria-toolkit-oss`)
**Source of truth for the overall design:** [Agent TX Streaming - Cross-Repo Plan.md](./Agent%20TX%20Streaming%20-%20Cross-Repo%20Plan.md) — read that first.
**Status (ria-hub side):** landed 2026-04-16. Ready to talk to a TX-capable agent.
---
## Your job
Implement Part A of the plan in `ria-toolkit-oss` (§A1A8). The hub is already speaking the protocol below and waiting for an agent that can:
1. Accept hub → agent binary TX buffers over an existing WebSocket.
2. Enforce the operator-configured TX interlocks (`tx_enabled`, `tx_max_gain_db`, `tx_max_duration_s`, `tx_allowed_freq_ranges`).
3. Drive `sdr.init_tx` / `_stream_tx` so a real Pluto transmits what the hub sends.
4. Report status back as `tx_status` JSON frames.
Full-duplex with the existing RX session on the same `app_id` must keep working.
---
## What ria-hub does (so you know what's on the other end of the wire)
You don't need to know any of this to do your work — but if you hit a wall, here's the mental model:
| Hub-side concept | What it does |
|---|---|
| `AgentTxSink` (Python, Celery worker) | Mirrors `AgentDataSource`. Publishes `tx_start`/`tx_stop`/`tx_configure` control JSON and raw binary IQ buffers intended for the agent. |
| `/screens/agent/ws` FastAPI endpoint | The WebSocket you already connect to. Now also pumps hub → agent binary TX frames and republishes your `tx_status` JSON upstream to the Celery task. |
| Redis channels (`agent:tx_iq:*`, `agent:events:*`) | Internal to the hub. You will never see them. Everything reaches you as WS frames. |
| Capability gate | Hub refuses to launch a TX app unless it's seen a recent heartbeat from you with `"tx" ∈ capabilities` and `tx_enabled: true`. |
| Audit log (`AgentTxAudit`) | Hub persists who started what transmission at what frequency and gain. Your error messages in `tx_status` end up in that record. |
**Bottom line: from your process, this is still the same WebSocket you've been using. You're just getting new message types and a new class of binary frames going the other direction.**
---
## Protocol contract (the only thing you actually need)
All additions. Existing RX messages (`start`/`stop`/`configure` + agent → hub binary) keep their current semantics — do not touch them.
### Hub → agent
**JSON control frames** (text WS frames):
```jsonc
// Arm TX. Call sdr.init_tx with this radio_config and start _stream_tx.
// After this you'll start receiving binary frames (see below) that go into
// the stream callback.
{
"type": "tx_start",
"app_id": "app-abc",
"radio_config": {
"device": "pluto",
"identifier": "ip:192.168.3.1",
"tx_sample_rate": 1000000,
"tx_center_frequency": 2450000000,
"tx_gain": -20, // dB. Pluto: negative = attenuation.
"tx_bandwidth": 1000000, // optional
"buffer_size": 1024, // optional; complex samples per buffer
"underrun_policy": "pause" // "pause" | "zero" | "repeat"
}
}
// Stop TX, drain queue, pause_tx. RX session on the same app_id (if any)
// stays alive.
{ "type": "tx_stop", "app_id": "app-abc" }
// Apply parameter changes at the next buffer boundary. Any subset of
// radio_config fields.
{ "type": "tx_configure", "app_id": "app-abc", "radio_config": { "tx_gain": -25 } }
// Advisory — safe to ignore. Hub publishes this whenever it RPUSHes a new
// binary buffer; it was wired so the WS bridge wakes up promptly. You do
// NOT need to act on it. Consider it a keepalive.
{ "type": "tx_data_available", "app_id": "app-abc" }
```
**Binary frames** (binary WS frames):
* Raw interleaved `float32` IQ samples in `[-1, 1]`.
* One frame = one buffer.
* Byte length is always `num_complex_samples × 8` (8 bytes per complex sample: two float32s).
* **Only valid between `tx_start` and `tx_stop`.** If you receive a binary frame outside that window, drop it and log WARN — don't crash, don't panic.
Format validator is already in `ria_toolkit_oss.sdr.sdr._verify_sample_format` — reuse it.
### Agent → hub
**JSON status frames** (text WS frames). Use the existing `send_json` path:
```jsonc
// Lifecycle — emit on every state transition.
{ "type": "tx_status", "app_id": "app-abc", "state": "armed" }
{ "type": "tx_status", "app_id": "app-abc", "state": "transmitting" }
{ "type": "tx_status", "app_id": "app-abc", "state": "underrun" }
{ "type": "tx_status", "app_id": "app-abc", "state": "done" }
// Errors — include a human-readable message. Hub surfaces it to the UI
// and writes it into the audit record.
{ "type": "tx_status", "app_id": "app-abc", "state": "error",
"message": "gain -5 exceeds tx_max_gain_db=-15" }
```
**States** (hub assumes this vocabulary):
* `armed``init_tx` done, callback started, queue empty, nothing transmitting yet.
* `transmitting` — at least one buffer has flowed through the callback.
* `underrun` — queue drained; what you do next depends on `underrun_policy`:
* `"pause"` → call `pause_tx()`, emit `underrun`, stay paused until the hub sends a fresh `tx_start`.
* `"zero"` → continue with `np.zeros(...)` fills, still emit `underrun` once so the hub can show the indicator.
* `"repeat"` → loop the last good buffer, emit `underrun` once.
* `done` — clean stop after `tx_stop`.
* `error` — capability rejection or hardware failure. Include `message`.
**Extended heartbeat** — you are already sending heartbeats. Grow the payload:
```jsonc
{
"type": "heartbeat",
"hardware": ["mock", "pluto"],
"status": "streaming", // unchanged semantics
"capabilities": ["rx", "tx"], // NEW — derived from tx_enabled + SDR class having init_tx
"tx_enabled": true, // NEW — mirror of cfg.tx_enabled
"tx_max_gain_db": -10, // NEW — optional, from agent config
"tx_max_duration_s": 60, // NEW — optional
"tx_allowed_freq_ranges": [[2.4e9, 2.5e9]], // NEW — optional
"sessions": { // NEW — optional per-session snapshot
"rx": { "app_id": "app-abc", "state": "streaming" },
"tx": { "app_id": "app-abc", "state": "transmitting" }
},
"app_id": "app-abc" // keep for back-compat
}
```
The hub reads these fields, stores them on `ScreensAgent`, and gates TX launches on them. **If you don't advertise `tx` in `capabilities` and `tx_enabled: true`, the hub will refuse to start any TX app with HTTP 400 — no WS traffic will be generated.**
### Backpressure model (what happens when you can't keep up)
* The hub caps its outbound TX queue at 200 buffers. If it fills, the hub either blocks on `write()` or drops the oldest buffer — both are benign for you.
* On the agent side, enforce your own cap (plan §A2 suggests 8 buffers). When full, `await ws.send` on the hub will slow via TCP/WS backpressure. You don't need an application-level flow-control message.
---
## Implementation roadmap (mapped to the Cross-Repo Plan)
Work in the order below. Each row is a single PR-sized unit.
| # | Plan ref | Deliverable | Acceptance |
|---|---|---|---|
| 1 | §A3 | `AgentConfig` gains `tx_enabled`, `tx_max_gain_db`, `tx_max_duration_s`, `tx_allowed_freq_ranges`. `save()` keeps 0600. | Unit test: round-trip through `~/.ria/agent.json`. |
| 2 | §A4 | `ria-agent register --allow-tx --tx-max-gain-db -10 --tx-max-duration 60` persists into config. `ria-agent stream --allow-tx` is a runtime override. | Integration test: `ria-agent register --allow-tx` then `cat ~/.ria/agent.json` shows fields. |
| 3 | §A1 | `ws_client.run()` grows a `on_binary: Callable[[bytes], Awaitable[None]]` parameter. Reconnect + heartbeat + malformed-frame behavior unchanged. | Existing `test_ws_client.py` still passes; new `test_ws_client_binary.py` asserts bytes reach the handler. |
| 4 | §A2 | Replace flat `self._sdr` / `self._app_id` state in `streamer.py` with `RxSession` + `TxSession` dataclasses. SDR instances cached by `(device, identifier)` so RX+TX share one handle on the same device. | Unit test: creating a TxSession on the same device as an active RxSession reuses the same SDR object. |
| 5 | §A2 | `_handle_tx_start`, `_handle_tx_stop`, `_handle_tx_configure` + `on_binary(data)``self._tx.queue.put(data)`. TX loop runs `_stream_tx` in an executor thread with a thread-safe `queue.Queue` adapter. | Integration test against MockSDR: tx_start → 10 binary frames → tx_stop produces exactly those samples through the callback. |
| 6 | §A2 | Underrun handling: `"pause"` / `"zero"` / `"repeat"` fills. Emits `tx_status: underrun` exactly once per drain event. | Unit test per policy against a slow producer. |
| 7 | §A2 | Cap enforcement **before** opening the SDR: reject with `tx_status: error` if `tx_enabled=False`, gain exceeds cap, freq outside allowed ranges, or duration cap exceeded (watchdog in TX loop calls `tx_stop` after `tx_max_duration_s`). | Unit test per rejection path; SDR is never opened when rejection fires. |
| 8 | §A5 | Heartbeat grows `capabilities`, `tx_enabled`, optional caps, `sessions`. | Integration test: start agent with `--allow-tx`, connect, verify heartbeat payload. |
| 9 | §A6 | Audit the Pluto driver's `_tx_lock` + `_param_lock` interaction to ensure concurrent RX + TX on the same `adi.Pluto` doesn't race on attribute writes. `MockSDR.init_tx` already exists — no change needed. | Stress test: 30 seconds of concurrent RX + TX on MockSDR with `_param_lock` instrumented for contention. |
| 10 | §A7 | Test matrix per plan: `test_streamer_tx`, `test_tx_safety`, `test_tx_underrun`, `test_full_duplex`, `test_ws_client_binary`, `test_integration_tx`. | All green in CI. |
| 11 | §A8 | Docs: new `docs/agent_tx_protocol.md` OR extended section in existing agent protocol doc. Regulatory disclaimer included. | Lints + renders. |
**Ship order advice:** 1 → 2 → 3 → 4 → (5 || 6) → 7 → 8 → 9 → 10 → 11. Steps 13 are strict prerequisites for everything else. Steps 5 and 6 can parallelize. Step 7 can't land without 5.
---
## Verification loop (how to prove the two sides talk)
Once you've implemented §A1A7, use this to close the loop with the live hub:
1. On the agent host, run `ria-agent register --allow-tx --tx-max-gain-db -10 --tx-max-duration 60` then `ria-agent stream`.
2. Confirm the hub has seen the heartbeat: `curl $HUB/screens/agents/json | jq '.agents[] | select(.agent_id==...) | {tx_enabled, capabilities}'` should show `tx_enabled: true` and `["rx","tx"]`.
3. Create a Screens app whose manifest contains a `dataSink.type == "agent"` pointing at your `agent_id`, or use the composer UI with a `PlutoTXOp` in the graph + the new TX-sink agent picker.
4. `POST /screens/apps/{id}/start` on the hub. You should observe, in order:
1. `tx_start` JSON on your WS.
2. Binary frames arriving (if the hub-side operator is actually generating buffers — may no-op for now since the operator refactor is planned but not done).
3. Your `tx_status: armed` JSON emitted back.
5. Stop the app. You should receive `tx_stop` and emit `tx_status: done`.
6. Provoke a rejection: set `tx_max_gain_db: -15` in your config, then start a TX app with `tx_gain: -5`. The hub should return `HTTP 400` from `/start` without any WS traffic — capability gate fires first. If you make it past the gate and it's still wrong, emit `tx_status: error` and the hub will surface the message to the UI.
**Useful hub-side greps if something is wrong:**
* `grep -r "tx_status" controller/app/modules/screens/` — see how the hub parses your frames.
* `grep -r "tx_enabled" controller/app/modules/screens/` — see what heartbeat fields the hub reads.
* `controller/app/modules/screens/agent_ws.py:200-290` — the WS handler's JSON dispatch.
* `controller/app/modules/screens/data_sinks.py` — what the hub publishes on each control frame.
---
## Open questions (from the original plan that still apply)
Answered since the plan was written:
***Operator name:** `PlutoTXOp` (PascalCase, stored in the hub's MongoDB `ops` collection via the application packager).
***Redis channel naming:** kept `agent:*` prefix on the hub side — you never see this.
***Status plumbing:** `tx_status` frames get republished on a hub-internal pub/sub and surface to the UI through the existing SSE stream. You just send the frames; the hub does the rest.
Still open (flag when you have a preference):
* **Bulk + loop fast-path.** If the hub's TX operator turns out to be a fixed recording played on loop, we could add a `{ "type": "tx_start", ..., "loop": true }` variant where the hub sends the buffer once and the agent uses the existing `tx_recording` path. Protocol-compatible with the streaming version. Defer until a real use case demands it.
* **Multi-app-per-agent.** Out of scope for v1 (§Non-goals). If/when needed: prefix binary frames with a 4-byte session header and bump a `protocol_version` in the heartbeat.
* **TX clock drift.** Relying on generous queue depth + stable local networks for v1. Longer term may need agent-side resampling.
---
## What lives in `ria-hub` now (reference)
You don't need to read any of this, but if you're curious or need to debug the integration, these are the load-bearing bits on the hub side:
| Path | What |
|---|---|
| `controller/app/modules/screens/data_sinks.py` | `AgentTxSink`, `LocalPlutoTxSink`, `build_data_sink` |
| `controller/app/modules/screens/agent_ws.py` | `_forward_tx_binary`, heartbeat parsing, `tx_status` republish |
| `controller/app/modules/screens/graph_derivation.py` | `_pluto_tx_spec_mapping`, `_SDR_SINK_MAP`, `_derive_data_sink` |
| `controller/app/modules/screens/routes.py` | `_check_agent_tx_capability`, `AgentTxAudit` write, `POST /apps/{id}/sink-agent` |
| `controller/app/modules/screens/models.py` | `ScreensAgent` TX fields, `AgentTxAudit` document |
| `schemas/screens/app_manifest.schema.json` | `dataSink` schema block |
| `web_src/js/components/screens/components/TxConsentModal.vue` | Pre-transmit consent dialog |
| `web_src/js/components/screens/components/SinkPanel.vue` | TX-capable agent picker + live `tx_status` indicator |
| `web_src/js/components/screens/ScreensApp.vue` | Consent gate + `tx_status` forwarding to children |
---
## Regulatory note (keep this in your docs too)
Transmission is regulated in every jurisdiction. The agent-side interlocks (`tx_enabled`, caps, freq ranges) exist so the operator can configure safe defaults for an agent's physical location. They are not a substitute for licensing or for respecting local regulations. The hub shows a consent modal and writes an audit log so actions are attributable. None of this is a legal compliance layer — it's defense-in-depth.