Push Tracker
ria-toolkit-oss/src/ria_toolkit_oss/agent/cli.py

132 lines
4.2 KiB
Python
Raw Normal View History

"""Unified ``ria-agent`` CLI.
Subcommands:
- ``ria-agent run [legacy args]`` legacy long-poll NodeAgent (unchanged).
- ``ria-agent stream`` new WebSocket-based IQ streamer.
- ``ria-agent detect`` print SDR drivers whose modules import cleanly.
- ``ria-agent register --url URL --token TOKEN`` save credentials to
``~/.ria/agent.json``.
Invoking ``ria-agent`` with no subcommand falls through to the legacy
long-poll behavior for back-compatibility with existing deployments.
"""
from __future__ import annotations
import argparse
import asyncio
import json
import logging
import sys
from . import config as _config
from .hardware import available_devices
from .legacy_executor import main as _legacy_main
_LEGACY_ALIASES = {"--hub", "--key", "--name", "--device", "--insecure", "--log-level", "--config"}
def _cmd_detect(_args: argparse.Namespace) -> int:
devices = available_devices()
if not devices:
print("No SDR drivers available (install ria-toolkit-oss[all-sdr] or per-driver extras).")
return 0
for name in devices:
print(name)
return 0
def _cmd_register(args: argparse.Namespace) -> int:
cfg = _config.load()
cfg.hub_url = args.url
cfg.token = args.token
if args.name:
cfg.name = args.name
if args.agent_id:
cfg.agent_id = args.agent_id
cfg.insecure = bool(args.insecure)
path = _config.save(cfg)
print(f"Saved agent credentials to {path}")
return 0
def _cmd_stream(args: argparse.Namespace) -> int:
from .streamer import run_streamer
cfg = _config.load()
url = args.url or _derive_ws_url(cfg.hub_url, cfg.agent_id)
token = args.token or cfg.token
if not url:
print("error: --url is required (or run `ria-agent register` first)", file=sys.stderr)
return 2
try:
asyncio.run(run_streamer(url, token))
except KeyboardInterrupt:
pass
return 0
def _derive_ws_url(hub_url: str, agent_id: str) -> str:
if not hub_url:
return ""
base = hub_url.rstrip("/")
if base.startswith("https://"):
base = "wss://" + base[len("https://"):]
elif base.startswith("http://"):
base = "ws://" + base[len("http://"):]
suffix = f"/api/agent/ws/{agent_id}" if agent_id else "/api/agent/ws"
return base + suffix
def main() -> None:
# Back-compat: if the first non-flag token matches a known legacy flag,
# or there is no subcommand at all, dispatch to the legacy CLI.
argv = sys.argv[1:]
if not argv or (argv[0].startswith("--") and argv[0] in _LEGACY_ALIASES):
_legacy_main()
return
parser = argparse.ArgumentParser(prog="ria-agent")
sub = parser.add_subparsers(dest="command", required=True)
sub.add_parser("run", help="Legacy long-poll agent (NodeAgent)")
sub.add_parser("detect", help="List available SDR drivers")
p_reg = sub.add_parser("register", help="Save agent credentials to ~/.ria/agent.json")
p_reg.add_argument("--url", required=True, help="RIA Hub base URL")
p_reg.add_argument("--token", required=True, help="Agent registration token")
p_reg.add_argument("--name", default=None)
p_reg.add_argument("--agent-id", dest="agent_id", default=None)
p_reg.add_argument("--insecure", action="store_true")
p_stream = sub.add_parser("stream", help="Run the WebSocket IQ streamer")
p_stream.add_argument("--url", default=None, help="Override WebSocket URL")
p_stream.add_argument("--token", default=None, help="Override bearer token")
p_stream.add_argument("--log-level", default="INFO")
# Unknown extras are forwarded to the legacy CLI when command == "run".
args, extras = parser.parse_known_args(argv)
logging.basicConfig(
level=getattr(logging, getattr(args, "log_level", "INFO"), logging.INFO),
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
if args.command == "run":
sys.argv = [sys.argv[0], *extras]
_legacy_main()
return
if args.command == "detect":
sys.exit(_cmd_detect(args))
if args.command == "register":
sys.exit(_cmd_register(args))
if args.command == "stream":
sys.exit(_cmd_stream(args))
parser.error(f"unknown command: {args.command}")
if __name__ == "__main__":
main()