Push Tracker
ria-toolkit-oss/docs/agent_tx_plan.md

21 KiB

Agent TX Streaming — Cross-Repo Plan

Repos: ria-toolkit-oss, ria-hub, Screens frontend Status: Proposal / pre-implementation Prerequisites: The RX-streaming work from screens_agent_handoff.md and screens_agent_streamer_plan.md is landed (agent WS protocol, AgentDataSource, /screens/agents/register, /screens/agent/ws).

Goal

Let a Screens app running on the hub drive a remote agent's Pluto (or other TX-capable SDR) to transmit — streaming IQ buffers end-to-end from an operator like plutoTXoperator into the agent's sdr.tx() path. Mirror image of what AgentDataSource already does for RX.

Non-goals (v1)

  • Multi-tenant radio sharing (one app owns the radio at a time per agent).
  • Bulk/upload-once TX — superseded by streaming per request.
  • Arbitrary waveform generation in the agent. The agent is dumb pipe + hardware control; signal generation stays on the hub.

Key design decisions

# Decision Value
D1 Delivery mode Streaming. Hub pushes binary IQ buffers continuously over the existing WS; agent's _stream_tx callback pulls them from an in-agent queue.
D2 Full-duplex Yes. A single app_id may own both an RX session and a TX session on the same agent concurrently. Same physical SDR handle serves both (Pluto is FDD-capable; init_rx and init_tx are independent on one adi.Pluto instance).
D3 Safety caps Agent-enforced. ~/.ria/agent.json holds tx_enabled, tx_max_gain_db, tx_max_duration_s, optional tx_allowed_freq_ranges: [[low,high], …]. Agent rejects tx_start frames that violate any of these, independent of what the hub sends.
D4 Buffer format Interleaved float32 IQ, range [-1, 1] — same as RX. Format-validated by ria_toolkit_oss.sdr.sdr._verify_sample_format.
D5 Protocol evolution Keep existing RX messages (start/stop/configure) unchanged for back-compat. Add parallel tx_start/tx_stop/tx_configure. Heartbeat grows to advertise capabilities.
D6 Underrun policy Default pause: if the TX queue empties, agent calls pause_tx() and emits tx_status: underrun. Hub must recover by sending a fresh tx_start + buffers. Configurable per session via radio_config.underrun_policy ∈ {"pause", "zero", "repeat"}.
D7 Backpressure Rely on TCP/WS backpressure. Agent caps inbound TX queue at 8 buffers; await ws.send on the hub side slows when the agent doesn't drain. No application-level flow control in v1.
D8 Session identity app_id identifies a Screens app. Each app has at most one RX session and one TX session per agent. Binary direction disambiguates: agent → hub binary = RX IQ; hub → agent binary = TX IQ.

Protocol specification

Additions only. Existing RX messages from screens_agent_handoff.md §Phase 4 are unchanged.

Hub → agent (JSON)

// Arm the TX side. Agent calls init_tx, starts the stream_tx thread with an empty queue.
// After this, hub sends binary TX buffers on the same WS.
{
  "type": "tx_start",
  "app_id": "app-abc",
  "radio_config": {
    "device": "pluto",
    "identifier": "ip:192.168.3.1",
    "tx_sample_rate": 1000000,
    "tx_center_frequency": 2450000000,
    "tx_gain": -20,             // dB, negative = attenuation on Pluto
    "tx_bandwidth": 1000000,    // optional
    "buffer_size": 1024,
    "underrun_policy": "pause"  // "pause" | "zero" | "repeat"
  }
}

// Apply parameter changes at the next buffer boundary.
{ "type": "tx_configure", "app_id": "app-abc", "radio_config": { "tx_gain": -25 } }

// Stop TX, drain queue, pause_tx, release TX side (RX may continue if a separate RX session is live).
{ "type": "tx_stop", "app_id": "app-abc" }

Hub → agent (binary)

Raw interleaved float32 IQ in [-1, 1]. One WS frame = one buffer = buffer_size complex samples = buffer_size * 2 * 4 bytes. Delivered only between tx_start and tx_stop. Binary frames arriving outside that window are discarded and logged at WARN.

Agent → hub (JSON)

// Lifecycle events.
{ "type": "tx_status", "app_id": "app-abc", "state": "armed"        }
{ "type": "tx_status", "app_id": "app-abc", "state": "transmitting" }
{ "type": "tx_status", "app_id": "app-abc", "state": "underrun"     }  // queue empty; TX paused
{ "type": "tx_status", "app_id": "app-abc", "state": "done"         }
{ "type": "tx_status", "app_id": "app-abc", "state": "error", "message": "gain -5 exceeds tx_max_gain_db=-15" }

// Reject reasons from agent-enforced caps/interlocks are surfaced via tx_status:error.

Heartbeat extension

Existing {type: heartbeat, hardware[], status} grows:

{
  "type": "heartbeat",
  "hardware": ["mock", "pluto"],
  "status": "streaming",                 // unchanged semantics
  "capabilities": ["rx", "tx"],          // NEW — derived from tx_enabled + SDR class having init_tx
  "tx_enabled": true,                    // NEW — mirrors config flag
  "sessions": {                          // NEW — optional per-session snapshot
    "rx": { "app_id": "app-abc", "state": "streaming" },
    "tx": { "app_id": "app-abc", "state": "transmitting" }
  },
  "app_id": "app-abc"                    // kept for back-compat
}

Part A — ria-toolkit-oss (this repo)

A1. agent/ws_client.py

Currently the WS client drops server → agent binary (ws_client.py:77-79). Add a binary handler alongside the JSON one.

BinaryHandler = Callable[[bytes], Awaitable[None]]

async def run(
    self,
    on_message: MessageHandler,
    heartbeat: HeartbeatBuilder,
    on_binary: BinaryHandler | None = None,
) -> None:
    ...
    async for raw in self._ws:
        if isinstance(raw, bytes):
            if on_binary is not None:
                await on_binary(raw)
            continue
        ...

Keep the reconnect, heartbeat, and malformed-frame behavior unchanged.

A2. agent/streamer.py — add TX sessions

Replace the flat self._sdr / self._app_id / self._capture_task state with a session model:

@dataclass
class RxSession:
    app_id: str
    sdr: Any
    buffer_size: int
    task: asyncio.Task
    pending_config: dict

@dataclass
class TxSession:
    app_id: str
    sdr: Any
    queue: asyncio.Queue[bytes]        # bounded, maxsize=8
    task: asyncio.Task                  # runs _stream_tx in executor
    underrun_policy: str
    pending_config: dict
    bytes_transmitted: int = 0
    started_at: float = 0.0             # for tx_max_duration_s enforcement

The streamer holds self._rx: RxSession | None and self._tx: TxSession | None. SDR instances are cached by (device, identifier) — when RX and TX name the same device, both sessions share one handle (matters for Pluto FDD).

New handlers:

  • _handle_tx_start(msg) — check cfg.tx_enabled, validate gain/duration/freq against caps, open/resolve SDR, sdr.init_tx(...), start _tx_loop, emit tx_status: armed.
  • _handle_tx_stop(msg) — cancel TX task, sdr.pause_tx(), drain queue, release SDR if no RX session on it, emit tx_status: done.
  • _handle_tx_configure(msg) — stash into self._tx.pending_config, applied at next buffer boundary (same pattern as RX).
  • on_binary(data) — if self._tx: await self._tx.queue.put(data) (awaiting here is the backpressure mechanism). Else: log and drop.

TX loop (runs in an executor thread via loop.run_in_executor, like the RX capture loop):

def _tx_callback(num_samples: int) -> np.ndarray:
    # Called by sdr._stream_tx on every buffer boundary.
    try:
        raw = self._tx_queue_sync.get(timeout=0.1)
    except queue.Empty:
        return self._underrun_fill(num_samples)   # policy-driven
    samples = np.frombuffer(raw, dtype=np.float32).view(np.complex64)
    if len(samples) < num_samples:
        return _pad_zero(samples, num_samples)
    return samples[:num_samples]

Use a thread-safe queue.Queue for the TX side (the asyncio.Queue lives on the event loop; the executor thread reads from a sibling queue.Queue fed by a tiny asyncio→threading adapter).

Underrun fills:

  • "pause": signal the main loop to call sdr.pause_tx(), emit tx_status: underrun, exit the callback.
  • "zero": return np.zeros(num_samples, dtype=np.complex64).
  • "repeat": return the last good buffer (cached). If no buffer yet: zeros.

Cap enforcement in _handle_tx_start (before opening the SDR):

if not self._cfg.tx_enabled:
    return await self._send_error_tx(app_id, "tx disabled on this agent")
if (cap := self._cfg.tx_max_gain_db) is not None and tx_gain > cap:
    return await self._send_error_tx(app_id, f"gain {tx_gain} exceeds cap {cap}")
if (cap := self._cfg.tx_max_duration_s) is not None:
    # enforced by a watchdog in _tx_loop that calls tx_stop after cap seconds
    ...
for (lo, hi) in self._cfg.tx_allowed_freq_ranges or []:
    if lo <= tx_center_frequency <= hi:
        break
else:
    if self._cfg.tx_allowed_freq_ranges:
        return await self._send_error_tx(app_id, f"freq {tx_center_frequency} outside allowed ranges")

A3. agent/config.py

Extend AgentConfig:

@dataclass
class AgentConfig:
    # existing fields…
    tx_enabled: bool = False
    tx_max_gain_db: float | None = None
    tx_max_duration_s: float | None = None
    tx_allowed_freq_ranges: list[tuple[float, float]] | None = None

save() preserves existing 0600 perms.

A4. agent/cli.py

  • ria-agent register --allow-tx --tx-max-gain-db -10 --tx-max-duration 60 — persist the interlock into config.
  • ria-agent stream --allow-tx — runtime override (sets cfg.tx_enabled=True for the life of the process without writing config).
  • ria-agent detect unchanged.

A5. agent/hardware.py

def heartbeat_payload(status, app_id=None, *, cfg: AgentConfig, sessions: dict | None = None) -> dict:
    caps = ["rx"]
    if cfg.tx_enabled:
        caps.append("tx")
    payload = {
        "type": "heartbeat",
        "hardware": available_devices(),
        "status": status,
        "capabilities": caps,
        "tx_enabled": cfg.tx_enabled,
    }
    if app_id:
        payload["app_id"] = app_id
    if sessions:
        payload["sessions"] = sessions
    return payload

A6. SDR layer

  • Audit: sdr/pluto.py tx_recording + _stream_tx paths already use _tx_lock (line 31, 323, 360). Double-check concurrent-with-RX behavior: the adi.Pluto Python object is not thread-safe for arbitrary attribute writes, so all set_tx_* / set_rx_* must go through the shared _param_lock (already present at sdr/sdr.py:44). Verify rx() in a loop + _stream_tx in another thread don't step on each other.
  • MockSDR already has init_tx + _stream_tx (sdr/mock.py:70-100). No changes needed for mock-based tests.
  • Other TX-capable drivers (blade, usrp, hackrf): out of scope for v1; leave their init_tx as-is.

A7. Tests (tests/agent/)

  • test_streamer_tx.pytx_start → binary frames → _stream_tx callback pulls correct samples → tx_stop cleans up.
  • test_tx_safety.py — cap violations (gain, duration, freq, tx_enabled=False) each produce tx_status: error and never open the SDR.
  • test_tx_underrun.py — each policy (pause, zero, repeat) exercised against a fake slow producer.
  • test_full_duplex.py — one app_id sends start + tx_start; both sessions share one MockSDR; both produce their expected frames; stopping one does not stop the other.
  • test_ws_client_binary.py — binary frames now reach the binary handler.
  • test_integration_tx.py — end-to-end against local websockets server + MockSDR.

A8. Docs

  • Add a TX section to any existing agent protocol doc (or create docs/agent_tx_protocol.md).
  • Include a regulatory disclaimer: the operator is responsible for transmissions. The agent is an enabler, not a policy layer beyond the interlocks.

Part B — ria-hub

Paths below are conceptual — confirm against the actual module layout in ria-hub before editing. Anchor points reference the RX handoff at screens_agent_handoff.md §Part B.

B1. AgentTxSink (new)

Mirror of AgentDataSource. Location: controller/app/modules/screens/data_sinks.py (or wherever output sinks live in ria-hub).

Responsibilities:

  • prepare(radio_config) — send tx_start via Redis pub/sub on screens:agent:{agent_id}:tx → WS proxy → agent.
  • write(buffer: np.ndarray | bytes) — convert to interleaved float32 bytes, send as binary over the WS. Awaits on WS backpressure.
  • configure(partial_radio_config) — send tx_configure.
  • close() — send tx_stop.
  • Subscribes to the agent's tx_status frames (via the same Redis pub/sub channel used for RX status today) and surfaces state back to the orchestrator. An error state aborts the Celery task.

B2. Refactor plutoTXoperator

The existing operator presumably calls radio.tx(...) against a directly-attached Pluto. Abstract the "output" into an injectable sink:

class PlutoTxOperator:
    def __init__(self, sink: TxSink, ...):
        self.sink = sink  # AgentTxSink when dataSink.type == "agent", else LocalPlutoTxSink

    def run(self, ...):
        self.sink.prepare(self.radio_config)
        while not stop:
            buf = self._generate_next_buffer()
            self.sink.write(buf)
        self.sink.close()

The local path (existing direct-hardware behavior) becomes LocalPlutoTxSink, a thin wrapper around the current radio.tx calls. No behavior change for existing deployments.

build_data_sink() (to match build_data_source() from B1/B6) routes on dataSink.type.

B3. Manifest schema

Add dataSink alongside dataSource in the manifest. New type: "agent":

{
  "dataSource": { "type": "agent", "device": "pluto", "agent_id": "agent-abc", "params": { "sample_rate": 1000000, "center_frequency": 2450000000, "gain": 40 } },
  "dataSink":   { "type": "agent", "device": "pluto", "agent_id": "agent-abc", "params": { "tx_sample_rate": 1000000, "tx_center_frequency": 2450000000, "tx_gain": -20, "underrun_policy": "pause" } }
}

Update Pydantic models + JSON schema validators in controller/app/modules/screens/graph_derivation.py (or equivalent). When dataSource.agent_id == dataSink.agent_id and both target pluto with the same identifier, the agent will naturally share one SDR handle — no special-casing needed on the hub side.

B4. WS endpoint extensions

/screens/agent/ws already exists. Add:

  • Support for hub → agent binary frames (currently binary is agent → hub only). FastAPI's WebSocket.send_bytes works directly; just route binary from the Redis pub/sub channel through to the WS.
  • New Redis pub/sub channel screens:agent:{agent_id}:tx for outbound TX control JSON + a separate screens:agent:{agent_id}:tx_bin for outbound binary. (Two channels because many Redis brokers don't love mixing binary into text-keyed channels; if your deployment uses Redis 6+ with SUBSCRIBE that handles bytes, one channel is fine.)

B5. Celery wiring

When dataSink.type == "agent", the Celery task that runs the TX-containing graph uses AgentTxSink instead of a local sink. The operator code (plutoTXoperator) is unchanged because the sink abstraction hides the difference.

Full-duplex: a single task with both dataSource.type == "agent" and dataSink.type == "agent" pointing at the same agent spawns both the RX consumer loop (existing AgentDataSource.next_chunk via BLPOP) and the TX producer loop (AgentTxSink.write). Both sides are wired up before any capture frames are sent.

B6. Capability gating

Before any control path sends tx_start:

agent = get_agent(agent_id)
if agent.last_heartbeat.age > 60:       # stale
    raise HTTPException(503, "agent not responding")
if "tx" not in agent.last_heartbeat.capabilities:
    raise HTTPException(400, "agent has not opted in to transmission (tx_enabled=false)")

Surface clear errors to the Screens UI so the user knows it's an agent config issue, not an app config issue.

B7. Audit log

New MongoDB collection agent_tx_audit:

{
  agent_id, app_id, user_id,
  center_frequency_hz, tx_gain_db, duration_s, num_samples,
  started_at, ended_at, terminal_status,   // "done" | "error" | "underrun" | "cancelled"
  error_message?
}

Write on every tx_start. Update on terminal tx_status. Index on {agent_id, started_at} for admin-view queries.

B8. Registration — no change needed

POST /screens/agents/register and ~/.ria/agent.json already cover credential storage. The TX interlock (tx_enabled, caps) is written by the agent operator via ria-agent register --allow-tx; the hub only reads the heartbeat to learn whether an agent will accept TX.


Part C — Screens (Vue 3 frontend)

C1. App composer

  • Agent picker (exists from RX work) grows a "TX capable" filter toggle; hides agents whose heartbeat capabilities lacks "tx".
  • When the graph contains plutoTXoperator (or any future TX operator):
    • Render a dataSink section mirroring dataSource.
    • Fields: device, agent_id, identifier, tx_sample_rate, tx_center_frequency, tx_gain, underrun_policy.
    • Validation: tx_center_frequency within radio band; tx_gain within agent-advertised max (read from heartbeat when available).

C2. Run-time UI

  • Consent modal on "Start" for any app whose manifest contains a dataSink.type: "agent":

    "This app will transmit on 2.450 GHz at -20 dB through agent lab-pluto-01. I confirm this transmission is permitted under my local radio regulations." Required checkbox, cannot be remembered across apps.

  • TX status indicator in the running-app view: shows armed / transmitting / underrun state from tx_status frames. Red banner on underrun or error.
  • Stop TX button always visible during transmission; fires tx_stop immediately. Separate from "Stop app" (which also stops RX).

C3. Admin view

Extend the agents list from B8 of the RX handoff:

  • Column: TX: enabled / disabled / in-use by app X.
  • Agent detail page: show tx_max_gain_db, tx_max_duration_s, tx_allowed_freq_ranges, and the last 10 rows from agent_tx_audit filtered to this agent.

Rollout order

  1. Part A §A1-A3, A7 — agent-side TX session + binary ingress + safety, all behind --allow-tx. Mock-based tests. Shippable standalone; no consumer yet.
  2. Part B §B1-B5 — hub sink + manifest + WS extension + Celery wiring. End-to-end test: Screens app with plutoTXoperator + agent sink → real Pluto in the lab → verify carrier on a spectrum analyzer.
  3. Part B §B6-B7 — capability gating + audit log. Blocks general release, not lab use.
  4. Part C §C1 — composer UI for TX apps.
  5. Part C §C2-C3 — consent modal + admin view. Gate for first non-internal user.

Parts A + B can land on parallel branches and meet at step 2's integration test. Part C can start in parallel with B once the manifest shape in B3 is stable.

Test matrix (integration)

Scenario Expected
App with RX only, agent connected RX as today (regression guard)
App with TX only, agent tx_enabled=True TX starts, underrun → pause, stop cleans up
App with RX + TX same agent, same device One Pluto handle serves both; independent gains/frequencies
App with TX, agent tx_enabled=False Hub rejects at gate with 400; no WS traffic generated
App with TX, gain exceeds agent cap tx_status: error; SDR never opened
Hub stops sending TX buffers mid-stream underrun emitted after queue drains; agent paused cleanly
WS drops during TX Agent cancels TX task, pauses hardware, reconnects; hub must re-issue tx_start
Agent process killed during TX Hardware stops (existing close() already handles this; verify _tx_lock released)

Open questions

  • Waveform source: is plutoTXoperator a real-time generator emitting on a clock, or does it synthesize a fixed recording and loop? If the latter, worth exposing a "bulk + loop" fast-path — hub sends the buffer once, agent loops it via existing tx_recording. Same protocol (tx_start + one buffer + loop: true), much less WS traffic.
  • Multi-app-per-agent: out of scope for v1 (§Non-goals). When needed: add a session id to binary frames (4-byte prefix: magic + stream_id + reserved), bump a protocol_version in the heartbeat.
  • Streaming TX clock drift: if hub and agent sample clocks drift, repeating zeros on underrun is audible/visible in the spectrum. Longer term: agent-side resampling or PLL, both expensive. v1: rely on generous queue depth + stable local networks.
  • Other TX-capable SDRs: HackRF, USRP, bladeRF. The _CONFIG_ATTR_MAP in agent/streamer.py:169-175 will need per-driver entries when those come online.

Regulatory note

Transmission is regulated in every jurisdiction. The agent-side interlocks (tx_enabled, caps, freq ranges) exist so the operator can configure safe defaults for an agent's physical location. They are not a substitute for licensing or for respecting local regulations. The hub's consent modal and audit log exist so actions are attributable. None of this is a legal compliance layer — it's a defense-in-depth mechanism.