Push Tracker
ria-toolkit-oss/docs/screens_agent_streamer_plan.md

8.3 KiB
Raw Permalink Blame History

Screens Agent Streamer — Implementation Plan

Source doc: screens_connection_updates.md Created: 2026-04-13 Goal: Add a thin WebSocket-based IQ streaming agent to ria-toolkit-oss, alongside the existing long-poll NodeAgent, and wire up the RIA Hub server to consume it.


Architectural Decision

The existing src/ria_toolkit_oss/agent.py (NodeAgent) uses HTTP long-polling and runs ONNX inference locally. It stays as-is.

The new streamer agent described in screens_connection_updates.md is a different execution mode — thin, WebSocket-based, server-driven inference. It will be added as a new submodule and exposed as a new CLI subcommand. Both modes coexist; users pick one based on deployment needs.


Part A — ria-toolkit-oss (this repo)

Phase 1 — SDR foundation

# Task File(s) Priority
1 Add detect_available() -> dict[str, type] that probes every driver without importing GUI deps src/ria_toolkit_oss/sdr/init.py P0
2 Audit each SDR driver for headless cleanliness (no matplotlib/Qt at import time) src/ria_toolkit_oss/sdr/{pluto,hackrf,rtlsdr,usrp,blade,thinkrf}.py P1
3 Raise typed SdrDisconnectedError from radio.rx() on USB drop instead of crashing src/ria_toolkit_oss/sdr/sdr.py + drivers P1
4 Validate load_recording against SigMF captures from each radio type tests/io/ P2

Phase 2 — Agent package restructure (non-breaking)

Promote the existing module to a package and add the streamer next to it:

src/ria_toolkit_oss/
├── agent.py                    # DELETE after move
└── agent/
    ├── __init__.py             # re-export NodeAgent for back-compat
    ├── legacy_executor.py      # former agent.py (NodeAgent, unchanged behavior)
    ├── streamer.py             # NEW — thin IQ streamer loop
    ├── ws_client.py            # NEW — persistent WS client + heartbeat + reconnect
    ├── hardware.py             # NEW — wraps sdr.detect_available()
    ├── config.py               # shared: ~/.ria/agent.json load/save
    └── cli.py                  # unified CLI with subcommands

Back-compat requirement: from ria_toolkit_oss.agent import NodeAgent must keep working. Keep the ria-agent console script entry point; expose both modes via subcommands.

Phase 3 — Streamer implementation

# Task File Priority
5 ws_client.py — persistent WebSocket, auto-reconnect, heartbeat loop src/ria_toolkit_oss/agent/ws_client.py P0
6 streamer.py — main loop: receive start → open SDR → radio.rx() → send binary IQ → handle stop/configure src/ria_toolkit_oss/agent/streamer.py P0
7 hardware.py — heartbeat payload builder, uses sdr.detect_available() src/ria_toolkit_oss/agent/hardware.py P0
8 config.py~/.ria/agent.json read/write, registration token storage src/ria_toolkit_oss/agent/config.py P1
9 CLI subcommands: ria-agent register, detect, stream (new), run (legacy long-poll) src/ria_toolkit_oss/agent/cli.py P0
10 Add websockets to dependencies pyproject.toml P0

Phase 4 — WebSocket protocol

Implement exactly the messages from screens_connection_updates.md §"WebSocket Protocol":

Agent → Server (JSON control):

{"type": "heartbeat", "hardware": ["pluto", "hackrf"], "status": "idle"}
{"type": "status",    "status": "streaming", "app_id": "abc"}
{"type": "error",     "app_id": "abc", "message": "USB device disconnected"}

Agent → Server (binary data): raw interleaved float32 IQ bytes per radio.rx() call.

Server → Agent (JSON control):

{"type": "start", "app_id": "...", "radio_config": {"device": "pluto", ...}}
{"type": "stop", "app_id": "..."}
{"type": "configure", "app_id": "...", "radio_config": {"center_frequency": 915000000}}

The agent does not interpret manifests, models, or preprocessing — it just applies radio_config to ria_toolkit_oss.sdr.<device>.

Phase 5 — Tests

# Task Location
11 Unit: streamer loop against sdr.mock + fake WS tests/agent/test_streamer.py
12 Unit: ws_client reconnect/heartbeat timing tests/agent/test_ws_client.py
13 Unit: hardware.detect_available() and heartbeat payload tests/agent/test_hardware.py
14 Integration: local websockets server → mock SDR → full start/stream/stop cycle tests/agent/test_integration.py
15 Regression: NodeAgent still importable and functional tests/agent/test_legacy.py

Part B — RIA Hub (hand-off to separate session)

Give this section to Claude in the ria-hub repo session. It depends on Part A shipping first (or at minimum, stubbed protocol).

Server-side tasks

# Task File / Area Priority
B1 Implement AgentDataSource (DataSource ABC, reads IQ from WebSocket connection) and register it in build_data_source() controller/app/modules/screens/data_sources.py P0
B2 Add "agent" to dataSource.type enum in manifest schema; update Pydantic/JSON schema validators controller/app/modules/screens/graph_derivation.py + manifest schema files P0
B3 Add agent WebSocket endpoint POST /api/agent/ws — accepts agent connections, auth via registration token, bridges the connection to the Celery task's AgentDataSource controller/app/modules/agent/routes.py (new) P0
B4 Agent registry: MongoDB collection tracking agent_id, hardware list, last heartbeat, online status, registration tokens controller/app/modules/agent/models.py (new) P1
B5 Registration endpoint POST /api/agent/register returning agent credentials for ~/.ria/agent.json controller/app/modules/agent/routes.py P1
B6 Celery task wiring: when manifest dataSource.type == "agent", look up the connected agent by agent_id, forward radio_config to it, and feed received IQ chunks into the inference loop via AgentDataSource.next_chunk() controller/app/modules/screens/tasks.py P0
B7 Device/agent picker UI in Screens app config (Vue 3) frontend screens panel P2
B8 Agents list/status admin view frontend admin area P2

Protocol contract (must match Part A exactly)

See screens_connection_updates.md §"WebSocket Protocol". Key invariants:

  • Binary frames are interleaved float32 IQ, one frame per radio.rx() call.
  • radio_config is derived from the manifest's dataSource.params and forwarded verbatim.
  • The server sends configure for retune-without-stop flow; the agent is responsible for applying it at the next capture boundary.

Manifest example (new mode)

{
  "dataSource": {
    "type": "agent",
    "device": "pluto",
    "agent_id": "agent-abc123",
    "params": {
      "identifier": "ip:192.168.3.1",
      "sample_rate": 1000000,
      "center_frequency": 2450000000,
      "gain": 40,
      "buffer_size": 1024
    }
  },
  "preprocess": "magnitude_phase_window_stats",
  "config": {"inference": {"knownDevices": [], "interval": 1}}
}

Pipeline invariant

No changes to inference_core.py, preprocessors.py, or run_onnx_chain_loop(). AgentDataSource is a drop-in for SdrDataSource; everything downstream is unchanged.


Rollout Order

  1. Part A Phase 1 (SDR foundation) — safe to land independently.
  2. Part A Phases 23 (agent package + streamer) — lands the ria-agent stream CLI; no server needed to build/test against a mock WS.
  3. Part B B1, B2, B3, B6 — minimum viable server path; lets a real agent connect end-to-end.
  4. Part A Phase 5 integration test against a dev RIA Hub instance.
  5. Part B B4, B5 (registry + registration) — hardens multi-agent deployments.
  6. Part B B7, B8 (UI) — operator-facing polish.

Open Questions

  • Auth model for the WebSocket handshake: bearer token in header, query param, or first-message auth frame?
  • Should configure apply mid-buffer or only at capture boundaries? (Doc implies boundaries; confirm for retune latency budget.)
  • Backpressure policy when the server's inference loop is slower than the agent's rx() cadence — drop frames, queue, or pause?