feat(agent): agent-owned device discovery, identifiers, and udev install

Make `ria-agent stream` work with any SDR the agent has drivers for, with
no per-device config in the hub:

- heartbeat advertises rich `hardware` entries {device, identifier, label,
  connected} via hardware.detect_devices(); USRP is enumerated into concrete
  instances (uhd_find_devices), others advertise driver-only entries. The
  identifier is chosen to round-trip through parse_ident (None=auto-select or
  name=...), so a device address is never a bare value.
- ship udev rules (Pluto/RTL-SDR/HackRF/USRP B2x0/bladeRF) + `ria-agent
  install-udev` so USB radios open without sudo — one privileged step, all
  inside the toolkit.
- streamer surfaces a "run: sudo ria-agent install-udev" hint on USB
  permission errors instead of the cryptic UHD message.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
J jrhughes003 2026-06-05 10:03:14 -04:00
parent d38276a533
commit bf64604bcf
8 changed files with 365 additions and 5 deletions

View File

@ -78,6 +78,7 @@ packages = [
] ]
include = [ include = [
"**/*.so", # Required for Nuitkaification "**/*.so", # Required for Nuitkaification
"src/ria_toolkit_oss/agent/udev/*.rules", # Shipped SDR udev rules (ria-agent install-udev)
] ]
[build-system] [build-system]

View File

@ -205,6 +205,71 @@ def _cmd_stream(args: argparse.Namespace) -> int:
return 0 return 0
_UDEV_RULES_NAME = "90-ria-sdr.rules"
def _cmd_install_udev(args: argparse.Namespace) -> int:
"""Install the bundled SDR udev rules so USB radios open without sudo.
This is the one OS-level step needed for USB SDRs (B2x0 / RTL-SDR / HackRF /
bladeRF). It ships inside ria-toolkit-oss no separate tool to install but
writing to ``/etc/udev/rules.d`` and reloading udev requires root, so run it
once with sudo. Network radios (Pluto/USRP over IP) need nothing here.
"""
import os
import shutil
import subprocess
from importlib.resources import files
try:
src = files("ria_toolkit_oss.agent").joinpath("udev", _UDEV_RULES_NAME)
rules_text = src.read_text()
except Exception as e:
print(f"error: bundled udev rules not found: {e}", file=sys.stderr)
return 1
dest_dir = args.dest
dest = os.path.join(dest_dir, _UDEV_RULES_NAME)
if os.geteuid() != 0:
print(
"error: installing udev rules requires root.\n"
f" run once: sudo {os.path.basename(sys.argv[0])} install-udev",
file=sys.stderr,
)
return 1
try:
os.makedirs(dest_dir, exist_ok=True)
with open(dest, "w") as f:
f.write(rules_text)
print(f"Installed udev rules -> {dest}")
except OSError as e:
print(f"error: failed to write {dest}: {e}", file=sys.stderr)
return 1
if not args.no_reload and shutil.which("udevadm"):
for cmd in (["udevadm", "control", "--reload-rules"], ["udevadm", "trigger"]):
try:
subprocess.run(cmd, check=True)
except Exception as e:
print(f"warning: '{' '.join(cmd)}' failed: {e}", file=sys.stderr)
# Add the invoking (pre-sudo) user to the access group so group-based rules
# apply even without a local logind session.
target_user = os.environ.get("SUDO_USER") or ""
if target_user and shutil.which("usermod"):
try:
subprocess.run(["usermod", "-aG", args.group, target_user], check=True)
print(f"Added user '{target_user}' to group '{args.group}'.")
print(f"Log out and back in (or run 'newgrp {args.group}') for the group to take effect.")
except Exception as e:
print(f"warning: could not add '{target_user}' to '{args.group}': {e}", file=sys.stderr)
print("Done. Unplug and replug your USB SDR, then run `ria-agent stream`.")
return 0
def _derive_ws_url(hub_url: str, agent_id: str) -> str: def _derive_ws_url(hub_url: str, agent_id: str) -> str:
if not hub_url: if not hub_url:
return "" return ""
@ -231,6 +296,26 @@ def main() -> None:
sub.add_parser("run", help="Legacy long-poll agent (NodeAgent)") sub.add_parser("run", help="Legacy long-poll agent (NodeAgent)")
sub.add_parser("detect", help="List available SDR drivers") sub.add_parser("detect", help="List available SDR drivers")
p_udev = sub.add_parser(
"install-udev",
help="Install SDR udev rules so USB radios open without sudo (run once, with sudo)",
)
p_udev.add_argument(
"--dest",
default="/etc/udev/rules.d",
help="Directory to install the rules file into (default: /etc/udev/rules.d)",
)
p_udev.add_argument(
"--group",
default="plugdev",
help="Group granted device access; the invoking user is added to it (default: plugdev)",
)
p_udev.add_argument(
"--no-reload",
action="store_true",
help="Skip 'udevadm control --reload-rules' / 'udevadm trigger'",
)
p_reg = sub.add_parser("register", help="Register agent with RIA Hub and save credentials") p_reg = sub.add_parser("register", help="Register agent with RIA Hub and save credentials")
p_reg.add_argument( p_reg.add_argument(
"--hub", "--hub",
@ -308,6 +393,8 @@ def main() -> None:
return return
if args.command == "detect": if args.command == "detect":
sys.exit(_cmd_detect(args)) sys.exit(_cmd_detect(args))
if args.command == "install-udev":
sys.exit(_cmd_install_udev(args))
if args.command == "register": if args.command == "register":
sys.exit(_cmd_register(args)) sys.exit(_cmd_register(args))
if args.command == "stream": if args.command == "stream":

View File

@ -1,17 +1,164 @@
"""Hardware detection and heartbeat payload construction for the streamer.""" """Hardware detection and heartbeat payload construction for the streamer.
The heartbeat advertises a ``hardware`` list the hub uses to populate the
radio-device picker. Each entry is a dict::
{"device": "usrp", "identifier": "name=MyB210",
"label": "Ettus USRP B210 (35D7CAD)", "connected": True}
- ``device`` driver/device-type name (``"usrp"``, ``"pluto"``, ).
- ``identifier`` the exact addressing string this driver wants, or ``None``
to let the driver auto-select the sole device of its type.
The hub forwards this verbatim in ``radio_config`` so the
identifier is always agent-owned never derived from the
composer graph (which is what used to leak a Pluto IP into a
USRP open). It must round-trip through
``ria_toolkit_oss_cli.common.parse_ident``: a bare value is
read as an IP address, so non-network devices use ``None`` or
``name=<value>``.
- ``label`` human-friendly text for the hub dropdown.
- ``connected`` ``True`` when the device was physically enumerated,
``False`` when only the driver is importable (no hardware
probed/found), ``None`` when presence is unknown.
The hub tolerates plain string entries from older agents (see
``_agent_device_names`` / ``hwName``), so this richer shape is backward
compatible.
"""
from __future__ import annotations from __future__ import annotations
import logging
import time
from ria_toolkit_oss.sdr import detect_available from ria_toolkit_oss.sdr import detect_available
from .config import AgentConfig from .config import AgentConfig
logger = logging.getLogger(__name__)
# Human-friendly names for the hub dropdown, keyed by device-type name.
DEVICE_LABELS: dict[str, str] = {
"usrp": "Ettus USRP (UHD)",
"pluto": "ADALM-Pluto",
"rtlsdr": "RTL-SDR",
"hackrf": "HackRF One",
"blade": "BladeRF",
"thinkrf": "ThinkRF (RTSA)",
"mock": "Mock SDR (synthetic)",
}
# Enumeration can shell out (e.g. ``uhd_find_devices``), so cache results for a
# short window rather than re-probing on every ~30s heartbeat. Hot-plug shows up
# within one TTL.
_PROBE_TTL_S = 60.0
_probe_cache: tuple[float, list[dict]] | None = None
def available_devices() -> list[str]: def available_devices() -> list[str]:
"""Return a sorted list of device names whose driver modules import cleanly.""" """Return a sorted list of device names whose driver modules import cleanly."""
return sorted(detect_available().keys()) return sorted(detect_available().keys())
def _label_for(device: str, suffix: str = "") -> str:
base = DEVICE_LABELS.get(device, device)
return f"{base} ({suffix})" if suffix else base
def _enumerate_usrp() -> list[dict] | None:
"""Probe for connected USRPs via ``uhd_find_devices``.
Returns a list of concrete device entries (``connected=True``), an empty
list when UHD ran but found nothing, or ``None`` when probing is not
possible (UHD/driver unavailable) so the caller can fall back to a
driver-only entry.
"""
try:
from ria_toolkit_oss.sdr.usrp import _parse_uhd_find_devices
except Exception:
return None
try:
found = _parse_uhd_find_devices() or []
except Exception as exc:
logger.debug("USRP enumeration failed: %s", exc)
return None
entries: list[dict] = []
for dev in found:
serial = dev.get("serial") or ""
name = dev.get("name") or ""
product = dev.get("product") or dev.get("type") or "USRP"
# parse_ident only round-trips IP (bare) or ``name=`` — UHD has no
# serial= addressing here, so prefer name; otherwise auto-select (None).
identifier = f"name={name}" if name else None
suffix = serial or name or product
entries.append(
{
"device": "usrp",
"identifier": identifier,
"label": _label_for("usrp", suffix),
"connected": True,
}
)
return entries
# Device types we can cheaply enumerate into concrete instances. Anything not
# listed is advertised as a single driver-only entry (presence unknown).
_PROBERS = {
"usrp": _enumerate_usrp,
}
def _detect_devices_uncached() -> list[dict]:
out: list[dict] = []
for device in available_devices():
prober = _PROBERS.get(device)
if prober is not None:
probed = prober()
if probed: # one or more concrete instances found
out.extend(probed)
continue
if probed == []: # prober ran but found no hardware
out.append(
{
"device": device,
"identifier": None,
"label": _label_for(device),
"connected": False,
}
)
continue
# probed is None — couldn't probe; fall through to unknown entry.
out.append(
{
"device": device,
"identifier": None,
"label": _label_for(device),
"connected": None,
}
)
return out
def detect_devices(*, use_cache: bool = True) -> list[dict]:
"""Return enriched ``hardware`` entries for the heartbeat.
Results are cached for ``_PROBE_TTL_S`` seconds because enumeration may shell
out to hardware tools. Pass ``use_cache=False`` to force a fresh probe.
"""
global _probe_cache
now = time.monotonic()
if use_cache and _probe_cache is not None:
ts, cached = _probe_cache
if now - ts < _PROBE_TTL_S:
return cached
devices = _detect_devices_uncached()
_probe_cache = (now, devices)
return devices
def heartbeat_payload( def heartbeat_payload(
status: str = "idle", status: str = "idle",
app_id: str | None = None, app_id: str | None = None,
@ -32,7 +179,7 @@ def heartbeat_payload(
payload: dict = { payload: dict = {
"type": "heartbeat", "type": "heartbeat",
"hardware": available_devices(), "hardware": detect_devices(),
"status": status, "status": status,
"capabilities": capabilities, "capabilities": capabilities,
"tx_enabled": bool(c.tx_enabled), "tx_enabled": bool(c.tx_enabled),

View File

@ -254,7 +254,7 @@ class Streamer:
_apply_sdr_config(sdr, radio_config) _apply_sdr_config(sdr, radio_config)
except Exception as exc: except Exception as exc:
logger.exception("Failed to open SDR %r", device) logger.exception("Failed to open SDR %r", device)
await self._send_error(app_id, f"SDR init failed: {exc}") await self._send_error(app_id, f"SDR init failed: {_friendly_sdr_error(device, exc)}")
return return
# Inherit any pending config that was queued before start. # Inherit any pending config that was queued before start.
@ -417,7 +417,7 @@ class Streamer:
except Exception: except Exception:
pass pass
logger.exception("Failed to init TX on %r", device) logger.exception("Failed to init TX on %r", device)
await self._send_tx_status(app_id, "error", f"tx init failed: {exc}") await self._send_tx_status(app_id, "error", f"tx init failed: {_friendly_sdr_error(device, exc)}")
return return
self._loop = asyncio.get_running_loop() self._loop = asyncio.get_running_loop()
@ -732,6 +732,25 @@ def _default_sdr_factory(device: str, identifier: str | None):
return get_sdr_device(device, ident=identifier) return get_sdr_device(device, ident=identifier)
def _friendly_sdr_error(device: str, exc: Exception) -> str:
"""Add an actionable hint when an SDR open fails on USB permissions.
UHD/libusb surface this as 'insufficient permissions' / EACCES, which is
cryptic to operators. Point them at the one-time fix that ships with the
toolkit instead of leaving them to discover udev rules on their own.
"""
text = str(exc).lower()
permission_markers = ("insufficient permissions", "permission denied", "eacces", "access denied")
is_perm = isinstance(exc, PermissionError) or any(m in text for m in permission_markers)
if is_perm:
return (
f"{exc}\n"
f"USB permission denied opening '{device}'. Run this once, then replug the device:\n"
f" sudo ria-agent install-udev"
)
return str(exc)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Top-level entry # Top-level entry

View File

@ -0,0 +1,34 @@
# RIA Toolkit SDR udev rules
#
# Grants non-root access to the USB SDRs ria-agent can drive, so `ria-agent
# stream` can open them without sudo. Installed by `ria-agent install-udev`.
#
# Access is granted two ways for portability:
# - GROUP="plugdev", MODE="0660" — classic group-based access.
# - TAG+="uaccess" — systemd-logind grants the active local
# session user access dynamically.
# A user in `plugdev` (or logged in locally) can open the device after replug.
# ADALM-Pluto (Analog Devices)
SUBSYSTEM=="usb", ATTRS{idVendor}=="0456", ATTRS{idProduct}=="b673", MODE="0660", GROUP="plugdev", TAG+="uaccess"
# RTL-SDR (Realtek RTL2832U)
SUBSYSTEM=="usb", ATTRS{idVendor}=="0bda", ATTRS{idProduct}=="2832", MODE="0660", GROUP="plugdev", TAG+="uaccess"
SUBSYSTEM=="usb", ATTRS{idVendor}=="0bda", ATTRS{idProduct}=="2838", MODE="0660", GROUP="plugdev", TAG+="uaccess"
# HackRF (Great Scott Gadgets)
SUBSYSTEM=="usb", ATTRS{idVendor}=="1d50", ATTRS{idProduct}=="6089", MODE="0660", GROUP="plugdev", TAG+="uaccess"
SUBSYSTEM=="usb", ATTRS{idVendor}=="1d50", ATTRS{idProduct}=="604b", MODE="0660", GROUP="plugdev", TAG+="uaccess"
SUBSYSTEM=="usb", ATTRS{idVendor}=="1d50", ATTRS{idProduct}=="cc15", MODE="0660", GROUP="plugdev", TAG+="uaccess"
# Ettus USRP B2x0 (UHD)
SUBSYSTEM=="usb", ATTRS{idVendor}=="2500", ATTRS{idProduct}=="0020", MODE="0660", GROUP="plugdev", TAG+="uaccess"
SUBSYSTEM=="usb", ATTRS{idVendor}=="2500", ATTRS{idProduct}=="0021", MODE="0660", GROUP="plugdev", TAG+="uaccess"
SUBSYSTEM=="usb", ATTRS{idVendor}=="2500", ATTRS{idProduct}=="0022", MODE="0660", GROUP="plugdev", TAG+="uaccess"
# USRP B2x0 in bootloader / uninitialized (Cypress FX3 / legacy Ettus VID)
SUBSYSTEM=="usb", ATTRS{idVendor}=="fffe", ATTRS{idProduct}=="0002", MODE="0660", GROUP="plugdev", TAG+="uaccess"
SUBSYSTEM=="usb", ATTRS{idVendor}=="04b4", ATTRS{idProduct}=="00f3", MODE="0660", GROUP="plugdev", TAG+="uaccess"
# Nuand bladeRF
SUBSYSTEM=="usb", ATTRS{idVendor}=="2cf0", ATTRS{idProduct}=="5246", MODE="0660", GROUP="plugdev", TAG+="uaccess"
SUBSYSTEM=="usb", ATTRS{idVendor}=="2cf0", ATTRS{idProduct}=="5250", MODE="0660", GROUP="plugdev", TAG+="uaccess"

View File

@ -0,0 +1,36 @@
"""Tests for `ria-agent install-udev` and the bundled udev rules."""
from __future__ import annotations
from importlib.resources import files
from unittest.mock import patch
from ria_toolkit_oss.agent import cli as agent_cli
def test_bundled_udev_rules_present_and_cover_usb_sdrs():
text = files("ria_toolkit_oss.agent").joinpath("udev", "90-ria-sdr.rules").read_text()
# ADALM-Pluto, RTL-SDR, HackRF, and USRP B2x0 VIDs must be covered.
for vid in ("0456", "0bda", "1d50", "2500"):
assert vid in text
def test_install_udev_requires_root(capsys):
args = agent_cli.argparse.Namespace(dest="/etc/udev/rules.d", group="plugdev", no_reload=True)
with patch("os.geteuid", return_value=1000):
rc = agent_cli._cmd_install_udev(args)
assert rc == 1
err = capsys.readouterr().err
assert "requires root" in err
assert "install-udev" in err
def test_install_udev_writes_rules_when_root(tmp_path, monkeypatch, capsys):
args = agent_cli.argparse.Namespace(dest=str(tmp_path), group="plugdev", no_reload=True)
# No SUDO_USER and --no-reload → no subprocess calls; just the file write.
monkeypatch.delenv("SUDO_USER", raising=False)
with patch("os.geteuid", return_value=0):
rc = agent_cli._cmd_install_udev(args)
assert rc == 0
written = (tmp_path / "90-ria-sdr.rules").read_text()
assert "SUBSYSTEM" in written

View File

@ -17,11 +17,16 @@ def test_available_devices_sorted_list():
assert "mock" in devices assert "mock" in devices
def _device_names(hardware_list):
return {e["device"] for e in hardware_list}
def test_heartbeat_payload_shape(): def test_heartbeat_payload_shape():
p = hardware.heartbeat_payload() p = hardware.heartbeat_payload()
assert p["type"] == "heartbeat" assert p["type"] == "heartbeat"
assert p["status"] == "idle" assert p["status"] == "idle"
assert "mock" in p["hardware"] # hardware is now a list of rich dict entries.
assert "mock" in _device_names(p["hardware"])
assert "app_id" not in p assert "app_id" not in p
# New fields, default shape # New fields, default shape
assert p["capabilities"] == ["rx"] assert p["capabilities"] == ["rx"]
@ -32,6 +37,24 @@ def test_heartbeat_payload_shape():
assert p2["app_id"] == "abc" assert p2["app_id"] == "abc"
def test_detect_devices_entry_shape():
devices = hardware.detect_devices(use_cache=False)
assert isinstance(devices, list)
for entry in devices:
assert set(entry) >= {"device", "identifier", "label", "connected"}
assert isinstance(entry["device"], str)
# identifier round-trips through parse_ident: None or a string.
assert entry["identifier"] is None or isinstance(entry["identifier"], str)
mock = next(e for e in devices if e["device"] == "mock")
assert mock["label"] # has a human label
def test_detect_devices_cache():
a = hardware.detect_devices(use_cache=False)
b = hardware.detect_devices(use_cache=True)
assert _device_names(a) == _device_names(b)
def test_heartbeat_payload_tx_capability_from_cfg(): def test_heartbeat_payload_tx_capability_from_cfg():
from ria_toolkit_oss.agent.config import AgentConfig from ria_toolkit_oss.agent.config import AgentConfig

View File

@ -9,11 +9,24 @@ import numpy as np
from ria_toolkit_oss.agent.streamer import ( from ria_toolkit_oss.agent.streamer import (
Streamer, Streamer,
_apply_sdr_config, _apply_sdr_config,
_friendly_sdr_error,
_samples_to_interleaved_float32, _samples_to_interleaved_float32,
) )
from ria_toolkit_oss.sdr.mock import MockSDR from ria_toolkit_oss.sdr.mock import MockSDR
def test_friendly_sdr_error_adds_udev_hint_on_permission():
msg = _friendly_sdr_error("usrp", RuntimeError("USB open failed: insufficient permissions."))
assert "install-udev" in msg
assert "insufficient permissions" in msg
def test_friendly_sdr_error_passes_through_other_errors():
msg = _friendly_sdr_error("usrp", RuntimeError("No USRP device found for identifier 'name=x'"))
assert "install-udev" not in msg
assert "No USRP device found" in msg
class FakeWs: class FakeWs:
def __init__(self): def __init__(self):
self.json_sent: list[dict] = [] self.json_sent: list[dict] = []