diff --git a/Agent TX Streaming Handoff.md b/Agent TX Streaming Handoff.md new file mode 100644 index 0000000..3a4c368 --- /dev/null +++ b/Agent TX Streaming Handoff.md @@ -0,0 +1,223 @@ + +# Agent TX Streaming — `ria-toolkit-oss` Handoff + +**Paired repo:** `ria-hub` (this doc lives here, but it's written for the Claude working in `ria-toolkit-oss`) +**Source of truth for the overall design:** [Agent TX Streaming - Cross-Repo Plan.md](./Agent%20TX%20Streaming%20-%20Cross-Repo%20Plan.md) — read that first. +**Status (ria-hub side):** landed 2026-04-16. Ready to talk to a TX-capable agent. + +--- + +## Your job + +Implement Part A of the plan in `ria-toolkit-oss` (§A1–A8). The hub is already speaking the protocol below and waiting for an agent that can: + +1. Accept hub → agent binary TX buffers over an existing WebSocket. +2. Enforce the operator-configured TX interlocks (`tx_enabled`, `tx_max_gain_db`, `tx_max_duration_s`, `tx_allowed_freq_ranges`). +3. Drive `sdr.init_tx` / `_stream_tx` so a real Pluto transmits what the hub sends. +4. Report status back as `tx_status` JSON frames. + +Full-duplex with the existing RX session on the same `app_id` must keep working. + +--- + +## What ria-hub does (so you know what's on the other end of the wire) + +You don't need to know any of this to do your work — but if you hit a wall, here's the mental model: + +| Hub-side concept | What it does | +|---|---| +| `AgentTxSink` (Python, Celery worker) | Mirrors `AgentDataSource`. Publishes `tx_start`/`tx_stop`/`tx_configure` control JSON and raw binary IQ buffers intended for the agent. | +| `/screens/agent/ws` FastAPI endpoint | The WebSocket you already connect to. Now also pumps hub → agent binary TX frames and republishes your `tx_status` JSON upstream to the Celery task. | +| Redis channels (`agent:tx_iq:*`, `agent:events:*`) | Internal to the hub. You will never see them. Everything reaches you as WS frames. | +| Capability gate | Hub refuses to launch a TX app unless it's seen a recent heartbeat from you with `"tx" ∈ capabilities` and `tx_enabled: true`. | +| Audit log (`AgentTxAudit`) | Hub persists who started what transmission at what frequency and gain. Your error messages in `tx_status` end up in that record. | + +**Bottom line: from your process, this is still the same WebSocket you've been using. You're just getting new message types and a new class of binary frames going the other direction.** + +--- + +## Protocol contract (the only thing you actually need) + +All additions. Existing RX messages (`start`/`stop`/`configure` + agent → hub binary) keep their current semantics — do not touch them. + +### Hub → agent + +**JSON control frames** (text WS frames): + +```jsonc +// Arm TX. Call sdr.init_tx with this radio_config and start _stream_tx. +// After this you'll start receiving binary frames (see below) that go into +// the stream callback. +{ + "type": "tx_start", + "app_id": "app-abc", + "radio_config": { + "device": "pluto", + "identifier": "ip:192.168.3.1", + "tx_sample_rate": 1000000, + "tx_center_frequency": 2450000000, + "tx_gain": -20, // dB. Pluto: negative = attenuation. + "tx_bandwidth": 1000000, // optional + "buffer_size": 1024, // optional; complex samples per buffer + "underrun_policy": "pause" // "pause" | "zero" | "repeat" + } +} + +// Stop TX, drain queue, pause_tx. RX session on the same app_id (if any) +// stays alive. +{ "type": "tx_stop", "app_id": "app-abc" } + +// Apply parameter changes at the next buffer boundary. Any subset of +// radio_config fields. +{ "type": "tx_configure", "app_id": "app-abc", "radio_config": { "tx_gain": -25 } } + +// Advisory — safe to ignore. Hub publishes this whenever it RPUSHes a new +// binary buffer; it was wired so the WS bridge wakes up promptly. You do +// NOT need to act on it. Consider it a keepalive. +{ "type": "tx_data_available", "app_id": "app-abc" } +``` + +**Binary frames** (binary WS frames): + +* Raw interleaved `float32` IQ samples in `[-1, 1]`. +* One frame = one buffer. +* Byte length is always `num_complex_samples × 8` (8 bytes per complex sample: two float32s). +* **Only valid between `tx_start` and `tx_stop`.** If you receive a binary frame outside that window, drop it and log WARN — don't crash, don't panic. + +Format validator is already in `ria_toolkit_oss.sdr.sdr._verify_sample_format` — reuse it. + +### Agent → hub + +**JSON status frames** (text WS frames). Use the existing `send_json` path: + +```jsonc +// Lifecycle — emit on every state transition. +{ "type": "tx_status", "app_id": "app-abc", "state": "armed" } +{ "type": "tx_status", "app_id": "app-abc", "state": "transmitting" } +{ "type": "tx_status", "app_id": "app-abc", "state": "underrun" } +{ "type": "tx_status", "app_id": "app-abc", "state": "done" } + +// Errors — include a human-readable message. Hub surfaces it to the UI +// and writes it into the audit record. +{ "type": "tx_status", "app_id": "app-abc", "state": "error", + "message": "gain -5 exceeds tx_max_gain_db=-15" } +``` + +**States** (hub assumes this vocabulary): +* `armed` — `init_tx` done, callback started, queue empty, nothing transmitting yet. +* `transmitting` — at least one buffer has flowed through the callback. +* `underrun` — queue drained; what you do next depends on `underrun_policy`: + * `"pause"` → call `pause_tx()`, emit `underrun`, stay paused until the hub sends a fresh `tx_start`. + * `"zero"` → continue with `np.zeros(...)` fills, still emit `underrun` once so the hub can show the indicator. + * `"repeat"` → loop the last good buffer, emit `underrun` once. +* `done` — clean stop after `tx_stop`. +* `error` — capability rejection or hardware failure. Include `message`. + +**Extended heartbeat** — you are already sending heartbeats. Grow the payload: + +```jsonc +{ + "type": "heartbeat", + "hardware": ["mock", "pluto"], + "status": "streaming", // unchanged semantics + "capabilities": ["rx", "tx"], // NEW — derived from tx_enabled + SDR class having init_tx + "tx_enabled": true, // NEW — mirror of cfg.tx_enabled + "tx_max_gain_db": -10, // NEW — optional, from agent config + "tx_max_duration_s": 60, // NEW — optional + "tx_allowed_freq_ranges": [[2.4e9, 2.5e9]], // NEW — optional + "sessions": { // NEW — optional per-session snapshot + "rx": { "app_id": "app-abc", "state": "streaming" }, + "tx": { "app_id": "app-abc", "state": "transmitting" } + }, + "app_id": "app-abc" // keep for back-compat +} +``` + +The hub reads these fields, stores them on `ScreensAgent`, and gates TX launches on them. **If you don't advertise `tx` in `capabilities` and `tx_enabled: true`, the hub will refuse to start any TX app with HTTP 400 — no WS traffic will be generated.** + +### Backpressure model (what happens when you can't keep up) + +* The hub caps its outbound TX queue at 200 buffers. If it fills, the hub either blocks on `write()` or drops the oldest buffer — both are benign for you. +* On the agent side, enforce your own cap (plan §A2 suggests 8 buffers). When full, `await ws.send` on the hub will slow via TCP/WS backpressure. You don't need an application-level flow-control message. + +--- + +## Implementation roadmap (mapped to the Cross-Repo Plan) + +Work in the order below. Each row is a single PR-sized unit. + +| # | Plan ref | Deliverable | Acceptance | +|---|---|---|---| +| 1 | §A3 | `AgentConfig` gains `tx_enabled`, `tx_max_gain_db`, `tx_max_duration_s`, `tx_allowed_freq_ranges`. `save()` keeps 0600. | Unit test: round-trip through `~/.ria/agent.json`. | +| 2 | §A4 | `ria-agent register --allow-tx --tx-max-gain-db -10 --tx-max-duration 60` persists into config. `ria-agent stream --allow-tx` is a runtime override. | Integration test: `ria-agent register --allow-tx` then `cat ~/.ria/agent.json` shows fields. | +| 3 | §A1 | `ws_client.run()` grows a `on_binary: Callable[[bytes], Awaitable[None]]` parameter. Reconnect + heartbeat + malformed-frame behavior unchanged. | Existing `test_ws_client.py` still passes; new `test_ws_client_binary.py` asserts bytes reach the handler. | +| 4 | §A2 | Replace flat `self._sdr` / `self._app_id` state in `streamer.py` with `RxSession` + `TxSession` dataclasses. SDR instances cached by `(device, identifier)` so RX+TX share one handle on the same device. | Unit test: creating a TxSession on the same device as an active RxSession reuses the same SDR object. | +| 5 | §A2 | `_handle_tx_start`, `_handle_tx_stop`, `_handle_tx_configure` + `on_binary(data)` → `self._tx.queue.put(data)`. TX loop runs `_stream_tx` in an executor thread with a thread-safe `queue.Queue` adapter. | Integration test against MockSDR: tx_start → 10 binary frames → tx_stop produces exactly those samples through the callback. | +| 6 | §A2 | Underrun handling: `"pause"` / `"zero"` / `"repeat"` fills. Emits `tx_status: underrun` exactly once per drain event. | Unit test per policy against a slow producer. | +| 7 | §A2 | Cap enforcement **before** opening the SDR: reject with `tx_status: error` if `tx_enabled=False`, gain exceeds cap, freq outside allowed ranges, or duration cap exceeded (watchdog in TX loop calls `tx_stop` after `tx_max_duration_s`). | Unit test per rejection path; SDR is never opened when rejection fires. | +| 8 | §A5 | Heartbeat grows `capabilities`, `tx_enabled`, optional caps, `sessions`. | Integration test: start agent with `--allow-tx`, connect, verify heartbeat payload. | +| 9 | §A6 | Audit the Pluto driver's `_tx_lock` + `_param_lock` interaction to ensure concurrent RX + TX on the same `adi.Pluto` doesn't race on attribute writes. `MockSDR.init_tx` already exists — no change needed. | Stress test: 30 seconds of concurrent RX + TX on MockSDR with `_param_lock` instrumented for contention. | +| 10 | §A7 | Test matrix per plan: `test_streamer_tx`, `test_tx_safety`, `test_tx_underrun`, `test_full_duplex`, `test_ws_client_binary`, `test_integration_tx`. | All green in CI. | +| 11 | §A8 | Docs: new `docs/agent_tx_protocol.md` OR extended section in existing agent protocol doc. Regulatory disclaimer included. | Lints + renders. | + +**Ship order advice:** 1 → 2 → 3 → 4 → (5 || 6) → 7 → 8 → 9 → 10 → 11. Steps 1–3 are strict prerequisites for everything else. Steps 5 and 6 can parallelize. Step 7 can't land without 5. + +--- + +## Verification loop (how to prove the two sides talk) + +Once you've implemented §A1–A7, use this to close the loop with the live hub: + +1. On the agent host, run `ria-agent register --allow-tx --tx-max-gain-db -10 --tx-max-duration 60` then `ria-agent stream`. +2. Confirm the hub has seen the heartbeat: `curl $HUB/screens/agents/json | jq '.agents[] | select(.agent_id==...) | {tx_enabled, capabilities}'` should show `tx_enabled: true` and `["rx","tx"]`. +3. Create a Screens app whose manifest contains a `dataSink.type == "agent"` pointing at your `agent_id`, or use the composer UI with a `PlutoTXOp` in the graph + the new TX-sink agent picker. +4. `POST /screens/apps/{id}/start` on the hub. You should observe, in order: + 1. `tx_start` JSON on your WS. + 2. Binary frames arriving (if the hub-side operator is actually generating buffers — may no-op for now since the operator refactor is planned but not done). + 3. Your `tx_status: armed` JSON emitted back. +5. Stop the app. You should receive `tx_stop` and emit `tx_status: done`. +6. Provoke a rejection: set `tx_max_gain_db: -15` in your config, then start a TX app with `tx_gain: -5`. The hub should return `HTTP 400` from `/start` without any WS traffic — capability gate fires first. If you make it past the gate and it's still wrong, emit `tx_status: error` and the hub will surface the message to the UI. + +**Useful hub-side greps if something is wrong:** +* `grep -r "tx_status" controller/app/modules/screens/` — see how the hub parses your frames. +* `grep -r "tx_enabled" controller/app/modules/screens/` — see what heartbeat fields the hub reads. +* `controller/app/modules/screens/agent_ws.py:200-290` — the WS handler's JSON dispatch. +* `controller/app/modules/screens/data_sinks.py` — what the hub publishes on each control frame. + +--- + +## Open questions (from the original plan that still apply) + +Answered since the plan was written: +* ✅ **Operator name:** `PlutoTXOp` (PascalCase, stored in the hub's MongoDB `ops` collection via the application packager). +* ✅ **Redis channel naming:** kept `agent:*` prefix on the hub side — you never see this. +* ✅ **Status plumbing:** `tx_status` frames get republished on a hub-internal pub/sub and surface to the UI through the existing SSE stream. You just send the frames; the hub does the rest. + +Still open (flag when you have a preference): +* **Bulk + loop fast-path.** If the hub's TX operator turns out to be a fixed recording played on loop, we could add a `{ "type": "tx_start", ..., "loop": true }` variant where the hub sends the buffer once and the agent uses the existing `tx_recording` path. Protocol-compatible with the streaming version. Defer until a real use case demands it. +* **Multi-app-per-agent.** Out of scope for v1 (§Non-goals). If/when needed: prefix binary frames with a 4-byte session header and bump a `protocol_version` in the heartbeat. +* **TX clock drift.** Relying on generous queue depth + stable local networks for v1. Longer term may need agent-side resampling. + +--- + +## What lives in `ria-hub` now (reference) + +You don't need to read any of this, but if you're curious or need to debug the integration, these are the load-bearing bits on the hub side: + +| Path | What | +|---|---| +| `controller/app/modules/screens/data_sinks.py` | `AgentTxSink`, `LocalPlutoTxSink`, `build_data_sink` | +| `controller/app/modules/screens/agent_ws.py` | `_forward_tx_binary`, heartbeat parsing, `tx_status` republish | +| `controller/app/modules/screens/graph_derivation.py` | `_pluto_tx_spec_mapping`, `_SDR_SINK_MAP`, `_derive_data_sink` | +| `controller/app/modules/screens/routes.py` | `_check_agent_tx_capability`, `AgentTxAudit` write, `POST /apps/{id}/sink-agent` | +| `controller/app/modules/screens/models.py` | `ScreensAgent` TX fields, `AgentTxAudit` document | +| `schemas/screens/app_manifest.schema.json` | `dataSink` schema block | +| `web_src/js/components/screens/components/TxConsentModal.vue` | Pre-transmit consent dialog | +| `web_src/js/components/screens/components/SinkPanel.vue` | TX-capable agent picker + live `tx_status` indicator | +| `web_src/js/components/screens/ScreensApp.vue` | Consent gate + `tx_status` forwarding to children | + +--- + +## Regulatory note (keep this in your docs too) + +Transmission is regulated in every jurisdiction. The agent-side interlocks (`tx_enabled`, caps, freq ranges) exist so the operator can configure safe defaults for an agent's physical location. They are not a substitute for licensing or for respecting local regulations. The hub shows a consent modal and writes an audit log so actions are attributable. None of this is a legal compliance layer — it's defense-in-depth. \ No newline at end of file diff --git a/docs/agent_tx_implementation_plan.md b/docs/agent_tx_implementation_plan.md new file mode 100644 index 0000000..7458a8a --- /dev/null +++ b/docs/agent_tx_implementation_plan.md @@ -0,0 +1,568 @@ +# Agent TX Streaming — Implementation Plan (`ria-toolkit-oss`) + +**Scope:** Part A of [agent_tx_plan.md](./agent_tx_plan.md). This repo only. +**Goal:** Make the agent accept hub-originated TX control + binary IQ, stream it to the SDR in full duplex with RX, and enforce agent-local safety caps. +**Acceptance:** `pytest tests/agent/` green; `ria-agent stream --allow-tx` accepts a `tx_start` against MockSDR and round-trips binary frames to `_stream_tx`. + +Each phase below lands independently. After every phase the existing agent tests must still pass (no regressions), and the phase's own new tests must be green. + +--- + +## Preconditions + +- `--allow-tx` is opt-in at CLI level. Default config has `tx_enabled=False`; the agent will reject all TX control frames from the hub. +- Pluto FDD: one `adi.Pluto` instance serves both RX and TX. We share the SDR between sessions keyed by `(device, identifier)`. +- Known pre-existing bug: [`sdr/pluto.py:151`](../src/ria_toolkit_oss/sdr/pluto.py#L151) sets `_rx_initialized = False` inside `init_tx`. Our streamer's RX path (`sdr.rx(n)`) does not read this flag, so FDD still works. Leave the bug for a separate follow-up; do not refactor Pluto in this plan. + +## Glossary + +- **Session** = a `(app_id, direction)` pair held by the agent: one `RxSession` or one `TxSession`. +- **Direction** = `"rx"` (agent → hub binary) or `"tx"` (hub → agent binary). +- **Shared SDR** = when the same `(device, identifier)` is referenced by an RX and TX session concurrently; both sessions hold the same driver instance. + +--- + +## Phase 1 — WS binary ingress + +**Why first:** protocol-plumbing only. No behavior change for existing RX, but unblocks every later phase. + +### Touches + +- `src/ria_toolkit_oss/agent/ws_client.py` — add optional `on_binary` callback to `WsClient.run`. +- `tests/agent/test_ws_client.py` — add a "server sends binary, handler receives" case. + +### Shape + +```python +# ws_client.py +BinaryHandler = Callable[[bytes], Awaitable[None]] + +async def run( + self, + on_message: MessageHandler, + heartbeat: HeartbeatBuilder, + on_binary: BinaryHandler | None = None, # NEW, default preserves old behavior +) -> None: + ... + async for raw in self._ws: + if isinstance(raw, bytes): + if on_binary is not None: + try: + await on_binary(raw) + except Exception: + logger.exception("on_binary handler raised; dropping frame") + else: + logger.debug("Discarding unexpected %d-byte binary frame", len(raw)) + continue + # ... existing JSON dispatch unchanged +``` + +### Acceptance + +- New test: local `websockets` server pushes a binary frame after JSON handshake → handler sees exact bytes. +- Existing `test_ws_client.py` cases still pass with `on_binary=None`. + +--- + +## Phase 2 — Config + CLI TX opt-in + +**Why second:** small, isolated, and gives the rest of the phases a real `AgentConfig.tx_enabled` / caps to read. + +### Touches + +- `src/ria_toolkit_oss/agent/config.py` +- `src/ria_toolkit_oss/agent/cli.py` +- `tests/agent/test_config.py` +- new `tests/agent/test_cli_tx.py` + +### config.py + +Extend the dataclass and preserve backward-compat for old JSON files (the existing `extra` trick already handles unknown keys, but we want these fields promoted to first-class): + +```python +@dataclass +class AgentConfig: + hub_url: str = "" + agent_id: str = "" + token: str = "" + name: str = "" + insecure: bool = False + api_key: str = "" + # NEW — TX interlocks + tx_enabled: bool = False + tx_max_gain_db: float | None = None + tx_max_duration_s: float | None = None + tx_allowed_freq_ranges: list[list[float]] | None = None # JSON-friendly list-of-lists + extra: dict = field(default_factory=dict) +``` + +Update `load()` to pull the new fields and `save()` to emit them. Preserve the `0o600` chmod behavior. + +### cli.py + +Two entry points need flags: + +``` +ria-agent register --hub ... --api-key ... + [--allow-tx] + [--tx-max-gain-db VALUE] + [--tx-max-duration-s VALUE] + [--tx-freq-range LO HI] # repeatable: --tx-freq-range 2.4e9 2.5e9 --tx-freq-range 5.7e9 5.8e9 +``` + +``` +ria-agent stream + [--allow-tx] # runtime override: sets cfg.tx_enabled for this process only +``` + +In `_cmd_register`: after successful server registration, populate `cfg.tx_enabled=bool(args.allow_tx)` and caps from argparse before `_config.save(cfg)`. + +In `_cmd_stream`: `if args.allow_tx: cfg.tx_enabled = True` (before passing `cfg` to the streamer — which requires plumbing `cfg` in, see Phase 3). + +### Acceptance + +- `test_config.py` round-trip: new fields serialize → deserialize cleanly; missing fields in old JSON default correctly. +- `test_cli_tx.py`: `register --allow-tx --tx-max-gain-db -10` writes expected JSON; `stream --allow-tx` sets runtime flag without touching disk. + +--- + +## Phase 3 — Streamer refactor: session model (RX behavior preserved) + +**Why third:** the TX work needs a session-based state machine. Doing the refactor before wiring TX keeps the diff reviewable and keeps the RX regression surface contained. + +**Goal:** replace the flat state (`self._sdr`, `self._app_id`, `self._capture_task`, `self._pending_config`, `self._status`) with explicit session objects and an SDR registry, without changing any observable RX behavior. + +### Touches + +- `src/ria_toolkit_oss/agent/streamer.py` (bulk of work) +- `src/ria_toolkit_oss/agent/hardware.py` (heartbeat grows `capabilities` + optional `sessions` snapshot) +- `src/ria_toolkit_oss/agent/cli.py` (plumb `cfg` into the streamer) +- `tests/agent/test_streamer.py` + `test_hardware.py` — update for new heartbeat shape; keep all RX assertions. + +### Data model + +```python +from dataclasses import dataclass, field + +@dataclass +class RxSession: + app_id: str + sdr: Any + device_key: tuple[str, str | None] # (device, identifier) + buffer_size: int + task: asyncio.Task + pending_config: dict = field(default_factory=dict) + +@dataclass +class TxSession: + app_id: str + sdr: Any + device_key: tuple[str, str | None] + buffer_size: int + queue: queue.Queue # thread-safe; bytes -> np.complex64 buffers + stop_event: threading.Event + task: asyncio.Task # wraps run_in_executor(sdr._stream_tx, ...) + underrun_policy: str = "pause" + pending_config: dict = field(default_factory=dict) + last_buffer: np.ndarray | None = None # for "repeat" policy + started_at: float = 0.0 + max_duration_s: float | None = None +``` + +### SDR registry (ref-counted) + +```python +class _SdrRegistry: + def __init__(self, factory): + self._factory = factory # (device, identifier) -> SDR + self._instances: dict[tuple[str, str|None], tuple[Any, int]] = {} + self._lock = threading.Lock() + + def acquire(self, device: str, identifier: str | None): + key = (device, identifier) + with self._lock: + if key in self._instances: + sdr, rc = self._instances[key] + self._instances[key] = (sdr, rc + 1) + return sdr, key + sdr = self._factory(device, identifier) + self._instances[key] = (sdr, 1) + return sdr, key + + def release(self, key: tuple[str, str|None]) -> bool: + with self._lock: + sdr, rc = self._instances[key] + if rc <= 1: + del self._instances[key] + return True # caller should close() + self._instances[key] = (sdr, rc - 1) + return False +``` + +### Streamer state + +```python +class Streamer: + def __init__(self, ws, cfg: AgentConfig, sdr_factory=None): + self.ws = ws + self._cfg = cfg + self._registry = _SdrRegistry(sdr_factory or _default_sdr_factory) + self._rx: RxSession | None = None + self._tx: TxSession | None = None +``` + +### Message dispatch + +```python +async def on_message(self, msg: dict) -> None: + t = msg.get("type") + handlers = { + "start": self._handle_rx_start, + "stop": self._handle_rx_stop, + "configure": self._handle_rx_configure, + # TX handlers stubbed here in Phase 3, implemented in Phase 4 + "tx_start": self._handle_tx_start, + "tx_stop": self._handle_tx_stop, + "tx_configure": self._handle_tx_configure, + } + handler = handlers.get(t) + if handler is None: + logger.warning("Unknown server message type: %r", t) + return + await handler(msg) +``` + +Rename internals: `_handle_start → _handle_rx_start`, `_handle_stop → _handle_rx_stop`, etc. Behavior unchanged — just reading/writing `self._rx` in place of the old flat attributes, and going through the registry for acquire/release. + +### Heartbeat + +```python +# streamer.py +def build_heartbeat(self) -> dict: + status = "streaming" if (self._rx or self._tx) else "idle" + sessions: dict = {} + if self._rx: sessions["rx"] = {"app_id": self._rx.app_id, "state": "streaming"} + if self._tx: sessions["tx"] = {"app_id": self._tx.app_id, "state": self._tx_state()} + return heartbeat_payload( + status=status, + app_id=(self._rx or self._tx).app_id if (self._rx or self._tx) else None, + cfg=self._cfg, + sessions=sessions or None, + ) +``` + +Update `hardware.heartbeat_payload` to take `cfg` (for `capabilities`/`tx_enabled`) and optional `sessions`. Keep unknown-arg compatibility — existing tests can pass `cfg=AgentConfig()` to get the old shape minus the new fields. + +### Phase 3 acceptance + +- All existing `test_streamer.py` / `test_integration.py` / `test_hardware.py` cases pass, with the heartbeat additions asserted in `test_hardware.py` (capabilities = `["rx"]` when `tx_enabled=False`). +- New test: two `start` messages in sequence with same `(device, identifier)` both succeed without recreating the SDR (registry hit). (This is a Phase 3 bonus — confirms the registry works before TX consumes it.) +- New test: `tx_start` with `tx_enabled=False` returns `tx_status: error` (handler stubs can do just this much in Phase 3, full implementation lands in Phase 4). + +--- + +## Phase 4 — TX implementation + +**Why fourth:** now that binary arrives, config exists, and sessions exist, wire up real TX. + +### Touches + +- `src/ria_toolkit_oss/agent/streamer.py` +- Potentially a small helper module `src/ria_toolkit_oss/agent/_tx_loop.py` if streamer.py gets unwieldy. +- `tests/agent/test_streamer_tx.py`, `test_tx_safety.py`, `test_tx_underrun.py`, `test_full_duplex.py` + +### Binary ingress + +```python +async def on_binary(self, data: bytes) -> None: + if self._tx is None: + logger.debug("Dropping %d-byte binary frame: no TX session", len(data)) + return + try: + self._tx.queue.put(data, timeout=2.0) # backpressure: block if full + except queue.Full: + logger.warning("TX queue stalled; dropping frame (agent side)") +``` + +Wire this in via `ws.run(..., on_binary=self.on_binary)` — change `run_streamer()`'s `ws.run` call accordingly. + +### `_handle_tx_start` + +```python +async def _handle_tx_start(self, msg: dict) -> None: + app_id = msg.get("app_id") or "" + cfg_radio = dict(msg.get("radio_config") or {}) + + # 1) interlocks + if not self._cfg.tx_enabled: + return await self._send_tx_error(app_id, "tx disabled on this agent") + gain = cfg_radio.get("tx_gain") + if self._cfg.tx_max_gain_db is not None and gain is not None and float(gain) > self._cfg.tx_max_gain_db: + return await self._send_tx_error(app_id, f"tx_gain {gain} exceeds cap {self._cfg.tx_max_gain_db}") + freq = cfg_radio.get("tx_center_frequency") + if self._cfg.tx_allowed_freq_ranges and freq is not None: + if not any(lo <= float(freq) <= hi for lo, hi in self._cfg.tx_allowed_freq_ranges): + return await self._send_tx_error(app_id, f"tx_center_frequency {freq} outside allowed ranges") + + if self._tx is not None: + return await self._send_tx_error(app_id, "tx already active on this agent") + + # 2) device + device = cfg_radio.pop("device", None) + identifier = cfg_radio.pop("identifier", None) + buffer_size = int(cfg_radio.pop("buffer_size", 1024)) + underrun_policy = cfg_radio.pop("underrun_policy", "pause") + if not device: + return await self._send_tx_error(app_id, "tx_start missing radio_config.device") + + try: + sdr, device_key = self._registry.acquire(device, identifier) + _apply_sdr_config(sdr, cfg_radio) # sets tx_* attributes via alias map + # explicit init_tx if the driver supports it + if hasattr(sdr, "init_tx"): + sdr.init_tx( + sample_rate=cfg_radio.get("tx_sample_rate"), + center_frequency=cfg_radio.get("tx_center_frequency"), + gain=cfg_radio.get("tx_gain"), + channel=cfg_radio.get("tx_channel", 0), + gain_mode=cfg_radio.get("tx_gain_mode", "manual"), + ) + except Exception as exc: + self._registry.release(device_key) + logger.exception("Failed to init TX on %r", device) + return await self._send_tx_error(app_id, f"tx init failed: {exc}") + + # 3) build session + launch loop + self._tx = TxSession( + app_id=app_id, + sdr=sdr, + device_key=device_key, + buffer_size=buffer_size, + queue=queue.Queue(maxsize=8), + stop_event=threading.Event(), + task=None, # filled below + underrun_policy=underrun_policy, + max_duration_s=self._cfg.tx_max_duration_s, + started_at=time.monotonic(), + ) + loop = asyncio.get_running_loop() + self._tx.task = loop.run_in_executor(None, self._tx_executor_body) + + await self._send_tx_status(app_id, "armed") + # streamer transitions to "transmitting" on the first buffer consumed in the thread; + # schedule a tiny watchdog that emits that status when queue count rises. +``` + +### TX executor body + +Runs in a worker thread. Blocks in the SDR's `_stream_tx` driven by our callback that pulls from the queue. + +```python +def _tx_executor_body(self) -> None: + sdr = self._tx.sdr + try: + sdr._stream_tx(self._tx_callback) + except Exception: + logger.exception("TX stream crashed") + # surface via asyncio side + asyncio.run_coroutine_threadsafe( + self._send_tx_status(self._tx.app_id, "error", "stream crashed"), + asyncio.get_event_loop(), + ) + +def _tx_callback(self, num_samples): + tx = self._tx + if tx is None or tx.stop_event.is_set(): + sdr = tx.sdr if tx else None + if sdr is not None: + sdr.pause_tx() + return _silence(num_samples) + + # duration watchdog + if tx.max_duration_s is not None and (time.monotonic() - tx.started_at) > tx.max_duration_s: + tx.stop_event.set() + tx.sdr.pause_tx() + _schedule(self._send_tx_status(tx.app_id, "done", "max duration reached")) + return _silence(num_samples) + + try: + raw = tx.queue.get(timeout=0.1) + except queue.Empty: + return self._underrun_fill(tx, num_samples) + + samples = np.frombuffer(raw, dtype=np.float32) + # interleaved float32 -> complex64 + if samples.size % 2 != 0 or samples.size // 2 != num_samples: + # malformed / wrong-sized frame; underrun this cycle + logger.warning("TX frame size mismatch: got %d floats, expected %d", samples.size, num_samples * 2) + return self._underrun_fill(tx, num_samples) + complex_samples = samples.reshape(-1, 2).view(np.complex64).reshape(-1) + tx.last_buffer = complex_samples + return complex_samples +``` + +Helper `_underrun_fill`: + +```python +def _underrun_fill(self, tx: TxSession, num_samples: int): + if tx.underrun_policy == "zero": + return np.zeros(num_samples, dtype=np.complex64) + if tx.underrun_policy == "repeat" and tx.last_buffer is not None: + return tx.last_buffer[:num_samples] if tx.last_buffer.size >= num_samples \ + else np.concatenate([tx.last_buffer, + np.zeros(num_samples - tx.last_buffer.size, dtype=np.complex64)]) + # "pause" (default) + tx.stop_event.set() + tx.sdr.pause_tx() + _schedule(self._send_tx_status(tx.app_id, "underrun")) + return np.zeros(num_samples, dtype=np.complex64) +``` + +`_schedule()` is a tiny wrapper around `asyncio.run_coroutine_threadsafe` that resolves the loop once at streamer construction. + +### `_handle_tx_stop` + +```python +async def _handle_tx_stop(self, msg: dict) -> None: + tx = self._tx + if tx is None: + return + tx.stop_event.set() + tx.sdr.pause_tx() + # drain the queue so the executor thread wakes + try: + while True: + tx.queue.get_nowait() + except queue.Empty: + pass + # wait up to ~1s for the executor thread to finish + if tx.task is not None: + try: + await asyncio.wait_for(asyncio.wrap_future(tx.task), timeout=1.0) + except asyncio.TimeoutError: + logger.warning("TX executor did not exit within 1s after stop") + + # release SDR reference + should_close = self._registry.release(tx.device_key) + if should_close: + try: + tx.sdr.close() + except Exception: + logger.exception("Error closing SDR on tx_stop") + + self._tx = None + await self._send_tx_status(msg.get("app_id") or "", "done") +``` + +### `_handle_tx_configure` + +```python +async def _handle_tx_configure(self, msg: dict) -> None: + if self._tx is None: + return + self._tx.pending_config.update(msg.get("radio_config") or {}) +``` + +Consume `pending_config` at the top of `_tx_callback` before pulling from the queue (same pattern as RX's `_capture_loop`), using `_apply_sdr_config` with tx aliases. + +### `_apply_sdr_config` — extend alias map + +```python +_CONFIG_ATTR_MAP = { + # existing RX aliases... + "sample_rate": ("sample_rate", "rx_sample_rate"), + "center_frequency": ("center_freq", "rx_center_frequency"), + "gain": ("gain", "rx_gain"), + "bandwidth": ("bandwidth", "rx_bandwidth"), + # NEW TX aliases + "tx_sample_rate": ("tx_sample_rate",), + "tx_center_frequency": ("tx_center_frequency", "tx_lo"), + "tx_gain": ("tx_gain",), + "tx_bandwidth": ("tx_bandwidth",), +} +``` + +Pluto has `set_tx_sample_rate`, `set_tx_center_frequency`, `set_tx_gain` — those are called by `init_tx` using the attribute values, so setting attributes via `_apply_sdr_config` + calling `init_tx` is sufficient. + +### Phase 4 acceptance + +- `test_streamer_tx.py`: full happy path — `tx_start` against MockSDR → 3 binary frames → verify `_stream_tx` callback received them in order → `tx_stop` → session cleared, SDR closed. +- `test_tx_safety.py`: one test per cap — `tx_enabled=False`, gain cap, freq range, duplicate session. Each produces a `tx_status: error` JSON; registry shows zero outstanding acquires. +- `test_tx_underrun.py`: three tests — `pause` (session ends, `underrun` emitted), `zero` (callback returns zeros, no status change), `repeat` (callback returns last buffer). +- `test_full_duplex.py`: against MockSDR, send `start` + `tx_start` with same `(device=mock, identifier=None)` → registry ref-count = 2 → both sessions stream independently → stop one, other still runs → stop second, SDR closed. + +--- + +## Phase 5 — Integration + docs + +**Touches:** + +- `tests/agent/test_integration_tx.py` — end-to-end with a real local `websockets` server + MockSDR. Mirror `test_integration.py`'s shape: register → heartbeat with `tx_enabled=True` → tx_start → 3 binary frames → tx_stop. +- `docs/agent_tx_protocol.md` — short, user-facing: message types, binary format, heartbeat additions, interlock config, CLI examples. Link from [screens_agent_handoff.md](./screens_agent_handoff.md). +- `README.md` (if it mentions agent subcommands) — add `--allow-tx` usage. + +**Real-Pluto smoke test** (manual, not in CI): + +1. `ria-agent register --hub http://hub:3005 --api-key KEY --allow-tx --tx-max-gain-db -10 --tx-freq-range 2.4e9 2.5e9` +2. `ria-agent stream` +3. From a Python REPL with `websockets`, open the hub WS on the agent's behalf (bypass hub during dev), send a `tx_start` + binary frames of a 1kHz tone → confirm carrier on a spectrum analyzer at the configured frequency. + +--- + +## File-by-file summary + +| File | Phase | Change | +|---|---|---| +| `src/ria_toolkit_oss/agent/ws_client.py` | 1 | Add `on_binary` callback. | +| `src/ria_toolkit_oss/agent/config.py` | 2 | Add `tx_enabled`, `tx_max_gain_db`, `tx_max_duration_s`, `tx_allowed_freq_ranges`. | +| `src/ria_toolkit_oss/agent/cli.py` | 2, 4 | `--allow-tx` + cap flags on `register`; `--allow-tx` on `stream`; plumb `cfg` into `Streamer`. | +| `src/ria_toolkit_oss/agent/hardware.py` | 3 | `heartbeat_payload(cfg, sessions)` with `capabilities`, `tx_enabled`. | +| `src/ria_toolkit_oss/agent/streamer.py` | 3, 4 | Session refactor, SDR registry, TX dispatch, TX loop, underrun fills, `_apply_sdr_config` TX aliases. | +| `src/ria_toolkit_oss/agent/_tx_loop.py` | 4 (opt) | Extracted TX callback helpers if streamer.py > ~400 lines. | +| `tests/agent/test_ws_client.py` | 1 | Binary-frame case. | +| `tests/agent/test_config.py` | 2 | Round-trip new fields. | +| `tests/agent/test_cli_tx.py` | 2 | New — `--allow-tx` flag handling. | +| `tests/agent/test_hardware.py` | 3 | Heartbeat `capabilities` + `sessions`. | +| `tests/agent/test_streamer.py` | 3 | Refactor for session model; RX assertions unchanged. | +| `tests/agent/test_streamer_tx.py` | 4 | New — TX happy path. | +| `tests/agent/test_tx_safety.py` | 4 | New — cap enforcement. | +| `tests/agent/test_tx_underrun.py` | 4 | New — pause/zero/repeat policies. | +| `tests/agent/test_full_duplex.py` | 4 | New — shared SDR ref count. | +| `tests/agent/test_integration_tx.py` | 5 | New — real `websockets` server E2E. | +| `docs/agent_tx_protocol.md` | 5 | New — operator-facing protocol doc. | + +--- + +## Implementation gotchas (do not skip) + +1. **Asyncio ↔ thread bridge.** The SDR's `_stream_tx` is synchronous and runs in an executor thread. Its callback must not `await`. Use `queue.Queue` (thread-safe) for inbound buffers and `asyncio.run_coroutine_threadsafe(coro, loop)` to emit `tx_status` from inside the thread. Resolve `loop` once at streamer construction; don't call `get_event_loop()` from the thread. + +2. **`sdr.pause_tx()` from inside the callback.** Pluto's `_stream_tx` loop condition is `while self._enable_tx is True`. Calling `pause_tx()` inside the callback sets `_enable_tx = False` so the NEXT iteration exits. That's fine — it may emit one trailing zero-filled buffer. Document this; don't try to exit mid-callback. + +3. **Queue drain on stop.** When `_handle_tx_stop` sets `stop_event` and pauses TX, the executor thread may still be blocked in `queue.get(timeout=0.1)`. Draining the queue does not unblock a timed get. Rely on the 100ms timeout; the thread exits on the next iteration. Don't try to clever-inject a poison pill. + +4. **Interleaved float32 → complex64 conversion.** `np.frombuffer(buf, dtype=np.float32).view(np.complex64)` is zero-copy and correct when `buf.size` is a multiple of 8 bytes. Validate size first; mismatched size = underrun for that cycle, don't crash the thread. + +5. **MockSDR's `_stream_tx`** ([sdr/mock.py:96-100](../src/ria_toolkit_oss/sdr/mock.py#L96-L100)) calls `callback(self.rx_buffer_size)` — it passes a *size*, not samples. The TX callback contract is "I am given `num_samples`, I return that many complex64 samples." `test_streamer_tx` must respect this: the test's `sdr.tx_buffer_size` (if used) doesn't affect what the callback receives from mock. Simplest path: set `MockSDR.rx_buffer_size = buffer_size` in the test harness before `_stream_tx` is invoked, so the TX callback receives the right size. + +6. **`init_tx` on MockSDR vs Pluto.** MockSDR's `init_tx` [sets attributes and flips `_tx_initialized = True`](../src/ria_toolkit_oss/sdr/mock.py#L70-L81). Pluto's does the same plus `_rx_initialized = False` (the FDD bug). For full-duplex tests we currently target MockSDR only — Pluto FDD will work because our RX path ignores `_rx_initialized`, but the real-Pluto smoke test is the only validation. Call that out in the PR description. + +7. **Don't block the event loop.** `asyncio.wait_for(asyncio.wrap_future(tx.task), timeout=1.0)` in `_handle_tx_stop` is non-blocking from the loop's perspective — the 1s cap prevents a misbehaving driver from stalling heartbeat/RX. + +8. **Heartbeat during TX.** The existing heartbeat loop runs on a 30s timer. Sessions snapshot is cheap; no locking needed if we read `self._rx`/`self._tx` references atomically (Python ref swap is GIL-safe for single field reads). + +--- + +## Rollout + +1. Open a single PR per phase (1 → 2 → 3 → 4 → 5), each green on its own. +2. Phase 3 is the riskiest diff (RX refactor). Get a second reviewer if possible; the regression surface is all of current RX behavior. +3. After Phase 4 merges, `ria-agent stream --allow-tx` is a usable toy — you can hand-drive it from a Python REPL with `websockets` to validate against real hardware before the hub side is ready. +4. Phase 5 closes the loop and ships the user-facing docs. + +## Out of scope (explicit) + +- **Multi-app-per-agent** — one RX + one TX per agent in v1. Adding session IDs to binary frames is a v2 protocol bump. +- **Other TX drivers** (HackRF, USRP, bladeRF) — wiring `_CONFIG_ATTR_MAP` entries and verifying `_stream_tx` behavior per-driver. Tackle when the hub has an operator that targets them. +- **Resampling / clock drift** — agent treats the hub-supplied samples as authoritative. Drift manifests as underruns; the underrun policy is the only mitigation. +- **Fixing Pluto's `init_tx` `_rx_initialized = False` reset** — pre-existing, not triggered by our RX path, left for a separate cleanup. diff --git a/docs/agent_tx_plan.md b/docs/agent_tx_plan.md new file mode 100644 index 0000000..616b470 --- /dev/null +++ b/docs/agent_tx_plan.md @@ -0,0 +1,420 @@ +# Agent TX Streaming — Cross-Repo Plan + +**Repos:** `ria-toolkit-oss`, `ria-hub`, Screens frontend +**Status:** Proposal / pre-implementation +**Prerequisites:** The RX-streaming work from [screens_agent_handoff.md](./screens_agent_handoff.md) and [screens_agent_streamer_plan.md](./screens_agent_streamer_plan.md) is landed (agent WS protocol, `AgentDataSource`, `/screens/agents/register`, `/screens/agent/ws`). + +## Goal + +Let a Screens app running on the hub drive a **remote agent's Pluto** (or other TX-capable SDR) to transmit — streaming IQ buffers end-to-end from an operator like `plutoTXoperator` into the agent's `sdr.tx()` path. Mirror image of what `AgentDataSource` already does for RX. + +## Non-goals (v1) + +- Multi-tenant radio sharing (one app owns the radio at a time per agent). +- Bulk/upload-once TX — superseded by streaming per request. +- Arbitrary waveform generation in the agent. The agent is dumb pipe + hardware control; signal generation stays on the hub. + +## Key design decisions + +| # | Decision | Value | +|---|---|---| +| D1 | **Delivery mode** | **Streaming**. Hub pushes binary IQ buffers continuously over the existing WS; agent's `_stream_tx` callback pulls them from an in-agent queue. | +| D2 | **Full-duplex** | **Yes.** A single `app_id` may own both an RX session and a TX session on the same agent concurrently. Same physical SDR handle serves both (Pluto is FDD-capable; `init_rx` and `init_tx` are independent on one `adi.Pluto` instance). | +| D3 | **Safety caps** | **Agent-enforced.** `~/.ria/agent.json` holds `tx_enabled`, `tx_max_gain_db`, `tx_max_duration_s`, optional `tx_allowed_freq_ranges: [[low,high], …]`. Agent rejects `tx_start` frames that violate any of these, independent of what the hub sends. | +| D4 | **Buffer format** | Interleaved float32 IQ, range `[-1, 1]` — same as RX. Format-validated by `ria_toolkit_oss.sdr.sdr._verify_sample_format`. | +| D5 | **Protocol evolution** | Keep existing RX messages (`start`/`stop`/`configure`) unchanged for back-compat. Add parallel `tx_start`/`tx_stop`/`tx_configure`. Heartbeat grows to advertise capabilities. | +| D6 | **Underrun policy** | Default `pause`: if the TX queue empties, agent calls `pause_tx()` and emits `tx_status: underrun`. Hub must recover by sending a fresh `tx_start` + buffers. Configurable per session via `radio_config.underrun_policy ∈ {"pause", "zero", "repeat"}`. | +| D7 | **Backpressure** | Rely on TCP/WS backpressure. Agent caps inbound TX queue at 8 buffers; `await ws.send` on the hub side slows when the agent doesn't drain. No application-level flow control in v1. | +| D8 | **Session identity** | `app_id` identifies a Screens app. Each app has at most one RX session and one TX session per agent. Binary direction disambiguates: agent → hub binary = RX IQ; hub → agent binary = TX IQ. | + +## Protocol specification + +Additions only. Existing RX messages from [screens_agent_handoff.md §Phase 4](./screens_agent_handoff.md) are unchanged. + +### Hub → agent (JSON) + +```jsonc +// Arm the TX side. Agent calls init_tx, starts the stream_tx thread with an empty queue. +// After this, hub sends binary TX buffers on the same WS. +{ + "type": "tx_start", + "app_id": "app-abc", + "radio_config": { + "device": "pluto", + "identifier": "ip:192.168.3.1", + "tx_sample_rate": 1000000, + "tx_center_frequency": 2450000000, + "tx_gain": -20, // dB, negative = attenuation on Pluto + "tx_bandwidth": 1000000, // optional + "buffer_size": 1024, + "underrun_policy": "pause" // "pause" | "zero" | "repeat" + } +} + +// Apply parameter changes at the next buffer boundary. +{ "type": "tx_configure", "app_id": "app-abc", "radio_config": { "tx_gain": -25 } } + +// Stop TX, drain queue, pause_tx, release TX side (RX may continue if a separate RX session is live). +{ "type": "tx_stop", "app_id": "app-abc" } +``` + +### Hub → agent (binary) + +Raw interleaved float32 IQ in `[-1, 1]`. One WS frame = one buffer = `buffer_size` complex samples = `buffer_size * 2 * 4` bytes. Delivered only between `tx_start` and `tx_stop`. Binary frames arriving outside that window are discarded and logged at WARN. + +### Agent → hub (JSON) + +```jsonc +// Lifecycle events. +{ "type": "tx_status", "app_id": "app-abc", "state": "armed" } +{ "type": "tx_status", "app_id": "app-abc", "state": "transmitting" } +{ "type": "tx_status", "app_id": "app-abc", "state": "underrun" } // queue empty; TX paused +{ "type": "tx_status", "app_id": "app-abc", "state": "done" } +{ "type": "tx_status", "app_id": "app-abc", "state": "error", "message": "gain -5 exceeds tx_max_gain_db=-15" } + +// Reject reasons from agent-enforced caps/interlocks are surfaced via tx_status:error. +``` + +### Heartbeat extension + +Existing `{type: heartbeat, hardware[], status}` grows: + +```jsonc +{ + "type": "heartbeat", + "hardware": ["mock", "pluto"], + "status": "streaming", // unchanged semantics + "capabilities": ["rx", "tx"], // NEW — derived from tx_enabled + SDR class having init_tx + "tx_enabled": true, // NEW — mirrors config flag + "sessions": { // NEW — optional per-session snapshot + "rx": { "app_id": "app-abc", "state": "streaming" }, + "tx": { "app_id": "app-abc", "state": "transmitting" } + }, + "app_id": "app-abc" // kept for back-compat +} +``` + +--- + +## Part A — `ria-toolkit-oss` (this repo) + +### A1. `agent/ws_client.py` + +Currently the WS client drops server → agent binary (`ws_client.py:77-79`). Add a binary handler alongside the JSON one. + +```python +BinaryHandler = Callable[[bytes], Awaitable[None]] + +async def run( + self, + on_message: MessageHandler, + heartbeat: HeartbeatBuilder, + on_binary: BinaryHandler | None = None, +) -> None: + ... + async for raw in self._ws: + if isinstance(raw, bytes): + if on_binary is not None: + await on_binary(raw) + continue + ... +``` + +Keep the reconnect, heartbeat, and malformed-frame behavior unchanged. + +### A2. `agent/streamer.py` — add TX sessions + +Replace the flat `self._sdr` / `self._app_id` / `self._capture_task` state with a session model: + +```python +@dataclass +class RxSession: + app_id: str + sdr: Any + buffer_size: int + task: asyncio.Task + pending_config: dict + +@dataclass +class TxSession: + app_id: str + sdr: Any + queue: asyncio.Queue[bytes] # bounded, maxsize=8 + task: asyncio.Task # runs _stream_tx in executor + underrun_policy: str + pending_config: dict + bytes_transmitted: int = 0 + started_at: float = 0.0 # for tx_max_duration_s enforcement +``` + +The streamer holds `self._rx: RxSession | None` and `self._tx: TxSession | None`. SDR instances are cached by `(device, identifier)` — when RX and TX name the same device, both sessions share one handle (matters for Pluto FDD). + +**New handlers**: + +- `_handle_tx_start(msg)` — check `cfg.tx_enabled`, validate gain/duration/freq against caps, open/resolve SDR, `sdr.init_tx(...)`, start `_tx_loop`, emit `tx_status: armed`. +- `_handle_tx_stop(msg)` — cancel TX task, `sdr.pause_tx()`, drain queue, release SDR if no RX session on it, emit `tx_status: done`. +- `_handle_tx_configure(msg)` — stash into `self._tx.pending_config`, applied at next buffer boundary (same pattern as RX). +- `on_binary(data)` — if `self._tx`: `await self._tx.queue.put(data)` (awaiting here is the backpressure mechanism). Else: log and drop. + +**TX loop** (runs in an executor thread via `loop.run_in_executor`, like the RX capture loop): + +```python +def _tx_callback(num_samples: int) -> np.ndarray: + # Called by sdr._stream_tx on every buffer boundary. + try: + raw = self._tx_queue_sync.get(timeout=0.1) + except queue.Empty: + return self._underrun_fill(num_samples) # policy-driven + samples = np.frombuffer(raw, dtype=np.float32).view(np.complex64) + if len(samples) < num_samples: + return _pad_zero(samples, num_samples) + return samples[:num_samples] +``` + +Use a thread-safe `queue.Queue` for the TX side (the `asyncio.Queue` lives on the event loop; the executor thread reads from a sibling `queue.Queue` fed by a tiny asyncio→threading adapter). + +**Underrun fills**: +- `"pause"`: signal the main loop to call `sdr.pause_tx()`, emit `tx_status: underrun`, exit the callback. +- `"zero"`: return `np.zeros(num_samples, dtype=np.complex64)`. +- `"repeat"`: return the last good buffer (cached). If no buffer yet: zeros. + +**Cap enforcement** in `_handle_tx_start` (before opening the SDR): + +```python +if not self._cfg.tx_enabled: + return await self._send_error_tx(app_id, "tx disabled on this agent") +if (cap := self._cfg.tx_max_gain_db) is not None and tx_gain > cap: + return await self._send_error_tx(app_id, f"gain {tx_gain} exceeds cap {cap}") +if (cap := self._cfg.tx_max_duration_s) is not None: + # enforced by a watchdog in _tx_loop that calls tx_stop after cap seconds + ... +for (lo, hi) in self._cfg.tx_allowed_freq_ranges or []: + if lo <= tx_center_frequency <= hi: + break +else: + if self._cfg.tx_allowed_freq_ranges: + return await self._send_error_tx(app_id, f"freq {tx_center_frequency} outside allowed ranges") +``` + +### A3. `agent/config.py` + +Extend `AgentConfig`: + +```python +@dataclass +class AgentConfig: + # existing fields… + tx_enabled: bool = False + tx_max_gain_db: float | None = None + tx_max_duration_s: float | None = None + tx_allowed_freq_ranges: list[tuple[float, float]] | None = None +``` + +`save()` preserves existing 0600 perms. + +### A4. `agent/cli.py` + +- `ria-agent register --allow-tx --tx-max-gain-db -10 --tx-max-duration 60` — persist the interlock into config. +- `ria-agent stream --allow-tx` — runtime override (sets `cfg.tx_enabled=True` for the life of the process without writing config). +- `ria-agent detect` unchanged. + +### A5. `agent/hardware.py` + +```python +def heartbeat_payload(status, app_id=None, *, cfg: AgentConfig, sessions: dict | None = None) -> dict: + caps = ["rx"] + if cfg.tx_enabled: + caps.append("tx") + payload = { + "type": "heartbeat", + "hardware": available_devices(), + "status": status, + "capabilities": caps, + "tx_enabled": cfg.tx_enabled, + } + if app_id: + payload["app_id"] = app_id + if sessions: + payload["sessions"] = sessions + return payload +``` + +### A6. SDR layer + +- **Audit**: [`sdr/pluto.py`](../src/ria_toolkit_oss/sdr/pluto.py) `tx_recording` + `_stream_tx` paths already use `_tx_lock` (line 31, 323, 360). Double-check concurrent-with-RX behavior: the `adi.Pluto` Python object is not thread-safe for arbitrary attribute writes, so all `set_tx_*` / `set_rx_*` must go through the shared `_param_lock` (already present at [`sdr/sdr.py:44`](../src/ria_toolkit_oss/sdr/sdr.py#L44)). Verify `rx()` in a loop + `_stream_tx` in another thread don't step on each other. +- **MockSDR** already has `init_tx` + `_stream_tx` (`sdr/mock.py:70-100`). No changes needed for mock-based tests. +- **Other TX-capable drivers** (blade, usrp, hackrf): out of scope for v1; leave their `init_tx` as-is. + +### A7. Tests (`tests/agent/`) + +- `test_streamer_tx.py` — `tx_start` → binary frames → `_stream_tx` callback pulls correct samples → `tx_stop` cleans up. +- `test_tx_safety.py` — cap violations (gain, duration, freq, `tx_enabled=False`) each produce `tx_status: error` and never open the SDR. +- `test_tx_underrun.py` — each policy (`pause`, `zero`, `repeat`) exercised against a fake slow producer. +- `test_full_duplex.py` — one `app_id` sends `start` + `tx_start`; both sessions share one MockSDR; both produce their expected frames; stopping one does not stop the other. +- `test_ws_client_binary.py` — binary frames now reach the binary handler. +- `test_integration_tx.py` — end-to-end against local `websockets` server + MockSDR. + +### A8. Docs + +- Add a TX section to any existing agent protocol doc (or create `docs/agent_tx_protocol.md`). +- Include a regulatory disclaimer: the operator is responsible for transmissions. The agent is an enabler, not a policy layer beyond the interlocks. + +--- + +## Part B — `ria-hub` + +> Paths below are conceptual — confirm against the actual module layout in `ria-hub` before editing. Anchor points reference the RX handoff at [screens_agent_handoff.md §Part B](./screens_agent_handoff.md). + +### B1. `AgentTxSink` (new) + +Mirror of `AgentDataSource`. Location: `controller/app/modules/screens/data_sinks.py` (or wherever output sinks live in `ria-hub`). + +Responsibilities: +- `prepare(radio_config)` — send `tx_start` via Redis pub/sub on `screens:agent:{agent_id}:tx` → WS proxy → agent. +- `write(buffer: np.ndarray | bytes)` — convert to interleaved float32 bytes, send as binary over the WS. Awaits on WS backpressure. +- `configure(partial_radio_config)` — send `tx_configure`. +- `close()` — send `tx_stop`. +- Subscribes to the agent's `tx_status` frames (via the same Redis pub/sub channel used for RX status today) and surfaces state back to the orchestrator. An `error` state aborts the Celery task. + +### B2. Refactor `plutoTXoperator` + +The existing operator presumably calls `radio.tx(...)` against a directly-attached Pluto. Abstract the "output" into an injectable sink: + +```python +class PlutoTxOperator: + def __init__(self, sink: TxSink, ...): + self.sink = sink # AgentTxSink when dataSink.type == "agent", else LocalPlutoTxSink + + def run(self, ...): + self.sink.prepare(self.radio_config) + while not stop: + buf = self._generate_next_buffer() + self.sink.write(buf) + self.sink.close() +``` + +The local path (existing direct-hardware behavior) becomes `LocalPlutoTxSink`, a thin wrapper around the current `radio.tx` calls. No behavior change for existing deployments. + +`build_data_sink()` (to match `build_data_source()` from B1/B6) routes on `dataSink.type`. + +### B3. Manifest schema + +Add `dataSink` alongside `dataSource` in the manifest. New `type: "agent"`: + +```json +{ + "dataSource": { "type": "agent", "device": "pluto", "agent_id": "agent-abc", "params": { "sample_rate": 1000000, "center_frequency": 2450000000, "gain": 40 } }, + "dataSink": { "type": "agent", "device": "pluto", "agent_id": "agent-abc", "params": { "tx_sample_rate": 1000000, "tx_center_frequency": 2450000000, "tx_gain": -20, "underrun_policy": "pause" } } +} +``` + +Update Pydantic models + JSON schema validators in `controller/app/modules/screens/graph_derivation.py` (or equivalent). When `dataSource.agent_id == dataSink.agent_id` and both target `pluto` with the same `identifier`, the agent will naturally share one SDR handle — no special-casing needed on the hub side. + +### B4. WS endpoint extensions + +`/screens/agent/ws` already exists. Add: + +- Support for hub → agent **binary frames** (currently binary is agent → hub only). FastAPI's `WebSocket.send_bytes` works directly; just route binary from the Redis pub/sub channel through to the WS. +- New Redis pub/sub channel `screens:agent:{agent_id}:tx` for outbound TX control JSON + a separate `screens:agent:{agent_id}:tx_bin` for outbound binary. (Two channels because many Redis brokers don't love mixing binary into text-keyed channels; if your deployment uses Redis 6+ with `SUBSCRIBE` that handles bytes, one channel is fine.) + +### B5. Celery wiring + +When `dataSink.type == "agent"`, the Celery task that runs the TX-containing graph uses `AgentTxSink` instead of a local sink. The operator code (`plutoTXoperator`) is unchanged because the sink abstraction hides the difference. + +Full-duplex: a single task with both `dataSource.type == "agent"` and `dataSink.type == "agent"` pointing at the same agent spawns both the RX consumer loop (existing `AgentDataSource.next_chunk` via BLPOP) and the TX producer loop (`AgentTxSink.write`). Both sides are wired up before any capture frames are sent. + +### B6. Capability gating + +Before any control path sends `tx_start`: + +```python +agent = get_agent(agent_id) +if agent.last_heartbeat.age > 60: # stale + raise HTTPException(503, "agent not responding") +if "tx" not in agent.last_heartbeat.capabilities: + raise HTTPException(400, "agent has not opted in to transmission (tx_enabled=false)") +``` + +Surface clear errors to the Screens UI so the user knows it's an agent config issue, not an app config issue. + +### B7. Audit log + +New MongoDB collection `agent_tx_audit`: + +``` +{ + agent_id, app_id, user_id, + center_frequency_hz, tx_gain_db, duration_s, num_samples, + started_at, ended_at, terminal_status, // "done" | "error" | "underrun" | "cancelled" + error_message? +} +``` + +Write on every `tx_start`. Update on terminal `tx_status`. Index on `{agent_id, started_at}` for admin-view queries. + +### B8. Registration — no change needed + +`POST /screens/agents/register` and `~/.ria/agent.json` already cover credential storage. The TX interlock (`tx_enabled`, caps) is written by the *agent operator* via `ria-agent register --allow-tx`; the hub only reads the heartbeat to learn whether an agent will accept TX. + +--- + +## Part C — Screens (Vue 3 frontend) + +### C1. App composer + +- **Agent picker** (exists from RX work) grows a "TX capable" filter toggle; hides agents whose heartbeat `capabilities` lacks `"tx"`. +- When the graph contains `plutoTXoperator` (or any future TX operator): + - Render a **dataSink** section mirroring dataSource. + - Fields: device, agent_id, identifier, tx_sample_rate, tx_center_frequency, tx_gain, underrun_policy. + - Validation: tx_center_frequency within radio band; tx_gain within agent-advertised max (read from heartbeat when available). + +### C2. Run-time UI + +- **Consent modal** on "Start" for any app whose manifest contains a `dataSink.type: "agent"`: + > "This app will transmit on **2.450 GHz** at **-20 dB** through agent **lab-pluto-01**. I confirm this transmission is permitted under my local radio regulations." + Required checkbox, cannot be remembered across apps. +- **TX status indicator** in the running-app view: shows `armed` / `transmitting` / `underrun` state from `tx_status` frames. Red banner on `underrun` or `error`. +- **Stop TX button** always visible during transmission; fires `tx_stop` immediately. Separate from "Stop app" (which also stops RX). + +### C3. Admin view + +Extend the agents list from B8 of the RX handoff: + +- Column: **TX**: `enabled` / `disabled` / `in-use by app X`. +- Agent detail page: show `tx_max_gain_db`, `tx_max_duration_s`, `tx_allowed_freq_ranges`, and the last 10 rows from `agent_tx_audit` filtered to this agent. + +--- + +## Rollout order + +1. **Part A §A1-A3, A7** — agent-side TX session + binary ingress + safety, all behind `--allow-tx`. Mock-based tests. Shippable standalone; no consumer yet. +2. **Part B §B1-B5** — hub sink + manifest + WS extension + Celery wiring. End-to-end test: Screens app with `plutoTXoperator` + agent sink → real Pluto in the lab → verify carrier on a spectrum analyzer. +3. **Part B §B6-B7** — capability gating + audit log. Blocks general release, not lab use. +4. **Part C §C1** — composer UI for TX apps. +5. **Part C §C2-C3** — consent modal + admin view. Gate for first non-internal user. + +Parts A + B can land on parallel branches and meet at step 2's integration test. Part C can start in parallel with B once the manifest shape in B3 is stable. + +## Test matrix (integration) + +| Scenario | Expected | +|---|---| +| App with RX only, agent connected | RX as today (regression guard) | +| App with TX only, agent `tx_enabled=True` | TX starts, underrun → pause, stop cleans up | +| App with RX + TX same agent, same device | One Pluto handle serves both; independent gains/frequencies | +| App with TX, agent `tx_enabled=False` | Hub rejects at gate with 400; no WS traffic generated | +| App with TX, gain exceeds agent cap | `tx_status: error`; SDR never opened | +| Hub stops sending TX buffers mid-stream | `underrun` emitted after queue drains; agent paused cleanly | +| WS drops during TX | Agent cancels TX task, pauses hardware, reconnects; hub must re-issue `tx_start` | +| Agent process killed during TX | Hardware stops (existing `close()` already handles this; verify `_tx_lock` released) | + +## Open questions + +- **Waveform source**: is `plutoTXoperator` a real-time generator emitting on a clock, or does it synthesize a fixed recording and loop? If the latter, worth exposing a "bulk + loop" fast-path — hub sends the buffer once, agent loops it via existing `tx_recording`. Same protocol (`tx_start` + one buffer + `loop: true`), much less WS traffic. +- **Multi-app-per-agent**: out of scope for v1 (§Non-goals). When needed: add a session id to binary frames (4-byte prefix: magic + stream_id + reserved), bump a `protocol_version` in the heartbeat. +- **Streaming TX clock drift**: if hub and agent sample clocks drift, repeating zeros on underrun is audible/visible in the spectrum. Longer term: agent-side resampling or PLL, both expensive. v1: rely on generous queue depth + stable local networks. +- **Other TX-capable SDRs**: HackRF, USRP, bladeRF. The `_CONFIG_ATTR_MAP` in [`agent/streamer.py:169-175`](../src/ria_toolkit_oss/agent/streamer.py#L169-L175) will need per-driver entries when those come online. + +## Regulatory note + +Transmission is regulated in every jurisdiction. The agent-side interlocks (`tx_enabled`, caps, freq ranges) exist so the operator can configure safe defaults for an agent's physical location. They are not a substitute for licensing or for respecting local regulations. The hub's consent modal and audit log exist so actions are attributable. None of this is a legal compliance layer — it's a defense-in-depth mechanism. diff --git a/docs/agent_tx_protocol.md b/docs/agent_tx_protocol.md new file mode 100644 index 0000000..a785d4e --- /dev/null +++ b/docs/agent_tx_protocol.md @@ -0,0 +1,185 @@ +# Agent TX Protocol + +Operator-facing reference for the TX streaming extensions to the agent +WebSocket protocol. Implementation plan: [agent_tx_implementation_plan.md](./agent_tx_implementation_plan.md). +Cross-repo design: [agent_tx_plan.md](./agent_tx_plan.md). + +> **Regulatory note.** Transmission is regulated in every jurisdiction. The +> agent-side interlocks documented below let you configure safe defaults +> for your deployment. They do not replace licensing or responsibility +> for your own emissions. The RIA Hub's consent modal and audit log make +> actions attributable — they are not a legal-compliance layer. + +## Opt-in + +TX is **disabled by default**. The hub cannot make the agent transmit unless +the operator has explicitly opted in on the agent host. + +Two equivalent opt-in paths: + +```bash +# Persist to ~/.ria/agent.json so the agent always allows TX. +ria-agent register --hub http://HUB:3005 --api-key KEY \ + --allow-tx \ + --tx-max-gain-db -10 \ + --tx-max-duration-s 60 \ + --tx-freq-range 2.4e9 2.5e9 \ + --tx-freq-range 5.7e9 5.8e9 +``` + +```bash +# Runtime-only override (does not touch disk). +ria-agent stream --allow-tx +``` + +Caps: + +| Flag | Config key | Effect | +|---|---|---| +| `--tx-max-gain-db VALUE` | `tx_max_gain_db` | Reject any `tx_start` whose `tx_gain > VALUE` | +| `--tx-max-duration-s VALUE` | `tx_max_duration_s` | Auto-stop any TX session after `VALUE` seconds (watchdog in the TX loop) | +| `--tx-freq-range LO HI` (repeatable) | `tx_allowed_freq_ranges` | Reject any `tx_start` whose `tx_center_frequency` falls outside all configured ranges | + +The agent enforces each cap **before** opening the SDR. A violating +`tx_start` produces a `tx_status: error` frame and never touches hardware. + +## Heartbeat advertisement + +Every heartbeat now includes: + +```jsonc +{ + "type": "heartbeat", + "hardware": ["mock", "pluto"], + "status": "streaming", + "capabilities": ["rx", "tx"], // "tx" present only when tx_enabled=True + "tx_enabled": true, + "sessions": { // omitted when no session is live + "rx": { "app_id": "app-1", "state": "streaming" }, + "tx": { "app_id": "app-1", "state": "transmitting" } + } +} +``` + +Hubs should read `capabilities` to decide whether to surface TX operators +against this agent in the Screens app composer. + +## Control messages + +### Hub → agent (JSON) + +```jsonc +// Arm the TX side. Agent validates interlocks, opens/resolves the SDR, +// and transitions into "armed". The next binary frames are consumed as +// TX IQ buffers. +{ + "type": "tx_start", + "app_id": "app-1", + "radio_config": { + "device": "pluto", + "identifier": "ip:192.168.3.1", + "tx_sample_rate": 1000000, + "tx_center_frequency": 2450000000, + "tx_gain": -20, // dB; Pluto uses negative attenuation + "tx_bandwidth": 1000000, // optional + "buffer_size": 1024, + "underrun_policy": "pause" // "pause" (default) | "zero" | "repeat" + } +} + +// Update parameters at the next buffer boundary. No re-arm needed. +{ "type": "tx_configure", "app_id": "app-1", + "radio_config": { "tx_gain": -25 } } + +// Stop TX, drain the inbound queue, pause_tx, release the SDR (if no RX +// session is still using it). A new tx_start can follow immediately. +{ "type": "tx_stop", "app_id": "app-1" } +``` + +### Hub → agent (binary) + +- Raw interleaved float32 IQ, normalised to `[-1, 1]`. +- One WebSocket frame = one buffer = `buffer_size` complex samples = + `buffer_size * 2 * 4` bytes. +- Accepted only while a TX session is live. Frames outside that window + are logged and dropped. +- Malformed frames (odd float count, wrong size) trigger one underrun + cycle but do not crash the stream. + +### Agent → hub (JSON) + +```jsonc +{ "type": "tx_status", "app_id": "app-1", "state": "armed" } +{ "type": "tx_status", "app_id": "app-1", "state": "transmitting" } +{ "type": "tx_status", "app_id": "app-1", "state": "underrun" } +{ "type": "tx_status", "app_id": "app-1", "state": "done" } +{ "type": "tx_status", "app_id": "app-1", "state": "error", + "message": "tx_gain -5 exceeds cap -15.0" } +``` + +Transitions: + +``` + tx_start tx_stop + —————————————————▶ armed ▶ transmitting ——————————▶ done + │ │ + │ │ queue empties + policy="pause" + │ ▼ + │ underrun ▶ done (auto-teardown) + │ + └─ interlock / init failure ▶ error (no session) +``` + +## Underrun policies + +When the inbound TX queue is empty at a buffer boundary: + +| Policy | Behavior | +|---|---| +| `pause` *(default)* | Callback returns silence, calls `pause_tx()`, flips the session into `underrun`. Watchdog emits `tx_status: underrun` + `tx_status: done` and tears down. Hub must re-issue `tx_start` to resume. | +| `zero` | Callback returns a zero-filled buffer. Session stays alive; no status change. Carrier continues with dead air. | +| `repeat` | Callback returns the most recently transmitted buffer. If no buffer has arrived yet, falls back to zero for that cycle. | + +Choose `pause` for correctness-sensitive workloads (any data modulation +where zero-fill or repeat corrupts the stream). Choose `zero` or `repeat` +for continuous-carrier use cases where brief stalls are acceptable. + +## Concurrent RX + TX + +A single `app_id` may hold both an RX session (`start`/`stop`) and a TX +session (`tx_start`/`tx_stop`) on the same agent at the same time. When +both reference the same `(device, identifier)`, the agent shares a single +driver instance between the two sessions (ref-counted release on stop). + +Multi-app sharing of one SDR is not supported in v1. A second `tx_start` +with a different `app_id` while another TX session is live produces +`tx_status: error "tx already active on this agent"`. + +## Buffer format recap + +- **Direction** is the only framing: hub → agent binary means TX, + agent → hub binary means RX. +- **Layout**: `[I0, Q0, I1, Q1, …]` as little-endian float32. +- **Size**: `buffer_size * 2 * 4` bytes. Mismatched sizes are treated as + a single-cycle underrun (malformed frame). +- **Range**: samples must lie in `[-1, 1]`. Out-of-range values are + transmitted as-is; the SDR driver may clip. + +## Configuration reference + +`~/.ria/agent.json` is written by `ria-agent register` and read by +`ria-agent stream`. Minimum schema with TX: + +```json +{ + "hub_url": "https://hub.example.com", + "agent_id": "agent-abc123", + "token": "rha_...", + "tx_enabled": true, + "tx_max_gain_db": -10.0, + "tx_max_duration_s": 60, + "tx_allowed_freq_ranges": [[2.4e9, 2.5e9], [5.7e9, 5.8e9]] +} +``` + +File permissions are enforced to `0600` by `save()`. diff --git a/docs/per_user_registration_keys_plan.md b/docs/per_user_registration_keys_plan.md new file mode 100644 index 0000000..663bbcc --- /dev/null +++ b/docs/per_user_registration_keys_plan.md @@ -0,0 +1,232 @@ +--- +name: Per-user agent registration keys +description: Replace the shared [wac] API_KEY with per-user registration keys issued from the RIA Agents page on RIA Hub. +type: plan +--- + +# Per-user agent registration keys — plan + +**Status:** design only; nothing implemented. +**Owner (toolkit side):** `ria-toolkit-oss` +**Owner (hub side):** `ria-hub` / `controller` +**Related:** [screens_agent_handoff.md](./screens_agent_handoff.md), [agent_tx_protocol.md](./agent_tx_protocol.md) + +--- + +## Context (current state) + +Today, `ria-agent register` calls `POST {hub_url}/screens/agents/register` with +an `X-API-Key` header ([cli.py:41-64](../src/ria_toolkit_oss/agent/cli.py#L41-L64)). +The hub validates that header against a single shared secret — `[wac] API_KEY` +in the hub's `app.ini` ([legacy_executor.py:821-822](../src/ria_toolkit_oss/agent/legacy_executor.py#L821-L822)). +The hub responds with `{agent_id, token}`; the agent persists both to +`~/.ria/agent.json` and uses `token` as the bearer on the WS handshake +afterwards. + +Consequences of the shared secret: + +- Every agent operator holds the same key → no per-user attribution in logs. +- Revoking one operator forces a rotation across every deployed agent. +- Key-in-CLI-history leaks escalate to the whole fleet. +- Nothing ties a registered agent to a human in the hub's user table. + +## Goal + +A user signs into `riahub.ai`, opens an **RIA Agents** page, mints a key, and +uses it once with `ria-agent register`. The resulting agent is owned by that +user; the key can be revoked without affecting anyone else's agents. + +The agent-side `token` returned by `/screens/agents/register` keeps its current +role (bearer for the WS handshake). Only the *registration* credential +changes. + +--- + +## User flow + +1. User signs into `https://riahub.ai`. +2. User navigates to **Settings → RIA Agents** (or a top-level `/agents` + page — see open question O1). +3. User clicks **Generate registration key**. A modal shows the key **once**, + with copy-to-clipboard. Only a prefix + hash is stored server-side. +4. User runs, on the agent host: + ``` + ria-agent register --hub https://riahub.ai --api-key ria_reg_<...> + ``` +5. Hub validates the key, creates an agent row owned by the user, marks the + key as `consumed` (one-shot) or bumps `last_used_at` (multi-use — see O2), + and returns `{agent_id, token}` exactly as today. +6. The agent list on the same page shows the new agent's `name`, `hardware[]`, + `last_heartbeat`, and **Revoke** / **Rename** actions. + +--- + +## Scope split + +### Toolkit (`ria-toolkit-oss`) + +The CLI already sends `X-API-Key`, so no protocol change is required. Two +small quality-of-life changes: + +| # | Change | File | +|---|--------|------| +| T1 | Update `--api-key` help text and [cli.py:8 docstring](../src/ria_toolkit_oss/agent/cli.py#L8) to say "personal registration key from the RIA Agents page" rather than "Hub API key". | [agent/cli.py](../src/ria_toolkit_oss/agent/cli.py) | +| T2 | On registration failure, if the response body is JSON with a `reason` field (`invalid_key` / `expired` / `already_consumed` / `revoked`), surface it verbatim instead of the raw `HTTPError`. Makes user-facing errors actionable. | [agent/cli.py:56-61](../src/ria_toolkit_oss/agent/cli.py#L56-L61) | + +No change to `config.py`, `ws_client.py`, or the streamer — the `token` +returned by register is still what authenticates the WS connection. + +### Hub (`ria-hub` / `controller`) + +Paths below are inferred from [screens_agent_handoff.md](./screens_agent_handoff.md) +(`controller/app/modules/...`). Hub team should sanity-check before starting. + +#### Prior art — check RIA Conductor first + +The RIA Conductor feature is believed to already implement similar key +generation (likely for authenticating conductors to the hub). **Before +building anything in this section, read the Conductor key code** and decide +whether to: + +- **Reuse** it as-is (shared `registration_keys` table, `kind` column + discriminating `conductor` vs. `agent`) — preferred if the shapes line up. +- **Extract** the hashing / minting / revoke primitives into a shared + `registration_keys` module that both features depend on. +- **Fork** a parallel `agent_registration_keys` table — only if the + Conductor model is materially different (e.g. per-org scoping, different + lifetime rules) and forcing a merge would distort one or both features. + +Whichever path is chosen should be decided up front and noted on the PR, so +we don't end up with two near-identical key subsystems by accident. The +security notes below (argon2id, one-time reveal, rate limits, audit logging) +apply regardless of which path is taken — confirm Conductor already does +these; if not, the fix belongs in the shared code, not this feature. + +#### Data model + +New collection (Mongo) or table (if Postgres is used for users): + +``` +registration_keys + _id + user_id # FK to hub users + name # user-supplied label, e.g. "lab laptop" + key_prefix # first 8 chars of the plaintext, for UI display + key_hash # argon2id or bcrypt of the full plaintext + created_at + expires_at # optional; null = no expiry + consumed_at # null until first successful registration (if one-shot) + revoked_at # null unless explicitly revoked + last_used_at # updated on every successful use (if multi-use) +``` + +Augment the existing agents collection with `owner_user_id` (FK) and +`registered_via_key_id` (FK to `registration_keys._id`). + +Decide O2 before building: one-shot vs. reusable. Recommendation: one-shot by +default with an optional "reusable for N days" toggle, since one-shot is the +lower-blast-radius default and matches how GitHub/Gitea deploy keys behave. + +#### Endpoints + +| # | Endpoint | Notes | +|---|----------|-------| +| H1 | `POST /api/v1/user/registration-keys` | Auth: session cookie. Body: `{name, expires_in_days?, reusable?}`. Returns plaintext key **once**. | +| H2 | `GET /api/v1/user/registration-keys` | Auth: session cookie. Lists the caller's keys (prefix + metadata, never plaintext). | +| H3 | `DELETE /api/v1/user/registration-keys/{id}` | Auth: session cookie. Revokes. | +| H4 | `POST /screens/agents/register` (existing) | Change auth: look up `X-API-Key` by hash instead of string-compare against `[wac] API_KEY`. Reject if revoked / expired / consumed. Set `owner_user_id` on the new agent row. | +| H5 | `GET /api/v1/user/agents` | Auth: session cookie. Lists the caller's agents for the UI. | +| H6 | `DELETE /api/v1/user/agents/{id}` | Auth: session cookie. De-registers and closes any live WS. | + +H4 is the only backwards-incompatible change. See the migration section for +how to ship it without breaking existing deployments. + +#### Frontend + +New page — **Settings → RIA Agents** — two panels: + +- **Registration keys:** table (name, prefix, created, expires, last used, + revoke button) + "Generate" button that opens the one-time-reveal modal. +- **Agents:** table (name, hardware, status, last heartbeat, rename, revoke). + +Matches the existing Gitea-style Settings sidebar if RIA Hub is Gitea-based +(O3). + +--- + +## Migration from the shared `[wac] API_KEY` + +The shared key is likely in use on every existing deployment. To avoid a +flag day: + +1. **Dual-accept window.** H4 accepts *either* a per-user key (lookup by + hash) *or* the legacy `[wac] API_KEY` string. When the legacy key is used, + the resulting agent has `owner_user_id = null` and a warning is logged. +2. **Admin UI surfaces "unowned" agents** so an admin can re-assign them or + ask owners to re-register. +3. **Deprecation window of one release**, then H4 rejects the legacy key and + the `[wac] API_KEY` config is removed from `app.ini`. + +No toolkit-side migration needed — existing `~/.ria/agent.json` files already +store the post-registration `token`, which keeps working regardless of how +registration itself was authenticated. + +--- + +## Security notes + +- Store `key_hash` with a password hash (argon2id), not a fast hash. The key + is a secret-equivalent: treat it like a password. +- Plaintext key format: `ria_reg_`. Prefix makes + the purpose obvious in leaked logs and lets scanners (trufflehog etc.) + recognize it. +- One-time reveal in the UI — never persist or re-display the plaintext. +- Rate-limit H4 per source IP and per `key_prefix` to blunt brute-force on + leaked prefixes. Lock a key out after N failed attempts in M minutes. +- Log every H4 call (success + failure, with key prefix and source IP) + to the audit trail. + +--- + +## Open questions + +- **O1.** Where does the page live? A top-level `/agents` route is + discoverable; `/user/settings/agents` matches Gitea's existing IA. Pick + before F7 (frontend task). +- **O2.** One-shot vs. reusable keys (default and whether both are offered). + Recommendation above; needs product sign-off. +- **O3.** Is RIA Hub's web UI really a Gitea fork? URL patterns + (`/qoherent/-/packages/...`, `.git` clones) suggest yes, but the "Settings" + integration plan depends on confirming this. If it isn't, F7 is a standalone + page instead. +- **O4.** Does the agent bearer `token` need per-user scoping too, or is + ownership-at-registration enough? Today the token is opaque and not tied + to a user in the WS handler. Probably fine to defer until after per-user + keys ship. +- **O5.** Should admins be able to mint keys on behalf of other users (for + onboarding)? If yes, H1 needs an admin-scoped variant. +- **O6.** Conductor reuse decision — reuse / extract / fork. Must be answered + before any hub-side code lands. See "Prior art" above. + +--- + +## Out of scope + +- SSO / OIDC for agent-to-hub auth (current `token` bearer is kept as-is). +- Per-agent capability scoping beyond what `--allow-tx` already does at + registration time. +- Fleet provisioning (N agents from one key); covered instead by "reusable" + flag in O2 if that's the chosen default. + +--- + +## MVP cut + +If the hub team wants the smallest shippable slice: + +- H1, H2, H3, H4 (with dual-accept), H5. +- Frontend: registration-keys panel only; reuse the existing agents admin + view if one already exists. +- T1 toolkit copy-change. + +Defer H6, rename flows, T2, and audit logging to a follow-up. diff --git a/docs/ria_app_hub_handoff.md b/docs/ria_app_hub_handoff.md new file mode 100644 index 0000000..5aa55c1 --- /dev/null +++ b/docs/ria_app_hub_handoff.md @@ -0,0 +1,104 @@ +# `ria-app` Hub-Side Handoff + +**Repo:** `ria-hub` +**Goal:** Make containerized apps built by Application Composer self-describing so the new `ria-app` CLI in `ria-toolkit-oss` can auto-configure GPU/USB/network flags at `docker run` time. No user copy-paste of flags. + +--- + +## Context — what exists today + +In `ria-toolkit-oss` (branch `screens-connection`) there is now a `ria-app` CLI: + +```bash +ria-app configure --registry registry.riahub.ai --namespace qoherent +ria-app pull [:tag] +ria-app run [:tag] [--config config.yaml] +ria-app list +ria-app logs [-f] +ria-app stop +``` + +`ria-app run` inspects OCI image labels and auto-adds runtime flags: + +| Label | Value (example) | Effect | +|---|---|---| +| `ria.profile` | `native-x86`, `nvidia-x86`, `holoscan` | `nvidia`/`holoscan`/`cuda` → adds `--gpus all` | +| `ria.hardware` | comma list: `pluto,usrp,rtlsdr,hackrf,bladerf,thinkrf` | USB-attached SDRs → `--device /dev/bus/usb`; networked SDRs → `--net host` | +| `ria.app` | `` | Used by `ria-app list` to filter images | +| `ria.version` | `` | Informational | + +If the labels are missing, `ria-app run` still works but can't auto-configure — the user has to pass `--docker-args ...` themselves. So the value here is entirely in getting CI to stamp the labels. + +--- + +## What to change in `ria-hub` + +### 1. Stamp OCI labels on every built image + +In the Application Composer build flow (follow the path from `application_composer.go:172` `ComposerBuildTrigger` → generated `.riahub/workflows/*.yml` → `sample-build-tools` `full_generator.py` → `Dockerfile` emission), add `LABEL` instructions to the generated Dockerfile. The values should be computed from the app JSON the user submitted, not hard-coded: + +```dockerfile +LABEL ria.app="${APP_NAME}" +LABEL ria.profile="${PROFILE}" # native-x86 | nvidia-x86 | holoscan | ... +LABEL ria.hardware="${HARDWARE_CSV}" # e.g. "pluto,usrp" (empty string if none) +LABEL ria.version="${GIT_SHA}" +LABEL ria.operators="${OPERATORS_CSV}" # optional, nice for debugging +``` + +`HARDWARE_CSV` derivation: walk the operator graph in the submitted app JSON and collect the set of hardware backends that any operator requires. The mapping from operator → hardware tag should live next to the existing `operator_generator.py` apt-dep resolution (that code already knows, per operator, whether it needs `libuhd-dev`, `libad9361-dev`, `libhackrf-dev`, `librtlsdr-dev`, etc.). Reuse that table — just emit the short tag (`usrp`, `pluto`, `hackrf`, `rtlsdr`) alongside the apt package name. + +Allowed hardware tags (must match what `ria-app` recognizes): + +- `pluto`, `rtlsdr`, `hackrf`, `bladerf` → USB +- `usrp`, `thinkrf` → network +- (extend here when new SDR backends are added) + +If an operator needs both (e.g. Pluto over USB *and* its iio network endpoint), list it once — `ria-app` already applies both USB and host-net when `pluto` appears. + +### 2. Prefer `LABEL` over `ARG`-only + +The CI job likely already passes things like `APP_NAME` and `GIT_SHA` as build args. Those args disappear after build unless promoted to `LABEL`. Make sure each of the five labels above ends up in the final image layer (verify with `docker image inspect --format '{{json .Config.Labels}}' `). + +### 3. Push with both `:` and `:latest` tags + +`ria-app` defaults to `:latest` when the user omits a tag. If CI only pushes immutable SHA tags today, also push `:latest` on main-branch builds so `ria-app run my-classifier` Just Works. + +### 4. (Optional but recommended) App index endpoint + +Add `GET /apps` to the hub API returning something like: + +```json +[ + { + "name": "my-classifier", + "image": "registry.riahub.ai/qoherent/my-classifier:latest", + "profile": "nvidia-x86", + "hardware": ["pluto"], + "updated_at": "2026-04-14T10:00:00Z" + } +] +``` + +This lets `ria-app list --remote` show available apps without the user knowing image names. Not required for MVP — skip if it adds scope. + +### 5. (Optional) Ship a default `config.yaml` inside the image at a known path + +`ria-app run --config ` mounts the user's config to `/config/config.yaml` and sets `RIA_CONFIG=/config/config.yaml`. The runtime already falls back to an embedded config per your handoff notes, so this just needs to keep working — no change unless you want to standardize the embedded path. + +--- + +## Acceptance checklist + +- [ ] A Composer-built image for a native-x86 app with a Pluto operator has labels: `ria.profile=native-x86`, `ria.hardware=pluto`, `ria.app=`, `ria.version=`. +- [ ] A Composer-built image for an nvidia-x86 app has `ria.profile=nvidia-x86`. +- [ ] `docker image inspect --format '{{json .Config.Labels}}' ` shows all five labels. +- [ ] `:latest` tag is pushed for main-branch builds. +- [ ] Running `ria-app run ` on a user's machine starts the container with the right `--gpus` / `--device` / `--net` flags without the user passing anything beyond the app name. + +--- + +## Out of scope + +- Anything on the `ria-toolkit-oss` side — the CLI is already implemented on branch `screens-connection`. +- Changes to the generated C++ code, CMakeLists, or runtime config lookup. +- Artifact downloads — we're distributing via the container registry only. diff --git a/docs/screens_agent_handoff.md b/docs/screens_agent_handoff.md new file mode 100644 index 0000000..8a5e58b --- /dev/null +++ b/docs/screens_agent_handoff.md @@ -0,0 +1,257 @@ +# Screens Agent Streamer — Hand-off + +**Branch:** `screens-connection` in `ria-toolkit-oss` +**Status:** Part A complete (tests passing, 25/25). Part B is pending in the `ria-hub` repo. +**Related docs:** [screens_agent_streamer_plan.md](./screens_agent_streamer_plan.md), [../screens_connection_updates.md](../screens_connection_updates.md) + +--- + +## What's done (Part A — this repo) + +### Phase 1 — SDR foundation + +- Added `ria_toolkit_oss.sdr.detect_available() -> dict[str, type]` that probes + every driver module and returns the map of importable driver classes. + Importability is used as a proxy for "user has installed this driver's + optional dep"; it does **not** probe for physical hardware. +- Added `SdrDisconnectedError` (subclass of `SDRError`) plus a + `translate_disconnect(exc)` helper in [sdr/sdr.py](../src/ria_toolkit_oss/sdr/sdr.py). + Pattern-matches USB/device-drop exceptions (`ENODEV`/`EIO`, "no such device", + "broken pipe", etc.) and converts them so the streamer can distinguish a + real hardware failure from a transient error. +- Audited every driver under `sdr/` for GUI imports — none found. All drivers + are headless-clean at import time. +- `Pluto.rx(num_samples)` now wraps `self.radio.rx()` with + `translate_disconnect`. The same one-liner pattern can be applied to other + drivers (hackrf/rtlsdr/usrp/blade/thinkrf) when they get wired to the + streamer — deferred until each is needed. + +**Not done (Phase 1 Task 4):** SigMF recording validation across radio types — +needs real captures from each device, out of scope without hardware. + +### Phase 2 — Agent package restructure + +Moved the former `src/ria_toolkit_oss/agent.py` into a package: + +``` +src/ria_toolkit_oss/agent/ +├── __init__.py # re-exports NodeAgent + main for back-compat +├── legacy_executor.py # former agent.py, unchanged behavior +├── streamer.py # new WebSocket IQ streamer +├── ws_client.py # persistent WS client + heartbeat + reconnect +├── hardware.py # wraps sdr.detect_available() for heartbeat payloads +├── config.py # ~/.ria/agent.json load/save (0600 perms) +└── cli.py # unified CLI (run / stream / detect / register) +``` + +Back-compat preserved: `from ria_toolkit_oss.agent import NodeAgent` still +works, and bare `ria-agent ...` with no subcommand (or with legacy flags +like `--hub`) falls through to the original long-poll executor. + +### Phase 3 — Streamer implementation + +- `ws_client.WsClient` — `async run()` loop, JSON heartbeats on a timer, + auto-reconnect on drop, bearer-token auth header on connect. +- `streamer.Streamer` — handles `start` / `stop` / `configure` messages, + opens the SDR via an injectable `sdr_factory`, runs capture in an executor + thread, and ships raw interleaved float32 IQ bytes per `rx()` call. +- `hardware.heartbeat_payload(status, app_id)` — `{type, hardware[], status}`. +- `config.AgentConfig` — dataclass round-tripped through JSON, unknown keys + preserved in an `extra` dict. +- CLI subcommands: `run` (legacy), `stream`, `detect`, `register`. + +### Phase 4 — Protocol + +Matches `screens_connection_updates.md` §"WebSocket Protocol" exactly: + +| Direction | Message | +|-----------|---------| +| A → S (JSON) | `{type: heartbeat, hardware[], status}` | +| A → S (JSON) | `{type: status, status, app_id}` | +| A → S (JSON) | `{type: error, app_id, message}` | +| A → S (binary) | raw interleaved float32 IQ, one frame per `rx()` | +| S → A (JSON) | `{type: start, app_id, radio_config}` | +| S → A (JSON) | `{type: stop, app_id}` | +| S → A (JSON) | `{type: configure, app_id, radio_config}` — applied at next boundary | + +**Auth decision:** bearer token in `Authorization` header on the initial +handshake. (Open questions in the plan around first-frame auth / mid-buffer +`configure` / backpressure remain open.) + +### Phase 5 — Tests + +25 tests, all green under `poetry run pytest tests/agent/`: + +- `test_hardware.py` — `detect_available`, heartbeat payload shape +- `test_config.py` — round-trip, missing-file fallback, extra-key preservation +- `test_streamer.py` — start/stream/stop against `MockSDR` + fake WS, error + frames, configure queueing +- `test_disconnect.py` — `translate_disconnect` patterns + streamer reports + `SDR disconnected:` and closes the SDR +- `test_ws_client.py` — real local `websockets` server: heartbeat on connect, + auto-reconnect after server drop, malformed-frame resilience +- `test_integration.py` — end-to-end heartbeat → start → 3 binary IQ frames → stop +- `test_legacy.py` — regression: `NodeAgent` still importable + +### Dependency / build changes + +- Added `websockets (>=12.0,<14.0)` to `[tool.poetry.group.agent.dependencies]`. +- Repointed the `ria-agent` console script from `ria_toolkit_oss.agent:main` + to `ria_toolkit_oss.agent.cli:main` (the unified CLI, which still calls + through to the legacy `main` for back-compat). + +### Files changed + +``` +M pyproject.toml +M poetry.lock +M src/ria_toolkit_oss/sdr/__init__.py +M src/ria_toolkit_oss/sdr/sdr.py +M src/ria_toolkit_oss/sdr/pluto.py +R src/ria_toolkit_oss/agent.py -> src/ria_toolkit_oss/agent/legacy_executor.py +A src/ria_toolkit_oss/agent/{__init__,cli,config,hardware,streamer,ws_client}.py +A tests/agent/{__init__,test_config,test_disconnect,test_hardware, + test_integration,test_legacy,test_streamer,test_ws_client}.py +A docs/screens_agent_streamer_plan.md +A docs/screens_agent_handoff.md (this file) +``` + +--- + +## What's left (Part B — `ria-hub` repo) + +In priority order. B1 + B2 + B3 + B6 are the MVP; everything else hardens. + +### Server-side (MVP) + +| # | Task | File / area | +|---|------|-------------| +| B1 | Implement `AgentDataSource` (DataSource ABC, reads IQ from WS connection) and register in `build_data_source()` | `controller/app/modules/screens/data_sources.py` | +| B2 | Add `"agent"` to `dataSource.type` enum in manifest schema; update Pydantic / JSON schema validators | `controller/app/modules/screens/graph_derivation.py` + schema files | +| B3 | Agent WebSocket endpoint `POST /api/agent/ws` (or `GET` with Upgrade) — accepts agent connections, auths on bearer token, bridges the connection to the Celery task's `AgentDataSource` | new `controller/app/modules/agent/routes.py` | +| B6 | Celery wiring: when `dataSource.type == "agent"`, look up the connected agent by `agent_id`, forward `radio_config` as a `start` message, and feed received IQ bytes into the inference loop via `AgentDataSource.next_chunk()` | `controller/app/modules/screens/tasks.py` | + +### Server-side (hardening) + +| # | Task | +|---|------| +| B4 | Agent registry — MongoDB collection: `agent_id`, `hardware[]`, `last_heartbeat`, `online`, registration tokens | +| B5 | `POST /api/agent/register` returning `{agent_id, token}` for `~/.ria/agent.json` | + +### Frontend + +| # | Task | +|---|------| +| B7 | Device / agent picker in the Screens app config (Vue 3) | +| B8 | Agents list / status admin view | + +### Protocol contract (keep in lockstep with Part A) + +- Binary frames are interleaved float32 IQ, one frame per `radio.rx()` call. +- `radio_config` is forwarded verbatim from manifest `dataSource.params`. + Minimum keys the agent handles: `device`, `identifier`, `sample_rate`, + `center_frequency`, `gain`, `buffer_size`. +- `configure` messages from the server apply at the next capture boundary + (current agent implementation). +- Agent authenticates with bearer token in `Authorization` header on the + handshake. + +### Still-open protocol questions (pin down before B3 lands) + +- Auth frame as fallback when proxies strip `Authorization`? +- Mid-buffer `configure` application for tighter retune latency? +- Backpressure policy when the server's inference loop is slower than the + agent's `rx()` cadence — drop, queue, or pause the agent via a `pause` + control frame? + +### Manifest example (new `agent` mode) + +```json +{ + "dataSource": { + "type": "agent", + "device": "pluto", + "agent_id": "agent-abc123", + "params": { + "identifier": "ip:192.168.3.1", + "sample_rate": 1000000, + "center_frequency": 2450000000, + "gain": 40, + "buffer_size": 1024 + } + }, + "preprocess": "magnitude_phase_window_stats", + "config": {"inference": {"knownDevices": [], "interval": 1}} +} +``` + +### Pipeline invariant + +**No changes to `inference_core.py`, `preprocessors.py`, or +`run_onnx_chain_loop()`.** `AgentDataSource` is a drop-in replacement for +`SdrDataSource`; everything downstream is unchanged. + +--- + +## Local test setup + +### 1. Branch layout + +- `ria-toolkit-oss` — branch **`screens-connection`** (this work). +- `ria-hub` — whatever branch you'll be doing Part B on (create a matching + `screens-connection` branch there to keep the hand-off clean). + +### 2. Install the toolkit branch into your local RIA Hub + +The hub declares `ria-toolkit-oss` as a git dep in its `pyproject.toml`. +Point it at this branch: + +```toml +# ria-hub/pyproject.toml +ria-toolkit-oss = { git = "https://riahub.ai/qoherent/ria-toolkit-oss.git", branch = "screens-connection" } +``` + +Or for a fully local dev loop (edits visible without reinstall): + +```bash +# from ria-hub repo +poetry run pip install -e /home/qrf/ria-toolkit-oss +# plus the agent extra so websockets is available server-side too +poetry run pip install websockets +``` + +### 3. Commit this branch and push (optional, for CI / shared dev) + +```bash +cd /home/qrf/ria-toolkit-oss +git add -A +git commit -m "Add WebSocket IQ streamer agent (Part A)" +git push -u origin screens-connection +``` + +### 4. Try the agent locally against a mock server + +You can exercise the whole Part A stack without RIA Hub by running the +integration test: + +```bash +cd /home/qrf/ria-toolkit-oss +poetry run pytest tests/agent/test_integration.py -v +``` + +Or manually, once Part B lands: + +```bash +# on the user machine (mock hardware is fine) +ria-agent register --url https://localhost:8000 --token +ria-agent detect # lists: mock, pluto, ... whatever's importable +ria-agent stream --url ws://localhost:8000/api/agent/ws +``` + +### 5. End-to-end integration test (after Part B MVP) + +1. Start RIA Hub (FastAPI + Celery + Mongo + Redis) with the Part B branch. +2. Run `ria-agent stream` on your laptop. +3. Create a Screens app with `dataSource.type: "agent"` and `device: "mock"`. +4. Start the app; confirm the agent logs "streaming" and the SSE stream + shows inference metrics flowing. diff --git a/docs/screens_agent_streamer_plan.md b/docs/screens_agent_streamer_plan.md new file mode 100644 index 0000000..c8d4c39 --- /dev/null +++ b/docs/screens_agent_streamer_plan.md @@ -0,0 +1,156 @@ +# Screens Agent Streamer — Implementation Plan + +**Source doc:** [screens_connection_updates.md](../screens_connection_updates.md) +**Created:** 2026-04-13 +**Goal:** Add a thin WebSocket-based IQ streaming agent to `ria-toolkit-oss`, alongside the existing long-poll `NodeAgent`, and wire up the RIA Hub server to consume it. + +--- + +## Architectural Decision + +The existing [src/ria_toolkit_oss/agent.py](../src/ria_toolkit_oss/agent.py) (`NodeAgent`) uses HTTP long-polling and runs ONNX inference locally. It stays as-is. + +The new **streamer agent** described in `screens_connection_updates.md` is a *different* execution mode — thin, WebSocket-based, server-driven inference. It will be added as a new submodule and exposed as a new CLI subcommand. Both modes coexist; users pick one based on deployment needs. + +--- + +## Part A — ria-toolkit-oss (this repo) + +### Phase 1 — SDR foundation + +| # | Task | File(s) | Priority | +|---|------|---------|----------| +| 1 | Add `detect_available() -> dict[str, type]` that probes every driver without importing GUI deps | [src/ria_toolkit_oss/sdr/__init__.py](../src/ria_toolkit_oss/sdr/__init__.py) | P0 | +| 2 | Audit each SDR driver for headless cleanliness (no matplotlib/Qt at import time) | `src/ria_toolkit_oss/sdr/{pluto,hackrf,rtlsdr,usrp,blade,thinkrf}.py` | P1 | +| 3 | Raise typed `SdrDisconnectedError` from `radio.rx()` on USB drop instead of crashing | `src/ria_toolkit_oss/sdr/sdr.py` + drivers | P1 | +| 4 | Validate `load_recording` against SigMF captures from each radio type | `tests/io/` | P2 | + +### Phase 2 — Agent package restructure (non-breaking) + +Promote the existing module to a package and add the streamer next to it: + +``` +src/ria_toolkit_oss/ +├── agent.py # DELETE after move +└── agent/ + ├── __init__.py # re-export NodeAgent for back-compat + ├── legacy_executor.py # former agent.py (NodeAgent, unchanged behavior) + ├── streamer.py # NEW — thin IQ streamer loop + ├── ws_client.py # NEW — persistent WS client + heartbeat + reconnect + ├── hardware.py # NEW — wraps sdr.detect_available() + ├── config.py # shared: ~/.ria/agent.json load/save + └── cli.py # unified CLI with subcommands +``` + +Back-compat requirement: `from ria_toolkit_oss.agent import NodeAgent` must keep working. Keep the `ria-agent` console script entry point; expose both modes via subcommands. + +### Phase 3 — Streamer implementation + +| # | Task | File | Priority | +|---|------|------|----------| +| 5 | `ws_client.py` — persistent WebSocket, auto-reconnect, heartbeat loop | `src/ria_toolkit_oss/agent/ws_client.py` | P0 | +| 6 | `streamer.py` — main loop: receive `start` → open SDR → `radio.rx()` → send binary IQ → handle `stop`/`configure` | `src/ria_toolkit_oss/agent/streamer.py` | P0 | +| 7 | `hardware.py` — heartbeat payload builder, uses `sdr.detect_available()` | `src/ria_toolkit_oss/agent/hardware.py` | P0 | +| 8 | `config.py` — `~/.ria/agent.json` read/write, registration token storage | `src/ria_toolkit_oss/agent/config.py` | P1 | +| 9 | CLI subcommands: `ria-agent register`, `detect`, `stream` (new), `run` (legacy long-poll) | `src/ria_toolkit_oss/agent/cli.py` | P0 | +| 10 | Add `websockets` to dependencies | [pyproject.toml](../pyproject.toml) | P0 | + +### Phase 4 — WebSocket protocol + +Implement exactly the messages from `screens_connection_updates.md` §"WebSocket Protocol": + +**Agent → Server (JSON control):** +```json +{"type": "heartbeat", "hardware": ["pluto", "hackrf"], "status": "idle"} +{"type": "status", "status": "streaming", "app_id": "abc"} +{"type": "error", "app_id": "abc", "message": "USB device disconnected"} +``` + +**Agent → Server (binary data):** raw interleaved float32 IQ bytes per `radio.rx()` call. + +**Server → Agent (JSON control):** +```json +{"type": "start", "app_id": "...", "radio_config": {"device": "pluto", ...}} +{"type": "stop", "app_id": "..."} +{"type": "configure", "app_id": "...", "radio_config": {"center_frequency": 915000000}} +``` + +The agent does **not** interpret manifests, models, or preprocessing — it just applies `radio_config` to `ria_toolkit_oss.sdr.`. + +### Phase 5 — Tests + +| # | Task | Location | +|---|------|----------| +| 11 | Unit: streamer loop against `sdr.mock` + fake WS | `tests/agent/test_streamer.py` | +| 12 | Unit: `ws_client` reconnect/heartbeat timing | `tests/agent/test_ws_client.py` | +| 13 | Unit: `hardware.detect_available()` and heartbeat payload | `tests/agent/test_hardware.py` | +| 14 | Integration: local `websockets` server → mock SDR → full start/stream/stop cycle | `tests/agent/test_integration.py` | +| 15 | Regression: `NodeAgent` still importable and functional | `tests/agent/test_legacy.py` | + +--- + +## Part B — RIA Hub (hand-off to separate session) + +> **Give this section to Claude in the ria-hub repo session.** It depends on Part A shipping first (or at minimum, stubbed protocol). + +### Server-side tasks + +| # | Task | File / Area | Priority | +|---|------|-------------|----------| +| B1 | Implement `AgentDataSource` (DataSource ABC, reads IQ from WebSocket connection) and register it in `build_data_source()` | `controller/app/modules/screens/data_sources.py` | P0 | +| B2 | Add `"agent"` to `dataSource.type` enum in manifest schema; update Pydantic/JSON schema validators | `controller/app/modules/screens/graph_derivation.py` + manifest schema files | P0 | +| B3 | Add agent WebSocket endpoint `POST /api/agent/ws` — accepts agent connections, auth via registration token, bridges the connection to the Celery task's `AgentDataSource` | `controller/app/modules/agent/routes.py` (new) | P0 | +| B4 | Agent registry: MongoDB collection tracking `agent_id`, hardware list, last heartbeat, online status, registration tokens | `controller/app/modules/agent/models.py` (new) | P1 | +| B5 | Registration endpoint `POST /api/agent/register` returning agent credentials for `~/.ria/agent.json` | `controller/app/modules/agent/routes.py` | P1 | +| B6 | Celery task wiring: when manifest `dataSource.type == "agent"`, look up the connected agent by `agent_id`, forward `radio_config` to it, and feed received IQ chunks into the inference loop via `AgentDataSource.next_chunk()` | `controller/app/modules/screens/tasks.py` | P0 | +| B7 | Device/agent picker UI in Screens app config (Vue 3) | frontend screens panel | P2 | +| B8 | Agents list/status admin view | frontend admin area | P2 | + +### Protocol contract (must match Part A exactly) + +See `screens_connection_updates.md` §"WebSocket Protocol". Key invariants: +- Binary frames are interleaved float32 IQ, one frame per `radio.rx()` call. +- `radio_config` is derived from the manifest's `dataSource.params` and forwarded verbatim. +- The server sends `configure` for retune-without-stop flow; the agent is responsible for applying it at the next capture boundary. + +### Manifest example (new mode) + +```json +{ + "dataSource": { + "type": "agent", + "device": "pluto", + "agent_id": "agent-abc123", + "params": { + "identifier": "ip:192.168.3.1", + "sample_rate": 1000000, + "center_frequency": 2450000000, + "gain": 40, + "buffer_size": 1024 + } + }, + "preprocess": "magnitude_phase_window_stats", + "config": {"inference": {"knownDevices": [], "interval": 1}} +} +``` + +### Pipeline invariant + +**No changes to `inference_core.py`, `preprocessors.py`, or `run_onnx_chain_loop()`.** `AgentDataSource` is a drop-in for `SdrDataSource`; everything downstream is unchanged. + +--- + +## Rollout Order + +1. Part A Phase 1 (SDR foundation) — safe to land independently. +2. Part A Phases 2–3 (agent package + streamer) — lands the `ria-agent stream` CLI; no server needed to build/test against a mock WS. +3. Part B B1, B2, B3, B6 — minimum viable server path; lets a real agent connect end-to-end. +4. Part A Phase 5 integration test against a dev RIA Hub instance. +5. Part B B4, B5 (registry + registration) — hardens multi-agent deployments. +6. Part B B7, B8 (UI) — operator-facing polish. + +## Open Questions + +- Auth model for the WebSocket handshake: bearer token in header, query param, or first-message auth frame? +- Should `configure` apply mid-buffer or only at capture boundaries? (Doc implies boundaries; confirm for retune latency budget.) +- Backpressure policy when the server's inference loop is slower than the agent's `rx()` cadence — drop frames, queue, or pause? diff --git a/docs/spectrogram_dashboard_op_bug.md b/docs/spectrogram_dashboard_op_bug.md new file mode 100644 index 0000000..dfb9b5a --- /dev/null +++ b/docs/spectrogram_dashboard_op_bug.md @@ -0,0 +1,123 @@ +# Bug: `SpectrogramDashboardOp` destructor calls `std::terminate` + +## Summary + +`SpectrogramDashboardOp` spawns an HTTP server thread during setup but its destructor +does not `join()` or `detach()` it. Per the C++ standard, destroying a joinable +`std::thread` calls `std::terminate()` — so **any** shutdown path kills the app: +init failure, Ctrl-C, or normal exit at end of `main`. + +## Evidence + +Built app (`new_dashboard`) crashes on shutdown with this backtrace: + +``` +#3 __GI_raise +#4 __GI_abort +#5 libstdc++ (std::terminate handler) +#6 libstdc++ +#7 std::terminate() +#8 std::thread::~thread() +#9 ria::ops::SpectrogramDashboardOp::~SpectrogramDashboardOp() +#10 __gnu_cxx::new_allocator::destroy(...) +... +#23 ria::Pipeline::~Pipeline() +#24 main +``` + +The stack shows the failure is entirely inside the op's own destructor — not +downstream of any flow / port-wiring issue. The op's startup message +`HTTP server started on port 8080` prints just before the crash, confirming the +server thread is running and joinable when destruction begins. + +## Reproduction + +1. Build any RIA app that includes `SpectrogramDashboardOp`. +2. Run the container; it crashes with `terminate called without an active exception` + regardless of whether other operators succeed or fail. + +## Root cause + +Standard C++ invariant: + +> If a `std::thread` object is destroyed while still `joinable()`, the destructor +> calls `std::terminate()`. +> — [cppreference.com/w/cpp/thread/thread/~thread](https://en.cppreference.com/w/cpp/thread/thread/~thread) + +The destructor needs to (a) signal the server to stop, (b) wait for the thread +to exit, and (c) join it before the `std::thread` member is destroyed. + +## Fix + +In `SpectrogramDashboardOp`: + +```cpp +SpectrogramDashboardOp::~SpectrogramDashboardOp() { + // 1. Tell the HTTP server / websocket server to stop accepting + // and to return from its serve loop. Exact call depends on the + // HTTP library in use: + // - cpp-httplib: server_.stop(); + // - Boost.Beast: acceptor_.close(); io_context_.stop(); + // - custom: shutdown_flag_.store(true); close(listen_fd_); + if (server_) { + server_->stop(); + } + + // 2. Join the thread if it was ever started. + if (http_thread_.joinable()) { + http_thread_.join(); + } +} +``` + +If multiple threads are owned (e.g. separate WebSocket broadcaster, update-rate +timer), join **each** of them. + +## Related checks + +While fixing this op, audit any other operator in the same repo that owns a +thread: + +```bash +grep -rn "std::thread " src/ +``` + +For each match, confirm the owning class's destructor does: + +```cpp +if (thread_.joinable()) thread_.join(); +``` + +plus whatever shutdown signal is needed to make the thread actually return. + +## Acceptance + +- `SpectrogramDashboardOp` destructor joins all spawned threads. +- A RIA app containing this op exits cleanly on `Ctrl-C` with no + `terminate called without an active exception` message. +- Forcing an init failure (e.g. a bad `websocket_port`) produces a readable + exception message instead of `SIGABRT`. + +--- + +## Prompt to paste into Claude Code (in the op's repo) + +> `SpectrogramDashboardOp` has a latent bug: its destructor lets a joinable +> `std::thread` (the HTTP server thread that prints "HTTP server started on +> port 8080") go out of scope, which per the C++ standard calls +> `std::terminate()`. This makes any built RIA app containing this op crash on +> every shutdown path — init failure, normal exit, and Ctrl-C — with the +> unhelpful message `terminate called without an active exception`. Stack trace +> at the point of abort goes through `std::thread::~thread()` → +> `SpectrogramDashboardOp::~SpectrogramDashboardOp()`. +> +> Fix the destructor: (a) signal the HTTP server to stop (e.g. `server_->stop()` +> for cpp-httplib, or close the listening socket + set a shutdown flag), then +> (b) `if (http_thread_.joinable()) http_thread_.join();`. Apply the same pattern +> to any other `std::thread` members the op owns (WebSocket broadcaster, rate +> timer, etc.). Then grep for other `std::thread` members in this repo and audit +> their owners' destructors for the same bug. +> +> Acceptance: the op's destructor joins every thread it starts; a test that +> constructs and immediately destroys the op exits cleanly; Ctrl-C on a running +> app produces no `terminate` message. diff --git a/screens_connection_updates.md b/screens_connection_updates.md new file mode 100644 index 0000000..14a5e93 --- /dev/null +++ b/screens_connection_updates.md @@ -0,0 +1,170 @@ +# Agent CLI Simplification — Handoff to ria-toolkit-oss + +**Repo:** `ria-toolkit-oss`, branch `screens-connection` +**Goal:** Reduce agent setup from 3+ commands to 2 simple ones. + +--- + +## Current UX (painful) + +```bash +# Step 1: Register via curl against FastAPI directly +curl -X POST http://hub:8005/screens/agents/register \ + -H 'X-API-Key: supersecretapikey' \ + -d '{"name": "my-agent"}' +# → {"agent_id": "agent-55cf3c5b8137f6f3", "token": "45Hbt..."} + +# Step 2: Manually save credentials +ria-agent register \ + --url http://hub:8005 \ + --token 45HbtlpVDX7_XTF47biDcLcyiVmM51icEZVJ7J_UrEE \ + --agent-id agent-55cf3c5b8137f6f3 + +# Step 3: Stream (with manual URL construction) +ria-agent stream \ + --url "ws://hub:8005/screens/agent/ws?agent_id=agent-55cf3c5b8137f6f3" \ + --token 45HbtlpVDX7_XTF47biDcLcyiVmM51icEZVJ7J_UrEE +``` + +Problems: +- User must know the FastAPI port (8005), not just the hub URL (3005) +- `register` subcommand only saves locally — doesn't call the server +- `_derive_ws_url` builds `/api/agent/ws/{agent_id}` but server endpoint is `/screens/agent/ws?agent_id=...` +- User must copy-paste agent_id and token between commands + +--- + +## Target UX + +```bash +# One-time setup: register with the hub (hits server, saves config) +ria-agent register --hub http://whitehorse:3005 --api-key supersecretapikey --name lab-pluto + +# Stream (reads config, connects automatically) +ria-agent stream +``` + +That's it. Two commands, no copy-pasting. + +--- + +## Changes needed in ria-toolkit-oss + +### 1. `cli.py` — Make `register` call the server + +Current `_cmd_register` just saves to `~/.ria/agent.json`. It should: + +1. POST to `{hub_url}/screens/agents/register` with `X-API-Key` header +2. Receive `{agent_id, token}` from the server +3. Save everything to `~/.ria/agent.json` + +```python +def _cmd_register(args: argparse.Namespace) -> int: + import urllib.request + import json as _json + + hub_url = args.hub.rstrip("/") + api_key = args.api_key + + # Call the server to register + url = f"{hub_url}/screens/agents/register" + body = _json.dumps({"name": args.name or ""}).encode() + req = urllib.request.Request( + url, + data=body, + headers={ + "Content-Type": "application/json", + "X-API-Key": api_key, + }, + ) + try: + with urllib.request.urlopen(req) as resp: + data = _json.loads(resp.read()) + except Exception as e: + print(f"error: registration failed: {e}", file=sys.stderr) + return 1 + + agent_id = data["agent_id"] + token = data["token"] + + # Save to config + cfg = _config.load() + cfg.hub_url = hub_url + cfg.agent_id = agent_id + cfg.token = token + if args.name: + cfg.name = args.name + cfg.insecure = bool(args.insecure) + path = _config.save(cfg) + + print(f"Registered agent: {agent_id}") + print(f"Credentials saved to {path}") + return 0 +``` + +Update the argparse for `register`: +```python +p_reg = sub.add_parser("register", help="Register agent with RIA Hub and save credentials") +p_reg.add_argument("--hub", required=True, help="RIA Hub URL (e.g. http://whitehorse:3005)") +p_reg.add_argument("--api-key", required=True, help="Hub API key for authentication") +p_reg.add_argument("--name", default=None, help="Human-friendly agent name") +p_reg.add_argument("--insecure", action="store_true", help="Skip TLS verification") +``` + +Remove `--url`, `--token`, `--agent-id` from register — those are now server-generated. + +### 2. `cli.py` — Fix `_derive_ws_url` + +Current (wrong): +```python +suffix = f"/api/agent/ws/{agent_id}" if agent_id else "/api/agent/ws" +``` + +Should be: +```python +suffix = f"/screens/agent/ws?agent_id={agent_id}" if agent_id else "/screens/agent/ws" +``` + +### 3. `cli.py` — Make `stream` zero-arg by default + +Current `_cmd_stream` already loads config and derives the URL — it just needs the URL fix above. After that, bare `ria-agent stream` works if `register` was run first. + +### 4. `config.py` — Add `api_key` field (optional) + +Add `api_key: str = ""` to `AgentConfig` so the hub API key can be persisted for re-registration or other API calls. Not strictly required but useful. + +--- + +## Changes already done in ria-hub (Part B) + +The server side is ready: + +- `POST /screens/agents/register` — accepts `{"name": "..."}` with `X-API-Key` header, returns `{"agent_id": "...", "token": "..."}` +- `GET /screens/agent/ws?agent_id=...` — WebSocket endpoint, authenticates via `Authorization: Bearer {token}` header +- Agent token is hashed (SHA-256) and stored in MongoDB; lookup happens on WS connect + +The Go proxy for `/screens/agents/register` through port 3005 still needs to be added (currently agents must hit FastAPI port 8005 directly). That's a ria-hub task, not ria-toolkit-oss. + +--- + +## Summary of file changes + +| File | Change | +|------|--------| +| `src/ria_toolkit_oss/agent/cli.py` | `register` calls server API, new flags `--hub`/`--api-key`; fix `_derive_ws_url` path | +| `src/ria_toolkit_oss/agent/config.py` | Optional: add `api_key` field to `AgentConfig` | +| `tests/agent/test_cli.py` | Update register tests for new server-calling behavior | + +--- + +## Validated E2E flow (what works today) + +We tested the full pipeline on whitehorse with a real Pluto SDR: + +1. Agent connects via WebSocket with bearer token auth ✅ +2. Server sends `start` with `radio_config` via Redis pub/sub → agent ✅ +3. Agent opens Pluto, streams interleaved float32 IQ via binary WS frames ✅ +4. FastAPI pushes frames to Redis list, Celery worker's `AgentDataSource.next_chunk()` BLPOP reads them ✅ +5. Inference loop runs on live agent data identically to direct SDR mode ✅ + +The only manual friction is the multi-step registration and URL construction — which these CLI changes eliminate.