Push Tracker
ria-toolkit-oss/docs/agent_tx_plan.md

421 lines
21 KiB
Markdown

# Agent TX Streaming — Cross-Repo Plan
**Repos:** `ria-toolkit-oss`, `ria-hub`, Screens frontend
**Status:** Proposal / pre-implementation
**Prerequisites:** The RX-streaming work from [screens_agent_handoff.md](./screens_agent_handoff.md) and [screens_agent_streamer_plan.md](./screens_agent_streamer_plan.md) is landed (agent WS protocol, `AgentDataSource`, `/screens/agents/register`, `/screens/agent/ws`).
## Goal
Let a Screens app running on the hub drive a **remote agent's Pluto** (or other TX-capable SDR) to transmit — streaming IQ buffers end-to-end from an operator like `plutoTXoperator` into the agent's `sdr.tx()` path. Mirror image of what `AgentDataSource` already does for RX.
## Non-goals (v1)
- Multi-tenant radio sharing (one app owns the radio at a time per agent).
- Bulk/upload-once TX — superseded by streaming per request.
- Arbitrary waveform generation in the agent. The agent is dumb pipe + hardware control; signal generation stays on the hub.
## Key design decisions
| # | Decision | Value |
|---|---|---|
| D1 | **Delivery mode** | **Streaming**. Hub pushes binary IQ buffers continuously over the existing WS; agent's `_stream_tx` callback pulls them from an in-agent queue. |
| D2 | **Full-duplex** | **Yes.** A single `app_id` may own both an RX session and a TX session on the same agent concurrently. Same physical SDR handle serves both (Pluto is FDD-capable; `init_rx` and `init_tx` are independent on one `adi.Pluto` instance). |
| D3 | **Safety caps** | **Agent-enforced.** `~/.ria/agent.json` holds `tx_enabled`, `tx_max_gain_db`, `tx_max_duration_s`, optional `tx_allowed_freq_ranges: [[low,high], …]`. Agent rejects `tx_start` frames that violate any of these, independent of what the hub sends. |
| D4 | **Buffer format** | Interleaved float32 IQ, range `[-1, 1]` — same as RX. Format-validated by `ria_toolkit_oss.sdr.sdr._verify_sample_format`. |
| D5 | **Protocol evolution** | Keep existing RX messages (`start`/`stop`/`configure`) unchanged for back-compat. Add parallel `tx_start`/`tx_stop`/`tx_configure`. Heartbeat grows to advertise capabilities. |
| D6 | **Underrun policy** | Default `pause`: if the TX queue empties, agent calls `pause_tx()` and emits `tx_status: underrun`. Hub must recover by sending a fresh `tx_start` + buffers. Configurable per session via `radio_config.underrun_policy ∈ {"pause", "zero", "repeat"}`. |
| D7 | **Backpressure** | Rely on TCP/WS backpressure. Agent caps inbound TX queue at 8 buffers; `await ws.send` on the hub side slows when the agent doesn't drain. No application-level flow control in v1. |
| D8 | **Session identity** | `app_id` identifies a Screens app. Each app has at most one RX session and one TX session per agent. Binary direction disambiguates: agent → hub binary = RX IQ; hub → agent binary = TX IQ. |
## Protocol specification
Additions only. Existing RX messages from [screens_agent_handoff.md §Phase 4](./screens_agent_handoff.md) are unchanged.
### Hub → agent (JSON)
```jsonc
// Arm the TX side. Agent calls init_tx, starts the stream_tx thread with an empty queue.
// After this, hub sends binary TX buffers on the same WS.
{
"type": "tx_start",
"app_id": "app-abc",
"radio_config": {
"device": "pluto",
"identifier": "ip:192.168.3.1",
"tx_sample_rate": 1000000,
"tx_center_frequency": 2450000000,
"tx_gain": -20, // dB, negative = attenuation on Pluto
"tx_bandwidth": 1000000, // optional
"buffer_size": 1024,
"underrun_policy": "pause" // "pause" | "zero" | "repeat"
}
}
// Apply parameter changes at the next buffer boundary.
{ "type": "tx_configure", "app_id": "app-abc", "radio_config": { "tx_gain": -25 } }
// Stop TX, drain queue, pause_tx, release TX side (RX may continue if a separate RX session is live).
{ "type": "tx_stop", "app_id": "app-abc" }
```
### Hub → agent (binary)
Raw interleaved float32 IQ in `[-1, 1]`. One WS frame = one buffer = `buffer_size` complex samples = `buffer_size * 2 * 4` bytes. Delivered only between `tx_start` and `tx_stop`. Binary frames arriving outside that window are discarded and logged at WARN.
### Agent → hub (JSON)
```jsonc
// Lifecycle events.
{ "type": "tx_status", "app_id": "app-abc", "state": "armed" }
{ "type": "tx_status", "app_id": "app-abc", "state": "transmitting" }
{ "type": "tx_status", "app_id": "app-abc", "state": "underrun" } // queue empty; TX paused
{ "type": "tx_status", "app_id": "app-abc", "state": "done" }
{ "type": "tx_status", "app_id": "app-abc", "state": "error", "message": "gain -5 exceeds tx_max_gain_db=-15" }
// Reject reasons from agent-enforced caps/interlocks are surfaced via tx_status:error.
```
### Heartbeat extension
Existing `{type: heartbeat, hardware[], status}` grows:
```jsonc
{
"type": "heartbeat",
"hardware": ["mock", "pluto"],
"status": "streaming", // unchanged semantics
"capabilities": ["rx", "tx"], // NEW — derived from tx_enabled + SDR class having init_tx
"tx_enabled": true, // NEW — mirrors config flag
"sessions": { // NEW — optional per-session snapshot
"rx": { "app_id": "app-abc", "state": "streaming" },
"tx": { "app_id": "app-abc", "state": "transmitting" }
},
"app_id": "app-abc" // kept for back-compat
}
```
---
## Part A — `ria-toolkit-oss` (this repo)
### A1. `agent/ws_client.py`
Currently the WS client drops server → agent binary (`ws_client.py:77-79`). Add a binary handler alongside the JSON one.
```python
BinaryHandler = Callable[[bytes], Awaitable[None]]
async def run(
self,
on_message: MessageHandler,
heartbeat: HeartbeatBuilder,
on_binary: BinaryHandler | None = None,
) -> None:
...
async for raw in self._ws:
if isinstance(raw, bytes):
if on_binary is not None:
await on_binary(raw)
continue
...
```
Keep the reconnect, heartbeat, and malformed-frame behavior unchanged.
### A2. `agent/streamer.py` — add TX sessions
Replace the flat `self._sdr` / `self._app_id` / `self._capture_task` state with a session model:
```python
@dataclass
class RxSession:
app_id: str
sdr: Any
buffer_size: int
task: asyncio.Task
pending_config: dict
@dataclass
class TxSession:
app_id: str
sdr: Any
queue: asyncio.Queue[bytes] # bounded, maxsize=8
task: asyncio.Task # runs _stream_tx in executor
underrun_policy: str
pending_config: dict
bytes_transmitted: int = 0
started_at: float = 0.0 # for tx_max_duration_s enforcement
```
The streamer holds `self._rx: RxSession | None` and `self._tx: TxSession | None`. SDR instances are cached by `(device, identifier)` — when RX and TX name the same device, both sessions share one handle (matters for Pluto FDD).
**New handlers**:
- `_handle_tx_start(msg)` — check `cfg.tx_enabled`, validate gain/duration/freq against caps, open/resolve SDR, `sdr.init_tx(...)`, start `_tx_loop`, emit `tx_status: armed`.
- `_handle_tx_stop(msg)` — cancel TX task, `sdr.pause_tx()`, drain queue, release SDR if no RX session on it, emit `tx_status: done`.
- `_handle_tx_configure(msg)` — stash into `self._tx.pending_config`, applied at next buffer boundary (same pattern as RX).
- `on_binary(data)` — if `self._tx`: `await self._tx.queue.put(data)` (awaiting here is the backpressure mechanism). Else: log and drop.
**TX loop** (runs in an executor thread via `loop.run_in_executor`, like the RX capture loop):
```python
def _tx_callback(num_samples: int) -> np.ndarray:
# Called by sdr._stream_tx on every buffer boundary.
try:
raw = self._tx_queue_sync.get(timeout=0.1)
except queue.Empty:
return self._underrun_fill(num_samples) # policy-driven
samples = np.frombuffer(raw, dtype=np.float32).view(np.complex64)
if len(samples) < num_samples:
return _pad_zero(samples, num_samples)
return samples[:num_samples]
```
Use a thread-safe `queue.Queue` for the TX side (the `asyncio.Queue` lives on the event loop; the executor thread reads from a sibling `queue.Queue` fed by a tiny asyncio→threading adapter).
**Underrun fills**:
- `"pause"`: signal the main loop to call `sdr.pause_tx()`, emit `tx_status: underrun`, exit the callback.
- `"zero"`: return `np.zeros(num_samples, dtype=np.complex64)`.
- `"repeat"`: return the last good buffer (cached). If no buffer yet: zeros.
**Cap enforcement** in `_handle_tx_start` (before opening the SDR):
```python
if not self._cfg.tx_enabled:
return await self._send_error_tx(app_id, "tx disabled on this agent")
if (cap := self._cfg.tx_max_gain_db) is not None and tx_gain > cap:
return await self._send_error_tx(app_id, f"gain {tx_gain} exceeds cap {cap}")
if (cap := self._cfg.tx_max_duration_s) is not None:
# enforced by a watchdog in _tx_loop that calls tx_stop after cap seconds
...
for (lo, hi) in self._cfg.tx_allowed_freq_ranges or []:
if lo <= tx_center_frequency <= hi:
break
else:
if self._cfg.tx_allowed_freq_ranges:
return await self._send_error_tx(app_id, f"freq {tx_center_frequency} outside allowed ranges")
```
### A3. `agent/config.py`
Extend `AgentConfig`:
```python
@dataclass
class AgentConfig:
# existing fields…
tx_enabled: bool = False
tx_max_gain_db: float | None = None
tx_max_duration_s: float | None = None
tx_allowed_freq_ranges: list[tuple[float, float]] | None = None
```
`save()` preserves existing 0600 perms.
### A4. `agent/cli.py`
- `ria-agent register --allow-tx --tx-max-gain-db -10 --tx-max-duration 60` — persist the interlock into config.
- `ria-agent stream --allow-tx` — runtime override (sets `cfg.tx_enabled=True` for the life of the process without writing config).
- `ria-agent detect` unchanged.
### A5. `agent/hardware.py`
```python
def heartbeat_payload(status, app_id=None, *, cfg: AgentConfig, sessions: dict | None = None) -> dict:
caps = ["rx"]
if cfg.tx_enabled:
caps.append("tx")
payload = {
"type": "heartbeat",
"hardware": available_devices(),
"status": status,
"capabilities": caps,
"tx_enabled": cfg.tx_enabled,
}
if app_id:
payload["app_id"] = app_id
if sessions:
payload["sessions"] = sessions
return payload
```
### A6. SDR layer
- **Audit**: [`sdr/pluto.py`](../src/ria_toolkit_oss/sdr/pluto.py) `tx_recording` + `_stream_tx` paths already use `_tx_lock` (line 31, 323, 360). Double-check concurrent-with-RX behavior: the `adi.Pluto` Python object is not thread-safe for arbitrary attribute writes, so all `set_tx_*` / `set_rx_*` must go through the shared `_param_lock` (already present at [`sdr/sdr.py:44`](../src/ria_toolkit_oss/sdr/sdr.py#L44)). Verify `rx()` in a loop + `_stream_tx` in another thread don't step on each other.
- **MockSDR** already has `init_tx` + `_stream_tx` (`sdr/mock.py:70-100`). No changes needed for mock-based tests.
- **Other TX-capable drivers** (blade, usrp, hackrf): out of scope for v1; leave their `init_tx` as-is.
### A7. Tests (`tests/agent/`)
- `test_streamer_tx.py``tx_start` → binary frames → `_stream_tx` callback pulls correct samples → `tx_stop` cleans up.
- `test_tx_safety.py` — cap violations (gain, duration, freq, `tx_enabled=False`) each produce `tx_status: error` and never open the SDR.
- `test_tx_underrun.py` — each policy (`pause`, `zero`, `repeat`) exercised against a fake slow producer.
- `test_full_duplex.py` — one `app_id` sends `start` + `tx_start`; both sessions share one MockSDR; both produce their expected frames; stopping one does not stop the other.
- `test_ws_client_binary.py` — binary frames now reach the binary handler.
- `test_integration_tx.py` — end-to-end against local `websockets` server + MockSDR.
### A8. Docs
- Add a TX section to any existing agent protocol doc (or create `docs/agent_tx_protocol.md`).
- Include a regulatory disclaimer: the operator is responsible for transmissions. The agent is an enabler, not a policy layer beyond the interlocks.
---
## Part B — `ria-hub`
> Paths below are conceptual — confirm against the actual module layout in `ria-hub` before editing. Anchor points reference the RX handoff at [screens_agent_handoff.md §Part B](./screens_agent_handoff.md).
### B1. `AgentTxSink` (new)
Mirror of `AgentDataSource`. Location: `controller/app/modules/screens/data_sinks.py` (or wherever output sinks live in `ria-hub`).
Responsibilities:
- `prepare(radio_config)` — send `tx_start` via Redis pub/sub on `screens:agent:{agent_id}:tx` → WS proxy → agent.
- `write(buffer: np.ndarray | bytes)` — convert to interleaved float32 bytes, send as binary over the WS. Awaits on WS backpressure.
- `configure(partial_radio_config)` — send `tx_configure`.
- `close()` — send `tx_stop`.
- Subscribes to the agent's `tx_status` frames (via the same Redis pub/sub channel used for RX status today) and surfaces state back to the orchestrator. An `error` state aborts the Celery task.
### B2. Refactor `plutoTXoperator`
The existing operator presumably calls `radio.tx(...)` against a directly-attached Pluto. Abstract the "output" into an injectable sink:
```python
class PlutoTxOperator:
def __init__(self, sink: TxSink, ...):
self.sink = sink # AgentTxSink when dataSink.type == "agent", else LocalPlutoTxSink
def run(self, ...):
self.sink.prepare(self.radio_config)
while not stop:
buf = self._generate_next_buffer()
self.sink.write(buf)
self.sink.close()
```
The local path (existing direct-hardware behavior) becomes `LocalPlutoTxSink`, a thin wrapper around the current `radio.tx` calls. No behavior change for existing deployments.
`build_data_sink()` (to match `build_data_source()` from B1/B6) routes on `dataSink.type`.
### B3. Manifest schema
Add `dataSink` alongside `dataSource` in the manifest. New `type: "agent"`:
```json
{
"dataSource": { "type": "agent", "device": "pluto", "agent_id": "agent-abc", "params": { "sample_rate": 1000000, "center_frequency": 2450000000, "gain": 40 } },
"dataSink": { "type": "agent", "device": "pluto", "agent_id": "agent-abc", "params": { "tx_sample_rate": 1000000, "tx_center_frequency": 2450000000, "tx_gain": -20, "underrun_policy": "pause" } }
}
```
Update Pydantic models + JSON schema validators in `controller/app/modules/screens/graph_derivation.py` (or equivalent). When `dataSource.agent_id == dataSink.agent_id` and both target `pluto` with the same `identifier`, the agent will naturally share one SDR handle — no special-casing needed on the hub side.
### B4. WS endpoint extensions
`/screens/agent/ws` already exists. Add:
- Support for hub → agent **binary frames** (currently binary is agent → hub only). FastAPI's `WebSocket.send_bytes` works directly; just route binary from the Redis pub/sub channel through to the WS.
- New Redis pub/sub channel `screens:agent:{agent_id}:tx` for outbound TX control JSON + a separate `screens:agent:{agent_id}:tx_bin` for outbound binary. (Two channels because many Redis brokers don't love mixing binary into text-keyed channels; if your deployment uses Redis 6+ with `SUBSCRIBE` that handles bytes, one channel is fine.)
### B5. Celery wiring
When `dataSink.type == "agent"`, the Celery task that runs the TX-containing graph uses `AgentTxSink` instead of a local sink. The operator code (`plutoTXoperator`) is unchanged because the sink abstraction hides the difference.
Full-duplex: a single task with both `dataSource.type == "agent"` and `dataSink.type == "agent"` pointing at the same agent spawns both the RX consumer loop (existing `AgentDataSource.next_chunk` via BLPOP) and the TX producer loop (`AgentTxSink.write`). Both sides are wired up before any capture frames are sent.
### B6. Capability gating
Before any control path sends `tx_start`:
```python
agent = get_agent(agent_id)
if agent.last_heartbeat.age > 60: # stale
raise HTTPException(503, "agent not responding")
if "tx" not in agent.last_heartbeat.capabilities:
raise HTTPException(400, "agent has not opted in to transmission (tx_enabled=false)")
```
Surface clear errors to the Screens UI so the user knows it's an agent config issue, not an app config issue.
### B7. Audit log
New MongoDB collection `agent_tx_audit`:
```
{
agent_id, app_id, user_id,
center_frequency_hz, tx_gain_db, duration_s, num_samples,
started_at, ended_at, terminal_status, // "done" | "error" | "underrun" | "cancelled"
error_message?
}
```
Write on every `tx_start`. Update on terminal `tx_status`. Index on `{agent_id, started_at}` for admin-view queries.
### B8. Registration — no change needed
`POST /screens/agents/register` and `~/.ria/agent.json` already cover credential storage. The TX interlock (`tx_enabled`, caps) is written by the *agent operator* via `ria-agent register --allow-tx`; the hub only reads the heartbeat to learn whether an agent will accept TX.
---
## Part C — Screens (Vue 3 frontend)
### C1. App composer
- **Agent picker** (exists from RX work) grows a "TX capable" filter toggle; hides agents whose heartbeat `capabilities` lacks `"tx"`.
- When the graph contains `plutoTXoperator` (or any future TX operator):
- Render a **dataSink** section mirroring dataSource.
- Fields: device, agent_id, identifier, tx_sample_rate, tx_center_frequency, tx_gain, underrun_policy.
- Validation: tx_center_frequency within radio band; tx_gain within agent-advertised max (read from heartbeat when available).
### C2. Run-time UI
- **Consent modal** on "Start" for any app whose manifest contains a `dataSink.type: "agent"`:
> "This app will transmit on **2.450 GHz** at **-20 dB** through agent **lab-pluto-01**. I confirm this transmission is permitted under my local radio regulations."
Required checkbox, cannot be remembered across apps.
- **TX status indicator** in the running-app view: shows `armed` / `transmitting` / `underrun` state from `tx_status` frames. Red banner on `underrun` or `error`.
- **Stop TX button** always visible during transmission; fires `tx_stop` immediately. Separate from "Stop app" (which also stops RX).
### C3. Admin view
Extend the agents list from B8 of the RX handoff:
- Column: **TX**: `enabled` / `disabled` / `in-use by app X`.
- Agent detail page: show `tx_max_gain_db`, `tx_max_duration_s`, `tx_allowed_freq_ranges`, and the last 10 rows from `agent_tx_audit` filtered to this agent.
---
## Rollout order
1. **Part A §A1-A3, A7** — agent-side TX session + binary ingress + safety, all behind `--allow-tx`. Mock-based tests. Shippable standalone; no consumer yet.
2. **Part B §B1-B5** — hub sink + manifest + WS extension + Celery wiring. End-to-end test: Screens app with `plutoTXoperator` + agent sink → real Pluto in the lab → verify carrier on a spectrum analyzer.
3. **Part B §B6-B7** — capability gating + audit log. Blocks general release, not lab use.
4. **Part C §C1** — composer UI for TX apps.
5. **Part C §C2-C3** — consent modal + admin view. Gate for first non-internal user.
Parts A + B can land on parallel branches and meet at step 2's integration test. Part C can start in parallel with B once the manifest shape in B3 is stable.
## Test matrix (integration)
| Scenario | Expected |
|---|---|
| App with RX only, agent connected | RX as today (regression guard) |
| App with TX only, agent `tx_enabled=True` | TX starts, underrun → pause, stop cleans up |
| App with RX + TX same agent, same device | One Pluto handle serves both; independent gains/frequencies |
| App with TX, agent `tx_enabled=False` | Hub rejects at gate with 400; no WS traffic generated |
| App with TX, gain exceeds agent cap | `tx_status: error`; SDR never opened |
| Hub stops sending TX buffers mid-stream | `underrun` emitted after queue drains; agent paused cleanly |
| WS drops during TX | Agent cancels TX task, pauses hardware, reconnects; hub must re-issue `tx_start` |
| Agent process killed during TX | Hardware stops (existing `close()` already handles this; verify `_tx_lock` released) |
## Open questions
- **Waveform source**: is `plutoTXoperator` a real-time generator emitting on a clock, or does it synthesize a fixed recording and loop? If the latter, worth exposing a "bulk + loop" fast-path — hub sends the buffer once, agent loops it via existing `tx_recording`. Same protocol (`tx_start` + one buffer + `loop: true`), much less WS traffic.
- **Multi-app-per-agent**: out of scope for v1 (§Non-goals). When needed: add a session id to binary frames (4-byte prefix: magic + stream_id + reserved), bump a `protocol_version` in the heartbeat.
- **Streaming TX clock drift**: if hub and agent sample clocks drift, repeating zeros on underrun is audible/visible in the spectrum. Longer term: agent-side resampling or PLL, both expensive. v1: rely on generous queue depth + stable local networks.
- **Other TX-capable SDRs**: HackRF, USRP, bladeRF. The `_CONFIG_ATTR_MAP` in [`agent/streamer.py:169-175`](../src/ria_toolkit_oss/agent/streamer.py#L169-L175) will need per-driver entries when those come online.
## Regulatory note
Transmission is regulated in every jurisdiction. The agent-side interlocks (`tx_enabled`, caps, freq ranges) exist so the operator can configure safe defaults for an agent's physical location. They are not a substitute for licensing or for respecting local regulations. The hub's consent modal and audit log exist so actions are attributable. None of this is a legal compliance layer — it's a defense-in-depth mechanism.