G
2025-12-11 15:59:08 -05:00
|
|
|
"""Tests for split CLI command."""
|
|
|
|
|
|
|
|
|
|
import tempfile
|
|
|
|
|
from pathlib import Path
|
|
|
|
|
|
|
|
|
|
import numpy as np
|
|
|
|
|
import pytest
|
|
|
|
|
from click.testing import CliRunner
|
|
|
|
|
|
|
|
|
|
from ria_toolkit_oss.datatypes import Annotation, Recording
|
|
|
|
|
from ria_toolkit_oss.io import load_recording, to_sigmf
|
M
2025-12-15 15:59:48 -05:00
|
|
|
from ria_toolkit_oss_cli.cli import cli
|
G
2025-12-11 15:59:08 -05:00
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestSplitHelp:
|
|
|
|
|
"""Test split command help and basic functionality."""
|
|
|
|
|
|
|
|
|
|
def test_split_help(self):
|
|
|
|
|
"""Test split command help."""
|
|
|
|
|
runner = CliRunner()
|
|
|
|
|
result = runner.invoke(cli, ["split", "--help"])
|
|
|
|
|
assert result.exit_code == 0
|
|
|
|
|
assert "Split, trim, and extract portions of recordings" in result.output
|
|
|
|
|
assert "--split-at" in result.output
|
|
|
|
|
assert "--split-every" in result.output
|
|
|
|
|
assert "--split-duration" in result.output
|
|
|
|
|
assert "--trim" in result.output
|
|
|
|
|
assert "--extract-annotations" in result.output
|
|
|
|
|
|
|
|
|
|
def test_missing_arguments(self):
|
|
|
|
|
"""Test that missing arguments show error."""
|
|
|
|
|
runner = CliRunner()
|
|
|
|
|
result = runner.invoke(cli, ["split"])
|
|
|
|
|
assert result.exit_code != 0
|
|
|
|
|
assert "Missing argument" in result.output or "Error" in result.output
|
|
|
|
|
|
|
|
|
|
def test_no_operation_specified(self):
|
|
|
|
|
"""Test error when no operation is specified."""
|
|
|
|
|
runner = CliRunner()
|
|
|
|
|
|
|
|
|
|
# Create a test file
|
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
|
|
signal = np.ones(1000, dtype=np.complex64)
|
|
|
|
|
recording = Recording(data=signal, metadata={"sample_rate": 1e6})
|
|
|
|
|
to_sigmf(recording, filename="test", path=tmpdir, overwrite=True)
|
|
|
|
|
|
|
|
|
|
test_file = str(Path(tmpdir) / "test.sigmf-data")
|
|
|
|
|
result = runner.invoke(cli, ["split", test_file])
|
|
|
|
|
|
|
|
|
|
assert result.exit_code != 0
|
|
|
|
|
assert "No operation specified" in result.output
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestSplitTrim:
|
|
|
|
|
"""Test trim operations."""
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
|
def test_recording(self):
|
|
|
|
|
"""Create a test recording file."""
|
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
|
|
signal = np.arange(10000, dtype=np.complex64)
|
|
|
|
|
recording = Recording(data=signal, metadata={"sample_rate": 2e6, "center_frequency": 915e6})
|
|
|
|
|
to_sigmf(recording, filename="test", path=tmpdir, overwrite=True)
|
|
|
|
|
yield str(Path(tmpdir) / "test.sigmf-data")
|
|
|
|
|
|
|
|
|
|
def test_trim_with_length(self, test_recording):
|
|
|
|
|
"""Test trim with --start and --length."""
|
|
|
|
|
runner = CliRunner()
|
|
|
|
|
|
|
|
|
|
with tempfile.TemporaryDirectory() as outdir:
|
|
|
|
|
result = runner.invoke(
|
|
|
|
|
cli,
|
|
|
|
|
[
|
|
|
|
|
"split",
|
|
|
|
|
test_recording,
|
|
|
|
|
"--trim",
|
|
|
|
|
"--start",
|
|
|
|
|
"1000",
|
|
|
|
|
"--length",
|
|
|
|
|
"5000",
|
|
|
|
|
"--output-dir",
|
|
|
|
|
outdir,
|
|
|
|
|
"-q",
|
|
|
|
|
],
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
assert result.exit_code == 0
|
|
|
|
|
|
|
|
|
|
# Verify output file exists
|
|
|
|
|
output_files = list(Path(outdir).glob("*.sigmf-data"))
|
|
|
|
|
assert len(output_files) == 1
|
|
|
|
|
|
|
|
|
|
# Verify output has correct length
|
|
|
|
|
output_rec = load_recording(str(output_files[0]))
|
|
|
|
|
assert output_rec.data.shape[1] == 5000
|
|
|
|
|
assert output_rec.metadata["original_start_sample"] == 1000
|
|
|
|
|
assert output_rec.metadata["original_end_sample"] == 6000
|
|
|
|
|
assert output_rec.metadata["split_operation"] == "trim"
|
|
|
|
|
|
|
|
|
|
def test_trim_with_end(self, test_recording):
|
|
|
|
|
"""Test trim with --start and --end."""
|
|
|
|
|
runner = CliRunner()
|
|
|
|
|
|
|
|
|
|
with tempfile.TemporaryDirectory() as outdir:
|
|
|
|
|
result = runner.invoke(
|
|
|
|
|
cli,
|
|
|
|
|
["split", test_recording, "--trim", "--start", "2000", "--end", "7000", "--output-dir", outdir, "-q"],
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
assert result.exit_code == 0
|
|
|
|
|
|
|
|
|
|
output_files = list(Path(outdir).glob("*.sigmf-data"))
|
|
|
|
|
assert len(output_files) == 1
|
|
|
|
|
|
|
|
|
|
output_rec = load_recording(str(output_files[0]))
|
|
|
|
|
assert output_rec.data.shape[1] == 5000
|
|
|
|
|
|
|
|
|
|
def test_trim_without_length_or_end(self, test_recording):
|
|
|
|
|
"""Test that trim requires --length or --end."""
|
|
|
|
|
runner = CliRunner()
|
|
|
|
|
|
|
|
|
|
result = runner.invoke(cli, ["split", test_recording, "--trim", "--start", "1000"])
|
|
|
|
|
|
|
|
|
|
assert result.exit_code != 0
|
|
|
|
|
assert "requires either --length or --end" in result.output
|
|
|
|
|
|
|
|
|
|
def test_trim_with_both_length_and_end(self, test_recording):
|
|
|
|
|
"""Test that trim rejects both --length and --end."""
|
|
|
|
|
runner = CliRunner()
|
|
|
|
|
|
|
|
|
|
result = runner.invoke(
|
|
|
|
|
cli, ["split", test_recording, "--trim", "--start", "1000", "--length", "5000", "--end", "6000"]
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
assert result.exit_code != 0
|
|
|
|
|
assert "Cannot specify both --length and --end" in result.output
|
|
|
|
|
|
|
|
|
|
def test_trim_invalid_range(self, test_recording):
|
|
|
|
|
"""Test trim with invalid range."""
|
|
|
|
|
runner = CliRunner()
|
|
|
|
|
|
|
|
|
|
result = runner.invoke(
|
|
|
|
|
cli,
|
|
|
|
|
["split", test_recording, "--trim", "--start", "1000", "--length", "50000"], # Exceeds recording length
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
assert result.exit_code != 0
|
|
|
|
|
assert "Invalid trim range" in result.output
|
|
|
|
|
|
|
|
|
|
def test_trim_end_before_start(self, test_recording):
|
|
|
|
|
"""Test trim with end < start."""
|
|
|
|
|
runner = CliRunner()
|
|
|
|
|
|
|
|
|
|
result = runner.invoke(cli, ["split", test_recording, "--trim", "--start", "5000", "--end", "1000"])
|
|
|
|
|
|
|
|
|
|
assert result.exit_code != 0
|
|
|
|
|
assert "Invalid range" in result.output
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestSplitAt:
|
|
|
|
|
"""Test split-at operations."""
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
|
def test_recording(self):
|
|
|
|
|
"""Create a test recording file."""
|
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
|
|
signal = np.arange(10000, dtype=np.complex64)
|
|
|
|
|
recording = Recording(data=signal, metadata={"sample_rate": 2e6, "center_frequency": 915e6})
|
|
|
|
|
to_sigmf(recording, filename="test", path=tmpdir, overwrite=True)
|
|
|
|
|
yield str(Path(tmpdir) / "test.sigmf-data")
|
|
|
|
|
|
|
|
|
|
def test_split_at_middle(self, test_recording):
|
|
|
|
|
"""Test splitting at middle of recording."""
|
|
|
|
|
runner = CliRunner()
|
|
|
|
|
|
|
|
|
|
with tempfile.TemporaryDirectory() as outdir:
|
|
|
|
|
result = runner.invoke(cli, ["split", test_recording, "--split-at", "5000", "--output-dir", outdir, "-q"])
|
|
|
|
|
|
|
|
|
|
assert result.exit_code == 0
|
|
|
|
|
|
|
|
|
|
# Verify two output files exist
|
|
|
|
|
output_files = sorted(Path(outdir).glob("*.sigmf-data"))
|
|
|
|
|
assert len(output_files) == 2
|
|
|
|
|
|
|
|
|
|
# Verify part1
|
|
|
|
|
part1 = load_recording(str(output_files[0]))
|
|
|
|
|
assert part1.data.shape[1] == 5000
|
|
|
|
|
assert part1.metadata["original_start_sample"] == 0
|
|
|
|
|
assert part1.metadata["original_end_sample"] == 5000
|
|
|
|
|
|
|
|
|
|
# Verify part2
|
|
|
|
|
part2 = load_recording(str(output_files[1]))
|
|
|
|
|
assert part2.data.shape[1] == 5000
|
|
|
|
|
assert part2.metadata["original_start_sample"] == 5000
|
|
|
|
|
assert part2.metadata["original_end_sample"] == 10000
|
|
|
|
|
|
|
|
|
|
def test_split_at_invalid_point(self, test_recording):
|
|
|
|
|
"""Test split-at with invalid sample point."""
|
|
|
|
|
runner = CliRunner()
|
|
|
|
|
|
|
|
|
|
result = runner.invoke(cli, ["split", test_recording, "--split-at", "50000"]) # Exceeds recording length
|
|
|
|
|
|
|
|
|
|
assert result.exit_code != 0
|
|
|
|
|
assert "Invalid split point" in result.output
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestSplitEvery:
|
|
|
|
|
"""Test split-every operations."""
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
|
def test_recording(self):
|
|
|
|
|
"""Create a test recording file."""
|
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
|
|
signal = np.arange(10000, dtype=np.complex64)
|
|
|
|
|
recording = Recording(data=signal, metadata={"sample_rate": 2e6, "center_frequency": 915e6})
|
|
|
|
|
to_sigmf(recording, filename="test", path=tmpdir, overwrite=True)
|
|
|
|
|
yield str(Path(tmpdir) / "test.sigmf-data")
|
|
|
|
|
|
|
|
|
|
def test_split_every_equal_chunks(self, test_recording):
|
|
|
|
|
"""Test splitting into equal chunks."""
|
|
|
|
|
runner = CliRunner()
|
|
|
|
|
|
|
|
|
|
with tempfile.TemporaryDirectory() as outdir:
|
|
|
|
|
result = runner.invoke(
|
|
|
|
|
cli, ["split", test_recording, "--split-every", "2500", "--output-dir", outdir, "-q"]
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
assert result.exit_code == 0
|
|
|
|
|
|
|
|
|
|
# Verify 4 chunks created
|
|
|
|
|
output_files = sorted(Path(outdir).glob("*.sigmf-data"))
|
|
|
|
|
assert len(output_files) == 4
|
|
|
|
|
|
|
|
|
|
# Verify all chunks have correct size
|
|
|
|
|
for i, file in enumerate(output_files):
|
|
|
|
|
chunk = load_recording(str(file))
|
|
|
|
|
assert chunk.data.shape[1] == 2500
|
|
|
|
|
assert chunk.metadata["chunk_index"] == i + 1
|
|
|
|
|
assert chunk.metadata["total_chunks"] == 4
|
|
|
|
|
|
|
|
|
|
def test_split_every_unequal_chunks(self, test_recording):
|
|
|
|
|
"""Test splitting with remainder chunk."""
|
|
|
|
|
runner = CliRunner()
|
|
|
|
|
|
|
|
|
|
with tempfile.TemporaryDirectory() as outdir:
|
|
|
|
|
result = runner.invoke(
|
|
|
|
|
cli, ["split", test_recording, "--split-every", "3000", "--output-dir", outdir, "-q"]
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
assert result.exit_code == 0
|
|
|
|
|
|
|
|
|
|
# Verify 4 chunks created (3x3000 + 1x1000)
|
|
|
|
|
output_files = sorted(Path(outdir).glob("*.sigmf-data"))
|
|
|
|
|
assert len(output_files) == 4
|
|
|
|
|
|
|
|
|
|
# Last chunk should be smaller
|
|
|
|
|
last_chunk = load_recording(str(output_files[-1]))
|
|
|
|
|
assert last_chunk.data.shape[1] == 1000
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestSplitDuration:
|
|
|
|
|
"""Test split-duration operations."""
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
|
def test_recording(self):
|
|
|
|
|
"""Create a test recording file with known sample rate."""
|
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
|
|
signal = np.arange(10000, dtype=np.complex64)
|
|
|
|
|
recording = Recording(
|
|
|
|
|
data=signal, metadata={"sample_rate": 10000, "center_frequency": 915e6} # 10kHz for easy math
|
|
|
|
|
)
|
|
|
|
|
to_sigmf(recording, filename="test", path=tmpdir, overwrite=True)
|
|
|
|
|
yield str(Path(tmpdir) / "test.sigmf-data")
|
|
|
|
|
|
|
|
|
|
def test_split_duration_basic(self, test_recording):
|
|
|
|
|
"""Test splitting by duration."""
|
|
|
|
|
runner = CliRunner()
|
|
|
|
|
|
|
|
|
|
with tempfile.TemporaryDirectory() as outdir:
|
|
|
|
|
result = runner.invoke(
|
|
|
|
|
cli,
|
|
|
|
|
[
|
|
|
|
|
"split",
|
|
|
|
|
test_recording,
|
|
|
|
|
"--split-duration",
|
|
|
|
|
"0.25", # 0.25s = 2500 samples at 10kHz
|
|
|
|
|
"--output-dir",
|
|
|
|
|
outdir,
|
|
|
|
|
"-q",
|
|
|
|
|
],
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
assert result.exit_code == 0
|
|
|
|
|
|
|
|
|
|
# Verify chunks created
|
|
|
|
|
output_files = sorted(Path(outdir).glob("*.sigmf-data"))
|
|
|
|
|
assert len(output_files) == 4
|
|
|
|
|
|
|
|
|
|
# Verify chunk sizes
|
|
|
|
|
for file in output_files[:-1]:
|
|
|
|
|
chunk = load_recording(str(file))
|
|
|
|
|
assert chunk.data.shape[1] == 2500
|
|
|
|
|
|
|
|
|
|
def test_split_duration_no_sample_rate(self):
|
|
|
|
|
"""Test that split-duration requires sample_rate in metadata."""
|
|
|
|
|
runner = CliRunner()
|
|
|
|
|
|
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
|
|
# Create recording without sample_rate
|
|
|
|
|
signal = np.arange(1000, dtype=np.complex64)
|
|
|
|
|
recording = Recording(data=signal, metadata={})
|
|
|
|
|
to_sigmf(recording, filename="test", path=tmpdir, overwrite=True)
|
|
|
|
|
|
|
|
|
|
test_file = str(Path(tmpdir) / "test.sigmf-data")
|
|
|
|
|
result = runner.invoke(cli, ["split", test_file, "--split-duration", "1.0"])
|
|
|
|
|
|
|
|
|
|
assert result.exit_code != 0
|
|
|
|
|
assert "Cannot split by duration" in result.output
|
|
|
|
|
assert "no sample_rate" in result.output
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestExtractAnnotations:
|
|
|
|
|
"""Test extract-annotations operations."""
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
|
def annotated_recording(self):
|
|
|
|
|
"""Create a test recording with annotations."""
|
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
|
|
signal = np.arange(100000, dtype=np.complex64)
|
|
|
|
|
|
|
|
|
|
annotations = [
|
|
|
|
|
Annotation(
|
|
|
|
|
sample_start=0, sample_count=10000, freq_lower_edge=914e6, freq_upper_edge=916e6, label="preamble"
|
|
|
|
|
),
|
|
|
|
|
Annotation(
|
|
|
|
|
sample_start=10000,
|
|
|
|
|
sample_count=50000,
|
|
|
|
|
freq_lower_edge=914e6,
|
|
|
|
|
freq_upper_edge=916e6,
|
|
|
|
|
label="payload",
|
|
|
|
|
),
|
|
|
|
|
Annotation(
|
|
|
|
|
sample_start=60000, sample_count=5000, freq_lower_edge=914e6, freq_upper_edge=916e6, label="crc"
|
|
|
|
|
),
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
recording = Recording(
|
|
|
|
|
data=signal, metadata={"sample_rate": 2e6, "center_frequency": 915e6}, annotations=annotations
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
to_sigmf(recording, filename="annotated", path=tmpdir, overwrite=True)
|
|
|
|
|
yield str(Path(tmpdir) / "annotated.sigmf-data")
|
|
|
|
|
|
|
|
|
|
def test_extract_all_annotations(self, annotated_recording):
|
|
|
|
|
"""Test extracting all annotations."""
|
|
|
|
|
runner = CliRunner()
|
|
|
|
|
|
|
|
|
|
with tempfile.TemporaryDirectory() as outdir:
|
|
|
|
|
result = runner.invoke(
|
|
|
|
|
cli, ["split", annotated_recording, "--extract-annotations", "--output-dir", outdir, "-q"]
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
assert result.exit_code == 0
|
|
|
|
|
|
|
|
|
|
# Verify 3 files created
|
|
|
|
|
output_files = sorted(Path(outdir).glob("*.sigmf-data"))
|
|
|
|
|
assert len(output_files) == 3
|
|
|
|
|
|
|
|
|
|
# Verify each annotation was extracted
|
|
|
|
|
preamble = [f for f in output_files if "preamble" in str(f)][0]
|
|
|
|
|
payload = [f for f in output_files if "payload" in str(f)][0]
|
|
|
|
|
crc = [f for f in output_files if "crc" in str(f)][0]
|
|
|
|
|
|
|
|
|
|
preamble_rec = load_recording(str(preamble))
|
|
|
|
|
assert preamble_rec.data.shape[1] == 10000
|
|
|
|
|
assert preamble_rec.metadata["annotation_label"] == "preamble"
|
|
|
|
|
assert len(preamble_rec.annotations) == 0 # Annotations cleared
|
|
|
|
|
|
|
|
|
|
payload_rec = load_recording(str(payload))
|
|
|
|
|
assert payload_rec.data.shape[1] == 50000
|
|
|
|
|
assert payload_rec.metadata["annotation_label"] == "payload"
|
|
|
|
|
|
|
|
|
|
crc_rec = load_recording(str(crc))
|
|
|
|
|
assert crc_rec.data.shape[1] == 5000
|
|
|
|
|
assert crc_rec.metadata["annotation_label"] == "crc"
|
|
|
|
|
|
|
|
|
|
def test_extract_annotation_by_label(self, annotated_recording):
|
|
|
|
|
"""Test extracting annotations by label."""
|
|
|
|
|
runner = CliRunner()
|
|
|
|
|
|
|
|
|
|
with tempfile.TemporaryDirectory() as outdir:
|
|
|
|
|
result = runner.invoke(
|
|
|
|
|
cli,
|
|
|
|
|
[
|
|
|
|
|
"split",
|
|
|
|
|
annotated_recording,
|
|
|
|
|
"--extract-annotations",
|
|
|
|
|
"--annotation-label",
|
|
|
|
|
"payload",
|
|
|
|
|
"--output-dir",
|
|
|
|
|
outdir,
|
|
|
|
|
"-q",
|
|
|
|
|
],
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
assert result.exit_code == 0
|
|
|
|
|
|
|
|
|
|
# Verify only 1 file created
|
|
|
|
|
output_files = list(Path(outdir).glob("*.sigmf-data"))
|
|
|
|
|
assert len(output_files) == 1
|
|
|
|
|
assert "payload" in str(output_files[0])
|
|
|
|
|
|
|
|
|
|
def test_extract_annotation_by_index(self, annotated_recording):
|
|
|
|
|
"""Test extracting annotation by index."""
|
|
|
|
|
runner = CliRunner()
|
|
|
|
|
|
|
|
|
|
with tempfile.TemporaryDirectory() as outdir:
|
|
|
|
|
result = runner.invoke(
|
|
|
|
|
cli,
|
|
|
|
|
[
|
|
|
|
|
"split",
|
|
|
|
|
annotated_recording,
|
|
|
|
|
"--extract-annotations",
|
|
|
|
|
"--annotation-index",
|
|
|
|
|
"1",
|
|
|
|
|
"--output-dir",
|
|
|
|
|
outdir,
|
|
|
|
|
"-q",
|
|
|
|
|
],
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
assert result.exit_code == 0
|
|
|
|
|
|
|
|
|
|
# Verify only 1 file created (payload at index 1)
|
|
|
|
|
output_files = list(Path(outdir).glob("*.sigmf-data"))
|
|
|
|
|
assert len(output_files) == 1
|
|
|
|
|
assert "payload" in str(output_files[0])
|
|
|
|
|
|
|
|
|
|
def test_extract_annotations_invalid_label(self, annotated_recording):
|
|
|
|
|
"""Test error with non-existent label."""
|
|
|
|
|
runner = CliRunner()
|
|
|
|
|
|
|
|
|
|
result = runner.invoke(
|
|
|
|
|
cli, ["split", annotated_recording, "--extract-annotations", "--annotation-label", "nonexistent"]
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
assert result.exit_code != 0
|
|
|
|
|
assert "No annotations with label" in result.output
|
|
|
|
|
|
|
|
|
|
def test_extract_annotations_invalid_index(self, annotated_recording):
|
|
|
|
|
"""Test error with invalid index."""
|
|
|
|
|
runner = CliRunner()
|
|
|
|
|
|
|
|
|
|
result = runner.invoke(
|
|
|
|
|
cli, ["split", annotated_recording, "--extract-annotations", "--annotation-index", "99"]
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
assert result.exit_code != 0
|
|
|
|
|
assert "Invalid annotation index" in result.output
|
|
|
|
|
|
|
|
|
|
def test_extract_annotations_no_annotations(self):
|
|
|
|
|
"""Test error when recording has no annotations."""
|
|
|
|
|
runner = CliRunner()
|
|
|
|
|
|
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
|
|
signal = np.arange(1000, dtype=np.complex64)
|
|
|
|
|
recording = Recording(data=signal, metadata={"sample_rate": 1e6})
|
|
|
|
|
to_sigmf(recording, filename="test", path=tmpdir, overwrite=True)
|
|
|
|
|
|
|
|
|
|
test_file = str(Path(tmpdir) / "test.sigmf-data")
|
|
|
|
|
result = runner.invoke(cli, ["split", test_file, "--extract-annotations"])
|
|
|
|
|
|
|
|
|
|
assert result.exit_code != 0
|
|
|
|
|
assert "No annotations found" in result.output
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestOutputOptions:
|
|
|
|
|
"""Test output-related options."""
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
|
def test_recording(self):
|
|
|
|
|
"""Create a test recording file."""
|
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
|
|
signal = np.arange(10000, dtype=np.complex64)
|
|
|
|
|
recording = Recording(data=signal, metadata={"sample_rate": 2e6, "center_frequency": 915e6})
|
|
|
|
|
to_sigmf(recording, filename="test", path=tmpdir, overwrite=True)
|
|
|
|
|
yield str(Path(tmpdir) / "test.sigmf-data")
|
|
|
|
|
|
|
|
|
|
def test_output_prefix(self, test_recording):
|
|
|
|
|
"""Test custom output prefix."""
|
|
|
|
|
runner = CliRunner()
|
|
|
|
|
|
|
|
|
|
with tempfile.TemporaryDirectory() as outdir:
|
|
|
|
|
result = runner.invoke(
|
|
|
|
|
cli,
|
|
|
|
|
[
|
|
|
|
|
"split",
|
|
|
|
|
test_recording,
|
|
|
|
|
"--split-every",
|
|
|
|
|
"3000",
|
|
|
|
|
"--output-prefix",
|
|
|
|
|
"custom",
|
|
|
|
|
"--output-dir",
|
|
|
|
|
outdir,
|
|
|
|
|
"-q",
|
|
|
|
|
],
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
assert result.exit_code == 0
|
|
|
|
|
|
|
|
|
|
output_files = list(Path(outdir).glob("*.sigmf-data"))
|
|
|
|
|
assert all("custom" in str(f) for f in output_files)
|
|
|
|
|
|
|
|
|
|
def test_output_format_conversion(self, test_recording):
|
|
|
|
|
"""Test format conversion during split."""
|
|
|
|
|
runner = CliRunner()
|
|
|
|
|
|
|
|
|
|
with tempfile.TemporaryDirectory() as outdir:
|
|
|
|
|
result = runner.invoke(
|
|
|
|
|
cli,
|
|
|
|
|
[
|
|
|
|
|
"split",
|
|
|
|
|
test_recording,
|
|
|
|
|
"--split-every",
|
|
|
|
|
"5000",
|
|
|
|
|
"--output-format",
|
|
|
|
|
"npy",
|
|
|
|
|
"--output-dir",
|
|
|
|
|
outdir,
|
|
|
|
|
"-q",
|
|
|
|
|
],
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
assert result.exit_code == 0
|
|
|
|
|
|
|
|
|
|
# Verify NPY files created
|
|
|
|
|
output_files = list(Path(outdir).glob("*.npy"))
|
|
|
|
|
assert len(output_files) == 2
|
|
|
|
|
|
|
|
|
|
def test_overwrite_protection(self, test_recording):
|
|
|
|
|
"""Test overwrite protection."""
|
|
|
|
|
runner = CliRunner()
|
|
|
|
|
|
|
|
|
|
with tempfile.TemporaryDirectory() as outdir:
|
|
|
|
|
# First split should succeed
|
|
|
|
|
result = runner.invoke(
|
|
|
|
|
cli,
|
|
|
|
|
["split", test_recording, "--trim", "--start", "0", "--length", "1000", "--output-dir", outdir, "-q"],
|
|
|
|
|
)
|
|
|
|
|
assert result.exit_code == 0
|
|
|
|
|
|
|
|
|
|
# Second split without --overwrite should fail
|
|
|
|
|
result = runner.invoke(
|
|
|
|
|
cli, ["split", test_recording, "--trim", "--start", "0", "--length", "1000", "--output-dir", outdir]
|
|
|
|
|
)
|
|
|
|
|
assert result.exit_code != 0
|
|
|
|
|
assert "exist" in result.output.lower()
|
|
|
|
|
|
|
|
|
|
# Third split with --overwrite should succeed
|
|
|
|
|
result = runner.invoke(
|
|
|
|
|
cli,
|
|
|
|
|
[
|
|
|
|
|
"split",
|
|
|
|
|
test_recording,
|
|
|
|
|
"--trim",
|
|
|
|
|
"--start",
|
|
|
|
|
"0",
|
|
|
|
|
"--length",
|
|
|
|
|
"1000",
|
|
|
|
|
"--output-dir",
|
|
|
|
|
outdir,
|
|
|
|
|
"--overwrite",
|
|
|
|
|
"-q",
|
|
|
|
|
],
|
|
|
|
|
)
|
|
|
|
|
assert result.exit_code == 0
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestMultipleOperations:
|
|
|
|
|
"""Test that multiple operations are rejected."""
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
|
def test_recording(self):
|
|
|
|
|
"""Create a test recording file."""
|
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
|
|
signal = np.arange(10000, dtype=np.complex64)
|
|
|
|
|
recording = Recording(data=signal, metadata={"sample_rate": 2e6, "center_frequency": 915e6})
|
|
|
|
|
to_sigmf(recording, filename="test", path=tmpdir, overwrite=True)
|
|
|
|
|
yield str(Path(tmpdir) / "test.sigmf-data")
|
|
|
|
|
|
|
|
|
|
def test_trim_and_split_at(self, test_recording):
|
|
|
|
|
"""Test that trim and split-at cannot be used together."""
|
|
|
|
|
runner = CliRunner()
|
|
|
|
|
|
|
|
|
|
result = runner.invoke(cli, ["split", test_recording, "--trim", "--split-at", "5000"])
|
|
|
|
|
|
|
|
|
|
assert result.exit_code != 0
|
|
|
|
|
assert "Multiple operations specified" in result.output
|
|
|
|
|
|
|
|
|
|
def test_split_every_and_extract(self, test_recording):
|
|
|
|
|
"""Test that split-every and extract-annotations cannot be used together."""
|
|
|
|
|
runner = CliRunner()
|
|
|
|
|
|
|
|
|
|
result = runner.invoke(cli, ["split", test_recording, "--split-every", "1000", "--extract-annotations"])
|
|
|
|
|
|
|
|
|
|
assert result.exit_code != 0
|
|
|
|
|
assert "Multiple operations specified" in result.output
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestVerboseQuiet:
|
|
|
|
|
"""Test verbose and quiet modes."""
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
|
def test_recording(self):
|
|
|
|
|
"""Create a test recording file."""
|
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
|
|
signal = np.arange(10000, dtype=np.complex64)
|
|
|
|
|
recording = Recording(data=signal, metadata={"sample_rate": 2e6, "center_frequency": 915e6})
|
|
|
|
|
to_sigmf(recording, filename="test", path=tmpdir, overwrite=True)
|
|
|
|
|
yield str(Path(tmpdir) / "test.sigmf-data")
|
|
|
|
|
|
|
|
|
|
def test_verbose_mode(self, test_recording):
|
|
|
|
|
"""Test verbose output."""
|
|
|
|
|
runner = CliRunner()
|
|
|
|
|
|
|
|
|
|
with tempfile.TemporaryDirectory() as outdir:
|
|
|
|
|
result = runner.invoke(
|
|
|
|
|
cli,
|
|
|
|
|
[
|
|
|
|
|
"split",
|
|
|
|
|
test_recording,
|
|
|
|
|
"--trim",
|
|
|
|
|
"--start",
|
|
|
|
|
"0",
|
|
|
|
|
"--length",
|
|
|
|
|
"1000",
|
|
|
|
|
"--output-dir",
|
|
|
|
|
outdir,
|
|
|
|
|
"--verbose",
|
|
|
|
|
],
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
assert result.exit_code == 0
|
|
|
|
|
assert "Input format: SIGMF" in result.output
|
|
|
|
|
assert "Output format: SIGMF" in result.output
|
|
|
|
|
|
|
|
|
|
def test_quiet_mode(self, test_recording):
|
|
|
|
|
"""Test quiet output (minimal output)."""
|
|
|
|
|
runner = CliRunner()
|
|
|
|
|
|
|
|
|
|
with tempfile.TemporaryDirectory() as outdir:
|
|
|
|
|
result = runner.invoke(
|
|
|
|
|
cli,
|
|
|
|
|
[
|
|
|
|
|
"split",
|
|
|
|
|
test_recording,
|
|
|
|
|
"--trim",
|
|
|
|
|
"--start",
|
|
|
|
|
"0",
|
|
|
|
|
"--length",
|
|
|
|
|
"1000",
|
|
|
|
|
"--output-dir",
|
|
|
|
|
outdir,
|
|
|
|
|
"--quiet",
|
|
|
|
|
],
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
assert result.exit_code == 0
|
|
|
|
|
# Output should be minimal in quiet mode
|
|
|
|
|
assert len(result.output.strip()) < 100 or result.output.strip() == ""
|