258 lines
10 KiB
Markdown
258 lines
10 KiB
Markdown
|
J
|
# Screens Agent Streamer — Hand-off
|
||
|
|
|
||
|
|
**Branch:** `screens-connection` in `ria-toolkit-oss`
|
||
|
|
**Status:** Part A complete (tests passing, 25/25). Part B is pending in the `ria-hub` repo.
|
||
|
|
**Related docs:** [screens_agent_streamer_plan.md](./screens_agent_streamer_plan.md), [../screens_connection_updates.md](../screens_connection_updates.md)
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
## What's done (Part A — this repo)
|
||
|
|
|
||
|
|
### Phase 1 — SDR foundation
|
||
|
|
|
||
|
|
- Added `ria_toolkit_oss.sdr.detect_available() -> dict[str, type]` that probes
|
||
|
|
every driver module and returns the map of importable driver classes.
|
||
|
|
Importability is used as a proxy for "user has installed this driver's
|
||
|
|
optional dep"; it does **not** probe for physical hardware.
|
||
|
|
- Added `SdrDisconnectedError` (subclass of `SDRError`) plus a
|
||
|
|
`translate_disconnect(exc)` helper in [sdr/sdr.py](../src/ria_toolkit_oss/sdr/sdr.py).
|
||
|
|
Pattern-matches USB/device-drop exceptions (`ENODEV`/`EIO`, "no such device",
|
||
|
|
"broken pipe", etc.) and converts them so the streamer can distinguish a
|
||
|
|
real hardware failure from a transient error.
|
||
|
|
- Audited every driver under `sdr/` for GUI imports — none found. All drivers
|
||
|
|
are headless-clean at import time.
|
||
|
|
- `Pluto.rx(num_samples)` now wraps `self.radio.rx()` with
|
||
|
|
`translate_disconnect`. The same one-liner pattern can be applied to other
|
||
|
|
drivers (hackrf/rtlsdr/usrp/blade/thinkrf) when they get wired to the
|
||
|
|
streamer — deferred until each is needed.
|
||
|
|
|
||
|
|
**Not done (Phase 1 Task 4):** SigMF recording validation across radio types —
|
||
|
|
needs real captures from each device, out of scope without hardware.
|
||
|
|
|
||
|
|
### Phase 2 — Agent package restructure
|
||
|
|
|
||
|
|
Moved the former `src/ria_toolkit_oss/agent.py` into a package:
|
||
|
|
|
||
|
|
```
|
||
|
|
src/ria_toolkit_oss/agent/
|
||
|
|
├── __init__.py # re-exports NodeAgent + main for back-compat
|
||
|
|
├── legacy_executor.py # former agent.py, unchanged behavior
|
||
|
|
├── streamer.py # new WebSocket IQ streamer
|
||
|
|
├── ws_client.py # persistent WS client + heartbeat + reconnect
|
||
|
|
├── hardware.py # wraps sdr.detect_available() for heartbeat payloads
|
||
|
|
├── config.py # ~/.ria/agent.json load/save (0600 perms)
|
||
|
|
└── cli.py # unified CLI (run / stream / detect / register)
|
||
|
|
```
|
||
|
|
|
||
|
|
Back-compat preserved: `from ria_toolkit_oss.agent import NodeAgent` still
|
||
|
|
works, and bare `ria-agent ...` with no subcommand (or with legacy flags
|
||
|
|
like `--hub`) falls through to the original long-poll executor.
|
||
|
|
|
||
|
|
### Phase 3 — Streamer implementation
|
||
|
|
|
||
|
|
- `ws_client.WsClient` — `async run()` loop, JSON heartbeats on a timer,
|
||
|
|
auto-reconnect on drop, bearer-token auth header on connect.
|
||
|
|
- `streamer.Streamer` — handles `start` / `stop` / `configure` messages,
|
||
|
|
opens the SDR via an injectable `sdr_factory`, runs capture in an executor
|
||
|
|
thread, and ships raw interleaved float32 IQ bytes per `rx()` call.
|
||
|
|
- `hardware.heartbeat_payload(status, app_id)` — `{type, hardware[], status}`.
|
||
|
|
- `config.AgentConfig` — dataclass round-tripped through JSON, unknown keys
|
||
|
|
preserved in an `extra` dict.
|
||
|
|
- CLI subcommands: `run` (legacy), `stream`, `detect`, `register`.
|
||
|
|
|
||
|
|
### Phase 4 — Protocol
|
||
|
|
|
||
|
|
Matches `screens_connection_updates.md` §"WebSocket Protocol" exactly:
|
||
|
|
|
||
|
|
| Direction | Message |
|
||
|
|
|-----------|---------|
|
||
|
|
| A → S (JSON) | `{type: heartbeat, hardware[], status}` |
|
||
|
|
| A → S (JSON) | `{type: status, status, app_id}` |
|
||
|
|
| A → S (JSON) | `{type: error, app_id, message}` |
|
||
|
|
| A → S (binary) | raw interleaved float32 IQ, one frame per `rx()` |
|
||
|
|
| S → A (JSON) | `{type: start, app_id, radio_config}` |
|
||
|
|
| S → A (JSON) | `{type: stop, app_id}` |
|
||
|
|
| S → A (JSON) | `{type: configure, app_id, radio_config}` — applied at next boundary |
|
||
|
|
|
||
|
|
**Auth decision:** bearer token in `Authorization` header on the initial
|
||
|
|
handshake. (Open questions in the plan around first-frame auth / mid-buffer
|
||
|
|
`configure` / backpressure remain open.)
|
||
|
|
|
||
|
|
### Phase 5 — Tests
|
||
|
|
|
||
|
|
25 tests, all green under `poetry run pytest tests/agent/`:
|
||
|
|
|
||
|
|
- `test_hardware.py` — `detect_available`, heartbeat payload shape
|
||
|
|
- `test_config.py` — round-trip, missing-file fallback, extra-key preservation
|
||
|
|
- `test_streamer.py` — start/stream/stop against `MockSDR` + fake WS, error
|
||
|
|
frames, configure queueing
|
||
|
|
- `test_disconnect.py` — `translate_disconnect` patterns + streamer reports
|
||
|
|
`SDR disconnected:` and closes the SDR
|
||
|
|
- `test_ws_client.py` — real local `websockets` server: heartbeat on connect,
|
||
|
|
auto-reconnect after server drop, malformed-frame resilience
|
||
|
|
- `test_integration.py` — end-to-end heartbeat → start → 3 binary IQ frames → stop
|
||
|
|
- `test_legacy.py` — regression: `NodeAgent` still importable
|
||
|
|
|
||
|
|
### Dependency / build changes
|
||
|
|
|
||
|
|
- Added `websockets (>=12.0,<14.0)` to `[tool.poetry.group.agent.dependencies]`.
|
||
|
|
- Repointed the `ria-agent` console script from `ria_toolkit_oss.agent:main`
|
||
|
|
to `ria_toolkit_oss.agent.cli:main` (the unified CLI, which still calls
|
||
|
|
through to the legacy `main` for back-compat).
|
||
|
|
|
||
|
|
### Files changed
|
||
|
|
|
||
|
|
```
|
||
|
|
M pyproject.toml
|
||
|
|
M poetry.lock
|
||
|
|
M src/ria_toolkit_oss/sdr/__init__.py
|
||
|
|
M src/ria_toolkit_oss/sdr/sdr.py
|
||
|
|
M src/ria_toolkit_oss/sdr/pluto.py
|
||
|
|
R src/ria_toolkit_oss/agent.py -> src/ria_toolkit_oss/agent/legacy_executor.py
|
||
|
|
A src/ria_toolkit_oss/agent/{__init__,cli,config,hardware,streamer,ws_client}.py
|
||
|
|
A tests/agent/{__init__,test_config,test_disconnect,test_hardware,
|
||
|
|
test_integration,test_legacy,test_streamer,test_ws_client}.py
|
||
|
|
A docs/screens_agent_streamer_plan.md
|
||
|
|
A docs/screens_agent_handoff.md (this file)
|
||
|
|
```
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
## What's left (Part B — `ria-hub` repo)
|
||
|
|
|
||
|
|
In priority order. B1 + B2 + B3 + B6 are the MVP; everything else hardens.
|
||
|
|
|
||
|
|
### Server-side (MVP)
|
||
|
|
|
||
|
|
| # | Task | File / area |
|
||
|
|
|---|------|-------------|
|
||
|
|
| B1 | Implement `AgentDataSource` (DataSource ABC, reads IQ from WS connection) and register in `build_data_source()` | `controller/app/modules/screens/data_sources.py` |
|
||
|
|
| B2 | Add `"agent"` to `dataSource.type` enum in manifest schema; update Pydantic / JSON schema validators | `controller/app/modules/screens/graph_derivation.py` + schema files |
|
||
|
|
| B3 | Agent WebSocket endpoint `POST /api/agent/ws` (or `GET` with Upgrade) — accepts agent connections, auths on bearer token, bridges the connection to the Celery task's `AgentDataSource` | new `controller/app/modules/agent/routes.py` |
|
||
|
|
| B6 | Celery wiring: when `dataSource.type == "agent"`, look up the connected agent by `agent_id`, forward `radio_config` as a `start` message, and feed received IQ bytes into the inference loop via `AgentDataSource.next_chunk()` | `controller/app/modules/screens/tasks.py` |
|
||
|
|
|
||
|
|
### Server-side (hardening)
|
||
|
|
|
||
|
|
| # | Task |
|
||
|
|
|---|------|
|
||
|
|
| B4 | Agent registry — MongoDB collection: `agent_id`, `hardware[]`, `last_heartbeat`, `online`, registration tokens |
|
||
|
|
| B5 | `POST /api/agent/register` returning `{agent_id, token}` for `~/.ria/agent.json` |
|
||
|
|
|
||
|
|
### Frontend
|
||
|
|
|
||
|
|
| # | Task |
|
||
|
|
|---|------|
|
||
|
|
| B7 | Device / agent picker in the Screens app config (Vue 3) |
|
||
|
|
| B8 | Agents list / status admin view |
|
||
|
|
|
||
|
|
### Protocol contract (keep in lockstep with Part A)
|
||
|
|
|
||
|
|
- Binary frames are interleaved float32 IQ, one frame per `radio.rx()` call.
|
||
|
|
- `radio_config` is forwarded verbatim from manifest `dataSource.params`.
|
||
|
|
Minimum keys the agent handles: `device`, `identifier`, `sample_rate`,
|
||
|
|
`center_frequency`, `gain`, `buffer_size`.
|
||
|
|
- `configure` messages from the server apply at the next capture boundary
|
||
|
|
(current agent implementation).
|
||
|
|
- Agent authenticates with bearer token in `Authorization` header on the
|
||
|
|
handshake.
|
||
|
|
|
||
|
|
### Still-open protocol questions (pin down before B3 lands)
|
||
|
|
|
||
|
|
- Auth frame as fallback when proxies strip `Authorization`?
|
||
|
|
- Mid-buffer `configure` application for tighter retune latency?
|
||
|
|
- Backpressure policy when the server's inference loop is slower than the
|
||
|
|
agent's `rx()` cadence — drop, queue, or pause the agent via a `pause`
|
||
|
|
control frame?
|
||
|
|
|
||
|
|
### Manifest example (new `agent` mode)
|
||
|
|
|
||
|
|
```json
|
||
|
|
{
|
||
|
|
"dataSource": {
|
||
|
|
"type": "agent",
|
||
|
|
"device": "pluto",
|
||
|
|
"agent_id": "agent-abc123",
|
||
|
|
"params": {
|
||
|
|
"identifier": "ip:192.168.3.1",
|
||
|
|
"sample_rate": 1000000,
|
||
|
|
"center_frequency": 2450000000,
|
||
|
|
"gain": 40,
|
||
|
|
"buffer_size": 1024
|
||
|
|
}
|
||
|
|
},
|
||
|
|
"preprocess": "magnitude_phase_window_stats",
|
||
|
|
"config": {"inference": {"knownDevices": [], "interval": 1}}
|
||
|
|
}
|
||
|
|
```
|
||
|
|
|
||
|
|
### Pipeline invariant
|
||
|
|
|
||
|
|
**No changes to `inference_core.py`, `preprocessors.py`, or
|
||
|
|
`run_onnx_chain_loop()`.** `AgentDataSource` is a drop-in replacement for
|
||
|
|
`SdrDataSource`; everything downstream is unchanged.
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
## Local test setup
|
||
|
|
|
||
|
|
### 1. Branch layout
|
||
|
|
|
||
|
|
- `ria-toolkit-oss` — branch **`screens-connection`** (this work).
|
||
|
|
- `ria-hub` — whatever branch you'll be doing Part B on (create a matching
|
||
|
|
`screens-connection` branch there to keep the hand-off clean).
|
||
|
|
|
||
|
|
### 2. Install the toolkit branch into your local RIA Hub
|
||
|
|
|
||
|
|
The hub declares `ria-toolkit-oss` as a git dep in its `pyproject.toml`.
|
||
|
|
Point it at this branch:
|
||
|
|
|
||
|
|
```toml
|
||
|
|
# ria-hub/pyproject.toml
|
||
|
|
ria-toolkit-oss = { git = "https://riahub.ai/qoherent/ria-toolkit-oss.git", branch = "screens-connection" }
|
||
|
|
```
|
||
|
|
|
||
|
|
Or for a fully local dev loop (edits visible without reinstall):
|
||
|
|
|
||
|
|
```bash
|
||
|
|
# from ria-hub repo
|
||
|
|
poetry run pip install -e /home/qrf/ria-toolkit-oss
|
||
|
|
# plus the agent extra so websockets is available server-side too
|
||
|
|
poetry run pip install websockets
|
||
|
|
```
|
||
|
|
|
||
|
|
### 3. Commit this branch and push (optional, for CI / shared dev)
|
||
|
|
|
||
|
|
```bash
|
||
|
|
cd /home/qrf/ria-toolkit-oss
|
||
|
|
git add -A
|
||
|
|
git commit -m "Add WebSocket IQ streamer agent (Part A)"
|
||
|
|
git push -u origin screens-connection
|
||
|
|
```
|
||
|
|
|
||
|
|
### 4. Try the agent locally against a mock server
|
||
|
|
|
||
|
|
You can exercise the whole Part A stack without RIA Hub by running the
|
||
|
|
integration test:
|
||
|
|
|
||
|
|
```bash
|
||
|
|
cd /home/qrf/ria-toolkit-oss
|
||
|
|
poetry run pytest tests/agent/test_integration.py -v
|
||
|
|
```
|
||
|
|
|
||
|
|
Or manually, once Part B lands:
|
||
|
|
|
||
|
|
```bash
|
||
|
|
# on the user machine (mock hardware is fine)
|
||
|
|
ria-agent register --url https://localhost:8000 --token <tok>
|
||
|
|
ria-agent detect # lists: mock, pluto, ... whatever's importable
|
||
|
|
ria-agent stream --url ws://localhost:8000/api/agent/ws
|
||
|
|
```
|
||
|
|
|
||
|
|
### 5. End-to-end integration test (after Part B MVP)
|
||
|
|
|
||
|
|
1. Start RIA Hub (FastAPI + Celery + Mongo + Redis) with the Part B branch.
|
||
|
|
2. Run `ria-agent stream` on your laptop.
|
||
|
|
3. Create a Screens app with `dataSource.type: "agent"` and `device: "mock"`.
|
||
|
|
4. Start the app; confirm the agent logs "streaming" and the SSE stream
|
||
|
|
shows inference metrics flowing.
|