Push Tracker
ria-toolkit-oss/docs/screens_agent_streamer_plan.md

157 lines
8.3 KiB
Markdown
Raw Permalink Normal View History

J
2026-06-16 11:54:05 -04:00
# Screens Agent Streamer — Implementation Plan
**Source doc:** [screens_connection_updates.md](../screens_connection_updates.md)
**Created:** 2026-04-13
**Goal:** Add a thin WebSocket-based IQ streaming agent to `ria-toolkit-oss`, alongside the existing long-poll `NodeAgent`, and wire up the RIA Hub server to consume it.
---
## Architectural Decision
The existing [src/ria_toolkit_oss/agent.py](../src/ria_toolkit_oss/agent.py) (`NodeAgent`) uses HTTP long-polling and runs ONNX inference locally. It stays as-is.
The new **streamer agent** described in `screens_connection_updates.md` is a *different* execution mode — thin, WebSocket-based, server-driven inference. It will be added as a new submodule and exposed as a new CLI subcommand. Both modes coexist; users pick one based on deployment needs.
---
## Part A — ria-toolkit-oss (this repo)
### Phase 1 — SDR foundation
| # | Task | File(s) | Priority |
|---|------|---------|----------|
| 1 | Add `detect_available() -> dict[str, type]` that probes every driver without importing GUI deps | [src/ria_toolkit_oss/sdr/__init__.py](../src/ria_toolkit_oss/sdr/__init__.py) | P0 |
| 2 | Audit each SDR driver for headless cleanliness (no matplotlib/Qt at import time) | `src/ria_toolkit_oss/sdr/{pluto,hackrf,rtlsdr,usrp,blade,thinkrf}.py` | P1 |
| 3 | Raise typed `SdrDisconnectedError` from `radio.rx()` on USB drop instead of crashing | `src/ria_toolkit_oss/sdr/sdr.py` + drivers | P1 |
| 4 | Validate `load_recording` against SigMF captures from each radio type | `tests/io/` | P2 |
### Phase 2 — Agent package restructure (non-breaking)
Promote the existing module to a package and add the streamer next to it:
```
src/ria_toolkit_oss/
├── agent.py # DELETE after move
└── agent/
├── __init__.py # re-export NodeAgent for back-compat
├── legacy_executor.py # former agent.py (NodeAgent, unchanged behavior)
├── streamer.py # NEW — thin IQ streamer loop
├── ws_client.py # NEW — persistent WS client + heartbeat + reconnect
├── hardware.py # NEW — wraps sdr.detect_available()
├── config.py # shared: ~/.ria/agent.json load/save
└── cli.py # unified CLI with subcommands
```
Back-compat requirement: `from ria_toolkit_oss.agent import NodeAgent` must keep working. Keep the `ria-agent` console script entry point; expose both modes via subcommands.
### Phase 3 — Streamer implementation
| # | Task | File | Priority |
|---|------|------|----------|
| 5 | `ws_client.py` — persistent WebSocket, auto-reconnect, heartbeat loop | `src/ria_toolkit_oss/agent/ws_client.py` | P0 |
| 6 | `streamer.py` — main loop: receive `start` → open SDR → `radio.rx()` → send binary IQ → handle `stop`/`configure` | `src/ria_toolkit_oss/agent/streamer.py` | P0 |
| 7 | `hardware.py` — heartbeat payload builder, uses `sdr.detect_available()` | `src/ria_toolkit_oss/agent/hardware.py` | P0 |
| 8 | `config.py``~/.ria/agent.json` read/write, registration token storage | `src/ria_toolkit_oss/agent/config.py` | P1 |
| 9 | CLI subcommands: `ria-agent register`, `detect`, `stream` (new), `run` (legacy long-poll) | `src/ria_toolkit_oss/agent/cli.py` | P0 |
| 10 | Add `websockets` to dependencies | [pyproject.toml](../pyproject.toml) | P0 |
### Phase 4 — WebSocket protocol
Implement exactly the messages from `screens_connection_updates.md` §"WebSocket Protocol":
**Agent → Server (JSON control):**
```json
{"type": "heartbeat", "hardware": ["pluto", "hackrf"], "status": "idle"}
{"type": "status", "status": "streaming", "app_id": "abc"}
{"type": "error", "app_id": "abc", "message": "USB device disconnected"}
```
**Agent → Server (binary data):** raw interleaved float32 IQ bytes per `radio.rx()` call.
**Server → Agent (JSON control):**
```json
{"type": "start", "app_id": "...", "radio_config": {"device": "pluto", ...}}
{"type": "stop", "app_id": "..."}
{"type": "configure", "app_id": "...", "radio_config": {"center_frequency": 915000000}}
```
The agent does **not** interpret manifests, models, or preprocessing — it just applies `radio_config` to `ria_toolkit_oss.sdr.<device>`.
### Phase 5 — Tests
| # | Task | Location |
|---|------|----------|
| 11 | Unit: streamer loop against `sdr.mock` + fake WS | `tests/agent/test_streamer.py` |
| 12 | Unit: `ws_client` reconnect/heartbeat timing | `tests/agent/test_ws_client.py` |
| 13 | Unit: `hardware.detect_available()` and heartbeat payload | `tests/agent/test_hardware.py` |
| 14 | Integration: local `websockets` server → mock SDR → full start/stream/stop cycle | `tests/agent/test_integration.py` |
| 15 | Regression: `NodeAgent` still importable and functional | `tests/agent/test_legacy.py` |
---
## Part B — RIA Hub (hand-off to separate session)
> **Give this section to Claude in the ria-hub repo session.** It depends on Part A shipping first (or at minimum, stubbed protocol).
### Server-side tasks
| # | Task | File / Area | Priority |
|---|------|-------------|----------|
| B1 | Implement `AgentDataSource` (DataSource ABC, reads IQ from WebSocket connection) and register it in `build_data_source()` | `controller/app/modules/screens/data_sources.py` | P0 |
| B2 | Add `"agent"` to `dataSource.type` enum in manifest schema; update Pydantic/JSON schema validators | `controller/app/modules/screens/graph_derivation.py` + manifest schema files | P0 |
| B3 | Add agent WebSocket endpoint `POST /api/agent/ws` — accepts agent connections, auth via registration token, bridges the connection to the Celery task's `AgentDataSource` | `controller/app/modules/agent/routes.py` (new) | P0 |
| B4 | Agent registry: MongoDB collection tracking `agent_id`, hardware list, last heartbeat, online status, registration tokens | `controller/app/modules/agent/models.py` (new) | P1 |
| B5 | Registration endpoint `POST /api/agent/register` returning agent credentials for `~/.ria/agent.json` | `controller/app/modules/agent/routes.py` | P1 |
| B6 | Celery task wiring: when manifest `dataSource.type == "agent"`, look up the connected agent by `agent_id`, forward `radio_config` to it, and feed received IQ chunks into the inference loop via `AgentDataSource.next_chunk()` | `controller/app/modules/screens/tasks.py` | P0 |
| B7 | Device/agent picker UI in Screens app config (Vue 3) | frontend screens panel | P2 |
| B8 | Agents list/status admin view | frontend admin area | P2 |
### Protocol contract (must match Part A exactly)
See `screens_connection_updates.md` §"WebSocket Protocol". Key invariants:
- Binary frames are interleaved float32 IQ, one frame per `radio.rx()` call.
- `radio_config` is derived from the manifest's `dataSource.params` and forwarded verbatim.
- The server sends `configure` for retune-without-stop flow; the agent is responsible for applying it at the next capture boundary.
### Manifest example (new mode)
```json
{
"dataSource": {
"type": "agent",
"device": "pluto",
"agent_id": "agent-abc123",
"params": {
"identifier": "ip:192.168.3.1",
"sample_rate": 1000000,
"center_frequency": 2450000000,
"gain": 40,
"buffer_size": 1024
}
},
"preprocess": "magnitude_phase_window_stats",
"config": {"inference": {"knownDevices": [], "interval": 1}}
}
```
### Pipeline invariant
**No changes to `inference_core.py`, `preprocessors.py`, or `run_onnx_chain_loop()`.** `AgentDataSource` is a drop-in for `SdrDataSource`; everything downstream is unchanged.
---
## Rollout Order
1. Part A Phase 1 (SDR foundation) — safe to land independently.
2. Part A Phases 23 (agent package + streamer) — lands the `ria-agent stream` CLI; no server needed to build/test against a mock WS.
3. Part B B1, B2, B3, B6 — minimum viable server path; lets a real agent connect end-to-end.
4. Part A Phase 5 integration test against a dev RIA Hub instance.
5. Part B B4, B5 (registry + registration) — hardens multi-agent deployments.
6. Part B B7, B8 (UI) — operator-facing polish.
## Open Questions
- Auth model for the WebSocket handshake: bearer token in header, query param, or first-message auth frame?
- Should `configure` apply mid-buffer or only at capture boundaries? (Doc implies boundaries; confirm for retune latency budget.)
- Backpressure policy when the server's inference loop is slower than the agent's `rx()` cadence — drop frames, queue, or pause?