Push Tracker
ria-toolkit-oss/docs/screens_agent_handoff.md

10 KiB

Screens Agent Streamer — Hand-off

Branch: screens-connection in ria-toolkit-oss Status: Part A complete (tests passing, 25/25). Part B is pending in the ria-hub repo. Related docs: screens_agent_streamer_plan.md, ../screens_connection_updates.md


What's done (Part A — this repo)

Phase 1 — SDR foundation

  • Added ria_toolkit_oss.sdr.detect_available() -> dict[str, type] that probes every driver module and returns the map of importable driver classes. Importability is used as a proxy for "user has installed this driver's optional dep"; it does not probe for physical hardware.
  • Added SdrDisconnectedError (subclass of SDRError) plus a translate_disconnect(exc) helper in sdr/sdr.py. Pattern-matches USB/device-drop exceptions (ENODEV/EIO, "no such device", "broken pipe", etc.) and converts them so the streamer can distinguish a real hardware failure from a transient error.
  • Audited every driver under sdr/ for GUI imports — none found. All drivers are headless-clean at import time.
  • Pluto.rx(num_samples) now wraps self.radio.rx() with translate_disconnect. The same one-liner pattern can be applied to other drivers (hackrf/rtlsdr/usrp/blade/thinkrf) when they get wired to the streamer — deferred until each is needed.

Not done (Phase 1 Task 4): SigMF recording validation across radio types — needs real captures from each device, out of scope without hardware.

Phase 2 — Agent package restructure

Moved the former src/ria_toolkit_oss/agent.py into a package:

src/ria_toolkit_oss/agent/
├── __init__.py          # re-exports NodeAgent + main for back-compat
├── legacy_executor.py   # former agent.py, unchanged behavior
├── streamer.py          # new WebSocket IQ streamer
├── ws_client.py         # persistent WS client + heartbeat + reconnect
├── hardware.py          # wraps sdr.detect_available() for heartbeat payloads
├── config.py            # ~/.ria/agent.json load/save (0600 perms)
└── cli.py               # unified CLI (run / stream / detect / register)

Back-compat preserved: from ria_toolkit_oss.agent import NodeAgent still works, and bare ria-agent ... with no subcommand (or with legacy flags like --hub) falls through to the original long-poll executor.

Phase 3 — Streamer implementation

  • ws_client.WsClientasync run() loop, JSON heartbeats on a timer, auto-reconnect on drop, bearer-token auth header on connect.
  • streamer.Streamer — handles start / stop / configure messages, opens the SDR via an injectable sdr_factory, runs capture in an executor thread, and ships raw interleaved float32 IQ bytes per rx() call.
  • hardware.heartbeat_payload(status, app_id){type, hardware[], status}.
  • config.AgentConfig — dataclass round-tripped through JSON, unknown keys preserved in an extra dict.
  • CLI subcommands: run (legacy), stream, detect, register.

Phase 4 — Protocol

Matches screens_connection_updates.md §"WebSocket Protocol" exactly:

Direction Message
A → S (JSON) {type: heartbeat, hardware[], status}
A → S (JSON) {type: status, status, app_id}
A → S (JSON) {type: error, app_id, message}
A → S (binary) raw interleaved float32 IQ, one frame per rx()
S → A (JSON) {type: start, app_id, radio_config}
S → A (JSON) {type: stop, app_id}
S → A (JSON) {type: configure, app_id, radio_config} — applied at next boundary

Auth decision: bearer token in Authorization header on the initial handshake. (Open questions in the plan around first-frame auth / mid-buffer configure / backpressure remain open.)

Phase 5 — Tests

25 tests, all green under poetry run pytest tests/agent/:

  • test_hardware.pydetect_available, heartbeat payload shape
  • test_config.py — round-trip, missing-file fallback, extra-key preservation
  • test_streamer.py — start/stream/stop against MockSDR + fake WS, error frames, configure queueing
  • test_disconnect.pytranslate_disconnect patterns + streamer reports SDR disconnected: and closes the SDR
  • test_ws_client.py — real local websockets server: heartbeat on connect, auto-reconnect after server drop, malformed-frame resilience
  • test_integration.py — end-to-end heartbeat → start → 3 binary IQ frames → stop
  • test_legacy.py — regression: NodeAgent still importable

Dependency / build changes

  • Added websockets (>=12.0,<14.0) to [tool.poetry.group.agent.dependencies].
  • Repointed the ria-agent console script from ria_toolkit_oss.agent:main to ria_toolkit_oss.agent.cli:main (the unified CLI, which still calls through to the legacy main for back-compat).

Files changed

M  pyproject.toml
M  poetry.lock
M  src/ria_toolkit_oss/sdr/__init__.py
M  src/ria_toolkit_oss/sdr/sdr.py
M  src/ria_toolkit_oss/sdr/pluto.py
R  src/ria_toolkit_oss/agent.py -> src/ria_toolkit_oss/agent/legacy_executor.py
A  src/ria_toolkit_oss/agent/{__init__,cli,config,hardware,streamer,ws_client}.py
A  tests/agent/{__init__,test_config,test_disconnect,test_hardware,
               test_integration,test_legacy,test_streamer,test_ws_client}.py
A  docs/screens_agent_streamer_plan.md
A  docs/screens_agent_handoff.md  (this file)

What's left (Part B — ria-hub repo)

In priority order. B1 + B2 + B3 + B6 are the MVP; everything else hardens.

Server-side (MVP)

# Task File / area
B1 Implement AgentDataSource (DataSource ABC, reads IQ from WS connection) and register in build_data_source() controller/app/modules/screens/data_sources.py
B2 Add "agent" to dataSource.type enum in manifest schema; update Pydantic / JSON schema validators controller/app/modules/screens/graph_derivation.py + schema files
B3 Agent WebSocket endpoint POST /api/agent/ws (or GET with Upgrade) — accepts agent connections, auths on bearer token, bridges the connection to the Celery task's AgentDataSource new controller/app/modules/agent/routes.py
B6 Celery wiring: when dataSource.type == "agent", look up the connected agent by agent_id, forward radio_config as a start message, and feed received IQ bytes into the inference loop via AgentDataSource.next_chunk() controller/app/modules/screens/tasks.py

Server-side (hardening)

# Task
B4 Agent registry — MongoDB collection: agent_id, hardware[], last_heartbeat, online, registration tokens
B5 POST /api/agent/register returning {agent_id, token} for ~/.ria/agent.json

Frontend

# Task
B7 Device / agent picker in the Screens app config (Vue 3)
B8 Agents list / status admin view

Protocol contract (keep in lockstep with Part A)

  • Binary frames are interleaved float32 IQ, one frame per radio.rx() call.
  • radio_config is forwarded verbatim from manifest dataSource.params. Minimum keys the agent handles: device, identifier, sample_rate, center_frequency, gain, buffer_size.
  • configure messages from the server apply at the next capture boundary (current agent implementation).
  • Agent authenticates with bearer token in Authorization header on the handshake.

Still-open protocol questions (pin down before B3 lands)

  • Auth frame as fallback when proxies strip Authorization?
  • Mid-buffer configure application for tighter retune latency?
  • Backpressure policy when the server's inference loop is slower than the agent's rx() cadence — drop, queue, or pause the agent via a pause control frame?

Manifest example (new agent mode)

{
  "dataSource": {
    "type": "agent",
    "device": "pluto",
    "agent_id": "agent-abc123",
    "params": {
      "identifier": "ip:192.168.3.1",
      "sample_rate": 1000000,
      "center_frequency": 2450000000,
      "gain": 40,
      "buffer_size": 1024
    }
  },
  "preprocess": "magnitude_phase_window_stats",
  "config": {"inference": {"knownDevices": [], "interval": 1}}
}

Pipeline invariant

No changes to inference_core.py, preprocessors.py, or run_onnx_chain_loop(). AgentDataSource is a drop-in replacement for SdrDataSource; everything downstream is unchanged.


Local test setup

1. Branch layout

  • ria-toolkit-oss — branch screens-connection (this work).
  • ria-hub — whatever branch you'll be doing Part B on (create a matching screens-connection branch there to keep the hand-off clean).

2. Install the toolkit branch into your local RIA Hub

The hub declares ria-toolkit-oss as a git dep in its pyproject.toml. Point it at this branch:

# ria-hub/pyproject.toml
ria-toolkit-oss = { git = "https://riahub.ai/qoherent/ria-toolkit-oss.git", branch = "screens-connection" }

Or for a fully local dev loop (edits visible without reinstall):

# from ria-hub repo
poetry run pip install -e /home/qrf/ria-toolkit-oss
# plus the agent extra so websockets is available server-side too
poetry run pip install websockets

3. Commit this branch and push (optional, for CI / shared dev)

cd /home/qrf/ria-toolkit-oss
git add -A
git commit -m "Add WebSocket IQ streamer agent (Part A)"
git push -u origin screens-connection

4. Try the agent locally against a mock server

You can exercise the whole Part A stack without RIA Hub by running the integration test:

cd /home/qrf/ria-toolkit-oss
poetry run pytest tests/agent/test_integration.py -v

Or manually, once Part B lands:

# on the user machine (mock hardware is fine)
ria-agent register --url https://localhost:8000 --token <tok>
ria-agent detect          # lists: mock, pluto, ... whatever's importable
ria-agent stream --url ws://localhost:8000/api/agent/ws

5. End-to-end integration test (after Part B MVP)

  1. Start RIA Hub (FastAPI + Celery + Mongo + Redis) with the Part B branch.
  2. Run ria-agent stream on your laptop.
  3. Create a Screens app with dataSource.type: "agent" and device: "mock".
  4. Start the app; confirm the agent logs "streaming" and the SSE stream shows inference metrics flowing.