annotationsfix #19
813
src/ria_toolkit_oss_cli/ria_toolkit_oss/annotate.py
Normal file
813
src/ria_toolkit_oss_cli/ria_toolkit_oss/annotate.py
Normal file
|
|
@ -0,0 +1,813 @@
|
||||||
|
"""Annotate command - Automatic detection and manual annotation management."""
|
||||||
|
|
||||||
|
import json
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import click
|
||||||
|
|
||||||
|
from ria_toolkit_oss.annotations import (
|
||||||
|
annotate_with_cusum,
|
||||||
|
detect_signals_energy,
|
||||||
|
split_recording_annotations,
|
||||||
|
threshold_qualifier,
|
||||||
|
)
|
||||||
|
from ria_toolkit_oss.data import Annotation
|
||||||
|
from ria_toolkit_oss.data.recording import Recording
|
||||||
|
from ria_toolkit_oss.io import load_recording, to_blue, to_npy, to_sigmf, to_wav
|
||||||
|
from ria_toolkit_oss_cli.common import format_frequency, format_sample_count
|
||||||
|
|
||||||
|
|
||||||
|
def normalize_sigmf_path(filepath):
|
||||||
|
"""Normalize SigMF path to base name without extension."""
|
||||||
|
path = Path(filepath)
|
||||||
|
|
||||||
|
# Handle .sigmf-data, .sigmf-meta, or .sigmf
|
||||||
|
if ".sigmf" in path.suffix:
|
||||||
|
# Remove the suffix to get base name
|
||||||
|
return path.with_suffix("")
|
||||||
|
else:
|
||||||
|
return path
|
||||||
|
|
||||||
|
|
||||||
|
def detect_input_format(filepath):
|
||||||
|
"""Detect file format from extension."""
|
||||||
|
path = Path(filepath)
|
||||||
|
ext = path.suffix.lower()
|
||||||
|
|
||||||
|
if ext in [".sigmf-data", ".sigmf-meta"]:
|
||||||
|
return "sigmf"
|
||||||
|
elif path.name.endswith(".sigmf"):
|
||||||
|
return "sigmf"
|
||||||
|
elif ext == ".npy":
|
||||||
|
return "npy"
|
||||||
|
elif ext == ".wav":
|
||||||
|
return "wav"
|
||||||
|
elif ext == ".blue":
|
||||||
|
return "blue"
|
||||||
|
else:
|
||||||
|
raise click.ClickException(f"Unknown format for '{filepath}'. Supported: .sigmf, .npy, .wav, .blue")
|
||||||
|
|
||||||
|
|
||||||
|
def determine_output_path(input_path, output_path, fmt, quiet, overwrite):
|
||||||
|
input_path = Path(input_path)
|
||||||
|
|
||||||
|
if output_path:
|
||||||
|
target = Path(output_path)
|
||||||
|
final_path = target
|
||||||
|
else:
|
||||||
|
annotated_name = f"{input_path.stem}_annotated"
|
||||||
|
target = input_path.with_name(f"{annotated_name}{input_path.suffix}")
|
||||||
|
|
||||||
|
if fmt == "sigmf":
|
||||||
|
final_path = normalize_sigmf_path(target)
|
||||||
|
if not quiet:
|
||||||
|
click.echo(f"Saving SigMF metadata to: {final_path}")
|
||||||
|
else:
|
||||||
|
final_path = target
|
||||||
|
if not quiet:
|
||||||
|
click.echo(f"Saving to: {final_path}")
|
||||||
|
|
||||||
|
if final_path.exists() and not overwrite and final_path != input_path:
|
||||||
|
click.echo(f"Error: {final_path} already exists. Use --overwrite to replace it.", err=True)
|
||||||
|
return None
|
||||||
|
|
||||||
|
return final_path
|
||||||
|
|
||||||
|
|
||||||
|
def save_recording_auto(recording, output_path, input_path, quiet=False, overwrite=False):
|
||||||
|
"""Save recording, auto-detecting format from extension.
|
||||||
|
|
||||||
|
For SigMF: Only overwrites metadata file, data file is unchanged
|
||||||
|
For other formats: Creates _annotated copy by default, unless overwrite=True
|
||||||
|
"""
|
||||||
|
input_path = Path(input_path)
|
||||||
|
fmt = detect_input_format(input_path)
|
||||||
|
|
||||||
|
# Determine output path
|
||||||
|
output_path = determine_output_path(
|
||||||
|
input_path=input_path, output_path=output_path, fmt=fmt, quiet=quiet, overwrite=overwrite
|
||||||
|
)
|
||||||
|
|
||||||
|
if fmt == "sigmf":
|
||||||
|
# Normalize path for SigMF
|
||||||
|
base_path = output_path
|
||||||
|
stem = base_path.name
|
||||||
|
parent = base_path.parent
|
||||||
|
|
||||||
|
# For SigMF: only save metadata, copy data if needed
|
||||||
|
meta_path = parent / f"{stem}.sigmf-meta"
|
||||||
|
data_path = parent / f"{stem}.sigmf-data"
|
||||||
|
|
||||||
|
# If output is different from input, copy data file
|
||||||
|
input_base = normalize_sigmf_path(input_path)
|
||||||
|
if input_base != base_path:
|
||||||
|
import shutil
|
||||||
|
|
||||||
|
# Construct input data path correctly
|
||||||
|
# input_base is like /path/to/recording or /path/to/recording.sigmf
|
||||||
|
# We need /path/to/recording.sigmf-data
|
||||||
|
if str(input_base).endswith(".sigmf"):
|
||||||
|
input_data = Path(str(input_base).replace(".sigmf", ".sigmf-data"))
|
||||||
|
else:
|
||||||
|
input_data = input_base.parent / f"{input_base.name}.sigmf-data"
|
||||||
|
if not quiet:
|
||||||
|
click.echo(f" Copying: {data_path}")
|
||||||
|
shutil.copy2(input_data, data_path)
|
||||||
|
|
||||||
|
# Always save metadata (this is the whole point)
|
||||||
|
to_sigmf(recording, filename=stem, path=parent, overwrite=True)
|
||||||
|
|
||||||
|
if not quiet:
|
||||||
|
click.echo(f" Updated: {meta_path}")
|
||||||
|
if input_base != base_path:
|
||||||
|
click.echo(f" Created: {data_path}")
|
||||||
|
|
||||||
|
elif fmt == "npy":
|
||||||
|
to_npy(recording, filename=output_path.stem, path=output_path.parent, overwrite=True)
|
||||||
|
if not quiet:
|
||||||
|
click.echo(f" Created: {output_path}")
|
||||||
|
elif fmt == "wav":
|
||||||
|
to_wav(recording, filename=output_path.stem, path=output_path.parent, overwrite=True)
|
||||||
|
if not quiet:
|
||||||
|
click.echo(f" Created: {output_path}")
|
||||||
|
elif fmt == "blue":
|
||||||
|
to_blue(recording, filename=output_path.stem, path=output_path.parent, overwrite=True)
|
||||||
|
if not quiet:
|
||||||
|
click.echo(f" Created: {output_path}")
|
||||||
|
|
||||||
|
|
||||||
|
def determine_frequency_bounds(recording: Recording, freq_lower, freq_upper):
|
||||||
|
# Handle frequency bounds
|
||||||
|
if (freq_lower is None) != (freq_upper is None):
|
||||||
|
raise click.ClickException("Must specify both --freq-lower and --freq-upper, or neither")
|
||||||
|
|
||||||
|
if freq_lower is None:
|
||||||
|
# Default to full bandwidth
|
||||||
|
sample_rate = recording.metadata.get("sample_rate", 1)
|
||||||
|
center_freq = recording.metadata.get("center_frequency", 0)
|
||||||
|
freq_lower = center_freq - (sample_rate / 2)
|
||||||
|
freq_upper = center_freq + (sample_rate / 2)
|
||||||
|
freq_default = True
|
||||||
|
else:
|
||||||
|
freq_default = False
|
||||||
|
if freq_lower >= freq_upper:
|
||||||
|
raise click.ClickException(
|
||||||
|
f"Invalid frequency range: lower ({format_frequency(freq_lower)}) "
|
||||||
|
f"must be < upper ({format_frequency(freq_upper)})"
|
||||||
|
)
|
||||||
|
|
||||||
|
return freq_lower, freq_upper, freq_default
|
||||||
|
|
||||||
|
|
||||||
|
def get_indices_list(indices, recording: Recording):
|
||||||
|
if indices:
|
||||||
|
try:
|
||||||
|
indices_list = [int(idx.strip()) for idx in indices.split(",")]
|
||||||
|
# Validate indices
|
||||||
|
for idx in indices_list:
|
||||||
|
if idx < 0 or idx >= len(recording.annotations):
|
||||||
|
raise click.ClickException(
|
||||||
|
f"Invalid index {idx}. Recording has {len(recording.annotations)} annotation(s)"
|
||||||
|
)
|
||||||
|
except ValueError as e:
|
||||||
|
raise click.ClickException(f"Invalid indices format. Expected comma-separated integers: {e}")
|
||||||
|
|
||||||
|
return indices_list
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# Main command group
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
|
||||||
|
@click.group()
|
||||||
|
def annotate():
|
||||||
|
"""Manage and auto-detect annotations on RF recordings.
|
||||||
|
|
||||||
|
\b
|
||||||
|
MANUAL MANAGEMENT:
|
||||||
|
list - List all current annotations
|
||||||
|
add - Manually add a specific annotation
|
||||||
|
remove - Delete an annotation by its index
|
||||||
|
clear - Remove all annotations from the recording
|
||||||
|
|
||||||
|
\b
|
||||||
|
DETECTION & SEPARATION:
|
||||||
|
energy - Auto-detect using energy-based thresholding
|
||||||
|
cusum - Auto-detect segments using signal state changes
|
||||||
|
threshold - Auto-detect samples above magnitude percentage
|
||||||
|
separate - Auto-detect parallel frequency-offset signals, split into sub-bands
|
||||||
|
|
||||||
|
\b
|
||||||
|
File Path Handling:
|
||||||
|
- SigMF files: Pass .sigmf-data, .sigmf-meta, or base name
|
||||||
|
- Other formats: .npy, .wav, .blue files
|
||||||
|
|
||||||
|
\b
|
||||||
|
Output Behavior:
|
||||||
|
- SigMF: Updates .sigmf-meta only (data unchanged), in-place
|
||||||
|
- Other: Creates _annotated copy unless --overwrite specified
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# List subcommand
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
|
||||||
|
@annotate.command()
|
||||||
|
@click.argument("input", type=click.Path(exists=True))
|
||||||
|
@click.option("--verbose", is_flag=True, help="Show detailed annotation info")
|
||||||
|
def list(input, verbose):
|
||||||
|
"""List all annotations in a recording.
|
||||||
|
|
||||||
|
\b
|
||||||
|
Examples:
|
||||||
|
utils annotate list recording.sigmf-data
|
||||||
|
utils annotate list signal.npy --verbose
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
recording = load_recording(input)
|
||||||
|
except Exception as e:
|
||||||
|
raise click.ClickException(f"Failed to load recording: {e}")
|
||||||
|
|
||||||
|
if len(recording.annotations) == 0:
|
||||||
|
click.echo(f"No annotations in {Path(input).name}")
|
||||||
|
return
|
||||||
|
|
||||||
|
click.echo(f"\nAnnotations in {Path(input).name}:")
|
||||||
|
for i, ann in enumerate(recording.annotations):
|
||||||
|
# Parse type from comment JSON
|
||||||
|
try:
|
||||||
|
comment_data = json.loads(ann.comment)
|
||||||
|
ann_type = comment_data.get("type", "unknown")
|
||||||
|
user_comment = comment_data.get("user_comment", "")
|
||||||
|
except (json.JSONDecodeError, TypeError):
|
||||||
|
ann_type = "unknown"
|
||||||
|
user_comment = ann.comment or ""
|
||||||
|
|
||||||
|
# Basic info
|
||||||
|
freq_range = f"{format_frequency(ann.freq_lower_edge)} - {format_frequency(ann.freq_upper_edge)}"
|
||||||
|
click.echo(
|
||||||
|
f" [{i}] Samples {format_sample_count(ann.sample_start)}-"
|
||||||
|
f"{format_sample_count(ann.sample_start + ann.sample_count)}: {ann.label}"
|
||||||
|
)
|
||||||
|
click.echo(f" Type: {ann_type}")
|
||||||
|
|
||||||
|
if verbose:
|
||||||
|
if user_comment:
|
||||||
|
click.echo(f" Comment: {user_comment}")
|
||||||
|
click.echo(f" Frequency: {freq_range}")
|
||||||
|
if ann.detail:
|
||||||
|
click.echo(f" Detail: {ann.detail}")
|
||||||
|
|
||||||
|
click.echo(f"\nTotal: {len(recording.annotations)} annotation(s)")
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# Add subcommand
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
|
||||||
|
@annotate.command(context_settings={"max_content_width": 200})
|
||||||
|
@click.argument("input", type=click.Path(exists=True))
|
||||||
|
@click.option("--start", type=int, required=True, help="Start sample index")
|
||||||
|
@click.option("--count", type=int, required=True, help="Sample count")
|
||||||
|
@click.option("--label", type=str, required=True, help="Annotation label")
|
||||||
|
@click.option("--freq-lower", type=float, help="Lower frequency edge (Hz)")
|
||||||
|
@click.option("--freq-upper", type=float, help="Upper frequency edge (Hz)")
|
||||||
|
@click.option("--comment", type=str, help="Human-readable comment")
|
||||||
|
@click.option(
|
||||||
|
"--type",
|
||||||
|
"annotation_type",
|
||||||
|
type=click.Choice(["standalone", "parallel", "intersection"]),
|
||||||
|
default="standalone",
|
||||||
|
help="Annotation type",
|
||||||
|
)
|
||||||
|
@click.option("--output", "-o", type=click.Path(), help="Output file path")
|
||||||
|
@click.option("--overwrite", is_flag=True, help="Overwrite input file (non-SigMF only)")
|
||||||
|
@click.option("--quiet", is_flag=True, help="Quiet mode")
|
||||||
|
def add(input, start, count, label, freq_lower, freq_upper, comment, annotation_type, output, overwrite, quiet):
|
||||||
|
"""Add a manual annotation.
|
||||||
|
|
||||||
|
\b
|
||||||
|
Examples:
|
||||||
|
utils annotate add file.npy --start 1000 --count 500 --label wifi
|
||||||
|
utils annotate add signal.sigmf-data --start 0 --count 1000 --label burst --comment "Strong signal"
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
recording = load_recording(input)
|
||||||
|
if not quiet:
|
||||||
|
click.echo(f"Loaded: {input}")
|
||||||
|
except Exception as e:
|
||||||
|
raise click.ClickException(f"Failed to load recording: {e}")
|
||||||
|
|
||||||
|
# Validate sample range
|
||||||
|
n_samples = len(recording.data[0])
|
||||||
|
if start < 0:
|
||||||
|
raise click.ClickException(f"--start must be >= 0, got {start}")
|
||||||
|
if count <= 0:
|
||||||
|
raise click.ClickException(f"--count must be > 0, got {count}")
|
||||||
|
if start + count > n_samples:
|
||||||
|
raise click.ClickException(
|
||||||
|
f"Invalid annotation range:\n"
|
||||||
|
f" Start: {start:,}\n"
|
||||||
|
f" Count: {count:,}\n"
|
||||||
|
f" End: {start + count:,}\n"
|
||||||
|
f"Recording only has {n_samples:,} samples"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Handle frequency bounds
|
||||||
|
freq_lower, freq_upper, freq_default = determine_frequency_bounds(
|
||||||
|
recording=recording, freq_lower=freq_lower, freq_upper=freq_upper
|
||||||
|
)
|
||||||
|
|
||||||
|
# Build comment JSON
|
||||||
|
comment_data = {"type": annotation_type}
|
||||||
|
if comment:
|
||||||
|
comment_data["user_comment"] = comment
|
||||||
|
|
||||||
|
# Create annotation
|
||||||
|
ann = Annotation(
|
||||||
|
sample_start=start,
|
||||||
|
sample_count=count,
|
||||||
|
freq_lower_edge=freq_lower,
|
||||||
|
freq_upper_edge=freq_upper,
|
||||||
|
label=label,
|
||||||
|
comment=json.dumps(comment_data),
|
||||||
|
detail={},
|
||||||
|
)
|
||||||
|
|
||||||
|
recording._annotations.append(ann)
|
||||||
|
|
||||||
|
if not quiet:
|
||||||
|
click.echo("\nAdding annotation:")
|
||||||
|
click.echo(f" Start: {format_sample_count(start)}")
|
||||||
|
click.echo(f" Count: {format_sample_count(count)} samples")
|
||||||
|
freq_str = (
|
||||||
|
"full bandwidth" if freq_default else f"{format_frequency(freq_lower)} - {format_frequency(freq_upper)}"
|
||||||
|
)
|
||||||
|
click.echo(f" Frequency: {freq_str}")
|
||||||
|
click.echo(f" Label: {label}")
|
||||||
|
click.echo(f" Type: {annotation_type}")
|
||||||
|
if comment:
|
||||||
|
click.echo(f" Comment: {comment}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
save_recording_auto(recording, output, input, quiet, overwrite)
|
||||||
|
if not quiet:
|
||||||
|
click.echo(" ✓ Saved")
|
||||||
|
except Exception as e:
|
||||||
|
raise click.ClickException(f"Failed to save: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# Remove subcommand
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
|
||||||
|
@annotate.command(context_settings={"max_content_width": 200})
|
||||||
|
@click.argument("input", type=click.Path(exists=True))
|
||||||
|
@click.argument("index", type=int)
|
||||||
|
@click.option("--output", "-o", type=click.Path(), help="Output file path")
|
||||||
|
@click.option("--overwrite", is_flag=True, help="Overwrite input file (non-SigMF only)")
|
||||||
|
@click.option("--quiet", is_flag=True, help="Quiet mode")
|
||||||
|
def remove(input, index, output, overwrite, quiet):
|
||||||
|
"""Remove annotation by index.
|
||||||
|
|
||||||
|
Use 'utils annotate list' to see annotation indices.
|
||||||
|
|
||||||
|
\b
|
||||||
|
Examples:
|
||||||
|
utils annotate remove signal.sigmf-data 2
|
||||||
|
utils annotate remove file.npy 0
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
recording = load_recording(input)
|
||||||
|
if not quiet:
|
||||||
|
click.echo(f"Loaded: {input}")
|
||||||
|
except Exception as e:
|
||||||
|
raise click.ClickException(f"Failed to load recording: {e}")
|
||||||
|
|
||||||
|
if index < 0 or index >= len(recording.annotations):
|
||||||
|
raise click.ClickException(
|
||||||
|
f"Cannot remove annotation at index {index}\n"
|
||||||
|
f"Recording has {len(recording.annotations)} annotation(s) (indices 0-{len(recording.annotations)-1})"
|
||||||
|
)
|
||||||
|
|
||||||
|
removed_ann = recording.annotations[index]
|
||||||
|
recording._annotations.pop(index)
|
||||||
|
|
||||||
|
if not quiet:
|
||||||
|
click.echo(f"\nRemoving annotation [{index}]:")
|
||||||
|
click.echo(
|
||||||
|
f" Removed: samples {format_sample_count(removed_ann.sample_start)}-"
|
||||||
|
f"{format_sample_count(removed_ann.sample_start + removed_ann.sample_count)} ({removed_ann.label})"
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
save_recording_auto(recording, output_path=input, input_path=input, quiet=quiet, overwrite=True)
|
||||||
|
if not quiet:
|
||||||
|
click.echo(" ✓ Saved")
|
||||||
|
except Exception as e:
|
||||||
|
raise click.ClickException(f"Failed to save: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# Clear subcommand
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
|
||||||
|
@annotate.command(context_settings={"max_content_width": 175})
|
||||||
|
@click.argument("input", type=click.Path(exists=True))
|
||||||
|
@click.option("--output", "-o", type=click.Path(), help="Output file path")
|
||||||
|
@click.option("--overwrite", is_flag=True, help="Overwrite input file (non-SigMF only)")
|
||||||
|
@click.option("--force", is_flag=True, help="Skip confirmation")
|
||||||
|
@click.option("--quiet", is_flag=True, help="Quiet mode")
|
||||||
|
def clear(input, output, overwrite, force, quiet):
|
||||||
|
"""Clear all annotations.
|
||||||
|
|
||||||
|
\b
|
||||||
|
Examples:
|
||||||
|
utils annotate clear signal.sigmf-data
|
||||||
|
utils annotate clear file.npy --force
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
recording = load_recording(input)
|
||||||
|
if not quiet:
|
||||||
|
click.echo(f"Loaded: {input}")
|
||||||
|
except Exception as e:
|
||||||
|
raise click.ClickException(f"Failed to load recording: {e}")
|
||||||
|
|
||||||
|
count_before = len(recording.annotations)
|
||||||
|
|
||||||
|
if count_before == 0:
|
||||||
|
if not quiet:
|
||||||
|
click.echo("No annotations to clear")
|
||||||
|
return
|
||||||
|
|
||||||
|
# Confirm unless --force
|
||||||
|
if not force and not quiet:
|
||||||
|
click.echo(f"\nWarning: This will remove all {count_before} annotation(s)")
|
||||||
|
click.confirm("Continue?", abort=True)
|
||||||
|
|
||||||
|
recording._annotations = []
|
||||||
|
|
||||||
|
if not quiet:
|
||||||
|
click.echo(f"\nCleared {count_before} annotation(s)")
|
||||||
|
|
||||||
|
recording._annotations = []
|
||||||
|
|
||||||
|
try:
|
||||||
|
save_recording_auto(recording, output_path=input, input_path=input, quiet=quiet, overwrite=True)
|
||||||
|
if not quiet:
|
||||||
|
click.echo(" ✓ Saved")
|
||||||
|
except Exception as e:
|
||||||
|
raise click.ClickException(f"Failed to save: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# Energy detection subcommand
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
|
||||||
|
@annotate.command(context_settings={"max_content_width": 200})
|
||||||
|
@click.argument("input", type=click.Path(exists=True))
|
||||||
|
@click.option("--label", type=str, default="signal", help="Annotation label")
|
||||||
|
@click.option("--threshold", type=float, default=1.2, help="Threshold multiplier above noise floor")
|
||||||
|
@click.option("--segments", type=int, default=10, help="Number of segments for noise estimation")
|
||||||
|
@click.option("--window-size", type=int, default=200, help="Smoothing window size")
|
||||||
|
@click.option("--min-distance", type=int, default=5000, help="Min distance between detections")
|
||||||
|
@click.option(
|
||||||
|
"--freq-method",
|
||||||
|
type=click.Choice(["nbw", "obw", "full-detected", "full-bandwidth"]),
|
||||||
|
default="nbw",
|
||||||
|
help="Frequency bounding method",
|
||||||
|
)
|
||||||
|
@click.option("--nfft", type=int, default=None, help="FFT size for frequency calculation")
|
||||||
|
@click.option("--obw-power", type=float, default=0.99, help="Power percentage for OBW/NBW (0.98-0.9999)")
|
||||||
|
@click.option(
|
||||||
|
"--type",
|
||||||
|
"annotation_type",
|
||||||
|
type=click.Choice(["standalone", "parallel", "intersection"]),
|
||||||
|
default="standalone",
|
||||||
|
help="Annotation type",
|
||||||
|
)
|
||||||
|
@click.option("--output", "-o", type=click.Path(), help="Output file path")
|
||||||
|
@click.option("--overwrite", is_flag=True, help="Overwrite input file (non-SigMF only)")
|
||||||
|
@click.option("--quiet", is_flag=True, help="Quiet mode")
|
||||||
|
def energy(
|
||||||
|
input,
|
||||||
|
label,
|
||||||
|
threshold,
|
||||||
|
segments,
|
||||||
|
window_size,
|
||||||
|
min_distance,
|
||||||
|
freq_method,
|
||||||
|
nfft,
|
||||||
|
obw_power,
|
||||||
|
annotation_type,
|
||||||
|
output,
|
||||||
|
overwrite,
|
||||||
|
quiet,
|
||||||
|
):
|
||||||
|
"""Auto-detect signals using energy-based method.
|
||||||
|
|
||||||
|
Detects bursts based on energy above noise floor. Best for bursty signals
|
||||||
|
and intermittent transmissions.
|
||||||
|
|
||||||
|
\b
|
||||||
|
Frequency Bounding Methods:
|
||||||
|
nbw - Nominal bandwidth (default, best for real signals)
|
||||||
|
obw - Occupied bandwidth (more conservative, includes sidelobes)
|
||||||
|
full-detected - Lowest to highest spectral component
|
||||||
|
full-bandwidth - Entire Nyquist span
|
||||||
|
|
||||||
|
\b
|
||||||
|
Examples:
|
||||||
|
utils annotate energy capture.sigmf-data --label burst
|
||||||
|
utils annotate energy signal.npy --threshold 1.5 --min-distance 10000
|
||||||
|
utils annotate energy signal.sigmf-data --freq-method obw
|
||||||
|
utils annotate energy signal.sigmf-data --freq-method full-detected
|
||||||
|
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
recording = load_recording(input)
|
||||||
|
if not quiet:
|
||||||
|
click.echo(f"Loaded: {input}")
|
||||||
|
except Exception as e:
|
||||||
|
raise click.ClickException(f"Failed to load recording: {e}")
|
||||||
|
|
||||||
|
if not quiet:
|
||||||
|
click.echo("\nDetecting signals using energy-based method...")
|
||||||
|
click.echo(" Time detection:")
|
||||||
|
click.echo(f" Segments: {segments}")
|
||||||
|
click.echo(f" Threshold: {threshold}x noise floor")
|
||||||
|
click.echo(f" Window size: {window_size} samples")
|
||||||
|
click.echo(f" Min distance: {min_distance} samples")
|
||||||
|
click.echo(f" Frequency bounds: {freq_method}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
initial_count = len(recording.annotations)
|
||||||
|
recording = detect_signals_energy(
|
||||||
|
recording,
|
||||||
|
k=segments,
|
||||||
|
threshold_factor=threshold,
|
||||||
|
window_size=window_size,
|
||||||
|
min_distance=min_distance,
|
||||||
|
label=label,
|
||||||
|
annotation_type=annotation_type,
|
||||||
|
freq_method=freq_method,
|
||||||
|
nfft=nfft,
|
||||||
|
obw_power=obw_power,
|
||||||
|
)
|
||||||
|
added = len(recording.annotations) - initial_count
|
||||||
|
|
||||||
|
if not quiet:
|
||||||
|
click.echo(f" ✓ Added {added} annotation(s)")
|
||||||
|
|
||||||
|
save_recording_auto(recording, output, input, quiet, overwrite)
|
||||||
|
if not quiet:
|
||||||
|
click.echo(" ✓ Saved")
|
||||||
|
except Exception as e:
|
||||||
|
raise click.ClickException(f"Energy detection failed: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# CUSUM detection subcommand
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
|
||||||
|
@annotate.command()
|
||||||
|
@click.argument("input", type=click.Path(exists=True))
|
||||||
|
@click.option("--label", type=str, default="segment", help="Annotation label")
|
||||||
|
@click.option("--min-duration", type=float, default=5.0, help="Min duration in ms (prevents over-segmentation)")
|
||||||
|
@click.option("--window-size", type=int, default=1, help="Smoothing window size")
|
||||||
|
@click.option("--tolerance", type=int, default=-1, help="Sample tolerance for merging")
|
||||||
|
@click.option(
|
||||||
|
"--type",
|
||||||
|
"annotation_type",
|
||||||
|
type=click.Choice(["standalone", "parallel", "intersection"]),
|
||||||
|
default="standalone",
|
||||||
|
help="Annotation type",
|
||||||
|
)
|
||||||
|
@click.option("--output", "-o", type=click.Path(), help="Output file path")
|
||||||
|
@click.option("--overwrite", is_flag=True, help="Overwrite input file (non-SigMF only)")
|
||||||
|
@click.option("--quiet", is_flag=True, help="Quiet mode")
|
||||||
|
def cusum(input, label, min_duration, window_size, tolerance, annotation_type, output, overwrite, quiet):
|
||||||
|
"""Auto-detect segments using CUSUM method.
|
||||||
|
|
||||||
|
Detects signal state changes (on/off, amplitude transitions). Best for
|
||||||
|
segmenting continuous signals.
|
||||||
|
|
||||||
|
IMPORTANT: Always specify --min-duration to prevent excessive segmentation.
|
||||||
|
|
||||||
|
\b
|
||||||
|
Examples:
|
||||||
|
utils annotate cusum signal.sigmf-data --min-duration 5.0
|
||||||
|
utils annotate cusum data.npy --min-duration 10.0 --label state
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
recording = load_recording(input)
|
||||||
|
if not quiet:
|
||||||
|
click.echo(f"Loaded: {input}")
|
||||||
|
except Exception as e:
|
||||||
|
raise click.ClickException(f"Failed to load recording: {e}")
|
||||||
|
|
||||||
|
if not quiet:
|
||||||
|
click.echo("\nDetecting segments using CUSUM...")
|
||||||
|
click.echo(f" Min duration: {min_duration} ms")
|
||||||
|
if window_size != 1:
|
||||||
|
click.echo(f" Window size: {window_size} samples")
|
||||||
|
|
||||||
|
try:
|
||||||
|
initial_count = len(recording.annotations)
|
||||||
|
recording = annotate_with_cusum(
|
||||||
|
recording,
|
||||||
|
label=label,
|
||||||
|
window_size=window_size,
|
||||||
|
min_duration=min_duration,
|
||||||
|
tolerance=tolerance,
|
||||||
|
annotation_type=annotation_type,
|
||||||
|
)
|
||||||
|
added = len(recording.annotations) - initial_count
|
||||||
|
|
||||||
|
if not quiet:
|
||||||
|
click.echo(f" ✓ Added {added} annotation(s)")
|
||||||
|
|
||||||
|
save_recording_auto(recording, output, input, quiet, overwrite)
|
||||||
|
if not quiet:
|
||||||
|
click.echo(" ✓ Saved")
|
||||||
|
except Exception as e:
|
||||||
|
raise click.ClickException(f"CUSUM detection failed: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# Threshold detection subcommand
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
|
||||||
|
@annotate.command()
|
||||||
|
@click.argument("input", type=click.Path(exists=True))
|
||||||
|
@click.option("--threshold", type=float, required=True, help="Threshold (0.0-1.0, fraction of max magnitude)")
|
||||||
|
@click.option("--label", type=str, default=None, help="Annotation label")
|
||||||
|
@click.option("--window-size", type=int, default=1024, help="Smoothing window size")
|
||||||
|
@click.option(
|
||||||
|
"--type",
|
||||||
|
"annotation_type",
|
||||||
|
type=click.Choice(["standalone", "parallel", "intersection"]),
|
||||||
|
default="standalone",
|
||||||
|
help="Annotation type",
|
||||||
|
)
|
||||||
|
@click.option("--output", "-o", type=click.Path(), help="Output file path")
|
||||||
|
@click.option("--overwrite", is_flag=True, help="Overwrite input file (non-SigMF only)")
|
||||||
|
@click.option("--quiet", is_flag=True, help="Quiet mode")
|
||||||
|
def threshold(input, threshold, label, window_size, annotation_type, output, overwrite, quiet):
|
||||||
|
"""Auto-detect signals using threshold method.
|
||||||
|
|
||||||
|
Detects samples above a percentage of maximum magnitude. Best for simple
|
||||||
|
power-based detection.
|
||||||
|
|
||||||
|
\b
|
||||||
|
Examples:
|
||||||
|
utils annotate threshold signal.sigmf-data --threshold 0.7 --label wifi
|
||||||
|
utils annotate threshold data.npy --threshold 0.5 --window-size 2048
|
||||||
|
"""
|
||||||
|
if not (0.0 <= threshold <= 1.0):
|
||||||
|
raise click.ClickException(f"--threshold must be between 0.0 and 1.0, got {threshold}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
recording = load_recording(input)
|
||||||
|
if not quiet:
|
||||||
|
click.echo(f"Loaded: {input}")
|
||||||
|
except Exception as e:
|
||||||
|
raise click.ClickException(f"Failed to load recording: {e}")
|
||||||
|
|
||||||
|
if not quiet:
|
||||||
|
click.echo("\nDetecting signals using threshold qualifier...")
|
||||||
|
click.echo(f" Threshold: {threshold * 100:.1f}% of max magnitude")
|
||||||
|
click.echo(f" Window size: {window_size} samples")
|
||||||
|
|
||||||
|
try:
|
||||||
|
initial_count = len(recording.annotations)
|
||||||
|
recording = threshold_qualifier(
|
||||||
|
recording,
|
||||||
|
threshold=threshold,
|
||||||
|
window_size=window_size,
|
||||||
|
label=label,
|
||||||
|
annotation_type=annotation_type,
|
||||||
|
)
|
||||||
|
added = len(recording.annotations) - initial_count
|
||||||
|
|
||||||
|
if not quiet:
|
||||||
|
click.echo(f" ✓ Added {added} annotation(s)")
|
||||||
|
|
||||||
|
save_recording_auto(recording, output, input, quiet, overwrite)
|
||||||
|
if not quiet:
|
||||||
|
click.echo(" ✓ Saved")
|
||||||
|
except Exception as e:
|
||||||
|
raise click.ClickException(f"Threshold detection failed: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# Separate subcommand (Phase 2: Parallel signal separation)
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
|
||||||
|
@annotate.command()
|
||||||
|
@click.argument("input", type=click.Path(exists=True))
|
||||||
|
@click.option("--indices", type=str, help="Comma-separated annotation indices to split (default: all)")
|
||||||
|
@click.option("--nfft", type=int, default=65536, help="FFT size for spectral analysis")
|
||||||
|
@click.option("--noise-threshold-db", type=float, help="Noise floor threshold in dB (auto-estimated if not specified)")
|
||||||
|
@click.option("--min-component-bw", type=float, default=50e3, help="Min component bandwidth in Hz")
|
||||||
|
@click.option("--output", "-o", type=click.Path(), help="Output file path")
|
||||||
|
@click.option("--overwrite", is_flag=True, help="Overwrite input file (non-SigMF only)")
|
||||||
|
@click.option("--quiet", is_flag=True, help="Quiet mode")
|
||||||
|
@click.option("--verbose", is_flag=True, help="Verbose output (show detected components)")
|
||||||
|
def separate(input, indices, nfft, noise_threshold_db, min_component_bw, output, overwrite, quiet, verbose):
|
||||||
|
"""
|
||||||
|
Auto-detect parallel frequency-offset signals and split into sub-bands.
|
||||||
|
|
||||||
|
Provides methods to detect and separate overlapping frequency-domain signals
|
||||||
|
that occupy the same time window but different frequency bands.
|
||||||
|
|
||||||
|
Detects multiple frequency components within single annotations and splits
|
||||||
|
them into separate annotations. Uses spectral peak detection with dual
|
||||||
|
bandwidth estimation.
|
||||||
|
|
||||||
|
\b
|
||||||
|
Key Features:
|
||||||
|
- Spectral peak detection for frequency components
|
||||||
|
- Auto noise floor estimation (or user-specified)
|
||||||
|
- Dual bandwidth estimation: -3dB primary, cumulative power fallback
|
||||||
|
- Handles narrowband and wide signals (OFDM)
|
||||||
|
|
||||||
|
\b
|
||||||
|
Examples:
|
||||||
|
utils annotate separate capture.sigmf-data
|
||||||
|
utils annotate separate signal.npy --indices 0,1,2
|
||||||
|
utils annotate separate data.sigmf-data --noise-threshold-db -70
|
||||||
|
utils annotate separate signal.npy --min-component-bw 100000
|
||||||
|
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
recording = load_recording(input)
|
||||||
|
if not quiet:
|
||||||
|
click.echo(f"Loaded: {input}")
|
||||||
|
except Exception as e:
|
||||||
|
raise click.ClickException(f"Failed to load recording: {e}")
|
||||||
|
|
||||||
|
# Parse indices if specified
|
||||||
|
indices_list = get_indices_list(indices=indices, recording=recording)
|
||||||
|
|
||||||
|
if len(recording.annotations) == 0:
|
||||||
|
if not quiet:
|
||||||
|
click.echo("No annotations to split")
|
||||||
|
return
|
||||||
|
|
||||||
|
if not quiet:
|
||||||
|
click.echo("\nSplitting annotations by frequency components...")
|
||||||
|
click.echo(f" Input annotations: {len(recording.annotations)}")
|
||||||
|
if indices_list:
|
||||||
|
click.echo(f" Splitting indices: {indices_list}")
|
||||||
|
click.echo(f" FFT size: {nfft}")
|
||||||
|
if noise_threshold_db is not None:
|
||||||
|
click.echo(f" Noise threshold: {noise_threshold_db} dB")
|
||||||
|
else:
|
||||||
|
click.echo(" Noise threshold: auto-estimated")
|
||||||
|
click.echo(f" Min component BW: {format_frequency(min_component_bw)}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
initial_count = len(recording.annotations)
|
||||||
|
|
||||||
|
recording = split_recording_annotations(
|
||||||
|
recording,
|
||||||
|
indices=indices_list,
|
||||||
|
nfft=nfft,
|
||||||
|
noise_threshold_db=noise_threshold_db,
|
||||||
|
min_component_bw=min_component_bw,
|
||||||
|
)
|
||||||
|
|
||||||
|
final_count = len(recording.annotations)
|
||||||
|
added = final_count - initial_count
|
||||||
|
|
||||||
|
if not quiet:
|
||||||
|
click.echo(f" ✓ Output annotations: {final_count} ({'+' if added >= 0 else ''}{added} change)")
|
||||||
|
if verbose and added > 0:
|
||||||
|
click.echo("\n Details:")
|
||||||
|
for i in range(initial_count, final_count):
|
||||||
|
ann = recording.annotations[i]
|
||||||
|
freq_range = f"{format_frequency(ann.freq_lower_edge)} - {format_frequency(ann.freq_upper_edge)}"
|
||||||
|
click.echo(
|
||||||
|
f" [{i}] samples {format_sample_count(ann.sample_start)}-"
|
||||||
|
f"{format_sample_count(ann.sample_start + ann.sample_count)}: {freq_range}"
|
||||||
|
)
|
||||||
|
|
||||||
|
save_recording_auto(recording, output, input, quiet, overwrite)
|
||||||
|
if not quiet:
|
||||||
|
click.echo(" ✓ Saved")
|
||||||
|
except Exception as e:
|
||||||
|
raise click.ClickException(f"Spectral separation failed: {e}")
|
||||||
Loading…
Reference in New Issue
Block a user