From bf64604bcf8a62ea840b2d8d792c697da0662260 Mon Sep 17 00:00:00 2001 From: jrhughes003 Date: Fri, 5 Jun 2026 10:03:14 -0400 Subject: [PATCH] feat(agent): agent-owned device discovery, identifiers, and udev install MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Make `ria-agent stream` work with any SDR the agent has drivers for, with no per-device config in the hub: - heartbeat advertises rich `hardware` entries {device, identifier, label, connected} via hardware.detect_devices(); USRP is enumerated into concrete instances (uhd_find_devices), others advertise driver-only entries. The identifier is chosen to round-trip through parse_ident (None=auto-select or name=...), so a device address is never a bare value. - ship udev rules (Pluto/RTL-SDR/HackRF/USRP B2x0/bladeRF) + `ria-agent install-udev` so USB radios open without sudo — one privileged step, all inside the toolkit. - streamer surfaces a "run: sudo ria-agent install-udev" hint on USB permission errors instead of the cryptic UHD message. Co-Authored-By: Claude Opus 4.8 (1M context) --- pyproject.toml | 1 + src/ria_toolkit_oss/agent/cli.py | 87 ++++++++++ src/ria_toolkit_oss/agent/hardware.py | 151 +++++++++++++++++- src/ria_toolkit_oss/agent/streamer.py | 23 ++- .../agent/udev/90-ria-sdr.rules | 34 ++++ tests/agent/test_cli_install_udev.py | 36 +++++ tests/agent/test_hardware.py | 25 ++- tests/agent/test_streamer.py | 13 ++ 8 files changed, 365 insertions(+), 5 deletions(-) create mode 100644 src/ria_toolkit_oss/agent/udev/90-ria-sdr.rules create mode 100644 tests/agent/test_cli_install_udev.py diff --git a/pyproject.toml b/pyproject.toml index 678be00..1764ca6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -78,6 +78,7 @@ packages = [ ] include = [ "**/*.so", # Required for Nuitkaification + "src/ria_toolkit_oss/agent/udev/*.rules", # Shipped SDR udev rules (ria-agent install-udev) ] [build-system] diff --git a/src/ria_toolkit_oss/agent/cli.py b/src/ria_toolkit_oss/agent/cli.py index 0d73761..9ba7635 100644 --- a/src/ria_toolkit_oss/agent/cli.py +++ b/src/ria_toolkit_oss/agent/cli.py @@ -205,6 +205,71 @@ def _cmd_stream(args: argparse.Namespace) -> int: return 0 +_UDEV_RULES_NAME = "90-ria-sdr.rules" + + +def _cmd_install_udev(args: argparse.Namespace) -> int: + """Install the bundled SDR udev rules so USB radios open without sudo. + + This is the one OS-level step needed for USB SDRs (B2x0 / RTL-SDR / HackRF / + bladeRF). It ships inside ria-toolkit-oss — no separate tool to install — but + writing to ``/etc/udev/rules.d`` and reloading udev requires root, so run it + once with sudo. Network radios (Pluto/USRP over IP) need nothing here. + """ + import os + import shutil + import subprocess + from importlib.resources import files + + try: + src = files("ria_toolkit_oss.agent").joinpath("udev", _UDEV_RULES_NAME) + rules_text = src.read_text() + except Exception as e: + print(f"error: bundled udev rules not found: {e}", file=sys.stderr) + return 1 + + dest_dir = args.dest + dest = os.path.join(dest_dir, _UDEV_RULES_NAME) + + if os.geteuid() != 0: + print( + "error: installing udev rules requires root.\n" + f" run once: sudo {os.path.basename(sys.argv[0])} install-udev", + file=sys.stderr, + ) + return 1 + + try: + os.makedirs(dest_dir, exist_ok=True) + with open(dest, "w") as f: + f.write(rules_text) + print(f"Installed udev rules -> {dest}") + except OSError as e: + print(f"error: failed to write {dest}: {e}", file=sys.stderr) + return 1 + + if not args.no_reload and shutil.which("udevadm"): + for cmd in (["udevadm", "control", "--reload-rules"], ["udevadm", "trigger"]): + try: + subprocess.run(cmd, check=True) + except Exception as e: + print(f"warning: '{' '.join(cmd)}' failed: {e}", file=sys.stderr) + + # Add the invoking (pre-sudo) user to the access group so group-based rules + # apply even without a local logind session. + target_user = os.environ.get("SUDO_USER") or "" + if target_user and shutil.which("usermod"): + try: + subprocess.run(["usermod", "-aG", args.group, target_user], check=True) + print(f"Added user '{target_user}' to group '{args.group}'.") + print(f"Log out and back in (or run 'newgrp {args.group}') for the group to take effect.") + except Exception as e: + print(f"warning: could not add '{target_user}' to '{args.group}': {e}", file=sys.stderr) + + print("Done. Unplug and replug your USB SDR, then run `ria-agent stream`.") + return 0 + + def _derive_ws_url(hub_url: str, agent_id: str) -> str: if not hub_url: return "" @@ -231,6 +296,26 @@ def main() -> None: sub.add_parser("run", help="Legacy long-poll agent (NodeAgent)") sub.add_parser("detect", help="List available SDR drivers") + p_udev = sub.add_parser( + "install-udev", + help="Install SDR udev rules so USB radios open without sudo (run once, with sudo)", + ) + p_udev.add_argument( + "--dest", + default="/etc/udev/rules.d", + help="Directory to install the rules file into (default: /etc/udev/rules.d)", + ) + p_udev.add_argument( + "--group", + default="plugdev", + help="Group granted device access; the invoking user is added to it (default: plugdev)", + ) + p_udev.add_argument( + "--no-reload", + action="store_true", + help="Skip 'udevadm control --reload-rules' / 'udevadm trigger'", + ) + p_reg = sub.add_parser("register", help="Register agent with RIA Hub and save credentials") p_reg.add_argument( "--hub", @@ -308,6 +393,8 @@ def main() -> None: return if args.command == "detect": sys.exit(_cmd_detect(args)) + if args.command == "install-udev": + sys.exit(_cmd_install_udev(args)) if args.command == "register": sys.exit(_cmd_register(args)) if args.command == "stream": diff --git a/src/ria_toolkit_oss/agent/hardware.py b/src/ria_toolkit_oss/agent/hardware.py index 98b4683..d4c1414 100644 --- a/src/ria_toolkit_oss/agent/hardware.py +++ b/src/ria_toolkit_oss/agent/hardware.py @@ -1,17 +1,164 @@ -"""Hardware detection and heartbeat payload construction for the streamer.""" +"""Hardware detection and heartbeat payload construction for the streamer. + +The heartbeat advertises a ``hardware`` list the hub uses to populate the +radio-device picker. Each entry is a dict:: + + {"device": "usrp", "identifier": "name=MyB210", + "label": "Ettus USRP B210 (35D7CAD)", "connected": True} + +- ``device`` — driver/device-type name (``"usrp"``, ``"pluto"``, …). +- ``identifier`` — the exact addressing string this driver wants, or ``None`` + to let the driver auto-select the sole device of its type. + The hub forwards this verbatim in ``radio_config`` so the + identifier is always agent-owned — never derived from the + composer graph (which is what used to leak a Pluto IP into a + USRP open). It must round-trip through + ``ria_toolkit_oss_cli.common.parse_ident``: a bare value is + read as an IP address, so non-network devices use ``None`` or + ``name=``. +- ``label`` — human-friendly text for the hub dropdown. +- ``connected`` — ``True`` when the device was physically enumerated, + ``False`` when only the driver is importable (no hardware + probed/found), ``None`` when presence is unknown. + +The hub tolerates plain string entries from older agents (see +``_agent_device_names`` / ``hwName``), so this richer shape is backward +compatible. +""" from __future__ import annotations +import logging +import time + from ria_toolkit_oss.sdr import detect_available from .config import AgentConfig +logger = logging.getLogger(__name__) + +# Human-friendly names for the hub dropdown, keyed by device-type name. +DEVICE_LABELS: dict[str, str] = { + "usrp": "Ettus USRP (UHD)", + "pluto": "ADALM-Pluto", + "rtlsdr": "RTL-SDR", + "hackrf": "HackRF One", + "blade": "BladeRF", + "thinkrf": "ThinkRF (RTSA)", + "mock": "Mock SDR (synthetic)", +} + +# Enumeration can shell out (e.g. ``uhd_find_devices``), so cache results for a +# short window rather than re-probing on every ~30s heartbeat. Hot-plug shows up +# within one TTL. +_PROBE_TTL_S = 60.0 +_probe_cache: tuple[float, list[dict]] | None = None + def available_devices() -> list[str]: """Return a sorted list of device names whose driver modules import cleanly.""" return sorted(detect_available().keys()) +def _label_for(device: str, suffix: str = "") -> str: + base = DEVICE_LABELS.get(device, device) + return f"{base} ({suffix})" if suffix else base + + +def _enumerate_usrp() -> list[dict] | None: + """Probe for connected USRPs via ``uhd_find_devices``. + + Returns a list of concrete device entries (``connected=True``), an empty + list when UHD ran but found nothing, or ``None`` when probing is not + possible (UHD/driver unavailable) so the caller can fall back to a + driver-only entry. + """ + try: + from ria_toolkit_oss.sdr.usrp import _parse_uhd_find_devices + except Exception: + return None + + try: + found = _parse_uhd_find_devices() or [] + except Exception as exc: + logger.debug("USRP enumeration failed: %s", exc) + return None + + entries: list[dict] = [] + for dev in found: + serial = dev.get("serial") or "" + name = dev.get("name") or "" + product = dev.get("product") or dev.get("type") or "USRP" + # parse_ident only round-trips IP (bare) or ``name=`` — UHD has no + # serial= addressing here, so prefer name; otherwise auto-select (None). + identifier = f"name={name}" if name else None + suffix = serial or name or product + entries.append( + { + "device": "usrp", + "identifier": identifier, + "label": _label_for("usrp", suffix), + "connected": True, + } + ) + return entries + + +# Device types we can cheaply enumerate into concrete instances. Anything not +# listed is advertised as a single driver-only entry (presence unknown). +_PROBERS = { + "usrp": _enumerate_usrp, +} + + +def _detect_devices_uncached() -> list[dict]: + out: list[dict] = [] + for device in available_devices(): + prober = _PROBERS.get(device) + if prober is not None: + probed = prober() + if probed: # one or more concrete instances found + out.extend(probed) + continue + if probed == []: # prober ran but found no hardware + out.append( + { + "device": device, + "identifier": None, + "label": _label_for(device), + "connected": False, + } + ) + continue + # probed is None — couldn't probe; fall through to unknown entry. + out.append( + { + "device": device, + "identifier": None, + "label": _label_for(device), + "connected": None, + } + ) + return out + + +def detect_devices(*, use_cache: bool = True) -> list[dict]: + """Return enriched ``hardware`` entries for the heartbeat. + + Results are cached for ``_PROBE_TTL_S`` seconds because enumeration may shell + out to hardware tools. Pass ``use_cache=False`` to force a fresh probe. + """ + global _probe_cache + now = time.monotonic() + if use_cache and _probe_cache is not None: + ts, cached = _probe_cache + if now - ts < _PROBE_TTL_S: + return cached + devices = _detect_devices_uncached() + _probe_cache = (now, devices) + return devices + + def heartbeat_payload( status: str = "idle", app_id: str | None = None, @@ -32,7 +179,7 @@ def heartbeat_payload( payload: dict = { "type": "heartbeat", - "hardware": available_devices(), + "hardware": detect_devices(), "status": status, "capabilities": capabilities, "tx_enabled": bool(c.tx_enabled), diff --git a/src/ria_toolkit_oss/agent/streamer.py b/src/ria_toolkit_oss/agent/streamer.py index 6f727ff..1e82a5f 100644 --- a/src/ria_toolkit_oss/agent/streamer.py +++ b/src/ria_toolkit_oss/agent/streamer.py @@ -254,7 +254,7 @@ class Streamer: _apply_sdr_config(sdr, radio_config) except Exception as exc: logger.exception("Failed to open SDR %r", device) - await self._send_error(app_id, f"SDR init failed: {exc}") + await self._send_error(app_id, f"SDR init failed: {_friendly_sdr_error(device, exc)}") return # Inherit any pending config that was queued before start. @@ -417,7 +417,7 @@ class Streamer: except Exception: pass logger.exception("Failed to init TX on %r", device) - await self._send_tx_status(app_id, "error", f"tx init failed: {exc}") + await self._send_tx_status(app_id, "error", f"tx init failed: {_friendly_sdr_error(device, exc)}") return self._loop = asyncio.get_running_loop() @@ -732,6 +732,25 @@ def _default_sdr_factory(device: str, identifier: str | None): return get_sdr_device(device, ident=identifier) +def _friendly_sdr_error(device: str, exc: Exception) -> str: + """Add an actionable hint when an SDR open fails on USB permissions. + + UHD/libusb surface this as 'insufficient permissions' / EACCES, which is + cryptic to operators. Point them at the one-time fix that ships with the + toolkit instead of leaving them to discover udev rules on their own. + """ + text = str(exc).lower() + permission_markers = ("insufficient permissions", "permission denied", "eacces", "access denied") + is_perm = isinstance(exc, PermissionError) or any(m in text for m in permission_markers) + if is_perm: + return ( + f"{exc}\n" + f"USB permission denied opening '{device}'. Run this once, then replug the device:\n" + f" sudo ria-agent install-udev" + ) + return str(exc) + + # --------------------------------------------------------------------------- # Top-level entry diff --git a/src/ria_toolkit_oss/agent/udev/90-ria-sdr.rules b/src/ria_toolkit_oss/agent/udev/90-ria-sdr.rules new file mode 100644 index 0000000..05dee67 --- /dev/null +++ b/src/ria_toolkit_oss/agent/udev/90-ria-sdr.rules @@ -0,0 +1,34 @@ +# RIA Toolkit SDR udev rules +# +# Grants non-root access to the USB SDRs ria-agent can drive, so `ria-agent +# stream` can open them without sudo. Installed by `ria-agent install-udev`. +# +# Access is granted two ways for portability: +# - GROUP="plugdev", MODE="0660" — classic group-based access. +# - TAG+="uaccess" — systemd-logind grants the active local +# session user access dynamically. +# A user in `plugdev` (or logged in locally) can open the device after replug. + +# ADALM-Pluto (Analog Devices) +SUBSYSTEM=="usb", ATTRS{idVendor}=="0456", ATTRS{idProduct}=="b673", MODE="0660", GROUP="plugdev", TAG+="uaccess" + +# RTL-SDR (Realtek RTL2832U) +SUBSYSTEM=="usb", ATTRS{idVendor}=="0bda", ATTRS{idProduct}=="2832", MODE="0660", GROUP="plugdev", TAG+="uaccess" +SUBSYSTEM=="usb", ATTRS{idVendor}=="0bda", ATTRS{idProduct}=="2838", MODE="0660", GROUP="plugdev", TAG+="uaccess" + +# HackRF (Great Scott Gadgets) +SUBSYSTEM=="usb", ATTRS{idVendor}=="1d50", ATTRS{idProduct}=="6089", MODE="0660", GROUP="plugdev", TAG+="uaccess" +SUBSYSTEM=="usb", ATTRS{idVendor}=="1d50", ATTRS{idProduct}=="604b", MODE="0660", GROUP="plugdev", TAG+="uaccess" +SUBSYSTEM=="usb", ATTRS{idVendor}=="1d50", ATTRS{idProduct}=="cc15", MODE="0660", GROUP="plugdev", TAG+="uaccess" + +# Ettus USRP B2x0 (UHD) +SUBSYSTEM=="usb", ATTRS{idVendor}=="2500", ATTRS{idProduct}=="0020", MODE="0660", GROUP="plugdev", TAG+="uaccess" +SUBSYSTEM=="usb", ATTRS{idVendor}=="2500", ATTRS{idProduct}=="0021", MODE="0660", GROUP="plugdev", TAG+="uaccess" +SUBSYSTEM=="usb", ATTRS{idVendor}=="2500", ATTRS{idProduct}=="0022", MODE="0660", GROUP="plugdev", TAG+="uaccess" +# USRP B2x0 in bootloader / uninitialized (Cypress FX3 / legacy Ettus VID) +SUBSYSTEM=="usb", ATTRS{idVendor}=="fffe", ATTRS{idProduct}=="0002", MODE="0660", GROUP="plugdev", TAG+="uaccess" +SUBSYSTEM=="usb", ATTRS{idVendor}=="04b4", ATTRS{idProduct}=="00f3", MODE="0660", GROUP="plugdev", TAG+="uaccess" + +# Nuand bladeRF +SUBSYSTEM=="usb", ATTRS{idVendor}=="2cf0", ATTRS{idProduct}=="5246", MODE="0660", GROUP="plugdev", TAG+="uaccess" +SUBSYSTEM=="usb", ATTRS{idVendor}=="2cf0", ATTRS{idProduct}=="5250", MODE="0660", GROUP="plugdev", TAG+="uaccess" diff --git a/tests/agent/test_cli_install_udev.py b/tests/agent/test_cli_install_udev.py new file mode 100644 index 0000000..5f54bd6 --- /dev/null +++ b/tests/agent/test_cli_install_udev.py @@ -0,0 +1,36 @@ +"""Tests for `ria-agent install-udev` and the bundled udev rules.""" + +from __future__ import annotations + +from importlib.resources import files +from unittest.mock import patch + +from ria_toolkit_oss.agent import cli as agent_cli + + +def test_bundled_udev_rules_present_and_cover_usb_sdrs(): + text = files("ria_toolkit_oss.agent").joinpath("udev", "90-ria-sdr.rules").read_text() + # ADALM-Pluto, RTL-SDR, HackRF, and USRP B2x0 VIDs must be covered. + for vid in ("0456", "0bda", "1d50", "2500"): + assert vid in text + + +def test_install_udev_requires_root(capsys): + args = agent_cli.argparse.Namespace(dest="/etc/udev/rules.d", group="plugdev", no_reload=True) + with patch("os.geteuid", return_value=1000): + rc = agent_cli._cmd_install_udev(args) + assert rc == 1 + err = capsys.readouterr().err + assert "requires root" in err + assert "install-udev" in err + + +def test_install_udev_writes_rules_when_root(tmp_path, monkeypatch, capsys): + args = agent_cli.argparse.Namespace(dest=str(tmp_path), group="plugdev", no_reload=True) + # No SUDO_USER and --no-reload → no subprocess calls; just the file write. + monkeypatch.delenv("SUDO_USER", raising=False) + with patch("os.geteuid", return_value=0): + rc = agent_cli._cmd_install_udev(args) + assert rc == 0 + written = (tmp_path / "90-ria-sdr.rules").read_text() + assert "SUBSYSTEM" in written diff --git a/tests/agent/test_hardware.py b/tests/agent/test_hardware.py index 6a9cdf3..9e2af07 100644 --- a/tests/agent/test_hardware.py +++ b/tests/agent/test_hardware.py @@ -17,11 +17,16 @@ def test_available_devices_sorted_list(): assert "mock" in devices +def _device_names(hardware_list): + return {e["device"] for e in hardware_list} + + def test_heartbeat_payload_shape(): p = hardware.heartbeat_payload() assert p["type"] == "heartbeat" assert p["status"] == "idle" - assert "mock" in p["hardware"] + # hardware is now a list of rich dict entries. + assert "mock" in _device_names(p["hardware"]) assert "app_id" not in p # New fields, default shape assert p["capabilities"] == ["rx"] @@ -32,6 +37,24 @@ def test_heartbeat_payload_shape(): assert p2["app_id"] == "abc" +def test_detect_devices_entry_shape(): + devices = hardware.detect_devices(use_cache=False) + assert isinstance(devices, list) + for entry in devices: + assert set(entry) >= {"device", "identifier", "label", "connected"} + assert isinstance(entry["device"], str) + # identifier round-trips through parse_ident: None or a string. + assert entry["identifier"] is None or isinstance(entry["identifier"], str) + mock = next(e for e in devices if e["device"] == "mock") + assert mock["label"] # has a human label + + +def test_detect_devices_cache(): + a = hardware.detect_devices(use_cache=False) + b = hardware.detect_devices(use_cache=True) + assert _device_names(a) == _device_names(b) + + def test_heartbeat_payload_tx_capability_from_cfg(): from ria_toolkit_oss.agent.config import AgentConfig diff --git a/tests/agent/test_streamer.py b/tests/agent/test_streamer.py index 44f98e0..153a265 100644 --- a/tests/agent/test_streamer.py +++ b/tests/agent/test_streamer.py @@ -9,11 +9,24 @@ import numpy as np from ria_toolkit_oss.agent.streamer import ( Streamer, _apply_sdr_config, + _friendly_sdr_error, _samples_to_interleaved_float32, ) from ria_toolkit_oss.sdr.mock import MockSDR +def test_friendly_sdr_error_adds_udev_hint_on_permission(): + msg = _friendly_sdr_error("usrp", RuntimeError("USB open failed: insufficient permissions.")) + assert "install-udev" in msg + assert "insufficient permissions" in msg + + +def test_friendly_sdr_error_passes_through_other_errors(): + msg = _friendly_sdr_error("usrp", RuntimeError("No USRP device found for identifier 'name=x'")) + assert "install-udev" not in msg + assert "No USRP device found" in msg + + class FakeWs: def __init__(self): self.json_sent: list[dict] = []