Push Tracker
ria-toolkit-oss/src/ria_toolkit_oss/agent/cli.py
J jrhughes003 543517f0ca fix(agent): address review findings on register flow
- Restore agent_id in success output. Pre-PR the user saw the hub's
  canonical identifier; the merge had reduced this to just `(name)`,
  which made it impossible to correlate the registered agent with
  anything on the hub side without inspecting the config file.
- Add a 15s timeout to the register POST. urllib's default is none,
  so a stuck hub would block the CLI indefinitely.
- Read User-Agent version from package metadata instead of hardcoding
  "0.1", so it tracks releases. Also corrected the URL to the canonical
  Source URL listed in pyproject.toml (was pointing at a likely-404
  github.com path).
- Add two tests guarding the User-Agent. The whole point of the Cloudflare
  fix was to set a non-default UA; previously no test asserted this, so
  a refactor could silently reintroduce the 403 code 1010 bug.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-26 12:40:29 -04:00

308 lines
10 KiB
Python

"""Unified ``ria-agent`` CLI.
Subcommands:
- ``ria-agent run [legacy args]`` — legacy long-poll NodeAgent (unchanged).
- ``ria-agent stream`` — new WebSocket-based IQ streamer.
- ``ria-agent detect`` — print SDR drivers whose modules import cleanly.
- ``ria-agent register --hub URL --api-key KEY`` — register with the hub
using a personal registration key (minted from **Settings → RIA Agents**
on the hub, shown once at mint time) and save credentials (and optional
TX interlocks) to ``~/.ria/agent.json``. The hub also accepts the legacy
shared ``[wac] API_KEY`` for back-compat, but that path is deprecated.
Invoking ``ria-agent`` with no subcommand falls through to the legacy
long-poll behavior for back-compatibility with existing deployments.
"""
from __future__ import annotations
import argparse
import asyncio
import json
import logging
import sys
from . import config as _config
from .hardware import available_devices
from .legacy_executor import main as _legacy_main
from .namegen import generate_agent_name
_LEGACY_ALIASES = {"--hub", "--key", "--name", "--device", "--insecure", "--log-level", "--config"}
def _user_agent() -> str:
"""Build the User-Agent header for hub requests.
Set explicitly so we don't fall back to Python's default `Python-urllib/<ver>`,
which is blocked by Cloudflare's Browser Integrity Check on `riahub.ai`
(HTTP 403 edge code 1010). Version is read from package metadata so it
tracks releases instead of going stale.
"""
from importlib.metadata import PackageNotFoundError, version
try:
pkg_version = version("ria-toolkit-oss")
except PackageNotFoundError:
pkg_version = "unknown"
return f"ria-agent/{pkg_version} (+https://riahub.ai/qoherent/ria-toolkit-oss)"
# How long to wait on the hub before giving up. The register endpoint is a
# small DB lookup + insert; anything past this is a stuck hub, not a slow one.
_REGISTER_TIMEOUT_S = 15
REGISTRATION_REASON_MESSAGES = {
"invalid_key": (
"Registration key not recognized. Generate a fresh key from "
"Settings → RIA Agents on the hub."
),
"expired": (
"This registration key has expired. Generate a new one from "
"Settings → RIA Agents on the hub."
),
"revoked": (
"This registration key was revoked. Generate a new one from "
"Settings → RIA Agents on the hub."
),
"already_consumed": (
"This single-use registration key has already been used. "
"Generate a new one, or mint a reusable key instead."
),
}
def _explain_registration_failure(status: int, body: bytes) -> str:
"""Return a human-readable explanation for a failed register call."""
try:
parsed = json.loads(body) if body else None
except ValueError:
parsed = None
if status == 429:
# 429 carries a plain string detail, never a reason code.
if isinstance(parsed, dict) and parsed.get("detail"):
detail = parsed["detail"]
else:
detail = body.decode("utf-8", "replace") or "rate limited"
return f"Registration rate-limited by the hub: {detail}"
if not isinstance(parsed, dict):
text = body.decode("utf-8", "replace")
return f"HTTP {status}: {text or 'no body'}"
detail = parsed.get("detail")
if isinstance(detail, dict):
reason = detail.get("reason")
if reason in REGISTRATION_REASON_MESSAGES:
return REGISTRATION_REASON_MESSAGES[reason]
if reason:
return f"Registration rejected ({reason})"
if isinstance(detail, str) and detail:
return f"Registration rejected: {detail}"
return f"HTTP {status}: {parsed}"
def _cmd_detect(_args: argparse.Namespace) -> int:
devices = available_devices()
if not devices:
print("No SDR drivers available (install ria-toolkit-oss[all-sdr] or per-driver extras).")
return 0
for name in devices:
print(name)
return 0
def _cmd_register(args: argparse.Namespace) -> int:
import urllib.error
import urllib.request
hub_url = args.hub.rstrip("/")
url = f"{hub_url}/screens/agents/register"
name = args.name or generate_agent_name()
body = json.dumps({"name": name}).encode()
req = urllib.request.Request(
url,
data=body,
headers={
"Content-Type": "application/json",
"X-API-Key": args.api_key,
"User-Agent": _user_agent(),
},
)
try:
with urllib.request.urlopen(req, timeout=_REGISTER_TIMEOUT_S) as resp:
data = json.loads(resp.read())
except urllib.error.HTTPError as e:
try:
err_body = e.read()
except Exception:
err_body = b""
msg = _explain_registration_failure(e.code, err_body)
print(f"error: registration failed: {msg}", file=sys.stderr)
return 1
except Exception as e:
print(f"error: registration failed: {e}", file=sys.stderr)
return 1
agent_id = data["agent_id"]
token = data["token"]
cfg = _config.load()
cfg.hub_url = hub_url
cfg.agent_id = agent_id
cfg.token = token
cfg.api_key = args.api_key
cfg.name = name
cfg.insecure = bool(args.insecure)
cfg.tx_enabled = bool(getattr(args, "allow_tx", False))
if (v := getattr(args, "tx_max_gain_db", None)) is not None:
cfg.tx_max_gain_db = float(v)
if (v := getattr(args, "tx_max_duration_s", None)) is not None:
cfg.tx_max_duration_s = float(v)
freq_ranges = getattr(args, "tx_freq_range", None) or []
if freq_ranges:
cfg.tx_allowed_freq_ranges = [[float(lo), float(hi)] for lo, hi in freq_ranges]
path = _config.save(cfg)
print(f"Registered agent: {agent_id} ({name})")
if cfg.tx_enabled:
caps: list[str] = []
if cfg.tx_max_gain_db is not None:
caps.append(f"gain<={cfg.tx_max_gain_db} dB")
if cfg.tx_max_duration_s is not None:
caps.append(f"duration<={cfg.tx_max_duration_s} s")
if cfg.tx_allowed_freq_ranges:
caps.append(f"freq in {cfg.tx_allowed_freq_ranges}")
tail = f" ({', '.join(caps)})" if caps else ""
print(f"TX enabled{tail}")
print(f"Credentials saved to {path}")
return 0
def _cmd_stream(args: argparse.Namespace) -> int:
from .streamer import run_streamer
cfg = _config.load()
url = args.url or _derive_ws_url(cfg.hub_url, cfg.agent_id)
token = args.token or cfg.token
if not url:
print("error: --url is required (or run `ria-agent register` first)", file=sys.stderr)
return 2
if getattr(args, "allow_tx", False):
cfg.tx_enabled = True
try:
asyncio.run(run_streamer(url, token, cfg=cfg))
except KeyboardInterrupt:
pass
return 0
def _derive_ws_url(hub_url: str, agent_id: str) -> str:
if not hub_url:
return ""
base = hub_url.rstrip("/")
if base.startswith("https://"):
base = "wss://" + base[len("https://") :]
elif base.startswith("http://"):
base = "ws://" + base[len("http://") :]
suffix = f"/screens/agent/ws?agent_id={agent_id}" if agent_id else "/screens/agent/ws"
return base + suffix
def main() -> None:
# Back-compat: if the first non-flag token matches a known legacy flag,
# or there is no subcommand at all, dispatch to the legacy CLI.
argv = sys.argv[1:]
if not argv or (argv[0].startswith("--") and argv[0] in _LEGACY_ALIASES):
_legacy_main()
return
parser = argparse.ArgumentParser(prog="ria-agent")
sub = parser.add_subparsers(dest="command", required=True)
sub.add_parser("run", help="Legacy long-poll agent (NodeAgent)")
sub.add_parser("detect", help="List available SDR drivers")
p_reg = sub.add_parser("register", help="Register agent with RIA Hub and save credentials")
p_reg.add_argument("--hub", required=True, help="RIA Hub URL (e.g. http://whitehorse:3005)")
p_reg.add_argument(
"--api-key",
dest="api_key",
required=True,
help=(
"Personal registration key from the RIA Agents page on the hub "
"(format: ria_reg_...). Shown once when generated; save it then. "
"The legacy shared API key is also accepted but deprecated."
),
)
p_reg.add_argument("--name", default=None, help="Human-friendly agent name")
p_reg.add_argument("--insecure", action="store_true", help="Skip TLS verification")
p_reg.add_argument(
"--allow-tx",
dest="allow_tx",
action="store_true",
help="Opt this agent in to TX (required for any transmission from the hub)",
)
p_reg.add_argument(
"--tx-max-gain-db",
dest="tx_max_gain_db",
type=float,
default=None,
help="Reject tx_start frames whose tx_gain exceeds this cap (dB)",
)
p_reg.add_argument(
"--tx-max-duration-s",
dest="tx_max_duration_s",
type=float,
default=None,
help="Auto-stop any TX session after this many seconds",
)
p_reg.add_argument(
"--tx-freq-range",
dest="tx_freq_range",
type=float,
nargs=2,
action="append",
metavar=("LO", "HI"),
default=None,
help="Allowed TX center-frequency range in Hz (repeat for multiple bands)",
)
p_stream = sub.add_parser("stream", help="Run the WebSocket IQ streamer")
p_stream.add_argument("--url", default=None, help="Override WebSocket URL")
p_stream.add_argument("--token", default=None, help="Override bearer token")
p_stream.add_argument("--log-level", default="INFO")
p_stream.add_argument(
"--allow-tx",
dest="allow_tx",
action="store_true",
help="Runtime override: enable TX for this process without writing config",
)
# Unknown extras are forwarded to the legacy CLI when command == "run".
args, extras = parser.parse_known_args(argv)
logging.basicConfig(
level=getattr(logging, getattr(args, "log_level", "INFO"), logging.INFO),
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
if args.command == "run":
sys.argv = [sys.argv[0], *extras]
_legacy_main()
return
if args.command == "detect":
sys.exit(_cmd_detect(args))
if args.command == "register":
sys.exit(_cmd_register(args))
if args.command == "stream":
sys.exit(_cmd_stream(args))
parser.error(f"unknown command: {args.command}")
if __name__ == "__main__":
main()