21 KiB
Agent TX Streaming — Cross-Repo Plan
Repos: ria-toolkit-oss, ria-hub, Screens frontend
Status: Proposal / pre-implementation
Prerequisites: The RX-streaming work from screens_agent_handoff.md and screens_agent_streamer_plan.md is landed (agent WS protocol, AgentDataSource, /screens/agents/register, /screens/agent/ws).
Goal
Let a Screens app running on the hub drive a remote agent's Pluto (or other TX-capable SDR) to transmit — streaming IQ buffers end-to-end from an operator like plutoTXoperator into the agent's sdr.tx() path. Mirror image of what AgentDataSource already does for RX.
Non-goals (v1)
- Multi-tenant radio sharing (one app owns the radio at a time per agent).
- Bulk/upload-once TX — superseded by streaming per request.
- Arbitrary waveform generation in the agent. The agent is dumb pipe + hardware control; signal generation stays on the hub.
Key design decisions
| # | Decision | Value |
|---|---|---|
| D1 | Delivery mode | Streaming. Hub pushes binary IQ buffers continuously over the existing WS; agent's _stream_tx callback pulls them from an in-agent queue. |
| D2 | Full-duplex | Yes. A single app_id may own both an RX session and a TX session on the same agent concurrently. Same physical SDR handle serves both (Pluto is FDD-capable; init_rx and init_tx are independent on one adi.Pluto instance). |
| D3 | Safety caps | Agent-enforced. ~/.ria/agent.json holds tx_enabled, tx_max_gain_db, tx_max_duration_s, optional tx_allowed_freq_ranges: [[low,high], …]. Agent rejects tx_start frames that violate any of these, independent of what the hub sends. |
| D4 | Buffer format | Interleaved float32 IQ, range [-1, 1] — same as RX. Format-validated by ria_toolkit_oss.sdr.sdr._verify_sample_format. |
| D5 | Protocol evolution | Keep existing RX messages (start/stop/configure) unchanged for back-compat. Add parallel tx_start/tx_stop/tx_configure. Heartbeat grows to advertise capabilities. |
| D6 | Underrun policy | Default pause: if the TX queue empties, agent calls pause_tx() and emits tx_status: underrun. Hub must recover by sending a fresh tx_start + buffers. Configurable per session via radio_config.underrun_policy ∈ {"pause", "zero", "repeat"}. |
| D7 | Backpressure | Rely on TCP/WS backpressure. Agent caps inbound TX queue at 8 buffers; await ws.send on the hub side slows when the agent doesn't drain. No application-level flow control in v1. |
| D8 | Session identity | app_id identifies a Screens app. Each app has at most one RX session and one TX session per agent. Binary direction disambiguates: agent → hub binary = RX IQ; hub → agent binary = TX IQ. |
Protocol specification
Additions only. Existing RX messages from screens_agent_handoff.md §Phase 4 are unchanged.
Hub → agent (JSON)
// Arm the TX side. Agent calls init_tx, starts the stream_tx thread with an empty queue.
// After this, hub sends binary TX buffers on the same WS.
{
"type": "tx_start",
"app_id": "app-abc",
"radio_config": {
"device": "pluto",
"identifier": "ip:192.168.3.1",
"tx_sample_rate": 1000000,
"tx_center_frequency": 2450000000,
"tx_gain": -20, // dB, negative = attenuation on Pluto
"tx_bandwidth": 1000000, // optional
"buffer_size": 1024,
"underrun_policy": "pause" // "pause" | "zero" | "repeat"
}
}
// Apply parameter changes at the next buffer boundary.
{ "type": "tx_configure", "app_id": "app-abc", "radio_config": { "tx_gain": -25 } }
// Stop TX, drain queue, pause_tx, release TX side (RX may continue if a separate RX session is live).
{ "type": "tx_stop", "app_id": "app-abc" }
Hub → agent (binary)
Raw interleaved float32 IQ in [-1, 1]. One WS frame = one buffer = buffer_size complex samples = buffer_size * 2 * 4 bytes. Delivered only between tx_start and tx_stop. Binary frames arriving outside that window are discarded and logged at WARN.
Agent → hub (JSON)
// Lifecycle events.
{ "type": "tx_status", "app_id": "app-abc", "state": "armed" }
{ "type": "tx_status", "app_id": "app-abc", "state": "transmitting" }
{ "type": "tx_status", "app_id": "app-abc", "state": "underrun" } // queue empty; TX paused
{ "type": "tx_status", "app_id": "app-abc", "state": "done" }
{ "type": "tx_status", "app_id": "app-abc", "state": "error", "message": "gain -5 exceeds tx_max_gain_db=-15" }
// Reject reasons from agent-enforced caps/interlocks are surfaced via tx_status:error.
Heartbeat extension
Existing {type: heartbeat, hardware[], status} grows:
{
"type": "heartbeat",
"hardware": ["mock", "pluto"],
"status": "streaming", // unchanged semantics
"capabilities": ["rx", "tx"], // NEW — derived from tx_enabled + SDR class having init_tx
"tx_enabled": true, // NEW — mirrors config flag
"sessions": { // NEW — optional per-session snapshot
"rx": { "app_id": "app-abc", "state": "streaming" },
"tx": { "app_id": "app-abc", "state": "transmitting" }
},
"app_id": "app-abc" // kept for back-compat
}
Part A — ria-toolkit-oss (this repo)
A1. agent/ws_client.py
Currently the WS client drops server → agent binary (ws_client.py:77-79). Add a binary handler alongside the JSON one.
BinaryHandler = Callable[[bytes], Awaitable[None]]
async def run(
self,
on_message: MessageHandler,
heartbeat: HeartbeatBuilder,
on_binary: BinaryHandler | None = None,
) -> None:
...
async for raw in self._ws:
if isinstance(raw, bytes):
if on_binary is not None:
await on_binary(raw)
continue
...
Keep the reconnect, heartbeat, and malformed-frame behavior unchanged.
A2. agent/streamer.py — add TX sessions
Replace the flat self._sdr / self._app_id / self._capture_task state with a session model:
@dataclass
class RxSession:
app_id: str
sdr: Any
buffer_size: int
task: asyncio.Task
pending_config: dict
@dataclass
class TxSession:
app_id: str
sdr: Any
queue: asyncio.Queue[bytes] # bounded, maxsize=8
task: asyncio.Task # runs _stream_tx in executor
underrun_policy: str
pending_config: dict
bytes_transmitted: int = 0
started_at: float = 0.0 # for tx_max_duration_s enforcement
The streamer holds self._rx: RxSession | None and self._tx: TxSession | None. SDR instances are cached by (device, identifier) — when RX and TX name the same device, both sessions share one handle (matters for Pluto FDD).
New handlers:
_handle_tx_start(msg)— checkcfg.tx_enabled, validate gain/duration/freq against caps, open/resolve SDR,sdr.init_tx(...), start_tx_loop, emittx_status: armed._handle_tx_stop(msg)— cancel TX task,sdr.pause_tx(), drain queue, release SDR if no RX session on it, emittx_status: done._handle_tx_configure(msg)— stash intoself._tx.pending_config, applied at next buffer boundary (same pattern as RX).on_binary(data)— ifself._tx:await self._tx.queue.put(data)(awaiting here is the backpressure mechanism). Else: log and drop.
TX loop (runs in an executor thread via loop.run_in_executor, like the RX capture loop):
def _tx_callback(num_samples: int) -> np.ndarray:
# Called by sdr._stream_tx on every buffer boundary.
try:
raw = self._tx_queue_sync.get(timeout=0.1)
except queue.Empty:
return self._underrun_fill(num_samples) # policy-driven
samples = np.frombuffer(raw, dtype=np.float32).view(np.complex64)
if len(samples) < num_samples:
return _pad_zero(samples, num_samples)
return samples[:num_samples]
Use a thread-safe queue.Queue for the TX side (the asyncio.Queue lives on the event loop; the executor thread reads from a sibling queue.Queue fed by a tiny asyncio→threading adapter).
Underrun fills:
"pause": signal the main loop to callsdr.pause_tx(), emittx_status: underrun, exit the callback."zero": returnnp.zeros(num_samples, dtype=np.complex64)."repeat": return the last good buffer (cached). If no buffer yet: zeros.
Cap enforcement in _handle_tx_start (before opening the SDR):
if not self._cfg.tx_enabled:
return await self._send_error_tx(app_id, "tx disabled on this agent")
if (cap := self._cfg.tx_max_gain_db) is not None and tx_gain > cap:
return await self._send_error_tx(app_id, f"gain {tx_gain} exceeds cap {cap}")
if (cap := self._cfg.tx_max_duration_s) is not None:
# enforced by a watchdog in _tx_loop that calls tx_stop after cap seconds
...
for (lo, hi) in self._cfg.tx_allowed_freq_ranges or []:
if lo <= tx_center_frequency <= hi:
break
else:
if self._cfg.tx_allowed_freq_ranges:
return await self._send_error_tx(app_id, f"freq {tx_center_frequency} outside allowed ranges")
A3. agent/config.py
Extend AgentConfig:
@dataclass
class AgentConfig:
# existing fields…
tx_enabled: bool = False
tx_max_gain_db: float | None = None
tx_max_duration_s: float | None = None
tx_allowed_freq_ranges: list[tuple[float, float]] | None = None
save() preserves existing 0600 perms.
A4. agent/cli.py
ria-agent register --allow-tx --tx-max-gain-db -10 --tx-max-duration 60— persist the interlock into config.ria-agent stream --allow-tx— runtime override (setscfg.tx_enabled=Truefor the life of the process without writing config).ria-agent detectunchanged.
A5. agent/hardware.py
def heartbeat_payload(status, app_id=None, *, cfg: AgentConfig, sessions: dict | None = None) -> dict:
caps = ["rx"]
if cfg.tx_enabled:
caps.append("tx")
payload = {
"type": "heartbeat",
"hardware": available_devices(),
"status": status,
"capabilities": caps,
"tx_enabled": cfg.tx_enabled,
}
if app_id:
payload["app_id"] = app_id
if sessions:
payload["sessions"] = sessions
return payload
A6. SDR layer
- Audit:
sdr/pluto.pytx_recording+_stream_txpaths already use_tx_lock(line 31, 323, 360). Double-check concurrent-with-RX behavior: theadi.PlutoPython object is not thread-safe for arbitrary attribute writes, so allset_tx_*/set_rx_*must go through the shared_param_lock(already present atsdr/sdr.py:44). Verifyrx()in a loop +_stream_txin another thread don't step on each other. - MockSDR already has
init_tx+_stream_tx(sdr/mock.py:70-100). No changes needed for mock-based tests. - Other TX-capable drivers (blade, usrp, hackrf): out of scope for v1; leave their
init_txas-is.
A7. Tests (tests/agent/)
test_streamer_tx.py—tx_start→ binary frames →_stream_txcallback pulls correct samples →tx_stopcleans up.test_tx_safety.py— cap violations (gain, duration, freq,tx_enabled=False) each producetx_status: errorand never open the SDR.test_tx_underrun.py— each policy (pause,zero,repeat) exercised against a fake slow producer.test_full_duplex.py— oneapp_idsendsstart+tx_start; both sessions share one MockSDR; both produce their expected frames; stopping one does not stop the other.test_ws_client_binary.py— binary frames now reach the binary handler.test_integration_tx.py— end-to-end against localwebsocketsserver + MockSDR.
A8. Docs
- Add a TX section to any existing agent protocol doc (or create
docs/agent_tx_protocol.md). - Include a regulatory disclaimer: the operator is responsible for transmissions. The agent is an enabler, not a policy layer beyond the interlocks.
Part B — ria-hub
Paths below are conceptual — confirm against the actual module layout in
ria-hubbefore editing. Anchor points reference the RX handoff at screens_agent_handoff.md §Part B.
B1. AgentTxSink (new)
Mirror of AgentDataSource. Location: controller/app/modules/screens/data_sinks.py (or wherever output sinks live in ria-hub).
Responsibilities:
prepare(radio_config)— sendtx_startvia Redis pub/sub onscreens:agent:{agent_id}:tx→ WS proxy → agent.write(buffer: np.ndarray | bytes)— convert to interleaved float32 bytes, send as binary over the WS. Awaits on WS backpressure.configure(partial_radio_config)— sendtx_configure.close()— sendtx_stop.- Subscribes to the agent's
tx_statusframes (via the same Redis pub/sub channel used for RX status today) and surfaces state back to the orchestrator. Anerrorstate aborts the Celery task.
B2. Refactor plutoTXoperator
The existing operator presumably calls radio.tx(...) against a directly-attached Pluto. Abstract the "output" into an injectable sink:
class PlutoTxOperator:
def __init__(self, sink: TxSink, ...):
self.sink = sink # AgentTxSink when dataSink.type == "agent", else LocalPlutoTxSink
def run(self, ...):
self.sink.prepare(self.radio_config)
while not stop:
buf = self._generate_next_buffer()
self.sink.write(buf)
self.sink.close()
The local path (existing direct-hardware behavior) becomes LocalPlutoTxSink, a thin wrapper around the current radio.tx calls. No behavior change for existing deployments.
build_data_sink() (to match build_data_source() from B1/B6) routes on dataSink.type.
B3. Manifest schema
Add dataSink alongside dataSource in the manifest. New type: "agent":
{
"dataSource": { "type": "agent", "device": "pluto", "agent_id": "agent-abc", "params": { "sample_rate": 1000000, "center_frequency": 2450000000, "gain": 40 } },
"dataSink": { "type": "agent", "device": "pluto", "agent_id": "agent-abc", "params": { "tx_sample_rate": 1000000, "tx_center_frequency": 2450000000, "tx_gain": -20, "underrun_policy": "pause" } }
}
Update Pydantic models + JSON schema validators in controller/app/modules/screens/graph_derivation.py (or equivalent). When dataSource.agent_id == dataSink.agent_id and both target pluto with the same identifier, the agent will naturally share one SDR handle — no special-casing needed on the hub side.
B4. WS endpoint extensions
/screens/agent/ws already exists. Add:
- Support for hub → agent binary frames (currently binary is agent → hub only). FastAPI's
WebSocket.send_bytesworks directly; just route binary from the Redis pub/sub channel through to the WS. - New Redis pub/sub channel
screens:agent:{agent_id}:txfor outbound TX control JSON + a separatescreens:agent:{agent_id}:tx_binfor outbound binary. (Two channels because many Redis brokers don't love mixing binary into text-keyed channels; if your deployment uses Redis 6+ withSUBSCRIBEthat handles bytes, one channel is fine.)
B5. Celery wiring
When dataSink.type == "agent", the Celery task that runs the TX-containing graph uses AgentTxSink instead of a local sink. The operator code (plutoTXoperator) is unchanged because the sink abstraction hides the difference.
Full-duplex: a single task with both dataSource.type == "agent" and dataSink.type == "agent" pointing at the same agent spawns both the RX consumer loop (existing AgentDataSource.next_chunk via BLPOP) and the TX producer loop (AgentTxSink.write). Both sides are wired up before any capture frames are sent.
B6. Capability gating
Before any control path sends tx_start:
agent = get_agent(agent_id)
if agent.last_heartbeat.age > 60: # stale
raise HTTPException(503, "agent not responding")
if "tx" not in agent.last_heartbeat.capabilities:
raise HTTPException(400, "agent has not opted in to transmission (tx_enabled=false)")
Surface clear errors to the Screens UI so the user knows it's an agent config issue, not an app config issue.
B7. Audit log
New MongoDB collection agent_tx_audit:
{
agent_id, app_id, user_id,
center_frequency_hz, tx_gain_db, duration_s, num_samples,
started_at, ended_at, terminal_status, // "done" | "error" | "underrun" | "cancelled"
error_message?
}
Write on every tx_start. Update on terminal tx_status. Index on {agent_id, started_at} for admin-view queries.
B8. Registration — no change needed
POST /screens/agents/register and ~/.ria/agent.json already cover credential storage. The TX interlock (tx_enabled, caps) is written by the agent operator via ria-agent register --allow-tx; the hub only reads the heartbeat to learn whether an agent will accept TX.
Part C — Screens (Vue 3 frontend)
C1. App composer
- Agent picker (exists from RX work) grows a "TX capable" filter toggle; hides agents whose heartbeat
capabilitieslacks"tx". - When the graph contains
plutoTXoperator(or any future TX operator):- Render a dataSink section mirroring dataSource.
- Fields: device, agent_id, identifier, tx_sample_rate, tx_center_frequency, tx_gain, underrun_policy.
- Validation: tx_center_frequency within radio band; tx_gain within agent-advertised max (read from heartbeat when available).
C2. Run-time UI
- Consent modal on "Start" for any app whose manifest contains a
dataSink.type: "agent":"This app will transmit on 2.450 GHz at -20 dB through agent lab-pluto-01. I confirm this transmission is permitted under my local radio regulations." Required checkbox, cannot be remembered across apps.
- TX status indicator in the running-app view: shows
armed/transmitting/underrunstate fromtx_statusframes. Red banner onunderrunorerror. - Stop TX button always visible during transmission; fires
tx_stopimmediately. Separate from "Stop app" (which also stops RX).
C3. Admin view
Extend the agents list from B8 of the RX handoff:
- Column: TX:
enabled/disabled/in-use by app X. - Agent detail page: show
tx_max_gain_db,tx_max_duration_s,tx_allowed_freq_ranges, and the last 10 rows fromagent_tx_auditfiltered to this agent.
Rollout order
- Part A §A1-A3, A7 — agent-side TX session + binary ingress + safety, all behind
--allow-tx. Mock-based tests. Shippable standalone; no consumer yet. - Part B §B1-B5 — hub sink + manifest + WS extension + Celery wiring. End-to-end test: Screens app with
plutoTXoperator+ agent sink → real Pluto in the lab → verify carrier on a spectrum analyzer. - Part B §B6-B7 — capability gating + audit log. Blocks general release, not lab use.
- Part C §C1 — composer UI for TX apps.
- Part C §C2-C3 — consent modal + admin view. Gate for first non-internal user.
Parts A + B can land on parallel branches and meet at step 2's integration test. Part C can start in parallel with B once the manifest shape in B3 is stable.
Test matrix (integration)
| Scenario | Expected |
|---|---|
| App with RX only, agent connected | RX as today (regression guard) |
App with TX only, agent tx_enabled=True |
TX starts, underrun → pause, stop cleans up |
| App with RX + TX same agent, same device | One Pluto handle serves both; independent gains/frequencies |
App with TX, agent tx_enabled=False |
Hub rejects at gate with 400; no WS traffic generated |
| App with TX, gain exceeds agent cap | tx_status: error; SDR never opened |
| Hub stops sending TX buffers mid-stream | underrun emitted after queue drains; agent paused cleanly |
| WS drops during TX | Agent cancels TX task, pauses hardware, reconnects; hub must re-issue tx_start |
| Agent process killed during TX | Hardware stops (existing close() already handles this; verify _tx_lock released) |
Open questions
- Waveform source: is
plutoTXoperatora real-time generator emitting on a clock, or does it synthesize a fixed recording and loop? If the latter, worth exposing a "bulk + loop" fast-path — hub sends the buffer once, agent loops it via existingtx_recording. Same protocol (tx_start+ one buffer +loop: true), much less WS traffic. - Multi-app-per-agent: out of scope for v1 (§Non-goals). When needed: add a session id to binary frames (4-byte prefix: magic + stream_id + reserved), bump a
protocol_versionin the heartbeat. - Streaming TX clock drift: if hub and agent sample clocks drift, repeating zeros on underrun is audible/visible in the spectrum. Longer term: agent-side resampling or PLL, both expensive. v1: rely on generous queue depth + stable local networks.
- Other TX-capable SDRs: HackRF, USRP, bladeRF. The
_CONFIG_ATTR_MAPinagent/streamer.py:169-175will need per-driver entries when those come online.
Regulatory note
Transmission is regulated in every jurisdiction. The agent-side interlocks (tx_enabled, caps, freq ranges) exist so the operator can configure safe defaults for an agent's physical location. They are not a substitute for licensing or for respecting local regulations. The hub's consent modal and audit log exist so actions are attributable. None of this is a legal compliance layer — it's a defense-in-depth mechanism.