Compare commits

...

3 Commits

Author SHA1 Message Date
806fcf8293 added tests for cli 2025-12-11 15:59:08 -05:00
155b13928b changed file to a string 2025-12-11 15:12:01 -05:00
54b41c246e changed load_rec to load_recording 2025-12-11 13:37:16 -05:00
19 changed files with 4569 additions and 38 deletions

87
poetry.lock generated
View File

@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 2.1.2 and should not be changed by hand.
# This file is automatically @generated by Poetry 2.2.1 and should not be changed by hand.
[[package]]
name = "alabaster"
@ -1116,6 +1116,89 @@ files = [
{file = "pytz-2025.2.tar.gz", hash = "sha256:360b9e3dbb49a209c21ad61809c7fb453643e048b38924c765813546746e81c3"},
]
[[package]]
name = "pyyaml"
version = "6.0.3"
description = "YAML parser and emitter for Python"
optional = false
python-versions = ">=3.8"
groups = ["main"]
files = [
{file = "PyYAML-6.0.3-cp38-cp38-macosx_10_13_x86_64.whl", hash = "sha256:c2514fceb77bc5e7a2f7adfaa1feb2fb311607c9cb518dbc378688ec73d8292f"},
{file = "PyYAML-6.0.3-cp38-cp38-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:9c57bb8c96f6d1808c030b1687b9b5fb476abaa47f0db9c0101f5e9f394e97f4"},
{file = "PyYAML-6.0.3-cp38-cp38-manylinux2014_s390x.manylinux_2_17_s390x.manylinux_2_28_s390x.whl", hash = "sha256:efd7b85f94a6f21e4932043973a7ba2613b059c4a000551892ac9f1d11f5baf3"},
{file = "PyYAML-6.0.3-cp38-cp38-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:22ba7cfcad58ef3ecddc7ed1db3409af68d023b7f940da23c6c2a1890976eda6"},
{file = "PyYAML-6.0.3-cp38-cp38-musllinux_1_2_x86_64.whl", hash = "sha256:6344df0d5755a2c9a276d4473ae6b90647e216ab4757f8426893b5dd2ac3f369"},
{file = "PyYAML-6.0.3-cp38-cp38-win32.whl", hash = "sha256:3ff07ec89bae51176c0549bc4c63aa6202991da2d9a6129d7aef7f1407d3f295"},
{file = "PyYAML-6.0.3-cp38-cp38-win_amd64.whl", hash = "sha256:5cf4e27da7e3fbed4d6c3d8e797387aaad68102272f8f9752883bc32d61cb87b"},
{file = "pyyaml-6.0.3-cp310-cp310-macosx_10_13_x86_64.whl", hash = "sha256:214ed4befebe12df36bcc8bc2b64b396ca31be9304b8f59e25c11cf94a4c033b"},
{file = "pyyaml-6.0.3-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:02ea2dfa234451bbb8772601d7b8e426c2bfa197136796224e50e35a78777956"},
{file = "pyyaml-6.0.3-cp310-cp310-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:b30236e45cf30d2b8e7b3e85881719e98507abed1011bf463a8fa23e9c3e98a8"},
{file = "pyyaml-6.0.3-cp310-cp310-manylinux2014_s390x.manylinux_2_17_s390x.manylinux_2_28_s390x.whl", hash = "sha256:66291b10affd76d76f54fad28e22e51719ef9ba22b29e1d7d03d6777a9174198"},
{file = "pyyaml-6.0.3-cp310-cp310-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:9c7708761fccb9397fe64bbc0395abcae8c4bf7b0eac081e12b809bf47700d0b"},
{file = "pyyaml-6.0.3-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:418cf3f2111bc80e0933b2cd8cd04f286338bb88bdc7bc8e6dd775ebde60b5e0"},
{file = "pyyaml-6.0.3-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:5e0b74767e5f8c593e8c9b5912019159ed0533c70051e9cce3e8b6aa699fcd69"},
{file = "pyyaml-6.0.3-cp310-cp310-win32.whl", hash = "sha256:28c8d926f98f432f88adc23edf2e6d4921ac26fb084b028c733d01868d19007e"},
{file = "pyyaml-6.0.3-cp310-cp310-win_amd64.whl", hash = "sha256:bdb2c67c6c1390b63c6ff89f210c8fd09d9a1217a465701eac7316313c915e4c"},
{file = "pyyaml-6.0.3-cp311-cp311-macosx_10_13_x86_64.whl", hash = "sha256:44edc647873928551a01e7a563d7452ccdebee747728c1080d881d68af7b997e"},
{file = "pyyaml-6.0.3-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:652cb6edd41e718550aad172851962662ff2681490a8a711af6a4d288dd96824"},
{file = "pyyaml-6.0.3-cp311-cp311-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:10892704fc220243f5305762e276552a0395f7beb4dbf9b14ec8fd43b57f126c"},
{file = "pyyaml-6.0.3-cp311-cp311-manylinux2014_s390x.manylinux_2_17_s390x.manylinux_2_28_s390x.whl", hash = "sha256:850774a7879607d3a6f50d36d04f00ee69e7fc816450e5f7e58d7f17f1ae5c00"},
{file = "pyyaml-6.0.3-cp311-cp311-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:b8bb0864c5a28024fac8a632c443c87c5aa6f215c0b126c449ae1a150412f31d"},
{file = "pyyaml-6.0.3-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:1d37d57ad971609cf3c53ba6a7e365e40660e3be0e5175fa9f2365a379d6095a"},
{file = "pyyaml-6.0.3-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:37503bfbfc9d2c40b344d06b2199cf0e96e97957ab1c1b546fd4f87e53e5d3e4"},
{file = "pyyaml-6.0.3-cp311-cp311-win32.whl", hash = "sha256:8098f252adfa6c80ab48096053f512f2321f0b998f98150cea9bd23d83e1467b"},
{file = "pyyaml-6.0.3-cp311-cp311-win_amd64.whl", hash = "sha256:9f3bfb4965eb874431221a3ff3fdcddc7e74e3b07799e0e84ca4a0f867d449bf"},
{file = "pyyaml-6.0.3-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:7f047e29dcae44602496db43be01ad42fc6f1cc0d8cd6c83d342306c32270196"},
{file = "pyyaml-6.0.3-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:fc09d0aa354569bc501d4e787133afc08552722d3ab34836a80547331bb5d4a0"},
{file = "pyyaml-6.0.3-cp312-cp312-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:9149cad251584d5fb4981be1ecde53a1ca46c891a79788c0df828d2f166bda28"},
{file = "pyyaml-6.0.3-cp312-cp312-manylinux2014_s390x.manylinux_2_17_s390x.manylinux_2_28_s390x.whl", hash = "sha256:5fdec68f91a0c6739b380c83b951e2c72ac0197ace422360e6d5a959d8d97b2c"},
{file = "pyyaml-6.0.3-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:ba1cc08a7ccde2d2ec775841541641e4548226580ab850948cbfda66a1befcdc"},
{file = "pyyaml-6.0.3-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:8dc52c23056b9ddd46818a57b78404882310fb473d63f17b07d5c40421e47f8e"},
{file = "pyyaml-6.0.3-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:41715c910c881bc081f1e8872880d3c650acf13dfa8214bad49ed4cede7c34ea"},
{file = "pyyaml-6.0.3-cp312-cp312-win32.whl", hash = "sha256:96b533f0e99f6579b3d4d4995707cf36df9100d67e0c8303a0c55b27b5f99bc5"},
{file = "pyyaml-6.0.3-cp312-cp312-win_amd64.whl", hash = "sha256:5fcd34e47f6e0b794d17de1b4ff496c00986e1c83f7ab2fb8fcfe9616ff7477b"},
{file = "pyyaml-6.0.3-cp312-cp312-win_arm64.whl", hash = "sha256:64386e5e707d03a7e172c0701abfb7e10f0fb753ee1d773128192742712a98fd"},
{file = "pyyaml-6.0.3-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:8da9669d359f02c0b91ccc01cac4a67f16afec0dac22c2ad09f46bee0697eba8"},
{file = "pyyaml-6.0.3-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:2283a07e2c21a2aa78d9c4442724ec1eb15f5e42a723b99cb3d822d48f5f7ad1"},
{file = "pyyaml-6.0.3-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:ee2922902c45ae8ccada2c5b501ab86c36525b883eff4255313a253a3160861c"},
{file = "pyyaml-6.0.3-cp313-cp313-manylinux2014_s390x.manylinux_2_17_s390x.manylinux_2_28_s390x.whl", hash = "sha256:a33284e20b78bd4a18c8c2282d549d10bc8408a2a7ff57653c0cf0b9be0afce5"},
{file = "pyyaml-6.0.3-cp313-cp313-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:0f29edc409a6392443abf94b9cf89ce99889a1dd5376d94316ae5145dfedd5d6"},
{file = "pyyaml-6.0.3-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:f7057c9a337546edc7973c0d3ba84ddcdf0daa14533c2065749c9075001090e6"},
{file = "pyyaml-6.0.3-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:eda16858a3cab07b80edaf74336ece1f986ba330fdb8ee0d6c0d68fe82bc96be"},
{file = "pyyaml-6.0.3-cp313-cp313-win32.whl", hash = "sha256:d0eae10f8159e8fdad514efdc92d74fd8d682c933a6dd088030f3834bc8e6b26"},
{file = "pyyaml-6.0.3-cp313-cp313-win_amd64.whl", hash = "sha256:79005a0d97d5ddabfeeea4cf676af11e647e41d81c9a7722a193022accdb6b7c"},
{file = "pyyaml-6.0.3-cp313-cp313-win_arm64.whl", hash = "sha256:5498cd1645aa724a7c71c8f378eb29ebe23da2fc0d7a08071d89469bf1d2defb"},
{file = "pyyaml-6.0.3-cp314-cp314-macosx_10_13_x86_64.whl", hash = "sha256:8d1fab6bb153a416f9aeb4b8763bc0f22a5586065f86f7664fc23339fc1c1fac"},
{file = "pyyaml-6.0.3-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:34d5fcd24b8445fadc33f9cf348c1047101756fd760b4dacb5c3e99755703310"},
{file = "pyyaml-6.0.3-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:501a031947e3a9025ed4405a168e6ef5ae3126c59f90ce0cd6f2bfc477be31b7"},
{file = "pyyaml-6.0.3-cp314-cp314-manylinux2014_s390x.manylinux_2_17_s390x.manylinux_2_28_s390x.whl", hash = "sha256:b3bc83488de33889877a0f2543ade9f70c67d66d9ebb4ac959502e12de895788"},
{file = "pyyaml-6.0.3-cp314-cp314-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:c458b6d084f9b935061bc36216e8a69a7e293a2f1e68bf956dcd9e6cbcd143f5"},
{file = "pyyaml-6.0.3-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:7c6610def4f163542a622a73fb39f534f8c101d690126992300bf3207eab9764"},
{file = "pyyaml-6.0.3-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:5190d403f121660ce8d1d2c1bb2ef1bd05b5f68533fc5c2ea899bd15f4399b35"},
{file = "pyyaml-6.0.3-cp314-cp314-win_amd64.whl", hash = "sha256:4a2e8cebe2ff6ab7d1050ecd59c25d4c8bd7e6f400f5f82b96557ac0abafd0ac"},
{file = "pyyaml-6.0.3-cp314-cp314-win_arm64.whl", hash = "sha256:93dda82c9c22deb0a405ea4dc5f2d0cda384168e466364dec6255b293923b2f3"},
{file = "pyyaml-6.0.3-cp314-cp314t-macosx_10_13_x86_64.whl", hash = "sha256:02893d100e99e03eda1c8fd5c441d8c60103fd175728e23e431db1b589cf5ab3"},
{file = "pyyaml-6.0.3-cp314-cp314t-macosx_11_0_arm64.whl", hash = "sha256:c1ff362665ae507275af2853520967820d9124984e0f7466736aea23d8611fba"},
{file = "pyyaml-6.0.3-cp314-cp314t-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:6adc77889b628398debc7b65c073bcb99c4a0237b248cacaf3fe8a557563ef6c"},
{file = "pyyaml-6.0.3-cp314-cp314t-manylinux2014_s390x.manylinux_2_17_s390x.manylinux_2_28_s390x.whl", hash = "sha256:a80cb027f6b349846a3bf6d73b5e95e782175e52f22108cfa17876aaeff93702"},
{file = "pyyaml-6.0.3-cp314-cp314t-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:00c4bdeba853cc34e7dd471f16b4114f4162dc03e6b7afcc2128711f0eca823c"},
{file = "pyyaml-6.0.3-cp314-cp314t-musllinux_1_2_aarch64.whl", hash = "sha256:66e1674c3ef6f541c35191caae2d429b967b99e02040f5ba928632d9a7f0f065"},
{file = "pyyaml-6.0.3-cp314-cp314t-musllinux_1_2_x86_64.whl", hash = "sha256:16249ee61e95f858e83976573de0f5b2893b3677ba71c9dd36b9cf8be9ac6d65"},
{file = "pyyaml-6.0.3-cp314-cp314t-win_amd64.whl", hash = "sha256:4ad1906908f2f5ae4e5a8ddfce73c320c2a1429ec52eafd27138b7f1cbe341c9"},
{file = "pyyaml-6.0.3-cp314-cp314t-win_arm64.whl", hash = "sha256:ebc55a14a21cb14062aa4162f906cd962b28e2e9ea38f9b4391244cd8de4ae0b"},
{file = "pyyaml-6.0.3-cp39-cp39-macosx_10_13_x86_64.whl", hash = "sha256:b865addae83924361678b652338317d1bd7e79b1f4596f96b96c77a5a34b34da"},
{file = "pyyaml-6.0.3-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:c3355370a2c156cffb25e876646f149d5d68f5e0a3ce86a5084dd0b64a994917"},
{file = "pyyaml-6.0.3-cp39-cp39-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:3c5677e12444c15717b902a5798264fa7909e41153cdf9ef7ad571b704a63dd9"},
{file = "pyyaml-6.0.3-cp39-cp39-manylinux2014_s390x.manylinux_2_17_s390x.manylinux_2_28_s390x.whl", hash = "sha256:5ed875a24292240029e4483f9d4a4b8a1ae08843b9c54f43fcc11e404532a8a5"},
{file = "pyyaml-6.0.3-cp39-cp39-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:0150219816b6a1fa26fb4699fb7daa9caf09eb1999f3b70fb6e786805e80375a"},
{file = "pyyaml-6.0.3-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:fa160448684b4e94d80416c0fa4aac48967a969efe22931448d853ada8baf926"},
{file = "pyyaml-6.0.3-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:27c0abcb4a5dac13684a37f76e701e054692a9b2d3064b70f5e4eb54810553d7"},
{file = "pyyaml-6.0.3-cp39-cp39-win32.whl", hash = "sha256:1ebe39cb5fc479422b83de611d14e2c0d3bb2a18bbcb01f229ab3cfbd8fee7a0"},
{file = "pyyaml-6.0.3-cp39-cp39-win_amd64.whl", hash = "sha256:2e71d11abed7344e42a8849600193d15b6def118602c4c176f748e4583246007"},
{file = "pyyaml-6.0.3.tar.gz", hash = "sha256:d76623373421df22fb4cf8817020cbb7ef15c725b9d5e45f17e189bfc384190f"},
]
[[package]]
name = "pyzmq"
version = "27.1.0"
@ -2136,4 +2219,4 @@ files = [
[metadata]
lock-version = "2.1"
python-versions = ">=3.10"
content-hash = "546dd85a2ad750359310ff22acfe7bfd3ca764f025d19e3fd48a50cd431e64e5"
content-hash = "65a8b8214ef247a1f9b8e936e1e2a8253d9a2c21517138f3bf41b5289d1208a0"

View File

@ -47,6 +47,7 @@ dependencies = [
"h5py (>=3.14.0,<4.0.0)",
"pandas (>=2.3.2,<3.0.0)",
"pyzmq (>=27.1.0,<28.0.0)",
"pyyaml (>=6.0.3,<7.0.0)",
]
# [project.optional-dependencies] Commented out to prevent Tox tests from failing

View File

@ -326,6 +326,7 @@ def from_sigmf(file: os.PathLike | str) -> Recording:
:rtype: ria_toolkit_oss.datatypes.Recording
"""
file = str(file)
if len(file) > 11:
if file[-11:-5] != ".sigmf":
file = file + ".sigmf-data"

View File

@ -6,6 +6,7 @@ This module contains all the CLI bindings for the ria package.
from .capture import capture
from .combine import combine
from .convert import convert
from .generate import generate
# Import all command functions
from .discover import discover
@ -18,7 +19,7 @@ from .transmit import transmit
from .view import view
# Aliases
# synth = generate
synth = generate
# All commands will be automatically registered by cli.py
# Commands must be click.Command instances

View File

@ -5,58 +5,42 @@ from typing import Optional
import click
import numpy as np
import utils.signal.basic_signal_generator as basic_gen
import ria_toolkit_oss.signal.basic_signal_generator as basic_gen
import yaml
from utils.data import Recording
from utils.signal.block_gen.continuous_modulation.fsk_modulator import FSKModulator
from utils.signal.block_generator.basic import FrequencyShift
from utils.signal.block_generator.data_types import DataType
from utils.signal.block_generator.mapping.apsk_mapper import _APSKMapper
from utils.signal.block_generator.mapping.cross_qam_mapper import _CrossQAMMapper
from utils.signal.block_generator.mapping.mapper import Mapper
from utils.signal.block_generator.modulation import (
from ria_toolkit_oss.datatypes import Recording
from ria_toolkit_oss.signal.block_generator.continuous_modulation.fsk_modulator import FSKModulator
from ria_toolkit_oss.signal.block_generator.basic import FrequencyShift
from ria_toolkit_oss.signal.block_generator.data_types import DataType
from ria_toolkit_oss.signal.block_generator.mapping.apsk_mapper import _APSKMapper
from ria_toolkit_oss.signal.block_generator.mapping.cross_qam_mapper import _CrossQAMMapper
from ria_toolkit_oss.signal.block_generator.mapping.mapper import Mapper
from ria_toolkit_oss.signal.block_generator.symbol_modulation import (
GMSKModulator,
OOKModulator,
OQPSKModulator,
)
from utils.signal.block_generator.pulse_shaping import (
from ria_toolkit_oss.signal.block_generator.pulse_shaping import (
RaisedCosineFilter,
RootRaisedCosineFilter,
Upsampling,
)
from utils.signal.block_generator.source import (
from ria_toolkit_oss.signal.block_generator.source import (
LFMChirpSource,
RandomBinarySource,
BinarySource,
RecordingSource,
SawtoothSource,
SquareSource,
)
# Block Generator Imports
from utils.signal.block_generator.source_block import SourceBlock
from ria_toolkit_oss.signal.block_generator.source_block import SourceBlock
# Transforms for impairments
from utils.transforms.iq_channel_models import (
complex_multipath_rayleigh_channel,
rician_fading_channel,
)
from utils.transforms.iq_impairments import (
add_compression,
add_doppler,
add_gain_fluctuation,
add_phase_noise,
from ria_toolkit_oss.transforms.iq_impairments import (
iq_imbalance,
)
# NR 5G Import
try:
from utils.signal.block_gen.nr_5g.nr_5g_generator import NR5GGenerator
HAS_NR5G = True
except ImportError:
HAS_NR5G = False
from utils_cli.utils.common import (
from ria_toolkit_oss_cli.ria_toolkit_oss.common import (
echo_progress,
echo_verbose,
format_frequency,
@ -64,7 +48,7 @@ from utils_cli.utils.common import (
parse_metadata_args,
save_recording,
)
from utils_cli.utils.config import load_user_config
from ria_toolkit_oss_cli.ria_toolkit_oss.config import load_user_config
# Extend Mapper to support new types

View File

@ -210,7 +210,7 @@ def generate_recording(generate, input_file, sample_rate, verbose, legacy):
# Generate signal or load from file
if generate or input_file is None:
# Generate signal instead of loading from file
from utils.signal.basic_signal_generator import (
from ria_toolkit_oss.signal.basic_signal_generator import (
chirp,
lfm_chirp_complex,
sine,

0
tests/__init__.py Normal file
View File

View File

@ -4,7 +4,7 @@ from ria_toolkit_oss.datatypes import Annotation, Recording
from ria_toolkit_oss.io.recording import (
from_npy,
from_sigmf,
load_rec,
load_recording,
to_npy,
to_sigmf,
)
@ -116,7 +116,7 @@ def test_load_recording_npy(tmp_path):
recording1.to_npy(path=tmp_path, filename=filename.name, overwrite=True)
# Load from tmp_path
recording2 = load_rec(filename)
recording2 = load_recording(filename)
assert recording1.annotations == recording2.annotations

View File

@ -0,0 +1,126 @@
# CLI Tests
Comprehensive test suite for the utils CLI commands.
## Test Structure
- `test_common.py` - Tests for common CLI utilities (YAML loading, metadata parsing, frequency formatting)
- `test_discover.py` - Tests for device discovery functionality
- `test_capture.py` - Tests for the capture command
- `test_transmit.py` - Tests for the transmit command
## Running Tests
### Run all CLI tests:
```bash
poetry run pytest tests/utils_cli/ -v
```
### Run specific test file:
```bash
poetry run pytest tests/utils_cli/test_common.py -v
poetry run pytest tests/utils_cli/test_discover.py -v
poetry run pytest tests/utils_cli/test_capture.py -v
```
### Run specific test class or function:
```bash
poetry run pytest tests/utils_cli/test_capture.py::TestCaptureCommand::test_capture_basic -v
poetry run pytest tests/utils_cli/test_common.py::test_parse_frequency -v
```
### Run with coverage:
```bash
poetry run pytest tests/utils_cli/ --cov=utils_cli --cov-report=html
```
## Test Coverage
### test_common.py
- YAML configuration file loading
- Metadata KEY=VALUE parsing
- Frequency parsing (scientific notation, suffixes)
- Frequency and sample rate formatting
### test_discover.py
- Device discovery for all supported SDR types (PlutoSDR, HackRF, BladeRF, USRP, RTL-SDR, ThinkRF)
- Device auto-selection logic
- Device connection testing
- CLI command options (--verbose, --json-output, --test, --type)
- Error handling for missing devices and multiple devices
### test_capture.py
- Basic capture functionality
- Parameter validation (sample rate, center frequency, duration/num-samples)
- Device auto-selection
- Multiple output formats (SigMF, NPY, WAV, Blue)
- Format auto-detection from file extension
- YAML configuration file loading
- Custom metadata handling
- Gain and bandwidth configuration
- Visualization saving (--save-image)
- Chunked capture for large recordings
- Verbose and quiet output modes
- Proper device cleanup on errors
### test_transmit.py
- TX device initialization (PlutoSDR, HackRF, BladeRF, USRP only)
- RX-only device rejection (RTL-SDR, ThinkRF)
- TX device auto-selection
- Input file loading (SigMF, NPY, WAV, Blue)
- Legacy NPY format support
- TX gain validation and range checking
- Sample rate mismatch warnings
- Transmission modes:
- Single transmission
- Repeat mode with delays
- Continuous transmission with safety warnings
- YAML configuration file loading
- Safety confirmations for continuous mode
- Proper device cleanup on errors
- Verbose and quiet output modes
## Mock Strategy
Tests use `unittest.mock` to:
- Mock SDR device instances to avoid requiring actual hardware
- Mock file I/O operations
- Mock discovery functions to simulate different device scenarios
- Verify proper function calls and parameters
## Adding New Tests
When adding new CLI commands, follow this pattern:
1. Create `test_<command>.py` in this directory
2. Use Click's `CliRunner` for testing CLI commands
3. Mock external dependencies (SDR devices, file I/O)
4. Test both success and error cases
5. Verify proper resource cleanup (device.close(), file handles, etc.)
Example:
```python
from click.testing import CliRunner
from unittest.mock import patch, MagicMock
def test_new_command():
runner = CliRunner()
with patch('module.dependency') as mock_dep:
mock_dep.return_value = expected_value
result = runner.invoke(command, ['--option', 'value'])
assert result.exit_code == 0
assert 'expected output' in result.output
```
## CI/CD Integration
These tests are designed to run in CI/CD pipelines without requiring actual SDR hardware. All hardware interactions are mocked.
## Notes
- Tests use temporary directories for file operations (cleaned up automatically)
- Device mocks simulate real SDR behavior without hardware dependencies
- Tests verify both command-line interface and underlying function behavior

View File

@ -0,0 +1 @@
"""Tests for utils CLI commands."""

View File

@ -0,0 +1,963 @@
"""Tests for the combine command."""
import tempfile
from pathlib import Path
import numpy as np
import pytest
from click.testing import CliRunner
from ria_toolkit_oss.datatypes import Annotation, Recording
from ria_toolkit_oss.io import load_recording, to_npy, to_sigmf
from ria_toolkit_oss_cli.cli import cli
class TestCombineHelp:
"""Test help and basic command structure."""
def test_help(self):
"""Test combine help."""
runner = CliRunner()
result = runner.invoke(cli, ["combine", "--help"])
assert result.exit_code == 0
assert "Combine multiple recordings" in result.output
assert "--mode" in result.output
assert "--align-mode" in result.output
def test_no_inputs(self):
"""Test error with no inputs."""
runner = CliRunner()
result = runner.invoke(cli, ["combine"])
assert result.exit_code != 0
def test_single_input(self):
"""Test error with only one input."""
runner = CliRunner()
with tempfile.TemporaryDirectory() as tmpdir:
# Create test file
signal = np.arange(1000, dtype=np.complex64)
recording = Recording(data=signal, metadata={"sample_rate": 2e6})
to_npy(recording, filename=str(Path(tmpdir) / "test.npy"), overwrite=True)
result = runner.invoke(cli, ["combine", str(Path(tmpdir) / "test.npy"), str(Path(tmpdir) / "output.npy")])
assert result.exit_code != 0
class TestCombineConcat:
"""Test concatenate mode."""
@pytest.fixture
def test_recordings(self):
"""Create multiple test recording files."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create 3 recordings with different data
for i in range(3):
signal = np.arange(i * 1000, (i + 1) * 1000, dtype=np.complex64)
recording = Recording(data=signal, metadata={"sample_rate": 2e6})
to_npy(recording, filename=str(Path(tmpdir) / f"chunk{i}.npy"), overwrite=True)
yield tmpdir
def test_concat_basic(self, test_recordings):
"""Test basic concatenation."""
runner = CliRunner()
tmpdir = test_recordings
output_path = str(Path(tmpdir) / "combined.npy")
result = runner.invoke(
cli,
[
"combine",
str(Path(tmpdir) / "chunk0.npy"),
str(Path(tmpdir) / "chunk1.npy"),
str(Path(tmpdir) / "chunk2.npy"),
output_path,
],
)
assert result.exit_code == 0
assert Path(output_path).exists()
# Verify result
combined = load_recording(output_path)
assert combined.data.shape[1] == 3000
assert combined._metadata["combine_mode"] == "concat"
assert combined._metadata["num_inputs"] == 3
# Check data is correctly concatenated
assert np.allclose(combined.data[0, :1000], np.arange(0, 1000))
assert np.allclose(combined.data[0, 1000:2000], np.arange(1000, 2000))
assert np.allclose(combined.data[0, 2000:3000], np.arange(2000, 3000))
def test_concat_verbose(self, test_recordings):
"""Test concatenation with verbose output."""
runner = CliRunner()
tmpdir = test_recordings
output_path = str(Path(tmpdir) / "combined.npy")
result = runner.invoke(
cli,
[
"combine",
str(Path(tmpdir) / "chunk0.npy"),
str(Path(tmpdir) / "chunk1.npy"),
str(Path(tmpdir) / "chunk2.npy"),
output_path,
"--verbose",
],
)
assert result.exit_code == 0
assert "Combining 3 recordings" in result.output
assert "concat mode" in result.output
assert "Concatenating..." in result.output
def test_concat_with_annotations(self):
"""Test that annotations are preserved and shifted in concat mode."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create recordings with annotations
rec1 = Recording(data=np.ones(1000, dtype=np.complex64), metadata={"sample_rate": 2e6})
rec1._annotations.append(
Annotation(
sample_start=100, sample_count=200, freq_lower_edge=900e6, freq_upper_edge=920e6, label="test1"
)
)
rec2 = Recording(data=np.ones(1000, dtype=np.complex64) * 2, metadata={"sample_rate": 2e6})
rec2._annotations.append(
Annotation(
sample_start=100, sample_count=200, freq_lower_edge=900e6, freq_upper_edge=920e6, label="test2"
)
)
to_sigmf(rec1, filename="rec1", path=tmpdir, overwrite=True)
to_sigmf(rec2, filename="rec2", path=tmpdir, overwrite=True)
runner = CliRunner()
output_path = str(Path(tmpdir) / "combined.sigmf-data")
result = runner.invoke(
cli,
["combine", str(Path(tmpdir) / "rec1.sigmf-data"), str(Path(tmpdir) / "rec2.sigmf-data"), output_path],
)
assert result.exit_code == 0
combined = load_recording(output_path)
assert len(combined._annotations) == 2
# First annotation unchanged
assert combined._annotations[0].sample_start == 100
assert combined._annotations[0].label == "test1"
# Second annotation shifted by 1000 samples
assert combined._annotations[1].sample_start == 1100
assert combined._annotations[1].label == "test2"
class TestCombineAddSameLength:
"""Test add mode with same-length recordings."""
def test_add_basic(self):
"""Test basic add with same-length recordings."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create two recordings with same length
sig1 = np.ones(1000, dtype=np.complex64)
sig2 = np.ones(1000, dtype=np.complex64) * 2
rec1 = Recording(data=sig1, metadata={"sample_rate": 2e6})
rec2 = Recording(data=sig2, metadata={"sample_rate": 2e6})
to_npy(rec1, filename=str(Path(tmpdir) / "sig1.npy"), overwrite=True)
to_npy(rec2, filename=str(Path(tmpdir) / "sig2.npy"), overwrite=True)
runner = CliRunner()
output_path = str(Path(tmpdir) / "added.npy")
result = runner.invoke(
cli,
[
"combine",
str(Path(tmpdir) / "sig1.npy"),
str(Path(tmpdir) / "sig2.npy"),
output_path,
"--mode",
"add",
],
)
assert result.exit_code == 0
# Verify result
combined = load_recording(output_path)
assert combined.data.shape[1] == 1000
assert np.allclose(combined.data, 3 + 0j)
assert combined._metadata["combine_mode"] == "add"
assert combined._metadata["align_mode"] == "error"
def test_add_three_recordings(self):
"""Test adding three same-length recordings."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create three recordings
for i in range(1, 4):
sig = np.ones(1000, dtype=np.complex64) * i
rec = Recording(data=sig, metadata={"sample_rate": 2e6})
to_npy(rec, filename=str(Path(tmpdir) / f"sig{i}.npy"), overwrite=True)
runner = CliRunner()
output_path = str(Path(tmpdir) / "added.npy")
result = runner.invoke(
cli,
[
"combine",
str(Path(tmpdir) / "sig1.npy"),
str(Path(tmpdir) / "sig2.npy"),
str(Path(tmpdir) / "sig3.npy"),
output_path,
"--mode",
"add",
],
)
assert result.exit_code == 0
combined = load_recording(output_path)
# 1 + 2 + 3 = 6
assert np.allclose(combined.data, 6 + 0j)
class TestCombineAddAlignError:
"""Test add mode with error alignment (default)."""
def test_different_length_error(self):
"""Test that different lengths cause error by default."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create recordings with different lengths
sig1 = np.ones(10000, dtype=np.complex64)
sig2 = np.ones(5000, dtype=np.complex64) * 2
rec1 = Recording(data=sig1, metadata={"sample_rate": 2e6})
rec2 = Recording(data=sig2, metadata={"sample_rate": 2e6})
to_npy(rec1, filename=str(Path(tmpdir) / "long.npy"), overwrite=True)
to_npy(rec2, filename=str(Path(tmpdir) / "short.npy"), overwrite=True)
runner = CliRunner()
output_path = str(Path(tmpdir) / "output.npy")
result = runner.invoke(
cli,
[
"combine",
str(Path(tmpdir) / "long.npy"),
str(Path(tmpdir) / "short.npy"),
output_path,
"--mode",
"add",
],
)
assert result.exit_code != 0
assert "different lengths" in result.output
assert "--align-mode" in result.output
class TestCombineAddAlignTruncate:
"""Test add mode with truncate alignment."""
def test_truncate(self):
"""Test truncate to shortest recording."""
with tempfile.TemporaryDirectory() as tmpdir:
sig1 = np.ones(10000, dtype=np.complex64)
sig2 = np.ones(5000, dtype=np.complex64) * 2
rec1 = Recording(data=sig1, metadata={"sample_rate": 2e6})
rec2 = Recording(data=sig2, metadata={"sample_rate": 2e6})
to_npy(rec1, filename=str(Path(tmpdir) / "long.npy"), overwrite=True)
to_npy(rec2, filename=str(Path(tmpdir) / "short.npy"), overwrite=True)
runner = CliRunner()
output_path = str(Path(tmpdir) / "truncated.npy")
result = runner.invoke(
cli,
[
"combine",
str(Path(tmpdir) / "long.npy"),
str(Path(tmpdir) / "short.npy"),
output_path,
"--mode",
"add",
"--align-mode",
"truncate",
],
)
assert result.exit_code == 0
combined = load_recording(output_path)
assert combined.data.shape[1] == 5000
assert np.allclose(combined.data, 3 + 0j)
class TestCombineAddAlignPad:
"""Test add mode with pad alignment."""
def test_pad(self):
"""Test zero-padding to longest recording."""
with tempfile.TemporaryDirectory() as tmpdir:
sig1 = np.ones(10000, dtype=np.complex64)
sig2 = np.ones(5000, dtype=np.complex64) * 2
rec1 = Recording(data=sig1, metadata={"sample_rate": 2e6})
rec2 = Recording(data=sig2, metadata={"sample_rate": 2e6})
to_npy(rec1, filename=str(Path(tmpdir) / "long.npy"), overwrite=True)
to_npy(rec2, filename=str(Path(tmpdir) / "short.npy"), overwrite=True)
runner = CliRunner()
output_path = str(Path(tmpdir) / "padded.npy")
result = runner.invoke(
cli,
[
"combine",
str(Path(tmpdir) / "long.npy"),
str(Path(tmpdir) / "short.npy"),
output_path,
"--mode",
"add",
"--align-mode",
"pad",
],
)
assert result.exit_code == 0
combined = load_recording(output_path)
assert combined.data.shape[1] == 10000
# First 5000: 1 + 2 = 3
assert np.allclose(combined.data[0, :5000], 3 + 0j)
# Last 5000: 1 + 0 = 1
assert np.allclose(combined.data[0, 5000:], 1 + 0j)
class TestCombineAddAlignPadStart:
"""Test add mode with pad-start alignment."""
def test_pad_start(self):
"""Test pad-start at specific sample."""
with tempfile.TemporaryDirectory() as tmpdir:
sig1 = np.ones(10000, dtype=np.complex64)
sig2 = np.ones(5000, dtype=np.complex64) * 2
rec1 = Recording(data=sig1, metadata={"sample_rate": 2e6})
rec2 = Recording(data=sig2, metadata={"sample_rate": 2e6})
to_npy(rec1, filename=str(Path(tmpdir) / "long.npy"), overwrite=True)
to_npy(rec2, filename=str(Path(tmpdir) / "short.npy"), overwrite=True)
runner = CliRunner()
output_path = str(Path(tmpdir) / "pad_start.npy")
result = runner.invoke(
cli,
[
"combine",
str(Path(tmpdir) / "long.npy"),
str(Path(tmpdir) / "short.npy"),
output_path,
"--mode",
"add",
"--align-mode",
"pad-start",
"--pad-start-sample",
"3000",
],
)
assert result.exit_code == 0
combined = load_recording(output_path)
assert combined.data.shape[1] == 10000
# Before 3000: 1 + 0 = 1
assert np.allclose(combined.data[0, :3000], 1 + 0j)
# 3000-8000: 1 + 2 = 3
assert np.allclose(combined.data[0, 3000:8000], 3 + 0j)
# After 8000: 1 + 0 = 1
assert np.allclose(combined.data[0, 8000:], 1 + 0j)
def test_pad_start_invalid(self):
"""Test invalid pad-start-sample."""
with tempfile.TemporaryDirectory() as tmpdir:
sig1 = np.ones(10000, dtype=np.complex64)
sig2 = np.ones(5000, dtype=np.complex64) * 2
rec1 = Recording(data=sig1, metadata={"sample_rate": 2e6})
rec2 = Recording(data=sig2, metadata={"sample_rate": 2e6})
to_npy(rec1, filename=str(Path(tmpdir) / "long.npy"), overwrite=True)
to_npy(rec2, filename=str(Path(tmpdir) / "short.npy"), overwrite=True)
runner = CliRunner()
output_path = str(Path(tmpdir) / "output.npy")
result = runner.invoke(
cli,
[
"combine",
str(Path(tmpdir) / "long.npy"),
str(Path(tmpdir) / "short.npy"),
output_path,
"--mode",
"add",
"--align-mode",
"pad-start",
"--pad-start-sample",
"7000", # Too large
],
)
assert result.exit_code != 0
assert "exceeds max length" in result.output
class TestCombineAddAlignPadCenter:
"""Test add mode with pad-center alignment."""
def test_pad_center(self):
"""Test centering shorter recording."""
with tempfile.TemporaryDirectory() as tmpdir:
sig1 = np.ones(10000, dtype=np.complex64)
sig2 = np.ones(5000, dtype=np.complex64) * 2
rec1 = Recording(data=sig1, metadata={"sample_rate": 2e6})
rec2 = Recording(data=sig2, metadata={"sample_rate": 2e6})
to_npy(rec1, filename=str(Path(tmpdir) / "long.npy"), overwrite=True)
to_npy(rec2, filename=str(Path(tmpdir) / "short.npy"), overwrite=True)
runner = CliRunner()
output_path = str(Path(tmpdir) / "pad_center.npy")
result = runner.invoke(
cli,
[
"combine",
str(Path(tmpdir) / "long.npy"),
str(Path(tmpdir) / "short.npy"),
output_path,
"--mode",
"add",
"--align-mode",
"pad-center",
],
)
assert result.exit_code == 0
combined = load_recording(output_path)
assert combined.data.shape[1] == 10000
# Before 2500: 1 + 0 = 1
assert np.allclose(combined.data[0, :2500], 1 + 0j)
# 2500-7500: 1 + 2 = 3
assert np.allclose(combined.data[0, 2500:7500], 3 + 0j)
# After 7500: 1 + 0 = 1
assert np.allclose(combined.data[0, 7500:], 1 + 0j)
class TestCombineAddAlignPadEnd:
"""Test add mode with pad-end alignment."""
def test_pad_end(self):
"""Test aligning end of recordings."""
with tempfile.TemporaryDirectory() as tmpdir:
sig1 = np.ones(10000, dtype=np.complex64)
sig2 = np.ones(5000, dtype=np.complex64) * 2
rec1 = Recording(data=sig1, metadata={"sample_rate": 2e6})
rec2 = Recording(data=sig2, metadata={"sample_rate": 2e6})
to_npy(rec1, filename=str(Path(tmpdir) / "long.npy"), overwrite=True)
to_npy(rec2, filename=str(Path(tmpdir) / "short.npy"), overwrite=True)
runner = CliRunner()
output_path = str(Path(tmpdir) / "pad_end.npy")
result = runner.invoke(
cli,
[
"combine",
str(Path(tmpdir) / "long.npy"),
str(Path(tmpdir) / "short.npy"),
output_path,
"--mode",
"add",
"--align-mode",
"pad-end",
],
)
assert result.exit_code == 0
combined = load_recording(output_path)
assert combined.data.shape[1] == 10000
# First 5000: 1 + 0 = 1
assert np.allclose(combined.data[0, :5000], 1 + 0j)
# Last 5000: 1 + 2 = 3
assert np.allclose(combined.data[0, 5000:], 3 + 0j)
class TestCombineAddAlignRepeat:
"""Test add mode with repeat alignment."""
def test_repeat(self):
"""Test repeating shorter recording."""
with tempfile.TemporaryDirectory() as tmpdir:
sig1 = np.ones(10000, dtype=np.complex64)
sig2 = np.ones(5000, dtype=np.complex64) * 2
rec1 = Recording(data=sig1, metadata={"sample_rate": 2e6})
rec2 = Recording(data=sig2, metadata={"sample_rate": 2e6})
to_npy(rec1, filename=str(Path(tmpdir) / "long.npy"), overwrite=True)
to_npy(rec2, filename=str(Path(tmpdir) / "short.npy"), overwrite=True)
runner = CliRunner()
output_path = str(Path(tmpdir) / "repeated.npy")
result = runner.invoke(
cli,
[
"combine",
str(Path(tmpdir) / "long.npy"),
str(Path(tmpdir) / "short.npy"),
output_path,
"--mode",
"add",
"--align-mode",
"repeat",
],
)
assert result.exit_code == 0
combined = load_recording(output_path)
assert combined.data.shape[1] == 10000
# Entire recording: 1 + 2 = 3 (pattern repeated)
assert np.allclose(combined.data, 3 + 0j)
def test_repeat_partial(self):
"""Test repeat with non-exact multiple."""
with tempfile.TemporaryDirectory() as tmpdir:
sig1 = np.ones(10000, dtype=np.complex64)
sig2 = np.arange(3000, dtype=np.complex64)
rec1 = Recording(data=sig1, metadata={"sample_rate": 2e6})
rec2 = Recording(data=sig2, metadata={"sample_rate": 2e6})
to_npy(rec1, filename=str(Path(tmpdir) / "long.npy"), overwrite=True)
to_npy(rec2, filename=str(Path(tmpdir) / "short.npy"), overwrite=True)
runner = CliRunner()
output_path = str(Path(tmpdir) / "repeated.npy")
result = runner.invoke(
cli,
[
"combine",
str(Path(tmpdir) / "long.npy"),
str(Path(tmpdir) / "short.npy"),
output_path,
"--mode",
"add",
"--align-mode",
"repeat",
],
)
assert result.exit_code == 0
combined = load_recording(output_path)
# Check pattern repeats correctly
# First 3000: 1 + [0,1,2,...,2999]
assert np.allclose(combined.data[0, :3000], 1 + np.arange(3000))
# Next 3000: 1 + [0,1,2,...,2999]
assert np.allclose(combined.data[0, 3000:6000], 1 + np.arange(3000))
# Next 3000: 1 + [0,1,2,...,2999]
assert np.allclose(combined.data[0, 6000:9000], 1 + np.arange(3000))
# Last 1000: 1 + [0,1,2,...,999]
assert np.allclose(combined.data[0, 9000:10000], 1 + np.arange(1000))
class TestCombineAddAlignRepeatSpaced:
"""Test add mode with repeat-spaced alignment."""
def test_repeat_spaced(self):
"""Test repeating with spacing."""
with tempfile.TemporaryDirectory() as tmpdir:
sig1 = np.ones(10000, dtype=np.complex64)
sig2 = np.ones(2000, dtype=np.complex64) * 2
rec1 = Recording(data=sig1, metadata={"sample_rate": 2e6})
rec2 = Recording(data=sig2, metadata={"sample_rate": 2e6})
to_npy(rec1, filename=str(Path(tmpdir) / "long.npy"), overwrite=True)
to_npy(rec2, filename=str(Path(tmpdir) / "short.npy"), overwrite=True)
runner = CliRunner()
output_path = str(Path(tmpdir) / "repeat_spaced.npy")
result = runner.invoke(
cli,
[
"combine",
str(Path(tmpdir) / "long.npy"),
str(Path(tmpdir) / "short.npy"),
output_path,
"--mode",
"add",
"--align-mode",
"repeat-spaced",
"--repeat-spacing",
"1000",
],
)
assert result.exit_code == 0
combined = load_recording(output_path)
assert combined.data.shape[1] == 10000
# First 2000: 1 + 2 = 3
assert np.allclose(combined.data[0, :2000], 3 + 0j)
# Next 1000 (gap): 1 + 0 = 1
assert np.allclose(combined.data[0, 2000:3000], 1 + 0j)
# Next 2000: 1 + 2 = 3
assert np.allclose(combined.data[0, 3000:5000], 3 + 0j)
# Next 1000 (gap): 1 + 0 = 1
assert np.allclose(combined.data[0, 5000:6000], 1 + 0j)
def test_repeat_spaced_missing_spacing(self):
"""Test error when spacing not provided."""
with tempfile.TemporaryDirectory() as tmpdir:
sig1 = np.ones(10000, dtype=np.complex64)
sig2 = np.ones(5000, dtype=np.complex64) * 2
rec1 = Recording(data=sig1, metadata={"sample_rate": 2e6})
rec2 = Recording(data=sig2, metadata={"sample_rate": 2e6})
to_npy(rec1, filename=str(Path(tmpdir) / "long.npy"), overwrite=True)
to_npy(rec2, filename=str(Path(tmpdir) / "short.npy"), overwrite=True)
runner = CliRunner()
output_path = str(Path(tmpdir) / "output.npy")
result = runner.invoke(
cli,
[
"combine",
str(Path(tmpdir) / "long.npy"),
str(Path(tmpdir) / "short.npy"),
output_path,
"--mode",
"add",
"--align-mode",
"repeat-spaced",
# Missing --repeat-spacing
],
)
assert result.exit_code != 0
assert "requires --repeat-spacing" in result.output
class TestCombineValidation:
"""Test validation and error handling."""
def test_sample_rate_mismatch(self):
"""Test error on sample rate mismatch in add mode."""
with tempfile.TemporaryDirectory() as tmpdir:
sig1 = np.ones(1000, dtype=np.complex64)
sig2 = np.ones(1000, dtype=np.complex64) * 2
rec1 = Recording(data=sig1, metadata={"sample_rate": 2e6})
rec2 = Recording(data=sig2, metadata={"sample_rate": 1e6})
to_npy(rec1, filename=str(Path(tmpdir) / "sig1.npy"), overwrite=True)
to_npy(rec2, filename=str(Path(tmpdir) / "sig2.npy"), overwrite=True)
runner = CliRunner()
output_path = str(Path(tmpdir) / "output.npy")
result = runner.invoke(
cli,
[
"combine",
str(Path(tmpdir) / "sig1.npy"),
str(Path(tmpdir) / "sig2.npy"),
output_path,
"--mode",
"add",
],
)
assert result.exit_code != 0
assert "different sample rates" in result.output
def test_channel_count_mismatch(self):
"""Test error on channel count mismatch."""
with tempfile.TemporaryDirectory() as tmpdir:
# Single channel
sig1 = np.ones((1, 1000), dtype=np.complex64)
# Two channels
sig2 = np.ones((2, 1000), dtype=np.complex64)
rec1 = Recording(data=sig1, metadata={"sample_rate": 2e6})
rec2 = Recording(data=sig2, metadata={"sample_rate": 2e6})
to_npy(rec1, filename=str(Path(tmpdir) / "sig1.npy"), overwrite=True)
to_npy(rec2, filename=str(Path(tmpdir) / "sig2.npy"), overwrite=True)
runner = CliRunner()
output_path = str(Path(tmpdir) / "output.npy")
result = runner.invoke(
cli,
[
"combine",
str(Path(tmpdir) / "sig1.npy"),
str(Path(tmpdir) / "sig2.npy"),
output_path,
"--mode",
"add",
],
)
assert result.exit_code != 0
assert "different channel counts" in result.output
def test_overwrite_protection(self):
"""Test overwrite protection."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create test recordings
sig1 = np.ones(1000, dtype=np.complex64)
sig2 = np.ones(1000, dtype=np.complex64) * 2
rec1 = Recording(data=sig1, metadata={"sample_rate": 2e6})
rec2 = Recording(data=sig2, metadata={"sample_rate": 2e6})
to_npy(rec1, filename=str(Path(tmpdir) / "sig1.npy"), overwrite=True)
to_npy(rec2, filename=str(Path(tmpdir) / "sig2.npy"), overwrite=True)
# Create existing output file
existing = Recording(data=np.zeros(100, dtype=np.complex64), metadata={})
output_path = str(Path(tmpdir) / "output.npy")
to_npy(existing, filename=output_path, overwrite=True)
runner = CliRunner()
# Should fail without --overwrite
result = runner.invoke(
cli,
[
"combine",
str(Path(tmpdir) / "sig1.npy"),
str(Path(tmpdir) / "sig2.npy"),
output_path,
"--mode",
"add",
],
)
assert result.exit_code != 0
assert "already exists" in result.output
# Should succeed with --overwrite
result = runner.invoke(
cli,
[
"combine",
str(Path(tmpdir) / "sig1.npy"),
str(Path(tmpdir) / "sig2.npy"),
output_path,
"--mode",
"add",
"--overwrite",
],
)
assert result.exit_code == 0
class TestCombineOutputOptions:
"""Test output format and options."""
def test_output_formats(self):
"""Test different output formats."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create test recordings
sig1 = np.ones(1000, dtype=np.complex64)
sig2 = np.ones(1000, dtype=np.complex64) * 2
rec1 = Recording(data=sig1, metadata={"sample_rate": 2e6})
rec2 = Recording(data=sig2, metadata={"sample_rate": 2e6})
to_npy(rec1, filename=str(Path(tmpdir) / "sig1.npy"), overwrite=True)
to_npy(rec2, filename=str(Path(tmpdir) / "sig2.npy"), overwrite=True)
runner = CliRunner()
# Test SigMF output
result = runner.invoke(
cli,
[
"combine",
str(Path(tmpdir) / "sig1.npy"),
str(Path(tmpdir) / "sig2.npy"),
str(Path(tmpdir) / "output.sigmf-data"),
"--mode",
"add",
],
)
assert result.exit_code == 0
assert Path(tmpdir, "output.sigmf-data").exists()
assert Path(tmpdir, "output.sigmf-meta").exists()
def test_normalize(self):
"""Test normalize option."""
with tempfile.TemporaryDirectory() as tmpdir:
sig1 = np.ones(1000, dtype=np.complex64) * 10
sig2 = np.ones(1000, dtype=np.complex64) * 20
rec1 = Recording(data=sig1, metadata={"sample_rate": 2e6})
rec2 = Recording(data=sig2, metadata={"sample_rate": 2e6})
to_npy(rec1, filename=str(Path(tmpdir) / "sig1.npy"), overwrite=True)
to_npy(rec2, filename=str(Path(tmpdir) / "sig2.npy"), overwrite=True)
runner = CliRunner()
output_path = str(Path(tmpdir) / "normalized.npy")
result = runner.invoke(
cli,
[
"combine",
str(Path(tmpdir) / "sig1.npy"),
str(Path(tmpdir) / "sig2.npy"),
output_path,
"--mode",
"add",
"--normalize",
],
)
assert result.exit_code == 0
combined = load_recording(output_path)
# Should be normalized to max magnitude 1
assert np.allclose(np.max(np.abs(combined.data)), 1.0)
assert combined._metadata.get("normalized") is True
def test_custom_metadata(self):
"""Test adding custom metadata."""
with tempfile.TemporaryDirectory() as tmpdir:
sig1 = np.ones(1000, dtype=np.complex64)
sig2 = np.ones(1000, dtype=np.complex64) * 2
rec1 = Recording(data=sig1, metadata={"sample_rate": 2e6})
rec2 = Recording(data=sig2, metadata={"sample_rate": 2e6})
to_npy(rec1, filename=str(Path(tmpdir) / "sig1.npy"), overwrite=True)
to_npy(rec2, filename=str(Path(tmpdir) / "sig2.npy"), overwrite=True)
runner = CliRunner()
output_path = str(Path(tmpdir) / "output.npy")
result = runner.invoke(
cli,
[
"combine",
str(Path(tmpdir) / "sig1.npy"),
str(Path(tmpdir) / "sig2.npy"),
output_path,
"--mode",
"add",
"--metadata",
"test_id=test123",
"--metadata",
"author=tester",
],
)
assert result.exit_code == 0
combined = load_recording(output_path)
assert combined._metadata["test_id"] == "test123"
assert combined._metadata["author"] == "tester"
class TestCombineVerboseQuiet:
"""Test verbose and quiet modes."""
def test_verbose(self):
"""Test verbose output."""
with tempfile.TemporaryDirectory() as tmpdir:
sig1 = np.ones(1000, dtype=np.complex64)
sig2 = np.ones(1000, dtype=np.complex64) * 2
rec1 = Recording(data=sig1, metadata={"sample_rate": 2e6})
rec2 = Recording(data=sig2, metadata={"sample_rate": 2e6})
to_npy(rec1, filename=str(Path(tmpdir) / "sig1.npy"), overwrite=True)
to_npy(rec2, filename=str(Path(tmpdir) / "sig2.npy"), overwrite=True)
runner = CliRunner()
output_path = str(Path(tmpdir) / "output.npy")
result = runner.invoke(
cli,
[
"combine",
str(Path(tmpdir) / "sig1.npy"),
str(Path(tmpdir) / "sig2.npy"),
output_path,
"--mode",
"add",
"--verbose",
],
)
assert result.exit_code == 0
assert "Loading" in result.output
assert "Done" in result.output
def test_quiet(self):
"""Test quiet output."""
with tempfile.TemporaryDirectory() as tmpdir:
sig1 = np.ones(1000, dtype=np.complex64)
sig2 = np.ones(1000, dtype=np.complex64) * 2
rec1 = Recording(data=sig1, metadata={"sample_rate": 2e6})
rec2 = Recording(data=sig2, metadata={"sample_rate": 2e6})
to_npy(rec1, filename=str(Path(tmpdir) / "sig1.npy"), overwrite=True)
to_npy(rec2, filename=str(Path(tmpdir) / "sig2.npy"), overwrite=True)
runner = CliRunner()
output_path = str(Path(tmpdir) / "output.npy")
result = runner.invoke(
cli,
[
"combine",
str(Path(tmpdir) / "sig1.npy"),
str(Path(tmpdir) / "sig2.npy"),
output_path,
"--mode",
"add",
"--quiet",
],
)
assert result.exit_code == 0
assert result.output == ""

View File

@ -0,0 +1,165 @@
# flake8: noqa
"""Tests for capture command."""
import os
import tempfile
from unittest.mock import MagicMock, patch
import numpy as np
import pytest
import yaml
from click.testing import CliRunner
from ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.capture import (
auto_select_device,
capture,
get_sdr_device,
save_visualization,
)
class TestGetSdrDevice:
"""Tests for get_sdr_device function."""
def test_get_pluto_device(self):
"""Test getting PlutoSDR device."""
mock_sdr_class = MagicMock()
mock_sdr_instance = MagicMock()
mock_sdr_class.return_value = mock_sdr_instance
with patch.dict("sys.modules", {"src.ria_toolkit_oss.sdr.pluto": MagicMock(Pluto=mock_sdr_class)}):
device = get_sdr_device("pluto")
assert device is mock_sdr_instance
def test_get_hackrf_device(self):
"""Test getting HackRF device."""
mock_sdr_class = MagicMock()
mock_sdr_instance = MagicMock()
mock_sdr_class.return_value = mock_sdr_instance
with patch.dict("sys.modules", {"src.ria_toolkit_oss.sdr.hackrf": MagicMock(HackRF=mock_sdr_class)}):
device = get_sdr_device("hackrf")
assert device is mock_sdr_instance
def test_get_unknown_device(self):
"""Test getting unknown device type."""
from click.exceptions import ClickException
with pytest.raises(ClickException) as exc_info:
get_sdr_device("unknown_device")
assert "Unknown device type" in str(exc_info.value)
class TestAutoSelectDevice:
"""Tests for auto_select_device function."""
def test_auto_select_no_devices(self):
"""Test auto-select with no devices found."""
from click.exceptions import ClickException
with patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.capture.list_all_devices") as mock_discover:
mock_discover.return_value = []
with pytest.raises(ClickException) as exc_info:
auto_select_device()
assert "No SDR devices found" in str(exc_info.value)
def test_auto_select_single_device(self):
"""Test auto-select with single device."""
with patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.capture.list_all_devices") as mock_discover:
mock_discover.return_value = [{"type": "HackRF", "serial": "123456"}]
device_type = auto_select_device(quiet=True)
assert device_type == "hackrf"
def test_auto_select_single_device_with_warning(self):
"""Test auto-select shows warning when not quiet."""
with (
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.capture.list_all_devices") as mock_discover,
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.capture.click.echo") as mock_echo,
):
mock_discover.return_value = [{"type": "PlutoSDR", "uri": "ip:pluto.local"}]
device_type = auto_select_device(quiet=False)
assert device_type == "pluto"
# Should have called echo twice (warning + hint)
assert mock_echo.call_count == 2
def test_auto_select_multiple_devices(self):
"""Test auto-select with multiple devices raises error."""
from click.exceptions import ClickException
with patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.capture.list_all_devices") as mock_discover:
mock_discover.return_value = [
{"type": "HackRF", "serial": "123456"},
{"type": "PlutoSDR", "uri": "ip:pluto.local"},
]
with pytest.raises(ClickException) as exc_info:
auto_select_device()
assert "Multiple devices found" in str(exc_info.value)
def test_auto_select_device_name_mapping(self):
"""Test device name mapping."""
with patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.capture.list_all_devices") as mock_discover:
# Test various device name formats
test_cases = [
("PlutoSDR", "pluto"),
("HackRF", "hackrf"),
("BladeRF", "bladerf"),
("RTL-SDR", "rtlsdr"),
]
for device_name, expected_type in test_cases:
mock_discover.return_value = [{"type": device_name}]
device_type = auto_select_device(quiet=True)
assert device_type == expected_type
class TestSaveVisualization:
"""Tests for save_visualization function."""
def test_save_visualization_success(self):
"""Test successful visualization save."""
mock_recording = MagicMock()
with patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.capture.view_simple_sig") as mock_view:
save_visualization(mock_recording, "test.png", quiet=True)
mock_view.assert_called_once_with(
mock_recording, output_path="test.png", saveplot=True, fast_mode=False, labels_mode=True
)
def test_save_visualization_import_error(self):
"""Test visualization save with import error."""
mock_recording = MagicMock()
with (
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.capture.view_simple_sig", side_effect=ImportError("Module not found")),
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.capture.click.echo") as mock_echo,
):
save_visualization(mock_recording, "test.png", quiet=True)
# Should catch error and echo warning
mock_echo.assert_called_once()
assert "Warning" in str(mock_echo.call_args)
def test_save_visualization_general_error(self):
"""Test visualization save with general error."""
mock_recording = MagicMock()
with (
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.capture.view_simple_sig", side_effect=Exception("Failed to plot")),
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.capture.click.echo") as mock_echo,
):
save_visualization(mock_recording, "test.png", quiet=True)
mock_echo.assert_called_once()
assert "Failed to save visualization" in str(mock_echo.call_args)

View File

@ -0,0 +1,118 @@
"""Tests for common CLI utilities."""
import os
import tempfile
import pytest
import yaml
from ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.common import (
format_frequency,
format_sample_rate,
load_yaml_config,
parse_frequency,
parse_metadata_args,
)
def test_load_yaml_config():
"""Test loading YAML configuration files."""
config_data = {
"device": "pluto",
"sample_rate": 2e6,
"center_frequency": "915e6",
"gain": 30,
"metadata": {"location": "test_lab", "experiment": "test_001"},
}
with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as f:
yaml.dump(config_data, f)
config_file = f.name
try:
loaded_config = load_yaml_config(config_file)
assert loaded_config == config_data
assert loaded_config["device"] == "pluto"
assert loaded_config["sample_rate"] == 2e6
assert loaded_config["metadata"]["location"] == "test_lab"
finally:
os.unlink(config_file)
def test_load_yaml_config_empty():
"""Test loading empty YAML file."""
with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as f:
f.write("")
config_file = f.name
try:
loaded_config = load_yaml_config(config_file)
assert loaded_config == {}
finally:
os.unlink(config_file)
def test_parse_metadata_args():
"""Test parsing metadata KEY=VALUE arguments."""
metadata_args = ["location=test_lab", "experiment=001", "power=30", "frequency=2.4e9", "description=Test Signal"]
result = parse_metadata_args(metadata_args)
assert result["location"] == "test_lab"
assert result["experiment"] == "001" # String because doesn't parse as number
assert result["power"] == 30 # Integer
assert result["frequency"] == 2.4e9 # Float
assert result["description"] == "Test Signal"
def test_parse_metadata_args_invalid():
"""Test invalid metadata format raises error."""
from click.exceptions import ClickException
with pytest.raises(ClickException):
parse_metadata_args(["invalid_format"])
with pytest.raises(ClickException):
parse_metadata_args(["key1=value1", "invalid", "key2=value2"])
def test_parse_frequency():
"""Test frequency parsing with different formats."""
# Scientific notation
assert parse_frequency("915e6") == 915e6
assert parse_frequency("2.4e9") == 2.4e9
assert parse_frequency("433e6") == 433e6
# With suffixes
assert parse_frequency("915M") == 915e6
assert parse_frequency("2.4G") == 2.4e9
assert parse_frequency("433M") == 433e6
assert parse_frequency("100k") == 100e3
assert parse_frequency("100K") == 100e3
# Plain numbers
assert parse_frequency("915000000") == 915e6
assert parse_frequency("2400000000") == 2.4e9
# Edge cases
assert parse_frequency("0.915G") == 915e6
assert parse_frequency("915.0M") == 915e6
def test_format_frequency():
"""Test frequency formatting."""
assert format_frequency(915e6) == "915.00 MHz"
assert format_frequency(2.4e9) == "2.40 GHz"
assert format_frequency(433e6) == "433.00 MHz"
assert format_frequency(100e3) == "100.00 kHz"
assert format_frequency(1e3) == "1.00 kHz"
assert format_frequency(500) == "500.00 Hz"
def test_format_sample_rate():
"""Test sample rate formatting."""
assert format_sample_rate(20e6) == "20.00 MS/s"
assert format_sample_rate(2e6) == "2.00 MS/s"
assert format_sample_rate(100e3) == "100.00 kS/s"
assert format_sample_rate(1e3) == "1.00 kS/s"
assert format_sample_rate(500) == "500.00 S/s"

View File

@ -0,0 +1,190 @@
"""Tests for convert command."""
import os
import tempfile
from pathlib import Path
import pytest
from click.testing import CliRunner
from ria_toolkit_oss.ria_toolkit_oss_cli.cli import cli
class TestConvert:
"""Test convert command functionality."""
def test_convert_help(self):
"""Test convert command help."""
runner = CliRunner()
result = runner.invoke(cli, ["convert", "--help"])
assert result.exit_code == 0
assert "Convert recordings between file formats" in result.output
assert "--format" in result.output
assert "--legacy" in result.output
assert "--wav-sample-rate" in result.output
assert "--blue-format" in result.output
assert "--overwrite" in result.output
assert "--metadata" in result.output
def test_missing_arguments(self):
"""Test that missing arguments show error."""
runner = CliRunner()
result = runner.invoke(cli, ["convert"])
assert result.exit_code != 0
assert "Missing argument" in result.output or "Error" in result.output
def test_invalid_input_format(self):
"""Test handling of invalid input format."""
runner = CliRunner()
with tempfile.NamedTemporaryFile(suffix=".xyz", delete=False) as f:
try:
result = runner.invoke(cli, ["convert", f.name, "output.npy"])
assert result.exit_code != 0
assert "Unknown format" in result.output or "Supported" in result.output
finally:
os.unlink(f.name)
def test_overwrite_protection(self):
"""Test that overwrite protection works."""
runner = CliRunner()
# Create a dummy input file (will use actual test data if available)
test_input = "/home/qrf/workarea/ash/signal-testbed/recordings/iq2440MHz234233.npy"
if not os.path.exists(test_input):
pytest.skip("Test recording file not found")
with tempfile.TemporaryDirectory() as tmpdir:
output_file = os.path.join(tmpdir, "test.sigmf")
# First conversion should succeed
result = runner.invoke(cli, ["convert", test_input, output_file, "--legacy", "-q"])
assert result.exit_code == 0
# Second conversion without --overwrite should fail
result = runner.invoke(cli, ["convert", test_input, output_file, "--legacy"])
assert result.exit_code != 0
assert "exist" in result.output.lower()
assert "--overwrite" in result.output
# Third conversion with --overwrite should succeed
result = runner.invoke(cli, ["convert", test_input, output_file, "--legacy", "--overwrite", "-q"])
assert result.exit_code == 0
def test_metadata_override(self):
"""Test metadata override functionality."""
runner = CliRunner()
test_input = "/home/qrf/workarea/ash/signal-testbed/recordings/iq2440MHz234233.npy"
if not os.path.exists(test_input):
pytest.skip("Test recording file not found")
with tempfile.TemporaryDirectory() as tmpdir:
output_file = os.path.join(tmpdir, "test.sigmf")
result = runner.invoke(
cli,
[
"convert",
test_input,
output_file,
"--legacy",
"--metadata",
"test_key=test_value",
"--metadata",
"number=42",
"--metadata",
"float_val=3.14",
"-v",
],
)
assert result.exit_code == 0
assert "test_key" in result.output
assert "number" in result.output
assert "float_val" in result.output
def test_format_detection(self):
"""Test that format detection works for different extensions."""
runner = CliRunner()
test_input = "/home/qrf/workarea/ash/signal-testbed/recordings/iq2440MHz234233.npy"
if not os.path.exists(test_input):
pytest.skip("Test recording file not found")
with tempfile.TemporaryDirectory() as tmpdir:
# Test NPY to SigMF
sigmf_out = os.path.join(tmpdir, "test.sigmf")
result = runner.invoke(cli, ["convert", test_input, sigmf_out, "--legacy", "-q"])
assert result.exit_code == 0
assert Path(sigmf_out).with_suffix(".sigmf-data").exists()
assert Path(sigmf_out).with_suffix(".sigmf-meta").exists()
# Test NPY to NPY
npy_out = os.path.join(tmpdir, "test.npy")
result = runner.invoke(cli, ["convert", test_input, npy_out, "--legacy", "-q"])
assert result.exit_code == 0
assert Path(npy_out).exists()
def test_wav_conversion_with_decimation(self):
"""Test WAV conversion with sample rate decimation."""
runner = CliRunner()
test_input = "/home/qrf/workarea/ash/signal-testbed/recordings/iq2440MHz234233.npy"
if not os.path.exists(test_input):
pytest.skip("Test recording file not found")
with tempfile.TemporaryDirectory() as tmpdir:
wav_out = os.path.join(tmpdir, "test.wav")
result = runner.invoke(
cli, ["convert", test_input, wav_out, "--legacy", "--wav-sample-rate", "48000", "--wav-bits", "16"]
)
assert result.exit_code == 0
assert "Decimation factor" in result.output
assert Path(wav_out).exists()
# Check file is non-empty
assert os.path.getsize(wav_out) > 0
def test_blue_format_conversion(self):
"""Test MIDAS Blue format conversion."""
runner = CliRunner()
test_input = "/home/qrf/workarea/ash/signal-testbed/recordings/iq2440MHz234233.npy"
if not os.path.exists(test_input):
pytest.skip("Test recording file not found")
with tempfile.TemporaryDirectory() as tmpdir:
# Test each Blue format
for blue_fmt in ["CI", "CF", "CD"]:
blue_out = os.path.join(tmpdir, f"test_{blue_fmt}.blue")
result = runner.invoke(
cli, ["convert", test_input, blue_out, "--legacy", "--blue-format", blue_fmt, "-q"]
)
assert result.exit_code == 0
assert Path(blue_out).exists()
# Check file is non-empty
assert os.path.getsize(blue_out) > 0
def test_quiet_and_verbose_modes(self):
"""Test quiet and verbose output modes."""
runner = CliRunner()
test_input = "/home/qrf/workarea/ash/signal-testbed/recordings/iq2440MHz234233.npy"
if not os.path.exists(test_input):
pytest.skip("Test recording file not found")
with tempfile.TemporaryDirectory() as tmpdir:
# Test verbose mode
output_file = os.path.join(tmpdir, "test_verbose.sigmf")
result = runner.invoke(cli, ["convert", test_input, output_file, "--legacy", "-v"])
assert result.exit_code == 0
assert "Reading input" in result.output
assert "Metadata preserved" in result.output
# Test quiet mode
output_file = os.path.join(tmpdir, "test_quiet.npy")
result = runner.invoke(cli, ["convert", test_input, output_file, "--legacy", "-q"])
assert result.exit_code == 0
# Should have minimal output
assert len(result.output) < 100 or result.output.strip() == ""

View File

@ -0,0 +1,287 @@
"""Tests for discover command."""
import json
import re
from unittest.mock import MagicMock, patch
from click.testing import CliRunner
from ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.discover import ( # find_bladerf_devices,; find_thinkrf_devices,; find_uhd_devices,
discover,
discover_all_devices,
find_hackrf_devices,
find_pluto_devices,
find_rtlsdr_devices,
load_sdr_drivers,
)
def test_discover_pluto_no_devices():
"""Test PlutoSDR discovery with no devices."""
with (
patch.dict("sys.modules", {"iio": MagicMock()}) as mock_modules,
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.discover.get_usb_devices") as mock_usb,
):
mock_iio = mock_modules["iio"]
mock_iio.scan_contexts.return_value = {}
mock_usb.return_value = []
devices = find_pluto_devices()
assert devices == []
def test_discover_pluto_with_device():
"""Test PlutoSDR discovery with device present."""
with patch.dict("sys.modules", {"iio": MagicMock()}) as mock_modules:
mock_iio = mock_modules["iio"]
mock_ctx = MagicMock()
mock_ctx.attrs = {"hw_serial": "123456", "fw_version": "1.0"}
mock_ctx._destroy = MagicMock()
mock_iio.scan_contexts.return_value = {"ip:pluto.local": "PlutoSDR (ADALM-PLUTO)"}
mock_iio.Context.return_value = mock_ctx
devices = find_pluto_devices()
assert len(devices) == 1
assert devices[0]["type"] == "PlutoSDR"
assert devices[0]["serial"] == "123456"
assert devices[0]["firmware"] == "1.0"
assert devices[0]["uri"] == "ip:pluto.local"
def test_discover_hackrf_no_devices():
"""Test HackRF discovery with no devices."""
with patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.discover.subprocess") as mock_subprocess:
mock_subprocess.check_output.return_value = ""
devices = find_hackrf_devices()
assert devices == []
def test_discover_hackrf_with_devices():
"""Test HackRF discovery with devices present."""
with patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.discover.subprocess") as mock_subprocess:
mock_subprocess.check_output.return_value = """
hackrf_info version: 2023.01.1
libhackrf version: 2023.01.1 (0.8)
Found HackRF
Index: 0
Serial number: serial123
Board ID Number: 2 (HackRF One)
Firmware Version: v2.1.0 (API:1.08)
Part ID Number: 0xa000cb3c 0x005d4761
Index: 1
Serial number: serial456
Board ID Number: 2 (HackRF One)
Firmware Version: v2.1.0 (API:1.08)
Part ID Number: 0xa000cb3c 0x005d4761
"""
devices = find_hackrf_devices()
assert len(devices) == 2
assert devices[0]["type"] == "HackRF One"
assert devices[0]["serial"] == "serial123"
assert devices[0]["device_index"] == 0 or devices[0]["device_index"] == "0"
assert devices[1]["serial"] == "serial456"
assert devices[1]["device_index"] == 1 or devices[1]["device_index"] == "1"
def test_discover_rtlsdr_no_devices():
"""Test RTL-SDR discovery with no devices."""
with patch("ria_toolkit_oss.ria_toolkit_oss.ria_toolkit_oss.discover.subprocess") as mock_subprocess:
mock_subprocess.check_output.return_value = ""
devices = find_rtlsdr_devices()
assert devices == []
def test_discover_rtlsdr_with_devices():
"""Test RTL-SDR discovery with devices present."""
with patch("ria_toolkit_oss.ria_toolkit_oss.ria_toolkit_oss.discover.subprocess") as mock_subprocess:
mock_subprocess.check_output.return_value = """
Found 2 device(s):
0: RTLSDRBlog, Blog V4, SN: 00000001
1: RTLSDRBlog, Blog V4, SN: 00000002
Using device 0: Generic RTL2832U OEM
Found Rafael Micro R828D tuner
RTL-SDR Blog V4 Detected
"""
devices = find_rtlsdr_devices()
assert len(devices) == 2
assert devices[0]["type"] == "RTL-SDR"
assert devices[0]["serial"] == "00000001"
assert devices[0]["device_index"] == 0 or devices[0]["device_index"] == "0"
def test_discover_all_devices_filter():
"""Test discovering devices with type filter."""
with (
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.discover.find_pluto_devices") as mock_pluto,
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.discover.find_hackrf_devices") as mock_hackrf,
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.discover.find_bladerf_devices") as mock_bladerf,
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.discover.find_uhd_devices") as mock_usrp,
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.discover.find_rtlsdr_devices") as mock_rtlsdr,
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.discover.find_thinkrf_devices") as mock_thinkrf,
):
mock_pluto.return_value = [{"type": "PlutoSDR", "uri": "ip:pluto.local"}]
mock_hackrf.return_value = []
mock_bladerf.return_value = []
mock_usrp.return_value = []
mock_rtlsdr.return_value = []
mock_thinkrf.return_value = []
# Test filtering by pluto
load_sdr_drivers(verbose=False)
devices = discover_all_devices()
mock_pluto.assert_called_once()
mock_hackrf.assert_called_once()
mock_bladerf.assert_called_once()
assert len(devices["devices"]) == 1
assert len(devices["pluto_devices"]) == 1
def test_discover_all_devices_no_filter():
"""Test discovering all device types."""
with (
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.discover.find_pluto_devices") as mock_pluto,
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.discover.find_hackrf_devices") as mock_hackrf,
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.discover.find_bladerf_devices") as mock_bladerf,
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.discover.find_uhd_devices") as mock_usrp,
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.discover.find_rtlsdr_devices") as mock_rtlsdr,
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.discover.find_thinkrf_devices") as mock_thinkrf,
):
mock_pluto.return_value = [{"type": "PlutoSDR", "uri": "ip:pluto.local"}]
mock_hackrf.return_value = [{"type": "HackRF"}]
mock_bladerf.return_value = []
mock_usrp.return_value = []
mock_rtlsdr.return_value = []
mock_thinkrf.return_value = []
load_sdr_drivers(verbose=False)
devices = discover_all_devices()
mock_pluto.assert_called_once()
mock_hackrf.assert_called_once()
mock_bladerf.assert_called_once()
mock_usrp.assert_called_once()
mock_rtlsdr.assert_called_once()
assert len(devices["devices"]) == 2
assert len(devices["pluto_devices"]) == 1
assert len(devices["hackrf_devices"]) == 1
def test_discover_command_no_devices():
"""Test discover CLI command with no devices."""
runner = CliRunner()
with patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.discover.discover_all_devices") as mock_discover:
mock_discover.return_value = {
"loaded_drivers": [],
"failed_drivers": [],
"devices": [],
"total_devices": 0,
"uhd_devices": [],
"pluto_devices": [],
"rtlsdr_devices": [],
"bladerf_devices": [],
"hackrf_devices": [],
}
result = runner.invoke(discover)
assert result.exit_code == 0
assert "No devices detected" in result.output
def test_discover_command():
"""Test discover CLI command."""
runner = CliRunner()
result = runner.invoke(discover)
radios = ["USRP/UHD", "PlutoSDR", "RTL-SDR", "BladeRF", "HackRF", "ThinkRF"]
match = re.search(r"Detected devices: (\d+)", result.output)
if match:
total_devices = int(match.group(1))
else:
total_devices = 0
if result.exit_code == 0:
assert "Attached Devices" in result.output
assert "Discovery Summary" in result.output
if total_devices > 0:
assert any(radio in result.output for radio in radios)
else:
assert not any(radio in result.output for radio in radios)
else:
assert result.exit_code == 1
assert isinstance(result.exception, AttributeError)
assert "undefined symbol: iio_get_backends_count" in str(result.exception)
def test_discover_command_json_output():
"""Test discover CLI command with JSON output."""
runner = CliRunner()
with patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.discover.discover_all_devices") as mock_discover:
mock_discover.return_value = {
"loaded_drivers": [],
"failed_drivers": [],
"devices": [{"type": "HackRF", "serial": "123456", "status": "available"}],
"total_devices": 1,
"uhd_devices": [],
"pluto_devices": [],
"rtlsdr_devices": [],
"bladerf_devices": [],
"hackrf_devices": [{"type": "HackRF", "serial": "123456", "status": "available"}],
}
result = runner.invoke(discover, ["--json-output"])
output_data = json.loads(result.output)
assert result.exit_code == 0
assert output_data["total_devices"] == 1
assert len(output_data["devices"]) == 1
assert output_data["devices"][0]["type"] == "HackRF"
def test_discover_command_verbose():
"""Test discover CLI command with verbose output."""
runner = CliRunner()
with patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.discover.discover_all_devices") as mock_discover:
mock_discover.return_value = {
"loaded_drivers": [],
"failed_drivers": [],
"devices": [
{
"type": "PlutoSDR",
"serial": "123456",
"firmware": "1.0",
"uri": "ip:pluto.local",
"status": "available",
}
],
"total_devices": 1,
"uhd_devices": [],
"pluto_devices": [],
"rtlsdr_devices": [],
"bladerf_devices": [],
"hackrf_devices": [
{
"type": "PlutoSDR",
"serial": "123456",
"firmware": "1.0",
"uri": "ip:pluto.local",
"status": "available",
}
],
}
result = runner.invoke(discover, ["--verbose"])
assert result.exit_code == 0
assert "RTL-SDR devices: None found" in result.output or "\n rtlsdr:" in result.output

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,670 @@
"""Tests for split CLI command."""
import tempfile
from pathlib import Path
import numpy as np
import pytest
from click.testing import CliRunner
from ria_toolkit_oss.datatypes import Annotation, Recording
from ria_toolkit_oss.io import load_recording, to_sigmf
from ria_toolkit_oss.ria_toolkit_oss_cli.cli import cli
class TestSplitHelp:
"""Test split command help and basic functionality."""
def test_split_help(self):
"""Test split command help."""
runner = CliRunner()
result = runner.invoke(cli, ["split", "--help"])
assert result.exit_code == 0
assert "Split, trim, and extract portions of recordings" in result.output
assert "--split-at" in result.output
assert "--split-every" in result.output
assert "--split-duration" in result.output
assert "--trim" in result.output
assert "--extract-annotations" in result.output
def test_missing_arguments(self):
"""Test that missing arguments show error."""
runner = CliRunner()
result = runner.invoke(cli, ["split"])
assert result.exit_code != 0
assert "Missing argument" in result.output or "Error" in result.output
def test_no_operation_specified(self):
"""Test error when no operation is specified."""
runner = CliRunner()
# Create a test file
with tempfile.TemporaryDirectory() as tmpdir:
signal = np.ones(1000, dtype=np.complex64)
recording = Recording(data=signal, metadata={"sample_rate": 1e6})
to_sigmf(recording, filename="test", path=tmpdir, overwrite=True)
test_file = str(Path(tmpdir) / "test.sigmf-data")
result = runner.invoke(cli, ["split", test_file])
assert result.exit_code != 0
assert "No operation specified" in result.output
class TestSplitTrim:
"""Test trim operations."""
@pytest.fixture
def test_recording(self):
"""Create a test recording file."""
with tempfile.TemporaryDirectory() as tmpdir:
signal = np.arange(10000, dtype=np.complex64)
recording = Recording(data=signal, metadata={"sample_rate": 2e6, "center_frequency": 915e6})
to_sigmf(recording, filename="test", path=tmpdir, overwrite=True)
yield str(Path(tmpdir) / "test.sigmf-data")
def test_trim_with_length(self, test_recording):
"""Test trim with --start and --length."""
runner = CliRunner()
with tempfile.TemporaryDirectory() as outdir:
result = runner.invoke(
cli,
[
"split",
test_recording,
"--trim",
"--start",
"1000",
"--length",
"5000",
"--output-dir",
outdir,
"-q",
],
)
assert result.exit_code == 0
# Verify output file exists
output_files = list(Path(outdir).glob("*.sigmf-data"))
assert len(output_files) == 1
# Verify output has correct length
output_rec = load_recording(str(output_files[0]))
assert output_rec.data.shape[1] == 5000
assert output_rec.metadata["original_start_sample"] == 1000
assert output_rec.metadata["original_end_sample"] == 6000
assert output_rec.metadata["split_operation"] == "trim"
def test_trim_with_end(self, test_recording):
"""Test trim with --start and --end."""
runner = CliRunner()
with tempfile.TemporaryDirectory() as outdir:
result = runner.invoke(
cli,
["split", test_recording, "--trim", "--start", "2000", "--end", "7000", "--output-dir", outdir, "-q"],
)
assert result.exit_code == 0
output_files = list(Path(outdir).glob("*.sigmf-data"))
assert len(output_files) == 1
output_rec = load_recording(str(output_files[0]))
assert output_rec.data.shape[1] == 5000
def test_trim_without_length_or_end(self, test_recording):
"""Test that trim requires --length or --end."""
runner = CliRunner()
result = runner.invoke(cli, ["split", test_recording, "--trim", "--start", "1000"])
assert result.exit_code != 0
assert "requires either --length or --end" in result.output
def test_trim_with_both_length_and_end(self, test_recording):
"""Test that trim rejects both --length and --end."""
runner = CliRunner()
result = runner.invoke(
cli, ["split", test_recording, "--trim", "--start", "1000", "--length", "5000", "--end", "6000"]
)
assert result.exit_code != 0
assert "Cannot specify both --length and --end" in result.output
def test_trim_invalid_range(self, test_recording):
"""Test trim with invalid range."""
runner = CliRunner()
result = runner.invoke(
cli,
["split", test_recording, "--trim", "--start", "1000", "--length", "50000"], # Exceeds recording length
)
assert result.exit_code != 0
assert "Invalid trim range" in result.output
def test_trim_end_before_start(self, test_recording):
"""Test trim with end < start."""
runner = CliRunner()
result = runner.invoke(cli, ["split", test_recording, "--trim", "--start", "5000", "--end", "1000"])
assert result.exit_code != 0
assert "Invalid range" in result.output
class TestSplitAt:
"""Test split-at operations."""
@pytest.fixture
def test_recording(self):
"""Create a test recording file."""
with tempfile.TemporaryDirectory() as tmpdir:
signal = np.arange(10000, dtype=np.complex64)
recording = Recording(data=signal, metadata={"sample_rate": 2e6, "center_frequency": 915e6})
to_sigmf(recording, filename="test", path=tmpdir, overwrite=True)
yield str(Path(tmpdir) / "test.sigmf-data")
def test_split_at_middle(self, test_recording):
"""Test splitting at middle of recording."""
runner = CliRunner()
with tempfile.TemporaryDirectory() as outdir:
result = runner.invoke(cli, ["split", test_recording, "--split-at", "5000", "--output-dir", outdir, "-q"])
assert result.exit_code == 0
# Verify two output files exist
output_files = sorted(Path(outdir).glob("*.sigmf-data"))
assert len(output_files) == 2
# Verify part1
part1 = load_recording(str(output_files[0]))
assert part1.data.shape[1] == 5000
assert part1.metadata["original_start_sample"] == 0
assert part1.metadata["original_end_sample"] == 5000
# Verify part2
part2 = load_recording(str(output_files[1]))
assert part2.data.shape[1] == 5000
assert part2.metadata["original_start_sample"] == 5000
assert part2.metadata["original_end_sample"] == 10000
def test_split_at_invalid_point(self, test_recording):
"""Test split-at with invalid sample point."""
runner = CliRunner()
result = runner.invoke(cli, ["split", test_recording, "--split-at", "50000"]) # Exceeds recording length
assert result.exit_code != 0
assert "Invalid split point" in result.output
class TestSplitEvery:
"""Test split-every operations."""
@pytest.fixture
def test_recording(self):
"""Create a test recording file."""
with tempfile.TemporaryDirectory() as tmpdir:
signal = np.arange(10000, dtype=np.complex64)
recording = Recording(data=signal, metadata={"sample_rate": 2e6, "center_frequency": 915e6})
to_sigmf(recording, filename="test", path=tmpdir, overwrite=True)
yield str(Path(tmpdir) / "test.sigmf-data")
def test_split_every_equal_chunks(self, test_recording):
"""Test splitting into equal chunks."""
runner = CliRunner()
with tempfile.TemporaryDirectory() as outdir:
result = runner.invoke(
cli, ["split", test_recording, "--split-every", "2500", "--output-dir", outdir, "-q"]
)
assert result.exit_code == 0
# Verify 4 chunks created
output_files = sorted(Path(outdir).glob("*.sigmf-data"))
assert len(output_files) == 4
# Verify all chunks have correct size
for i, file in enumerate(output_files):
chunk = load_recording(str(file))
assert chunk.data.shape[1] == 2500
assert chunk.metadata["chunk_index"] == i + 1
assert chunk.metadata["total_chunks"] == 4
def test_split_every_unequal_chunks(self, test_recording):
"""Test splitting with remainder chunk."""
runner = CliRunner()
with tempfile.TemporaryDirectory() as outdir:
result = runner.invoke(
cli, ["split", test_recording, "--split-every", "3000", "--output-dir", outdir, "-q"]
)
assert result.exit_code == 0
# Verify 4 chunks created (3x3000 + 1x1000)
output_files = sorted(Path(outdir).glob("*.sigmf-data"))
assert len(output_files) == 4
# Last chunk should be smaller
last_chunk = load_recording(str(output_files[-1]))
assert last_chunk.data.shape[1] == 1000
class TestSplitDuration:
"""Test split-duration operations."""
@pytest.fixture
def test_recording(self):
"""Create a test recording file with known sample rate."""
with tempfile.TemporaryDirectory() as tmpdir:
signal = np.arange(10000, dtype=np.complex64)
recording = Recording(
data=signal, metadata={"sample_rate": 10000, "center_frequency": 915e6} # 10kHz for easy math
)
to_sigmf(recording, filename="test", path=tmpdir, overwrite=True)
yield str(Path(tmpdir) / "test.sigmf-data")
def test_split_duration_basic(self, test_recording):
"""Test splitting by duration."""
runner = CliRunner()
with tempfile.TemporaryDirectory() as outdir:
result = runner.invoke(
cli,
[
"split",
test_recording,
"--split-duration",
"0.25", # 0.25s = 2500 samples at 10kHz
"--output-dir",
outdir,
"-q",
],
)
assert result.exit_code == 0
# Verify chunks created
output_files = sorted(Path(outdir).glob("*.sigmf-data"))
assert len(output_files) == 4
# Verify chunk sizes
for file in output_files[:-1]:
chunk = load_recording(str(file))
assert chunk.data.shape[1] == 2500
def test_split_duration_no_sample_rate(self):
"""Test that split-duration requires sample_rate in metadata."""
runner = CliRunner()
with tempfile.TemporaryDirectory() as tmpdir:
# Create recording without sample_rate
signal = np.arange(1000, dtype=np.complex64)
recording = Recording(data=signal, metadata={})
to_sigmf(recording, filename="test", path=tmpdir, overwrite=True)
test_file = str(Path(tmpdir) / "test.sigmf-data")
result = runner.invoke(cli, ["split", test_file, "--split-duration", "1.0"])
assert result.exit_code != 0
assert "Cannot split by duration" in result.output
assert "no sample_rate" in result.output
class TestExtractAnnotations:
"""Test extract-annotations operations."""
@pytest.fixture
def annotated_recording(self):
"""Create a test recording with annotations."""
with tempfile.TemporaryDirectory() as tmpdir:
signal = np.arange(100000, dtype=np.complex64)
annotations = [
Annotation(
sample_start=0, sample_count=10000, freq_lower_edge=914e6, freq_upper_edge=916e6, label="preamble"
),
Annotation(
sample_start=10000,
sample_count=50000,
freq_lower_edge=914e6,
freq_upper_edge=916e6,
label="payload",
),
Annotation(
sample_start=60000, sample_count=5000, freq_lower_edge=914e6, freq_upper_edge=916e6, label="crc"
),
]
recording = Recording(
data=signal, metadata={"sample_rate": 2e6, "center_frequency": 915e6}, annotations=annotations
)
to_sigmf(recording, filename="annotated", path=tmpdir, overwrite=True)
yield str(Path(tmpdir) / "annotated.sigmf-data")
def test_extract_all_annotations(self, annotated_recording):
"""Test extracting all annotations."""
runner = CliRunner()
with tempfile.TemporaryDirectory() as outdir:
result = runner.invoke(
cli, ["split", annotated_recording, "--extract-annotations", "--output-dir", outdir, "-q"]
)
assert result.exit_code == 0
# Verify 3 files created
output_files = sorted(Path(outdir).glob("*.sigmf-data"))
assert len(output_files) == 3
# Verify each annotation was extracted
preamble = [f for f in output_files if "preamble" in str(f)][0]
payload = [f for f in output_files if "payload" in str(f)][0]
crc = [f for f in output_files if "crc" in str(f)][0]
preamble_rec = load_recording(str(preamble))
assert preamble_rec.data.shape[1] == 10000
assert preamble_rec.metadata["annotation_label"] == "preamble"
assert len(preamble_rec.annotations) == 0 # Annotations cleared
payload_rec = load_recording(str(payload))
assert payload_rec.data.shape[1] == 50000
assert payload_rec.metadata["annotation_label"] == "payload"
crc_rec = load_recording(str(crc))
assert crc_rec.data.shape[1] == 5000
assert crc_rec.metadata["annotation_label"] == "crc"
def test_extract_annotation_by_label(self, annotated_recording):
"""Test extracting annotations by label."""
runner = CliRunner()
with tempfile.TemporaryDirectory() as outdir:
result = runner.invoke(
cli,
[
"split",
annotated_recording,
"--extract-annotations",
"--annotation-label",
"payload",
"--output-dir",
outdir,
"-q",
],
)
assert result.exit_code == 0
# Verify only 1 file created
output_files = list(Path(outdir).glob("*.sigmf-data"))
assert len(output_files) == 1
assert "payload" in str(output_files[0])
def test_extract_annotation_by_index(self, annotated_recording):
"""Test extracting annotation by index."""
runner = CliRunner()
with tempfile.TemporaryDirectory() as outdir:
result = runner.invoke(
cli,
[
"split",
annotated_recording,
"--extract-annotations",
"--annotation-index",
"1",
"--output-dir",
outdir,
"-q",
],
)
assert result.exit_code == 0
# Verify only 1 file created (payload at index 1)
output_files = list(Path(outdir).glob("*.sigmf-data"))
assert len(output_files) == 1
assert "payload" in str(output_files[0])
def test_extract_annotations_invalid_label(self, annotated_recording):
"""Test error with non-existent label."""
runner = CliRunner()
result = runner.invoke(
cli, ["split", annotated_recording, "--extract-annotations", "--annotation-label", "nonexistent"]
)
assert result.exit_code != 0
assert "No annotations with label" in result.output
def test_extract_annotations_invalid_index(self, annotated_recording):
"""Test error with invalid index."""
runner = CliRunner()
result = runner.invoke(
cli, ["split", annotated_recording, "--extract-annotations", "--annotation-index", "99"]
)
assert result.exit_code != 0
assert "Invalid annotation index" in result.output
def test_extract_annotations_no_annotations(self):
"""Test error when recording has no annotations."""
runner = CliRunner()
with tempfile.TemporaryDirectory() as tmpdir:
signal = np.arange(1000, dtype=np.complex64)
recording = Recording(data=signal, metadata={"sample_rate": 1e6})
to_sigmf(recording, filename="test", path=tmpdir, overwrite=True)
test_file = str(Path(tmpdir) / "test.sigmf-data")
result = runner.invoke(cli, ["split", test_file, "--extract-annotations"])
assert result.exit_code != 0
assert "No annotations found" in result.output
class TestOutputOptions:
"""Test output-related options."""
@pytest.fixture
def test_recording(self):
"""Create a test recording file."""
with tempfile.TemporaryDirectory() as tmpdir:
signal = np.arange(10000, dtype=np.complex64)
recording = Recording(data=signal, metadata={"sample_rate": 2e6, "center_frequency": 915e6})
to_sigmf(recording, filename="test", path=tmpdir, overwrite=True)
yield str(Path(tmpdir) / "test.sigmf-data")
def test_output_prefix(self, test_recording):
"""Test custom output prefix."""
runner = CliRunner()
with tempfile.TemporaryDirectory() as outdir:
result = runner.invoke(
cli,
[
"split",
test_recording,
"--split-every",
"3000",
"--output-prefix",
"custom",
"--output-dir",
outdir,
"-q",
],
)
assert result.exit_code == 0
output_files = list(Path(outdir).glob("*.sigmf-data"))
assert all("custom" in str(f) for f in output_files)
def test_output_format_conversion(self, test_recording):
"""Test format conversion during split."""
runner = CliRunner()
with tempfile.TemporaryDirectory() as outdir:
result = runner.invoke(
cli,
[
"split",
test_recording,
"--split-every",
"5000",
"--output-format",
"npy",
"--output-dir",
outdir,
"-q",
],
)
assert result.exit_code == 0
# Verify NPY files created
output_files = list(Path(outdir).glob("*.npy"))
assert len(output_files) == 2
def test_overwrite_protection(self, test_recording):
"""Test overwrite protection."""
runner = CliRunner()
with tempfile.TemporaryDirectory() as outdir:
# First split should succeed
result = runner.invoke(
cli,
["split", test_recording, "--trim", "--start", "0", "--length", "1000", "--output-dir", outdir, "-q"],
)
assert result.exit_code == 0
# Second split without --overwrite should fail
result = runner.invoke(
cli, ["split", test_recording, "--trim", "--start", "0", "--length", "1000", "--output-dir", outdir]
)
assert result.exit_code != 0
assert "exist" in result.output.lower()
# Third split with --overwrite should succeed
result = runner.invoke(
cli,
[
"split",
test_recording,
"--trim",
"--start",
"0",
"--length",
"1000",
"--output-dir",
outdir,
"--overwrite",
"-q",
],
)
assert result.exit_code == 0
class TestMultipleOperations:
"""Test that multiple operations are rejected."""
@pytest.fixture
def test_recording(self):
"""Create a test recording file."""
with tempfile.TemporaryDirectory() as tmpdir:
signal = np.arange(10000, dtype=np.complex64)
recording = Recording(data=signal, metadata={"sample_rate": 2e6, "center_frequency": 915e6})
to_sigmf(recording, filename="test", path=tmpdir, overwrite=True)
yield str(Path(tmpdir) / "test.sigmf-data")
def test_trim_and_split_at(self, test_recording):
"""Test that trim and split-at cannot be used together."""
runner = CliRunner()
result = runner.invoke(cli, ["split", test_recording, "--trim", "--split-at", "5000"])
assert result.exit_code != 0
assert "Multiple operations specified" in result.output
def test_split_every_and_extract(self, test_recording):
"""Test that split-every and extract-annotations cannot be used together."""
runner = CliRunner()
result = runner.invoke(cli, ["split", test_recording, "--split-every", "1000", "--extract-annotations"])
assert result.exit_code != 0
assert "Multiple operations specified" in result.output
class TestVerboseQuiet:
"""Test verbose and quiet modes."""
@pytest.fixture
def test_recording(self):
"""Create a test recording file."""
with tempfile.TemporaryDirectory() as tmpdir:
signal = np.arange(10000, dtype=np.complex64)
recording = Recording(data=signal, metadata={"sample_rate": 2e6, "center_frequency": 915e6})
to_sigmf(recording, filename="test", path=tmpdir, overwrite=True)
yield str(Path(tmpdir) / "test.sigmf-data")
def test_verbose_mode(self, test_recording):
"""Test verbose output."""
runner = CliRunner()
with tempfile.TemporaryDirectory() as outdir:
result = runner.invoke(
cli,
[
"split",
test_recording,
"--trim",
"--start",
"0",
"--length",
"1000",
"--output-dir",
outdir,
"--verbose",
],
)
assert result.exit_code == 0
assert "Input format: SIGMF" in result.output
assert "Output format: SIGMF" in result.output
def test_quiet_mode(self, test_recording):
"""Test quiet output (minimal output)."""
runner = CliRunner()
with tempfile.TemporaryDirectory() as outdir:
result = runner.invoke(
cli,
[
"split",
test_recording,
"--trim",
"--start",
"0",
"--length",
"1000",
"--output-dir",
outdir,
"--quiet",
],
)
assert result.exit_code == 0
# Output should be minimal in quiet mode
assert len(result.output.strip()) < 100 or result.output.strip() == ""

View File

@ -0,0 +1,345 @@
"""Tests for transmit command."""
import os
import tempfile
from unittest.mock import MagicMock, patch
import numpy as np
import pytest
import yaml
from click.testing import CliRunner
from ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.common import get_sdr_device
from ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit import (
auto_select_tx_device,
check_sample_rate_mismatch,
load_input_file,
transmit,
validate_tx_gain,
)
class TestGetTxDevice:
"""Tests for get_sdr_device function."""
def test_get_pluto_device(self):
"""Test getting PlutoSDR device."""
mock_sdr_class = MagicMock()
mock_sdr_instance = MagicMock()
mock_sdr_class.return_value = mock_sdr_instance
with patch.dict("sys.modules", {"src.ria_toolkit_oss.sdr.pluto": MagicMock(Pluto=mock_sdr_class)}):
device = get_sdr_device("pluto")
assert device is mock_sdr_instance
def test_get_hackrf_device(self):
"""Test getting HackRF device."""
mock_sdr_class = MagicMock()
mock_sdr_instance = MagicMock()
mock_sdr_class.return_value = mock_sdr_instance
with patch.dict("sys.modules", {"src.ria_toolkit_oss.sdr.hackrf": MagicMock(HackRF=mock_sdr_class)}):
device = get_sdr_device("hackrf")
assert device is mock_sdr_instance
def test_get_unknown_device(self):
"""Test getting unknown device type."""
from click.exceptions import ClickException
with pytest.raises(ClickException) as exc_info:
get_sdr_device("unknown_device")
assert "Unknown device type" in str(exc_info.value)
class TestAutoSelectTxDevice:
"""Tests for auto_select_tx_device function."""
def test_auto_select_no_devices(self):
"""Test auto-select with no TX devices found."""
from click.exceptions import ClickException
with (
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.load_sdr_drivers"),
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.find_uhd_devices", return_value=[]),
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.find_pluto_devices", return_value=[]),
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.find_hackrf_devices", return_value=[]),
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.find_bladerf_devices", return_value=[]),
):
with pytest.raises(ClickException) as exc_info:
auto_select_tx_device()
assert "No TX-capable SDR devices found" in str(exc_info.value)
def test_auto_select_single_device(self):
"""Test auto-select with single TX device."""
with (
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.load_sdr_drivers"),
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.find_uhd_devices", return_value=[]),
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.find_pluto_devices", return_value=[]),
patch(
"ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.find_hackrf_devices",
return_value=[{"type": "HackRF One", "serial": "123456"}],
),
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.find_bladerf_devices", return_value=[]),
):
device_type = auto_select_tx_device(quiet=True)
assert device_type == "hackrf"
def test_auto_select_multiple_devices(self):
"""Test auto-select with multiple TX devices raises error."""
from click.exceptions import ClickException
with (
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.load_sdr_drivers"),
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.find_uhd_devices", return_value=[]),
patch(
"ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.find_pluto_devices",
return_value=[{"type": "PlutoSDR", "uri": "ip:pluto.local"}],
),
patch(
"ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.find_hackrf_devices",
return_value=[{"type": "HackRF One", "serial": "123456"}],
),
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.find_bladerf_devices", return_value=[]),
):
with pytest.raises(ClickException) as exc_info:
auto_select_tx_device()
assert "Multiple TX-capable devices found" in str(exc_info.value)
def test_auto_select_device_mapping(self):
"""Test device type name mapping."""
test_cases = [
("PlutoSDR", "pluto"),
("HackRF One", "hackrf"),
("BladeRF", "bladerf"),
("b200", "usrp"),
("B210", "usrp"),
]
for device_name, expected_type in test_cases:
with (
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.load_sdr_drivers"),
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.find_uhd_devices", return_value=[]),
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.find_pluto_devices", return_value=[]),
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.find_hackrf_devices", return_value=[]),
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.find_bladerf_devices", return_value=[{"type": device_name}]),
):
device_type = auto_select_tx_device(quiet=True)
assert device_type == expected_type
class TestLoadInputFile:
"""Tests for load_input_file function."""
def test_load_file_not_found(self):
"""Test loading non-existent file."""
from click.exceptions import ClickException
with pytest.raises(ClickException) as exc_info:
load_input_file("nonexistent.sigmf")
assert "Input file not found" in str(exc_info.value)
def test_load_sigmf_file(self):
"""Test loading SigMF file."""
with tempfile.NamedTemporaryFile(suffix=".sigmf-data", delete=False) as f:
test_file = f.name
try:
mock_recording = MagicMock()
with patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.load_recording", return_value=mock_recording):
recording = load_input_file(test_file, legacy=False)
assert recording == mock_recording
finally:
os.unlink(test_file)
def test_load_legacy_npy_file(self):
"""Test loading legacy NPY file."""
with tempfile.NamedTemporaryFile(suffix=".npy", delete=False) as f:
test_file = f.name
try:
mock_recording = MagicMock()
with patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.from_npy_legacy", return_value=mock_recording):
recording = load_input_file(test_file, legacy=True)
assert recording == mock_recording
finally:
os.unlink(test_file)
def test_load_unsupported_format(self):
"""Test loading unsupported file format."""
from click.exceptions import ClickException
with tempfile.NamedTemporaryFile(suffix=".bin", delete=False) as f:
test_file = f.name
try:
with patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.load_recording", side_effect=Exception("Unsupported format")):
with pytest.raises(ClickException) as exc_info:
load_input_file(test_file)
assert "Could not load" in str(exc_info.value)
assert "Supported formats" in str(exc_info.value)
finally:
os.unlink(test_file)
class TestValidateTxGain:
"""Tests for validate_tx_gain function."""
def test_valid_pluto_gain(self):
"""Test valid PlutoSDR gain."""
validate_tx_gain("pluto", -30)
validate_tx_gain("pluto", 0)
validate_tx_gain("pluto", -89)
def test_invalid_pluto_gain_too_high(self):
"""Test PlutoSDR gain too high."""
from click.exceptions import ClickException
with pytest.raises(ClickException) as exc_info:
validate_tx_gain("pluto", 10)
assert "out of range" in str(exc_info.value)
def test_invalid_pluto_gain_too_low(self):
"""Test PlutoSDR gain too low."""
from click.exceptions import ClickException
with pytest.raises(ClickException) as exc_info:
validate_tx_gain("pluto", -100)
assert "out of range" in str(exc_info.value)
def test_valid_hackrf_gain(self):
"""Test valid HackRF gain."""
validate_tx_gain("hackrf", 0)
validate_tx_gain("hackrf", 20)
validate_tx_gain("hackrf", 47)
def test_invalid_hackrf_gain(self):
"""Test invalid HackRF gain."""
from click.exceptions import ClickException
with pytest.raises(ClickException):
validate_tx_gain("hackrf", -10)
with pytest.raises(ClickException):
validate_tx_gain("hackrf", 50)
def test_high_gain_warning(self):
"""Test warning for high gain levels."""
import click
with patch.object(click, "echo") as mock_echo:
validate_tx_gain("hackrf", 45)
mock_echo.assert_called()
args = str(mock_echo.call_args)
assert "WARNING" in args
assert "high gain" in args.lower()
class TestCheckSampleRateMismatch:
"""Tests for check_sample_rate_mismatch function."""
def test_no_mismatch(self):
"""Test when sample rates match."""
mock_recording = MagicMock()
mock_recording.metadata = {"sample_rate": 2e6}
with patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.click.echo") as mock_echo:
check_sample_rate_mismatch(mock_recording, 2e6, quiet=False)
mock_echo.assert_not_called()
def test_mismatch_warning(self):
"""Test warning when sample rates differ."""
mock_recording = MagicMock()
mock_recording.metadata = {"sample_rate": 1e6}
with patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.click.echo") as mock_echo:
check_sample_rate_mismatch(mock_recording, 2e6, quiet=False)
mock_echo.assert_called_once()
args = str(mock_echo.call_args)
assert "Warning" in args
assert "differs" in args
def test_mismatch_quiet_mode(self):
"""Test no warning in quiet mode."""
mock_recording = MagicMock()
mock_recording.metadata = {"sample_rate": 1e6}
with patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.click.echo") as mock_echo:
check_sample_rate_mismatch(mock_recording, 2e6, quiet=True)
mock_echo.assert_not_called()
def test_no_metadata(self):
"""Test when recording has no metadata."""
mock_recording = MagicMock()
mock_recording.metadata = None
with patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.click.echo") as mock_echo:
check_sample_rate_mismatch(mock_recording, 2e6, quiet=False)
mock_echo.assert_not_called()
class TestTransmitCommand:
"""Tests for transmit CLI command."""
def setup_method(self):
"""Set up test fixtures."""
self.runner = CliRunner()
self.temp_dir = tempfile.mkdtemp()
def teardown_method(self):
"""Clean up test fixtures."""
import shutil
if os.path.exists(self.temp_dir):
shutil.rmtree(self.temp_dir)
def test_transmit_basic(self):
"""Test basic transmit command."""
test_file = os.path.join(self.temp_dir, "test.npy")
open(test_file, "w").close()
mock_sdr = MagicMock()
mock_recording = MagicMock()
mock_recording.data = np.array([[0.1 + 0.1j] * 1000])
mock_recording.metadata = {}
with (
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.get_sdr_device", return_value=mock_sdr),
patch("ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss.transmit.load_input_file", return_value=mock_recording),
):
result = self.runner.invoke(
transmit,
[
"--device",
"hackrf",
"--sample-rate",
"2e6",
"--center-frequency",
"915M",
"--gain",
"10",
"--input",
test_file,
"--quiet",
],
)
assert result.exit_code == 0
mock_sdr.tx_recording.assert_called_once()
mock_sdr.close.assert_called_once()

View File

@ -0,0 +1,94 @@
"""Tests for transmit command signal generation."""
from click.testing import CliRunner
from ria_toolkit_oss.ria_toolkit_oss_cli.cli import cli
class TestTransmitGenerate:
"""Test signal generation in transmit command."""
def test_transmit_help(self):
"""Test transmit command help."""
runner = CliRunner()
result = runner.invoke(cli, ["transmit", "--help"])
assert result.exit_code == 0
assert "Generate signal instead of loading from file" in result.output
assert "lfm" in result.output
assert "chirp" in result.output
assert "sine" in result.output
assert "pulse" in result.output
def test_generate_lfm_chirp(self):
"""Test LFM chirp generation (should fail without device)."""
runner = CliRunner()
result = runner.invoke(cli, ["transmit", "--generate", "lfm", "--device", "pluto", "-v"])
# Should fail because no device is connected, but should show it's generating LFM
# Error will be about device initialization, not about missing input file
assert "Generating LFM chirp signal" in result.output or "Failed to initialize" in result.output
def test_generate_sine_wave(self):
"""Test sine wave generation (should fail without device)."""
runner = CliRunner()
result = runner.invoke(cli, ["transmit", "--generate", "sine", "--device", "pluto", "-v"])
# Should fail because no device is connected, but should show it's generating sine
assert "Generating sine wave signal" in result.output or "Failed to initialize" in result.output
def test_generate_chirp(self):
"""Test simple chirp generation (should fail without device)."""
runner = CliRunner()
result = runner.invoke(cli, ["transmit", "--generate", "chirp", "--device", "pluto", "-v"])
# Should fail because no device is connected, but should show it's generating chirp
assert "Generating chirp signal" in result.output or "Failed to initialize" in result.output
def test_generate_pulse(self):
"""Test pulse generation (should fail without device)."""
runner = CliRunner()
result = runner.invoke(cli, ["transmit", "--generate", "pulse", "--device", "pluto", "-v"])
# Should fail because no device is connected, but should show it's generating pulse
assert "Generating pulse signal" in result.output or "Failed to initialize" in result.output
def test_default_generates_lfm_when_no_input(self):
"""Test that default generates LFM chirp when no input file specified."""
runner = CliRunner()
result = runner.invoke(cli, ["transmit", "--device", "pluto", "-v"])
# Should default to LFM chirp when no input file or --generate specified
assert "Generating LFM chirp signal" in result.output or "Failed to initialize" in result.output
def test_generate_overrides_input_file(self):
"""Test that --generate overrides --input file."""
runner = CliRunner()
result = runner.invoke(
cli, ["transmit", "--device", "pluto", "--input", "nonexistent.sigmf", "--generate", "lfm", "-v"]
)
# Should generate LFM, not try to load nonexistent.sigmf
assert "Generating LFM chirp signal" in result.output or "Failed to initialize" in result.output
# Should NOT say "Input file not found"
assert "Input file not found" not in result.output
def test_signal_generation_parameters(self):
"""Test that signal generation uses correct parameters from CLI."""
runner = CliRunner()
result = runner.invoke(
cli,
[
"transmit",
"--device",
"pluto",
"--generate",
"lfm",
"--sample-rate",
"10e6",
"--center-frequency",
"915M",
"--gain",
"-20",
"-v",
],
)
# Check that parameters are shown in output
if "Failed to initialize" in result.output:
# Device initialization failed (expected without real device)
assert "10.00 MHz" in result.output or "10.000 MHz" in result.output or "10.00 MS/s" in result.output
assert "915" in result.output
assert "-20 dB" in result.output or "-20.0 dB" in result.output