From 18395a0af8b4c9367397a92d79db33ee7c07d0c3 Mon Sep 17 00:00:00 2001 From: gillian Date: Tue, 9 Dec 2025 14:12:11 -0500 Subject: [PATCH] adding view channels to view_signal --- pyproject.toml | 7 +- .../ria_toolkit_oss/annotate.py | 803 ------------------ src/ria_toolkit_oss/annotations/__init__.py | 54 -- .../annotations/annotation_transforms.py | 55 -- .../annotations/cusum_annotator.py | 197 ----- .../annotations/energy_detector.py | 520 ------------ .../annotations/parallel_signal_separator.py | 484 ----------- .../annotations/qualify_slice.py | 35 - .../annotations/signal_isolation.py | 97 --- .../annotations/threshold_qualifier.py | 114 --- src/ria_toolkit_oss/io/common.py | 83 ++ .../ria_toolkit_oss_cli}/__init__.py | 0 .../ria_toolkit_oss_cli}/cli.py | 2 +- .../ria_toolkit_oss/__init__.py | 0 .../ria_toolkit_oss/capture.py | 6 +- .../ria_toolkit_oss/combine.py | 0 .../ria_toolkit_oss/commands.py | 1 - .../ria_toolkit_oss/common.py | 0 .../ria_toolkit_oss/config.py | 0 .../ria_toolkit_oss/convert.py | 0 .../ria_toolkit_oss/discover.py | 0 .../ria_toolkit_oss/generate.py | 18 +- .../ria_toolkit_oss/init.py | 0 .../ria_toolkit_oss/split.py | 0 .../ria_toolkit_oss/transform.py | 0 .../ria_toolkit_oss/transmit.py | 0 .../ria_toolkit_oss/view.py | 0 src/ria_toolkit_oss/view/__init__.py | 3 +- src/ria_toolkit_oss/view/dataset.py | 2 +- src/ria_toolkit_oss/view/recording.py | 2 +- src/ria_toolkit_oss/view/view_signal.py | 60 ++ 31 files changed, 165 insertions(+), 2378 deletions(-) delete mode 100644 ria_toolkit_oss_cli/ria_toolkit_oss/annotate.py delete mode 100644 src/ria_toolkit_oss/annotations/__init__.py delete mode 100644 src/ria_toolkit_oss/annotations/annotation_transforms.py delete mode 100644 src/ria_toolkit_oss/annotations/cusum_annotator.py delete mode 100644 src/ria_toolkit_oss/annotations/energy_detector.py delete mode 100644 src/ria_toolkit_oss/annotations/parallel_signal_separator.py delete mode 100644 src/ria_toolkit_oss/annotations/qualify_slice.py delete mode 100644 src/ria_toolkit_oss/annotations/signal_isolation.py delete mode 100644 src/ria_toolkit_oss/annotations/threshold_qualifier.py create mode 100644 src/ria_toolkit_oss/io/common.py rename {ria_toolkit_oss_cli => src/ria_toolkit_oss/ria_toolkit_oss_cli}/__init__.py (100%) rename {ria_toolkit_oss_cli => src/ria_toolkit_oss/ria_toolkit_oss_cli}/cli.py (87%) create mode 100644 src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/__init__.py rename {ria_toolkit_oss_cli => src/ria_toolkit_oss/ria_toolkit_oss_cli}/ria_toolkit_oss/capture.py (98%) rename {ria_toolkit_oss_cli => src/ria_toolkit_oss/ria_toolkit_oss_cli}/ria_toolkit_oss/combine.py (100%) rename {ria_toolkit_oss_cli => src/ria_toolkit_oss/ria_toolkit_oss_cli}/ria_toolkit_oss/commands.py (94%) rename {ria_toolkit_oss_cli => src/ria_toolkit_oss/ria_toolkit_oss_cli}/ria_toolkit_oss/common.py (100%) rename {ria_toolkit_oss_cli => src/ria_toolkit_oss/ria_toolkit_oss_cli}/ria_toolkit_oss/config.py (100%) rename {ria_toolkit_oss_cli => src/ria_toolkit_oss/ria_toolkit_oss_cli}/ria_toolkit_oss/convert.py (100%) rename {ria_toolkit_oss_cli => src/ria_toolkit_oss/ria_toolkit_oss_cli}/ria_toolkit_oss/discover.py (100%) rename {ria_toolkit_oss_cli => src/ria_toolkit_oss/ria_toolkit_oss_cli}/ria_toolkit_oss/generate.py (98%) rename {ria_toolkit_oss_cli => src/ria_toolkit_oss/ria_toolkit_oss_cli}/ria_toolkit_oss/init.py (100%) rename {ria_toolkit_oss_cli => src/ria_toolkit_oss/ria_toolkit_oss_cli}/ria_toolkit_oss/split.py (100%) rename {ria_toolkit_oss_cli => src/ria_toolkit_oss/ria_toolkit_oss_cli}/ria_toolkit_oss/transform.py (100%) rename {ria_toolkit_oss_cli => src/ria_toolkit_oss/ria_toolkit_oss_cli}/ria_toolkit_oss/transmit.py (100%) rename {ria_toolkit_oss_cli => src/ria_toolkit_oss/ria_toolkit_oss_cli}/ria_toolkit_oss/view.py (100%) diff --git a/pyproject.toml b/pyproject.toml index d06e3be..1f0308d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,6 +12,7 @@ maintainers = [ { name = "Benjamin Chinnery", email = "ben@qoherent.ai" }, { name = "Ashkan Beigi", email = "ash@qoherent.ai" }, { name = "Madrigal Weersink", email = "madrigal@qoherent.ai" }, + { name = "Gillian Ford", email = "gillian@qoherent.ai" } ] keywords = [ "radio", @@ -67,7 +68,8 @@ all-sdr = [ [tool.poetry] packages = [ - { include = "ria_toolkit_oss", from = "src" } + { include = "ria_toolkit_oss", from = "src" }, + { include = "ria_toolkit_oss_cli", from = "src/ria_toolkit_oss" } ] include = [ "**/*.so", # Required for Nuitkaification @@ -97,6 +99,9 @@ pylint = "^3.2.6" # For pyreverse, to automate the creation of UML diagrams "Source" = "https://riahub.ai/qoherent/ria-toolkit-oss" "Issues Board" = "https://riahub.ai/qoherent/ria-toolkit-oss/issues" +[tool.poetry.scripts] +ria = "ria_toolkit_oss.ria_toolkit_oss_cli.cli:cli" + [tool.black] line-length = 119 target-version = ["py310"] diff --git a/ria_toolkit_oss_cli/ria_toolkit_oss/annotate.py b/ria_toolkit_oss_cli/ria_toolkit_oss/annotate.py deleted file mode 100644 index 37759a2..0000000 --- a/ria_toolkit_oss_cli/ria_toolkit_oss/annotate.py +++ /dev/null @@ -1,803 +0,0 @@ -"""Annotate command - Automatic detection and manual annotation management.""" - -import json -from pathlib import Path - -import click - -from src.ria_toolkit_oss.annotations import ( - annotate_with_cusum, - detect_signals_energy, - split_recording_annotations, - threshold_qualifier, -) -from ria_toolkit_oss.datatypes import Annotation -from ria_toolkit_oss.datatypes.recording import Recording -from ria_toolkit_oss.io import load_recording, to_blue, to_npy, to_sigmf, to_wav -from ria_toolkit_oss.common import format_frequency, format_sample_count - - -def normalize_sigmf_path(filepath): - """Normalize SigMF path to base name without extension.""" - path = Path(filepath) - - # Handle .sigmf-data, .sigmf-meta, or .sigmf - if ".sigmf" in path.suffix: - # Remove the suffix to get base name - return path.with_suffix("") - else: - return path - - -def detect_input_format(filepath): - """Detect file format from extension.""" - path = Path(filepath) - ext = path.suffix.lower() - - if ext in [".sigmf-data", ".sigmf-meta"]: - return "sigmf" - elif path.name.endswith(".sigmf"): - return "sigmf" - elif ext == ".npy": - return "npy" - elif ext == ".wav": - return "wav" - elif ext == ".blue": - return "blue" - else: - raise click.ClickException(f"Unknown format for '{filepath}'. Supported: .sigmf, .npy, .wav, .blue") - - -def determine_output_path(input_path, output_path, fmt, quiet, overwrite): - if fmt == "sigmf": - if not quiet: - click.echo(f"Saving annotations to: {output_path}.sigmf-meta") - - if output_path: - # Normalize the output path for consistency - return normalize_sigmf_path(output_path) - else: - # Auto-generate from input path - return normalize_sigmf_path(input_path) - else: - if not quiet: - click.echo(f"Saving to: {output_path}") - - if output_path: - return Path(output_path) - else: - # Other formats: add _annotated suffix unless --overwrite - if overwrite: - return input_path - else: - return input_path.with_name(input_path.stem + "_annotated" + input_path.suffix) - - -def save_recording_auto(recording, output_path, input_path, quiet=False, overwrite=False): - """Save recording, auto-detecting format from extension. - - For SigMF: Only overwrites metadata file, data file is unchanged - For other formats: Creates _annotated copy by default, unless overwrite=True - """ - input_path = Path(input_path) - fmt = detect_input_format(input_path) - - # Determine output path - output_path = determine_output_path( - input_path=input_path, output_path=output_path, fmt=fmt, quiet=quiet, overwrite=overwrite - ) - - if fmt == "sigmf": - # Normalize path for SigMF - base_path = output_path - stem = base_path.name - parent = base_path.parent - - # For SigMF: only save metadata, copy data if needed - meta_path = parent / f"{stem}.sigmf-meta" - data_path = parent / f"{stem}.sigmf-data" - - # If output is different from input, copy data file - input_base = normalize_sigmf_path(input_path) - if input_base != base_path: - import shutil - - # Construct input data path correctly - # input_base is like /path/to/recording or /path/to/recording.sigmf - # We need /path/to/recording.sigmf-data - if str(input_base).endswith(".sigmf"): - input_data = Path(str(input_base).replace(".sigmf", ".sigmf-data")) - else: - input_data = input_base.parent / f"{input_base.name}.sigmf-data" - if not quiet: - click.echo(f" Copying: {data_path}") - shutil.copy2(input_data, data_path) - - # Always save metadata (this is the whole point) - to_sigmf(recording, filename=stem, path=parent, overwrite=True) - - if not quiet: - click.echo(f" Updated: {meta_path}") - if input_base != base_path: - click.echo(f" Created: {data_path}") - - elif fmt == "npy": - to_npy(recording, filename=output_path.stem, path=output_path.parent, overwrite=True) - if not quiet: - click.echo(f" Created: {output_path}") - elif fmt == "wav": - to_wav(recording, filename=output_path.stem, path=output_path.parent, overwrite=True) - if not quiet: - click.echo(f" Created: {output_path}") - elif fmt == "blue": - to_blue(recording, filename=output_path.stem, path=output_path.parent, overwrite=True) - if not quiet: - click.echo(f" Created: {output_path}") - - -def determine_frequency_bounds(recording: Recording, freq_lower, freq_upper): - # Handle frequency bounds - if (freq_lower is None) != (freq_upper is None): - raise click.ClickException("Must specify both --freq-lower and --freq-upper, or neither") - - if freq_lower is None: - # Default to full bandwidth - sample_rate = recording.metadata.get("sample_rate", 1) - center_freq = recording.metadata.get("center_frequency", 0) - freq_lower = center_freq - (sample_rate / 2) - freq_upper = center_freq + (sample_rate / 2) - freq_default = True - else: - freq_default = False - if freq_lower >= freq_upper: - raise click.ClickException( - f"Invalid frequency range: lower ({format_frequency(freq_lower)}) " - f"must be < upper ({format_frequency(freq_upper)})" - ) - - return freq_lower, freq_upper, freq_default - - -def get_indices_list(indices, recording: Recording): - if indices: - try: - indices_list = [int(idx.strip()) for idx in indices.split(",")] - # Validate indices - for idx in indices_list: - if idx < 0 or idx >= len(recording.annotations): - raise click.ClickException( - f"Invalid index {idx}. Recording has {len(recording.annotations)} annotation(s)" - ) - except ValueError as e: - raise click.ClickException(f"Invalid indices format. Expected comma-separated integers: {e}") - - return indices_list - else: - return None - - -# ============================================================================ -# Main command group -# ============================================================================ - - -@click.group() -def annotate(): - """Manage and auto-detect annotations on RF recordings. - - \b - Subcommands: - list - List annotations - add - Add manual annotation - remove - Remove annotation by index - clear - Clear all annotations - energy - Auto-detect using energy method - cusum - Auto-detect using CUSUM method - threshold - Auto-detect using threshold method - separate - Split annotations by frequency components (Phase 2) - - \b - File Path Handling: - - SigMF files: Pass .sigmf-data, .sigmf-meta, or base name - - Other formats: .npy, .wav, .blue files - - \b - Output Behavior: - - SigMF: Updates .sigmf-meta only (data unchanged), in-place - - Other: Creates _annotated copy unless --overwrite specified - """ - pass - - -# ============================================================================ -# List subcommand -# ============================================================================ - - -@annotate.command() -@click.argument("input", type=click.Path(exists=True)) -@click.option("--verbose", is_flag=True, help="Show detailed annotation info") -def list(input, verbose): - """List all annotations in a recording. - - \b - Examples: - ria annotate list recording.sigmf-data - ria annotate list signal.npy --verbose - """ - try: - recording = load_recording(input) - except Exception as e: - raise click.ClickException(f"Failed to load recording: {e}") - - if len(recording.annotations) == 0: - click.echo(f"No annotations in {Path(input).name}") - return - - click.echo(f"\nAnnotations in {Path(input).name}:") - for i, ann in enumerate(recording.annotations): - # Parse type from comment JSON - try: - comment_data = json.loads(ann.comment) - ann_type = comment_data.get("type", "unknown") - user_comment = comment_data.get("user_comment", "") - except (json.JSONDecodeError, TypeError): - ann_type = "unknown" - user_comment = ann.comment or "" - - # Basic info - freq_range = f"{format_frequency(ann.freq_lower_edge)} - {format_frequency(ann.freq_upper_edge)}" - click.echo( - f" [{i}] Samples {format_sample_count(ann.sample_start)}-" - f"{format_sample_count(ann.sample_start + ann.sample_count)}: {ann.label}" - ) - click.echo(f" Type: {ann_type}") - - if verbose: - if user_comment: - click.echo(f" Comment: {user_comment}") - click.echo(f" Frequency: {freq_range}") - if ann.detail: - click.echo(f" Detail: {ann.detail}") - - click.echo(f"\nTotal: {len(recording.annotations)} annotation(s)") - - -# ============================================================================ -# Add subcommand -# ============================================================================ - - -@annotate.command(context_settings={"max_content_width": 200}) -@click.argument("input", type=click.Path(exists=True)) -@click.option("--start", type=int, required=True, help="Start sample index") -@click.option("--count", type=int, required=True, help="Sample count") -@click.option("--label", type=str, required=True, help="Annotation label") -@click.option("--freq-lower", type=float, help="Lower frequency edge (Hz)") -@click.option("--freq-upper", type=float, help="Upper frequency edge (Hz)") -@click.option("--comment", type=str, help="Human-readable comment") -@click.option( - "--type", - "annotation_type", - type=click.Choice(["standalone", "parallel", "intersection"]), - default="standalone", - help="Annotation type", -) -@click.option("--output", "-o", type=click.Path(), help="Output file path") -@click.option("--overwrite", is_flag=True, help="Overwrite input file (non-SigMF only)") -@click.option("--quiet", is_flag=True, help="Quiet mode") -def add(input, start, count, label, freq_lower, freq_upper, comment, annotation_type, output, overwrite, quiet): - """Add a manual annotation. - - \b - Examples: - ria annotate add file.npy --start 1000 --count 500 --label wifi - ria annotate add signal.sigmf-data --start 0 --count 1000 --label burst --comment "Strong signal" - """ - try: - recording = load_recording(input) - if not quiet: - click.echo(f"Loaded: {input}") - except Exception as e: - raise click.ClickException(f"Failed to load recording: {e}") - - # Validate sample range - n_samples = len(recording.data[0]) - if start < 0: - raise click.ClickException(f"--start must be >= 0, got {start}") - if count <= 0: - raise click.ClickException(f"--count must be > 0, got {count}") - if start + count > n_samples: - raise click.ClickException( - f"Invalid annotation range:\n" - f" Start: {start:,}\n" - f" Count: {count:,}\n" - f" End: {start + count:,}\n" - f"Recording only has {n_samples:,} samples" - ) - - # Handle frequency bounds - freq_lower, freq_upper, freq_default = determine_frequency_bounds( - recording=recording, freq_lower=freq_lower, freq_upper=freq_upper - ) - - # Build comment JSON - comment_data = {"type": annotation_type} - if comment: - comment_data["user_comment"] = comment - - # Create annotation - ann = Annotation( - sample_start=start, - sample_count=count, - freq_lower_edge=freq_lower, - freq_upper_edge=freq_upper, - label=label, - comment=json.dumps(comment_data), - detail={}, - ) - - recording._annotations.append(ann) - - if not quiet: - click.echo("\nAdding annotation:") - click.echo(f" Start: {format_sample_count(start)}") - click.echo(f" Count: {format_sample_count(count)} samples") - freq_str = ( - "full bandwidth" if freq_default else f"{format_frequency(freq_lower)} - {format_frequency(freq_upper)}" - ) - click.echo(f" Frequency: {freq_str}") - click.echo(f" Label: {label}") - click.echo(f" Type: {annotation_type}") - if comment: - click.echo(f" Comment: {comment}") - - try: - save_recording_auto(recording, output, input, quiet, overwrite) - if not quiet: - click.echo(" ✓ Saved") - except Exception as e: - raise click.ClickException(f"Failed to save: {e}") - - -# ============================================================================ -# Remove subcommand -# ============================================================================ - - -@annotate.command(context_settings={"max_content_width": 200}) -@click.argument("input", type=click.Path(exists=True)) -@click.argument("index", type=int) -@click.option("--output", "-o", type=click.Path(), help="Output file path") -@click.option("--overwrite", is_flag=True, help="Overwrite input file (non-SigMF only)") -@click.option("--quiet", is_flag=True, help="Quiet mode") -def remove(input, index, output, overwrite, quiet): - """Remove annotation by index. - - Use 'ria annotate list' to see annotation indices. - - \b - Examples: - ria annotate remove signal.sigmf-data 2 - ria annotate remove file.npy 0 - """ - try: - recording = load_recording(input) - if not quiet: - click.echo(f"Loaded: {input}") - except Exception as e: - raise click.ClickException(f"Failed to load recording: {e}") - - if index < 0 or index >= len(recording.annotations): - raise click.ClickException( - f"Cannot remove annotation at index {index}\n" - f"Recording has {len(recording.annotations)} annotation(s) (indices 0-{len(recording.annotations)-1})" - ) - - removed_ann = recording.annotations[index] - recording._annotations.pop(index) - - if not quiet: - click.echo(f"\nRemoving annotation [{index}]:") - click.echo( - f" Removed: samples {format_sample_count(removed_ann.sample_start)}-" - f"{format_sample_count(removed_ann.sample_start + removed_ann.sample_count)} ({removed_ann.label})" - ) - - try: - save_recording_auto(recording, output, input, quiet, overwrite) - if not quiet: - click.echo(" ✓ Saved") - except Exception as e: - raise click.ClickException(f"Failed to save: {e}") - - -# ============================================================================ -# Clear subcommand -# ============================================================================ - - -@annotate.command(context_settings={"max_content_width": 175}) -@click.argument("input", type=click.Path(exists=True)) -@click.option("--output", "-o", type=click.Path(), help="Output file path") -@click.option("--overwrite", is_flag=True, help="Overwrite input file (non-SigMF only)") -@click.option("--force", is_flag=True, help="Skip confirmation") -@click.option("--quiet", is_flag=True, help="Quiet mode") -def clear(input, output, overwrite, force, quiet): - """Clear all annotations. - - \b - Examples: - ria annotate clear signal.sigmf-data - ria annotate clear file.npy --force - """ - try: - recording = load_recording(input) - if not quiet: - click.echo(f"Loaded: {input}") - except Exception as e: - raise click.ClickException(f"Failed to load recording: {e}") - - count_before = len(recording.annotations) - - if count_before == 0: - if not quiet: - click.echo("No annotations to clear") - return - - # Confirm unless --force - if not force and not quiet: - click.echo(f"\nWarning: This will remove all {count_before} annotation(s)") - click.confirm("Continue?", abort=True) - - recording._annotations = [] - - if not quiet: - click.echo(f"\nCleared {count_before} annotation(s)") - - try: - save_recording_auto(recording, output, input, quiet, overwrite) - if not quiet: - click.echo(" ✓ Saved") - except Exception as e: - raise click.ClickException(f"Failed to save: {e}") - - -# ============================================================================ -# Energy detection subcommand -# ============================================================================ - - -@annotate.command(context_settings={"max_content_width": 200}) -@click.argument("input", type=click.Path(exists=True)) -@click.option("--label", type=str, default="signal", help="Annotation label") -@click.option("--threshold", type=float, default=1.2, help="Threshold multiplier above noise floor") -@click.option("--segments", type=int, default=10, help="Number of segments for noise estimation") -@click.option("--window-size", type=int, default=200, help="Smoothing window size") -@click.option("--min-distance", type=int, default=5000, help="Min distance between detections") -@click.option( - "--freq-method", - type=click.Choice(["nbw", "obw", "full-detected", "full-bandwidth"]), - default="nbw", - help="Frequency bounding method", -) -@click.option("--nfft", type=int, default=65536, help="FFT size for frequency calculation") -@click.option("--obw-power", type=float, default=0.9999, help="Power percentage for OBW/NBW (0.99-0.9999)") -@click.option( - "--type", - "annotation_type", - type=click.Choice(["standalone", "parallel", "intersection"]), - default="standalone", - help="Annotation type", -) -@click.option("--output", "-o", type=click.Path(), help="Output file path") -@click.option("--overwrite", is_flag=True, help="Overwrite input file (non-SigMF only)") -@click.option("--quiet", is_flag=True, help="Quiet mode") -def energy( - input, - label, - threshold, - segments, - window_size, - min_distance, - freq_method, - nfft, - obw_power, - annotation_type, - output, - overwrite, - quiet, -): - """Auto-detect signals using energy-based method. - - Detects bursts based on energy above noise floor. Best for bursty signals - and intermittent transmissions. - - \b - Frequency Bounding Methods: - nbw - Nominal bandwidth (default, best for real signals) - obw - Occupied bandwidth (more conservative, includes sidelobes) - full-detected - Lowest to highest spectral component - full-bandwidth - Entire Nyquist span - - \b - Examples: - ria annotate energy capture.sigmf-data --label burst - ria annotate energy signal.npy --threshold 1.5 --min-distance 10000 - ria annotate energy signal.sigmf-data --freq-method obw - ria annotate energy signal.sigmf-data --freq-method full-detected - - """ - try: - recording = load_recording(input) - if not quiet: - click.echo(f"Loaded: {input}") - except Exception as e: - raise click.ClickException(f"Failed to load recording: {e}") - - if not quiet: - click.echo("\nDetecting signals using energy-based method...") - click.echo(" Time detection:") - click.echo(f" Segments: {segments}") - click.echo(f" Threshold: {threshold}x noise floor") - click.echo(f" Window size: {window_size} samples") - click.echo(f" Min distance: {min_distance} samples") - click.echo(f" Frequency bounds: {freq_method}") - - try: - initial_count = len(recording.annotations) - recording = detect_signals_energy( - recording, - k=segments, - threshold_factor=threshold, - window_size=window_size, - min_distance=min_distance, - label=label, - annotation_type=annotation_type, - freq_method=freq_method, - nfft=nfft, - obw_power=obw_power, - ) - added = len(recording.annotations) - initial_count - - if not quiet: - click.echo(f" ✓ Added {added} annotation(s)") - - save_recording_auto(recording, output, input, quiet, overwrite) - if not quiet: - click.echo(" ✓ Saved") - except Exception as e: - raise click.ClickException(f"Energy detection failed: {e}") - - -# ============================================================================ -# CUSUM detection subcommand -# ============================================================================ - - -@annotate.command() -@click.argument("input", type=click.Path(exists=True)) -@click.option("--label", type=str, default="segment", help="Annotation label") -@click.option("--min-duration", type=float, default=5.0, help="Min duration in ms (prevents over-segmentation)") -@click.option("--window-size", type=int, default=1, help="Smoothing window size") -@click.option("--tolerance", type=int, default=-1, help="Sample tolerance for merging") -@click.option( - "--type", - "annotation_type", - type=click.Choice(["standalone", "parallel", "intersection"]), - default="standalone", - help="Annotation type", -) -@click.option("--output", "-o", type=click.Path(), help="Output file path") -@click.option("--overwrite", is_flag=True, help="Overwrite input file (non-SigMF only)") -@click.option("--quiet", is_flag=True, help="Quiet mode") -def cusum(input, label, min_duration, window_size, tolerance, annotation_type, output, overwrite, quiet): - """Auto-detect segments using CUSUM method. - - Detects signal state changes (on/off, amplitude transitions). Best for - segmenting continuous signals. - - IMPORTANT: Always specify --min-duration to prevent excessive segmentation. - - \b - Examples: - ria annotate cusum signal.sigmf-data --min-duration 5.0 - ria annotate cusum data.npy --min-duration 10.0 --label state - """ - try: - recording = load_recording(input) - if not quiet: - click.echo(f"Loaded: {input}") - except Exception as e: - raise click.ClickException(f"Failed to load recording: {e}") - - if not quiet: - click.echo("\nDetecting segments using CUSUM...") - click.echo(f" Min duration: {min_duration} ms") - if window_size != 1: - click.echo(f" Window size: {window_size} samples") - - try: - initial_count = len(recording.annotations) - recording = annotate_with_cusum( - recording, - label=label, - window_size=window_size, - min_duration=min_duration, - tolerance=tolerance, - annotation_type=annotation_type, - ) - added = len(recording.annotations) - initial_count - - if not quiet: - click.echo(f" ✓ Added {added} annotation(s)") - - save_recording_auto(recording, output, input, quiet, overwrite) - if not quiet: - click.echo(" ✓ Saved") - except Exception as e: - raise click.ClickException(f"CUSUM detection failed: {e}") - - -# ============================================================================ -# Threshold detection subcommand -# ============================================================================ - - -@annotate.command() -@click.argument("input", type=click.Path(exists=True)) -@click.option("--threshold", type=float, required=True, help="Threshold (0.0-1.0, fraction of max magnitude)") -@click.option("--label", type=str, default="signal", help="Annotation label") -@click.option("--window-size", type=int, default=1024, help="Smoothing window size") -@click.option( - "--type", - "annotation_type", - type=click.Choice(["standalone", "parallel", "intersection"]), - default="standalone", - help="Annotation type", -) -@click.option("--output", "-o", type=click.Path(), help="Output file path") -@click.option("--overwrite", is_flag=True, help="Overwrite input file (non-SigMF only)") -@click.option("--quiet", is_flag=True, help="Quiet mode") -def threshold(input, threshold, label, window_size, annotation_type, output, overwrite, quiet): - """Auto-detect signals using threshold method. - - Detects samples above a percentage of maximum magnitude. Best for simple - power-based detection. - - \b - Examples: - ria annotate threshold signal.sigmf-data --threshold 0.7 --label wifi - ria annotate threshold data.npy --threshold 0.5 --window-size 2048 - """ - if not (0.0 <= threshold <= 1.0): - raise click.ClickException(f"--threshold must be between 0.0 and 1.0, got {threshold}") - - try: - recording = load_recording(input) - if not quiet: - click.echo(f"Loaded: {input}") - except Exception as e: - raise click.ClickException(f"Failed to load recording: {e}") - - if not quiet: - click.echo("\nDetecting signals using threshold qualifier...") - click.echo(f" Threshold: {threshold * 100:.1f}% of max magnitude") - click.echo(f" Window size: {window_size} samples") - - try: - initial_count = len(recording.annotations) - recording = threshold_qualifier( - recording, - threshold=threshold, - window_size=window_size, - label=label, - annotation_type=annotation_type, - ) - added = len(recording.annotations) - initial_count - - if not quiet: - click.echo(f" ✓ Added {added} annotation(s)") - - save_recording_auto(recording, output, input, quiet, overwrite) - if not quiet: - click.echo(" ✓ Saved") - except Exception as e: - raise click.ClickException(f"Threshold detection failed: {e}") - - -# ============================================================================ -# Separate subcommand (Phase 2: Parallel signal separation) -# ============================================================================ - - -@annotate.command() -@click.argument("input", type=click.Path(exists=True)) -@click.option("--indices", type=str, help="Comma-separated annotation indices to split (default: all)") -@click.option("--nfft", type=int, default=65536, help="FFT size for spectral analysis") -@click.option("--noise-threshold-db", type=float, help="Noise floor threshold in dB (auto-estimated if not specified)") -@click.option("--min-component-bw", type=float, default=50e3, help="Min component bandwidth in Hz") -@click.option("--output", "-o", type=click.Path(), help="Output file path") -@click.option("--overwrite", is_flag=True, help="Overwrite input file (non-SigMF only)") -@click.option("--quiet", is_flag=True, help="Quiet mode") -@click.option("--verbose", is_flag=True, help="Verbose output (show detected components)") -def separate(input, indices, nfft, noise_threshold_db, min_component_bw, output, overwrite, quiet, verbose): - """Split annotations by frequency components (Phase 2). - - Detects multiple frequency components within single annotations and splits - them into separate annotations. Uses spectral peak detection with dual - bandwidth estimation. - - \b - Key Features: - - Spectral peak detection for frequency components - - Auto noise floor estimation (or user-specified) - - Dual bandwidth estimation: -3dB primary, cumulative power fallback - - Handles narrowband and wide signals (OFDM) - - \b - Examples: - ria annotate separate capture.sigmf-data - ria annotate separate signal.npy --indices 0,1,2 - ria annotate separate data.sigmf-data --noise-threshold-db -70 - ria annotate separate signal.npy --min-component-bw 100000 - - """ - try: - recording = load_recording(input) - if not quiet: - click.echo(f"Loaded: {input}") - except Exception as e: - raise click.ClickException(f"Failed to load recording: {e}") - - # Parse indices if specified - indices_list = get_indices_list(indices=indices, recording=recording) - - if len(recording.annotations) == 0: - if not quiet: - click.echo("No annotations to split") - return - - if not quiet: - click.echo("\nSplitting annotations by frequency components...") - click.echo(f" Input annotations: {len(recording.annotations)}") - if indices_list: - click.echo(f" Splitting indices: {indices_list}") - click.echo(f" FFT size: {nfft}") - if noise_threshold_db is not None: - click.echo(f" Noise threshold: {noise_threshold_db} dB") - else: - click.echo(" Noise threshold: auto-estimated") - click.echo(f" Min component BW: {format_frequency(min_component_bw)}") - - try: - initial_count = len(recording.annotations) - - recording = split_recording_annotations( - recording, - indices=indices_list, - nfft=nfft, - noise_threshold_db=noise_threshold_db, - min_component_bw=min_component_bw, - ) - - final_count = len(recording.annotations) - added = final_count - initial_count - - if not quiet: - click.echo(f" ✓ Output annotations: {final_count} ({'+' if added >= 0 else ''}{added} change)") - if verbose and added > 0: - click.echo("\n Details:") - for i in range(initial_count, final_count): - ann = recording.annotations[i] - freq_range = f"{format_frequency(ann.freq_lower_edge)} - {format_frequency(ann.freq_upper_edge)}" - click.echo( - f" [{i}] samples {format_sample_count(ann.sample_start)}-" - f"{format_sample_count(ann.sample_start + ann.sample_count)}: {freq_range}" - ) - - save_recording_auto(recording, output, input, quiet, overwrite) - if not quiet: - click.echo(" ✓ Saved") - except Exception as e: - raise click.ClickException(f"Spectral separation failed: {e}") diff --git a/src/ria_toolkit_oss/annotations/__init__.py b/src/ria_toolkit_oss/annotations/__init__.py deleted file mode 100644 index 1542dcf..0000000 --- a/src/ria_toolkit_oss/annotations/__init__.py +++ /dev/null @@ -1,54 +0,0 @@ -""" -The annotations package contains tools and utilities for creating, managing, and processing annotations. - -Provides automatic annotation generation using various signal detection algorithms: -- Energy-based detection (detect_signals_energy) -- CUSUM-based segmentation (annotate_with_cusum) -- Threshold-based qualification (threshold_qualifier) -- Signal isolation and extraction (isolate_signal) -- Occupied bandwidth analysis (calculate_occupied_bandwidth, calculate_nominal_bandwidth) - -All detection functions return Recording objects with added annotations. -""" - -__all__ = [ - # Energy-based detection - "detect_signals_energy", - "calculate_occupied_bandwidth", - "calculate_nominal_bandwidth", - "calculate_full_detected_bandwidth", - "annotate_with_obw", - # CUSUM detection - "annotate_with_cusum", - # Threshold detection - "threshold_qualifier", - # Parallel signal separation (Phase 2) - "find_spectral_components", - "split_annotation_by_components", - "split_recording_annotations", - # Signal isolation - "isolate_signal", - # Annotation transforms - "remove_contained_boxes", - "is_annotation_contained", - # Dataset creation - "qualify_slice_from_annotations", -] - -from .annotation_transforms import is_annotation_contained, remove_contained_boxes -from .cusum_annotator import annotate_with_cusum -from .energy_detector import ( - annotate_with_obw, - calculate_full_detected_bandwidth, - calculate_nominal_bandwidth, - calculate_occupied_bandwidth, - detect_signals_energy, -) -from .parallel_signal_separator import ( - find_spectral_components, - split_annotation_by_components, - split_recording_annotations, -) -from .qualify_slice import qualify_slice_from_annotations -from .signal_isolation import isolate_signal -from .threshold_qualifier import threshold_qualifier diff --git a/src/ria_toolkit_oss/annotations/annotation_transforms.py b/src/ria_toolkit_oss/annotations/annotation_transforms.py deleted file mode 100644 index 47300c1..0000000 --- a/src/ria_toolkit_oss/annotations/annotation_transforms.py +++ /dev/null @@ -1,55 +0,0 @@ -from ria_toolkit_oss.datatypes.annotation import Annotation - -# TODO figure out how to transfer labels in the merge case - - -def remove_contained_boxes(annotations: list[Annotation]): - """ - Remove all annotations (bounding boxes) that are entirely contained within other boxes in the list. - - :param annotations: A list of Annotation objects. - :type annotations: list[Annotation] - - :returns: A new list of Annotation objects. - :rtype: list[Annotation]""" - - output_boxes = [] - - for i in range(len(annotations)): - contained = False - for j in range(len(annotations)): - if i != j and is_annotation_contained(annotations[i], annotations[j]): - contained = True - break - - if not contained: - output_boxes.append(annotations[i]) - - return output_boxes - - -def is_annotation_contained(inner: Annotation, outer: Annotation) -> bool: - """ - Check if an annotation box is entirely contained within another annotation bounding box. - - :param inner: The inner box. - :type inner: Annotation. - :param outer: The outer box. - :type outer: Annotation. - - :returns: True if inner is within outer, false otherwise. - :rtype: bool - """ - - inner_sample_stop = inner.sample_start + inner.sample_count - outer_sample_stop = outer.sample_start + outer.sample_count - - if inner.sample_start > outer.sample_start and inner_sample_stop < outer_sample_stop: - if inner.freq_lower_edge > outer.freq_lower_edge and inner.freq_upper_edge < outer.freq_upper_edge: - return True - - return False - - -def merge_annotations(annotations: list[Annotation], overlap_threshold) -> list[Annotation]: - raise NotImplementedError diff --git a/src/ria_toolkit_oss/annotations/cusum_annotator.py b/src/ria_toolkit_oss/annotations/cusum_annotator.py deleted file mode 100644 index 879abc1..0000000 --- a/src/ria_toolkit_oss/annotations/cusum_annotator.py +++ /dev/null @@ -1,197 +0,0 @@ -import itertools -import json -from itertools import groupby -from typing import Optional - -import numpy as np - -from ria_toolkit_oss.datatypes import Annotation, Recording - - -def annotate_with_cusum( - recording: Recording, - label: Optional[str] = "segment", - window_size: Optional[int] = 1, - min_duration: Optional[float] = None, - tolerance: Optional[int] = -1, - annotation_type: Optional[str] = "standalone", -): - """ - Add annotations that divide the recording into distinct time segments. - - This algorithm computes the cumulative sum of the sample magnitudes and - determines break points in the signal. - - This tool can be used to find points where a signal turns on or off, or - changes between a low and high amplitude. - - :param recording: A ``Recording`` object to annotate. - :type recording: ``utils.data.Recording`` - :param label: Label for the detected segments. - :type label: str - :param window_size: The length (in samples) of the moving average window. - :type window_size: int - :param min_duration: The minimum duration (in ms) of a segment. - The algorithm will not produce annotations shorter than this length. - :type min_duration: float - :param tolerance: The minimum length (in samples) of a segment. - :type tolerance: int - :param annotation_type: Annotation type (standalone, parallel, intersection). - :type annotation_type: str - """ - - sample_rate = recording.metadata["sample_rate"] - center_frequency = recording.metadata.get("center_frequency", 0) - - # Create an object of the time segmenter - time_segmenter = TimeSegmenter(sample_rate, min_duration, window_size, tolerance) - - # TODO refactor time segmentor such that the _ s are not required. - _, _, change_points, _ = time_segmenter.apply(recording.data[0]) - - time_segments_indices = np.append(np.insert(change_points, 0, 0), len(recording.data[0])) - annotations = [] - for i in range(len(time_segments_indices) - 1): - # Build comment JSON with type metadata - comment_data = { - "type": annotation_type, - "generator": "cusum_annotator", - "params": { - "window_size": window_size, - "min_duration": min_duration, - "tolerance": tolerance, - }, - } - - annotations.append( - Annotation( - sample_start=time_segments_indices[i], - sample_count=time_segments_indices[i + 1] - time_segments_indices[i], - freq_lower_edge=center_frequency - (sample_rate / 2), - freq_upper_edge=center_frequency + (sample_rate / 2), - label=label, - comment=json.dumps(comment_data), - detail={"generator": "cusum_annotator"}, - ) - ) - - return Recording(data=recording.data, metadata=recording.metadata, annotations=recording.annotations + annotations) - - -def _compute_cusum(_signal, sample_rate: int, tolerance: int = None, min_duration: float = -1): - """ - This function efficiently computes the cumulative sum of a give list (_signal), with an optional tolerance. - - Args: - - _signal: array of iq samples. - - Tolerance: the least acceptable length of a block, Defaults to None. - - Returns: - - cusum (array): Array of the cumulative sum of the given list - - sample_rate (int): __description_ - - change_points (array): Array of the indices at which a change in the CUSUM direction happens. - - min_duration (float): The least acceptable time width of each segment (in ms). Defaults to -1. - """ - - # efficiently calculate the running sum of the signal - cusum = list(itertools.accumulate((_signal - np.mean(_signal)))) - - # 'diff' computes the differences between the consecutive values, - # then 'sign' determines if it is +ve or -ve. - change_indicators = np.sign(np.diff(cusum)) - - # TODO: Tolerance is not useful, we can get rid of it. - """ - To add the tolerance: - 1. Group the consecutive values, count them and mark their start index - """ - if tolerance: - groups_list, index = [], 0 - # 1 - for key, group in groupby(change_indicators): - group_len = len(list(group)) - groups_list.append((key, group_len, index)) - index += group_len - # 2 - for val, n, idx in groups_list: - if n <= tolerance: - change_indicators[idx : idx + n] = -1 * val - change_points = np.where(np.diff(change_indicators))[0] + 1 - - # Limit the change_points - # Reject those whose number of samples < minimum accepted #n of samples in (min duration) ms. - if min_duration is not None and min_duration > 0: - min_samples_wide = min_duration * sample_rate / 1000 - segments_lengths = np.diff(change_points) - segments_lengths = np.insert(segments_lengths, 0, change_points[0]) - change_points = change_points[np.where(segments_lengths > min_samples_wide)[0]] - return cusum, change_points - - -class TimeSegmenter: - """Time Segmenter class, it creates a segmenter object with certain\ - characteristics to easily split an input signal to segments based on\ - the cumulative sum of deviations (of the signal mean) - """ - - def __init__( - self, sample_rate: int, min_duration: float = 1, moving_average_window: int = 3, tolerance: int = None - ): - """_summary_ - - Args: - sample_rate (int): _description_ - min_duration (float, optional): _description_. Defaults to 1. - moving_average_window (int, optional): _description_. Defaults to 3. - tolerance (int, optional): _description_. Defaults to None. - """ - self.sample_rate = sample_rate - self.min_duration = min_duration - self.moving_average_window = moving_average_window - self._moving_avg_filter = self._init_filter() - self.tolerance = tolerance - - def _init_filter(self): - """_summary_ - - Returns: - _type_: _description_ - """ - return np.ones(self.moving_average_window) / self.moving_average_window - - def _apply_filter(self, iqsignal: np.array): - """_summary_ - - Args: - iqsignal (np.array): _description_ - - Returns: - _type_: _description_ - """ - return np.convolve(abs(iqsignal), self._moving_avg_filter, mode="same") - - def _create_segments(self, iq_signal: np.array, change_points: np.array): - """_summary_ - - Args: - iq_signal (np.array): _description_ - change_points (np.array): _description_ - - Returns: - _type_: _description_ - """ - return np.split(iq_signal, change_points) - - def apply(self, iq_signal: np.array): - """_summary_ - - Args: - iq_signal (np.array): _description_ - - Returns: - _type_: _description_ - """ - smoothed_signal = self._apply_filter(iq_signal) - cusum, change_points = _compute_cusum(smoothed_signal, self.sample_rate, self.tolerance, self.min_duration) - segments = self._create_segments(iq_signal, change_points) - return smoothed_signal, cusum, change_points, segments diff --git a/src/ria_toolkit_oss/annotations/energy_detector.py b/src/ria_toolkit_oss/annotations/energy_detector.py deleted file mode 100644 index c27a5f7..0000000 --- a/src/ria_toolkit_oss/annotations/energy_detector.py +++ /dev/null @@ -1,520 +0,0 @@ -""" -Energy-based signal detection and bandwidth analysis. - -Provides automatic annotation generation using energy-based signal detection -and occupied bandwidth calculation following ITU-R SM.328 standard. -""" - -import json -from typing import Tuple - -import numpy as np - -from ria_toolkit_oss.datatypes import Annotation, Recording - - -def detect_signals_energy( - recording: Recording, - k: int = 10, - threshold_factor: float = 1.2, - window_size: int = 200, - min_distance: int = 5000, - label: str = "signal", - annotation_type: str = "standalone", - freq_method: str = "nbw", - nfft: int = 65536, - obw_power: float = 0.9999, -) -> Recording: - """ - Detect signal bursts using energy-based method with adaptive noise floor estimation. - - This algorithm smooths the signal with a moving average filter, estimates the noise - floor from k segments, applies a threshold to detect regions above noise, and merges - nearby detections. Detected time boundaries are then assigned frequency bounds based - on the selected frequency method. - - Time Detection Algorithm: - 1. Smooth signal using moving average (envelope detection) - 2. Divide smoothed signal into k segments - 3. Estimate noise floor as median of segment mean powers - 4. Detect regions where power exceeds threshold_factor * noise_floor - 5. Merge regions closer than min_distance samples - - Frequency Bounding (freq_method): - - 'nbw': Nominal bandwidth (OBW + center frequency) - DEFAULT - - 'obw': Occupied bandwidth (99.99% power, includes siedelobes) - - 'full-detected': Lowest to highest spectral component - - 'full-bandwidth': Entire Nyquist span (center_freq ± sample_rate/2) - - :param recording: Recording to analyze - :type recording: Recording - :param k: Number of segments for noise floor estimation (default: 10) - :type k: int - :param threshold_factor: Threshold multiplier above noise floor (typical: 1.2-2.0, default: 1.2) - :type threshold_factor: float - :param window_size: Moving average window size in samples (default: 200) - :type window_size: int - :param min_distance: Minimum distance between separate signals in samples (default: 5000) - :type min_distance: int - :param label: Label for detected annotations (default: "signal") - :type label: str - :param annotation_type: Annotation type (standalone, parallel, intersection, default: standalone) - :type annotation_type: str - :param freq_method: How to calculate frequency bounds (default: 'nbw') - :type freq_method: str - :param nfft: FFT size for frequency calculations (default: 65536) - :type nfft: int - :param obw_power: Power percentage for OBW (0.9999 = 99.99%, default: 0.9999) - :type obw_power: float - - :returns: New Recording with added annotations - :rtype: Recording - - **Example**:: - - >>> from utils.io import load_recording - >>> from utils.annotations import detect_signals_energy - >>> recording = load_recording("capture.sigmf") - - >>> # Detect with NBW frequency bounds (default, best for real signals) - >>> annotated = detect_signals_energy(recording, label="burst") - - >>> # Detect with OBW (more conservative, includes siedelobes) - >>> annotated = detect_signals_energy( - ... recording, label="burst", freq_method="obw" - ... ) - - >>> # Detect with full detected range (captures all spectral components) - >>> annotated = detect_signals_energy( - ... recording, label="burst", freq_method="full-detected" - ... ) - """ - # Extract signal data (use first channel only) - signal = recording.data[0] - - # Smooth signal using moving average filter (envelope detection) - smoothed_signal = np.convolve(np.abs(signal), np.ones(window_size) / window_size, mode="valid") - - # Estimate noise floor using segment-based median (robust to signal presence) - segment_length = len(smoothed_signal) // k - segment_powers = [np.mean(smoothed_signal[i * segment_length : (i + 1) * segment_length] ** 2) for i in range(k)] - noise_floor = np.median(segment_powers) - threshold = noise_floor * threshold_factor - - # Detect signal boundaries (regions above threshold) - boundaries = [] - start = None - - for i, power in enumerate(smoothed_signal**2): - if power > threshold and start is None: - # Signal starts - start = i - elif power <= threshold and start is not None: - # Signal ends - boundaries.append((start, i - start)) - start = None - - # Handle case where signal extends to end - if start is not None: - boundaries.append((start, len(smoothed_signal) - start)) - - # Merge boundaries that are closer than min_distance - merged_boundaries = [] - if boundaries: - start, length = boundaries[0] - for next_start, next_length in boundaries[1:]: - if next_start - (start + length) < min_distance: - # Merge with current boundary - length = next_start + next_length - start - else: - # Save current and start new boundary - merged_boundaries.append((start, length)) - start, length = next_start, next_length - # Add final boundary - merged_boundaries.append((start, length)) - - # Create annotations from detected boundaries - sample_rate = recording.metadata["sample_rate"] - center_frequency = recording.metadata.get("center_frequency", 0) - - # Validate frequency method - valid_freq_methods = ["nbw", "obw", "full-detected", "full-bandwidth"] - if freq_method not in valid_freq_methods: - raise ValueError(f"Invalid freq_method '{freq_method}'. " f"Must be one of: {', '.join(valid_freq_methods)}") - - annotations = [] - for start_sample, sample_count in merged_boundaries: - # Calculate frequency bounds based on method - freq_lower, freq_upper = calculate_frequency_bounds( - freq_method, center_frequency, sample_rate, nfft, signal, start_sample, sample_count, obw_power - ) - # Build comment JSON with type metadata - comment_data = { - "type": annotation_type, - "generator": "energy_detector", - "freq_method": freq_method, - "params": { - "threshold_factor": threshold_factor, - "window_size": window_size, - "noise_floor": float(noise_floor), - "threshold": float(threshold), - }, - } - - anno = Annotation( - sample_start=start_sample, - sample_count=sample_count, - freq_lower_edge=freq_lower, - freq_upper_edge=freq_upper, - label=label, - comment=json.dumps(comment_data), - detail={"generator": "energy_detector", "freq_method": freq_method}, - ) - annotations.append(anno) - - # Return new Recording with combined annotations - return Recording(data=recording.data, metadata=recording.metadata, annotations=recording.annotations + annotations) - - -def calculate_occupied_bandwidth( - signal: np.ndarray, - sampling_rate: float, - nfft: int = 65536, - power_percentage: float = 0.9999, - start_offset: int = 1000, -) -> Tuple[float, float, float]: - """ - Calculate occupied bandwidth following ITU-R SM.328 standard. - - Occupied bandwidth (OBW) is defined as the bandwidth containing a specified - percentage of the total signal power. This implementation uses FFT-based - power spectral density analysis. - - The default power_percentage of 99.99% (0.9999) captures the main lobe and - first sidelobe, providing a realistic measure of actual spectral occupancy. - - ITU-R SM.328 defines OBW as bandwidth containing 99% of power, but this - implementation allows customization. - - :param signal: Complex IQ signal samples - :type signal: np.ndarray - :param sampling_rate: Sample rate in Hz - :type sampling_rate: float - :param nfft: FFT size (larger = better frequency resolution) - :type nfft: int - :param power_percentage: Fraction of power to contain (0.99 = 99%, 0.9999 = 99.99%) - :type power_percentage: float - :param start_offset: Skip this many samples at start (avoid transients) - :type start_offset: int - - :returns: Tuple of (occupied_bandwidth_hz, lower_edge_hz, upper_edge_hz) - :rtype: Tuple[float, float, float] - - :raises ValueError: If signal is too short for nfft + start_offset - - **Example**:: - - >>> import numpy as np - >>> from utils.annotations import calculate_occupied_bandwidth - >>> # Generate 1 MHz QPSK signal at 10 Msps - >>> signal = np.random.randn(100000) + 1j * np.random.randn(100000) - >>> obw, f_lower, f_upper = calculate_occupied_bandwidth( - ... signal, sampling_rate=10e6, power_percentage=0.99 - ... ) - >>> print(f"99% OBW: {obw/1e6:.3f} MHz") - >>> print(f"Range: {f_lower/1e6:.3f} to {f_upper/1e6:.3f} MHz") - - **References**: - ITU-R SM.328: "Spectra and bandwidth of emissions" - FCC Part 15: Uses 99% power containment for bandwidth limits - """ - # Validate input - if len(signal) < nfft + start_offset: - raise ValueError( - f"Signal too short: need {nfft + start_offset} samples, " - f"got {len(signal)}. Reduce nfft or start_offset." - ) - - # Extract segment (skip transients at start) - signal_segment = signal[start_offset : nfft + start_offset] - - # Compute FFT and power spectral density - freq_spectrum = np.fft.fft(signal_segment, n=nfft) - psd = np.abs(freq_spectrum) ** 2 - - # Shift to center DC at middle (makes freq interpretation easier) - psd_shifted = np.fft.fftshift(psd) - freq_bins = np.fft.fftshift(np.fft.fftfreq(nfft, 1 / sampling_rate)) - - # Compute total power - total_power = np.sum(psd_shifted) - - # Find frequency range containing specified power percentage - # Use cumulative sum to find edges - cumulative_power = np.cumsum(psd_shifted) - - # Lower edge: where cumulative power reaches (1 - power_percentage) / 2 - # Upper edge: where cumulative power reaches 1 - (1 - power_percentage) / 2 - # This centers the power percentage window - lower_threshold = total_power * (1 - power_percentage) / 2 - upper_threshold = total_power * (1 - (1 - power_percentage) / 2) - - # Find indices - lower_idx = np.where(cumulative_power >= lower_threshold)[0][0] - upper_idx = np.where(cumulative_power <= upper_threshold)[0][-1] - - # Get frequency values - lower_freq = freq_bins[lower_idx] - upper_freq = freq_bins[upper_idx] - - # Calculate occupied bandwidth - occupied_bandwidth = upper_freq - lower_freq - - return occupied_bandwidth, lower_freq, upper_freq - - -def calculate_nominal_bandwidth( - signal: np.ndarray, - sampling_rate: float, - nfft: int = 65536, - power_percentage: float = 0.9999, - start_offset: int = 1000, -) -> Tuple[float, float]: - """ - Calculate nominal bandwidth and center frequency. - - Nominal bandwidth (NBW) is the occupied bandwidth along with the center - frequency of the signal's spectral occupancy. Useful for characterizing - signals with unknown or drifting center frequencies. - - :param signal: Complex IQ signal samples - :type signal: np.ndarray - :param sampling_rate: Sample rate in Hz - :type sampling_rate: float - :param nfft: FFT size - :type nfft: int - :param power_percentage: Fraction of power to contain - :type power_percentage: float - :param start_offset: Skip samples at start - :type start_offset: int - - :returns: Tuple of (nominal_bandwidth_hz, center_frequency_hz) - :rtype: Tuple[float, float] - - **Example**:: - - >>> from utils.annotations import calculate_nominal_bandwidth - >>> nbw, center = calculate_nominal_bandwidth(signal, sampling_rate=10e6) - >>> print(f"NBW: {nbw/1e6:.3f} MHz, Center: {center/1e6:.3f} MHz") - """ - obw, lower_freq, upper_freq = calculate_occupied_bandwidth( - signal, sampling_rate, nfft, power_percentage, start_offset - ) - - # Center frequency is midpoint of occupied band - center_freq = (lower_freq + upper_freq) / 2 - - return obw, center_freq - - -def calculate_full_detected_bandwidth( - signal: np.ndarray, - sampling_rate: float, - nfft: int = 65536, - start_offset: int = 1000, -) -> Tuple[float, float, float]: - """ - Calculate frequency range from lowest to highest spectral component. - - Unlike OBW/NBW which define a power-based bandwidth, this calculates - the absolute frequency span from the lowest non-zero spectral component - to the highest non-zero component. - - Useful for: - - Signals with spectral gaps - - Multiple parallel signals (captures all of them) - - Understanding total occupied spectrum vs. actual bandwidth - - :param signal: Complex IQ signal samples - :type signal: np.ndarray - :param sampling_rate: Sample rate in Hz - :type sampling_rate: float - :param nfft: FFT size - :type nfft: int - :param start_offset: Skip samples at start - :type start_offset: int - - :returns: Tuple of (bandwidth_hz, lower_freq_hz, upper_freq_hz) - :rtype: Tuple[float, float, float] - - **Example**:: - - >>> # Signal with two components at different frequencies - >>> bw, f_low, f_high = calculate_full_detected_bandwidth( - ... signal, sampling_rate=10e6, nfft=65536 - ... ) - >>> print(f"Full span: {f_low/1e6:.3f} to {f_high/1e6:.3f} MHz") - """ - # Validate input - if len(signal) < nfft + start_offset: - raise ValueError( - f"Signal too short: need {nfft + start_offset} samples, " - f"got {len(signal)}. Reduce nfft or start_offset." - ) - - # Extract segment - signal_segment = signal[start_offset : nfft + start_offset] - - # Compute FFT and power spectral density - freq_spectrum = np.fft.fft(signal_segment, n=nfft) - psd = np.abs(freq_spectrum) ** 2 - - # Shift to center DC - psd_shifted = np.fft.fftshift(psd) - freq_bins = np.fft.fftshift(np.fft.fftfreq(nfft, 1 / sampling_rate)) - - # Find noise floor (mean of lowest 10% of bins) - noise_floor = np.mean(np.sort(psd_shifted)[: int(len(psd_shifted) * 0.1)]) - - # Find all bins above noise floor - above_noise = np.where(psd_shifted > noise_floor * 1.5)[0] - - if len(above_noise) == 0: - # No signal above noise, return zero bandwidth - return 0.0, 0.0, 0.0 - - # Get frequency range of signal components - lower_idx = above_noise[0] - upper_idx = above_noise[-1] - - lower_freq = freq_bins[lower_idx] - upper_freq = freq_bins[upper_idx] - - bandwidth = upper_freq - lower_freq - - return bandwidth, lower_freq, upper_freq - - -def annotate_with_obw( - recording: Recording, - label: str = "signal", - annotation_type: str = "standalone", - nfft: int = 65536, - power_percentage: float = 0.9999, - start_offset: int = 1000, -) -> Recording: - """ - Create a single annotation spanning the occupied bandwidth of the entire recording. - - Analyzes the full recording to find its occupied bandwidth and creates an annotation - covering that frequency range for the entire time duration. - - :param recording: Recording to analyze - :type recording: Recording - :param label: Annotation label - :type label: str - :param annotation_type: Annotation type - :type annotation_type: str - :param nfft: FFT size - :type nfft: int - :param power_percentage: Power percentage for OBW calculation - :type power_percentage: float - :param start_offset: Sample offset - :type start_offset: int - - :returns: Recording with OBW annotation added - :rtype: Recording - - **Example**:: - - >>> from utils.annotations import annotate_with_obw - >>> annotated = annotate_with_obw(recording, label="signal_obw") - """ - signal = recording.data[0] - sample_rate = recording.metadata["sample_rate"] - center_freq = recording.metadata.get("center_frequency", 0) - - # Calculate OBW - obw, lower_offset, upper_offset = calculate_occupied_bandwidth( - signal, sample_rate, nfft, power_percentage, start_offset - ) - - # Convert baseband offsets to absolute frequencies - freq_lower = center_freq + lower_offset - freq_upper = center_freq + upper_offset - - # Create comment JSON - comment_data = { - "type": annotation_type, - "generator": "obw_annotator", - "obw_hz": float(obw), - "power_percentage": power_percentage, - "params": {"nfft": nfft, "start_offset": start_offset}, - } - - # Create annotation spanning entire recording - anno = Annotation( - sample_start=0, - sample_count=len(signal), - freq_lower_edge=freq_lower, - freq_upper_edge=freq_upper, - label=label, - comment=json.dumps(comment_data), - detail={"generator": "obw_annotator", "obw_hz": float(obw)}, - ) - - return Recording(data=recording.data, metadata=recording.metadata, annotations=recording.annotations + [anno]) - - -def calculate_frequency_bounds( - freq_method, center_frequency, sample_rate, nfft, signal, start_sample, sample_count, obw_power -): - if freq_method == "full-bandwidth": - # Full Nyquist span - freq_lower = center_frequency - (sample_rate / 2) - freq_upper = center_frequency + (sample_rate / 2) - else: - # Extract segment for frequency analysis - segment_start = start_sample - segment_end = min(start_sample + sample_count, len(signal)) - segment = signal[segment_start:segment_end] - - if len(segment) >= nfft: - if freq_method == "nbw": - # Nominal bandwidth (OBW + center frequency) - try: - nbw_val, center = calculate_nominal_bandwidth(segment, sample_rate, nfft, obw_power) - freq_lower = center_frequency + center - (nbw_val / 2) - freq_upper = center_frequency + center + (nbw_val / 2) - except (ValueError, IndexError): - # Fallback if calculation fails - freq_lower = center_frequency - (sample_rate / 2) - freq_upper = center_frequency + (sample_rate / 2) - - elif freq_method == "obw": - # Occupied bandwidth - try: - _, f_lower, f_upper = calculate_occupied_bandwidth(segment, sample_rate, nfft, obw_power) - freq_lower = center_frequency + f_lower - freq_upper = center_frequency + f_upper - except (ValueError, IndexError): - # Fallback if calculation fails - freq_lower = center_frequency - (sample_rate / 2) - freq_upper = center_frequency + (sample_rate / 2) - - elif freq_method == "full-detected": - # Full detected range (lowest to highest component) - try: - _, f_lower, f_upper = calculate_full_detected_bandwidth(segment, sample_rate, nfft) - freq_lower = center_frequency + f_lower - freq_upper = center_frequency + f_upper - except (ValueError, IndexError): - # Fallback if calculation fails - freq_lower = center_frequency - (sample_rate / 2) - freq_upper = center_frequency + (sample_rate / 2) - else: - # Segment too short for FFT, use full bandwidth - freq_lower = center_frequency - (sample_rate / 2) - freq_upper = center_frequency + (sample_rate / 2) - - return freq_lower, freq_upper diff --git a/src/ria_toolkit_oss/annotations/parallel_signal_separator.py b/src/ria_toolkit_oss/annotations/parallel_signal_separator.py deleted file mode 100644 index 90c2744..0000000 --- a/src/ria_toolkit_oss/annotations/parallel_signal_separator.py +++ /dev/null @@ -1,484 +0,0 @@ -""" -Parallel signal separation for multi-component frequency-offset signals. - -Provides methods to detect and separate overlapping frequency-domain signals -that occupy the same time window but different frequency bands. - -This module implements **spectral peak detection** to identify distinct frequency -components and split single time-domain annotations into frequency-specific -sub-annotations. - -**Key Design Decisions** (per Codex review): - -1. **Complex IQ Support**: Uses `scipy.signal.welch` with `return_onesided=False` - for proper complex signal handling. Window length automatically adapts to - signal length via `nperseg=min(nfft, len(signal))` to handle bursts >> from utils.annotations import find_spectral_components - >>> # Detect the two distinct channels (returns relative frequencies) - >>> components = find_spectral_components(signal, sampling_rate=20e6) - >>> print(f"Found {len(components)} components") - Found 2 components - -The module is designed to work with detected time-domain annotations, -allowing splitting of overlapping signals into separate training samples. -""" - -import json -from typing import List, Optional, Tuple - -import numpy as np -from scipy import ndimage -from scipy import signal as scipy_signal - -from ria_toolkit_oss.datatypes import Annotation, Recording - - -def find_spectral_components( - signal_data: np.ndarray, - sampling_rate: float, - nfft: int = 65536, - noise_threshold_db: Optional[float] = None, - min_component_bw: float = 50e3, - power_threshold: float = 0.99, -) -> List[Tuple[float, float, float]]: - """ - Find distinct frequency components using spectral peak detection. - - Identifies separate frequency components in a signal by analyzing the power - spectral density and finding peaks corresponding to distinct signals. This is - useful for separating parallel signals that occupy different frequency bands. - - **Frequency Representation**: Returns frequencies in **baseband/relative** Hz - (centered at 0). To get absolute RF frequencies, add center_frequency_hz from - recording metadata to all returned values. - - Algorithm: - 1. Compute power spectral density using Welch (properly handles complex IQ) - 2. Auto-estimate noise floor from data if not specified - 3. Smooth PSD to reduce spurious peaks - 4. Find local maxima above noise floor - 5. Estimate bandwidth per peak using -3dB (fallback: cumulative power) - 6. Filter components below minimum bandwidth threshold - - :param signal_data: Complex IQ signal samples (np.complex64/128) - :type signal_data: np.ndarray - :param sampling_rate: Sample rate in Hz - :type sampling_rate: float - :param nfft: FFT size / window length for Welch. Automatically capped at - signal length to handle bursts (default: 65536) - :type nfft: int - :param noise_threshold_db: Minimum SNR threshold in dB. If None (default), - auto-estimates as np.percentile(psd_db, 10). - Adapt this across hardware (Pluto: ~-100, ThinkRF: ~-60). - :type noise_threshold_db: Optional[float] - :param min_component_bw: Minimum component bandwidth in Hz (default: 50 kHz) - :type min_component_bw: float - :param power_threshold: Cumulative power threshold for fallback bandwidth - estimation (default: 0.99 = 99% power, like OBW) - :type power_threshold: float - - :returns: List of (center_freq_hz, lower_freq_hz, upper_freq_hz) tuples. - **All frequencies are relative (baseband, 0-centered).** - Add recording metadata['center_frequency'] to get absolute RF frequencies. - :rtype: List[Tuple[float, float, float]] - - :raises ValueError: If signal has fewer than 256 samples - - **Example**:: - - >>> from utils.io import load_recording - >>> from utils.annotations import find_spectral_components - >>> recording = load_recording("capture.sigmf") - >>> segment = recording.data[0][start:end] - >>> # Components in relative (baseband) frequency - >>> components = find_spectral_components(segment, sampling_rate=20e6) - >>> for center_rel, lower_rel, upper_rel in components: - ... # Convert to absolute RF frequency - ... center_abs = recording.metadata['center_frequency'] + center_rel - ... print(f"Component @ {center_abs/1e9:.3f} GHz") - - **Key Implementation Notes**: - - 1. **Welch for complex signals**: Uses `return_onesided=False` to properly - handle complex IQ pairs. Window length auto-capped via - `nperseg = min(nfft, len(signal))` so bursts = threshold_3db)[0] - right_indices = np.where(psd_smooth[peak_idx:] >= threshold_3db)[0] - - if len(left_indices) > 0 and len(right_indices) > 0: - lower_idx = left_indices[0] - upper_idx = peak_idx + right_indices[-1] - lower_freq = freqs_shifted[lower_idx] - upper_freq = freqs_shifted[upper_idx] - bw_3db = upper_freq - lower_freq - - # Sanity check: if -3dB BW is unreasonably wide (>sampling_rate/2), - # fallback to power-based estimation - if bw_3db > sampling_rate / 4: - # Strategy 2: Use cumulative power (like OBW) for wide signals - psd_around_peak = psd_smooth[max(0, peak_idx - 100) : peak_idx + 100] - if len(psd_around_peak) > 0: - total_power = np.sum(psd_around_peak) - cumulative = np.cumsum(psd_around_peak) - # Find edges where cumulative power = power_threshold * total - # threshold_power = power_threshold * total_power - - # Crude but effective: find where we cross the threshold - left_local = np.where(cumulative >= (1 - power_threshold) / 2 * total_power)[0] - right_local = np.where(cumulative <= (1 + power_threshold) / 2 * total_power)[0] - if len(left_local) > 0 and len(right_local) > 0: - lower_idx_local = left_local[0] - upper_idx_local = right_local[-1] - lower_freq = freqs_shifted[max(0, peak_idx - 100) + lower_idx_local] - upper_freq = freqs_shifted[max(0, peak_idx - 100) + upper_idx_local] - else: - # Fallback: estimate bandwidth from spectral width heuristic - lower_freq = peak_freq - min_component_bw / 2 - upper_freq = peak_freq + min_component_bw / 2 - - center_freq = (lower_freq + upper_freq) / 2 - bw = upper_freq - lower_freq - - # Filter by minimum bandwidth - if bw >= min_component_bw: - # All frequencies are relative (baseband) - components.append((center_freq, lower_freq, upper_freq)) - - return components - - -def split_annotation_by_components( - annotation: Annotation, - signal: np.ndarray, - sampling_rate: float, - center_frequency_hz: float = 0.0, - nfft: int = 65536, - noise_threshold_db: Optional[float] = None, - min_component_bw: float = 50e3, -) -> List[Annotation]: - """ - Split an annotation into multiple annotations by detected frequency components. - - Takes an existing annotation spanning multiple frequency components and - analyzes the frequency content to create separate sub-annotations for - each distinct frequency component. - - **Use case**: Energy detection found a time window with 2-3 parallel WiFi - channels. This function splits it into separate annotations per channel. - - **Frequency Handling**: `find_spectral_components` returns relative (baseband) - frequencies. This function adds `center_frequency_hz` to convert to absolute - RF frequencies for SigMF annotation bounds. This ensures correct frequency - context across baseband and RF domains. - - :param annotation: Original annotation to split - :type annotation: Annotation - :param signal: Full signal array (complex IQ) - :type signal: np.ndarray - :param sampling_rate: Sample rate in Hz - :type sampling_rate: float - :param center_frequency_hz: RF center frequency to add to relative frequencies - from peak detection (default: 0.0 = baseband) - :type center_frequency_hz: float - :param nfft: FFT size for analysis (default: 65536, auto-capped at signal length) - :type nfft: int - :param noise_threshold_db: Noise floor threshold in dB. If None (default), - auto-estimates from data. - :type noise_threshold_db: Optional[float] - :param min_component_bw: Minimum component bandwidth in Hz (default: 50 kHz) - :type min_component_bw: float - - :returns: List of new annotations (one per detected component). - Returns empty list if no components found or segment too short. - :rtype: List[Annotation] - - **Example**:: - - >>> from utils.io import load_recording - >>> from utils.annotations import split_annotation_by_components - >>> recording = load_recording("capture.sigmf") - >>> # Original annotation spans multiple channels - >>> original = recording.annotations[0] - >>> # Split using RF center frequency from metadata - >>> components = split_annotation_by_components( - ... original, - ... recording.data[0], - ... recording.metadata['sample_rate'], - ... center_frequency_hz=recording.metadata.get('center_frequency', 0.0) - ... ) - >>> print(f"Split into {len(components)} components") - Split into 2 components - - **Algorithm**: - 1. Extract segment corresponding to annotation time bounds - 2. Find frequency components in that segment (returns relative frequencies) - 3. Add center_frequency_hz to get absolute RF frequencies - 4. Create new annotation for each component - 5. Preserve original metadata (label, type, etc.) - 6. Add component info to comment JSON - - **Notes**: - - Original annotation is not modified - - Returns empty list if segment too short (<256 samples) - - Segments Recording: - """ - Split multiple annotations in a recording by frequency components. - - Processes specified annotations (or all if indices=None), replacing each - with its frequency-separated components. Uses RF center_frequency from - recording metadata for proper absolute frequency conversion. - - :param recording: Recording to process - :type recording: Recording - :param indices: Annotation indices to split (None = all, default: None). - Use indices=[] to skip splitting (returns unchanged recording). - :type indices: Optional[List[int]] - :param nfft: FFT size for spectral analysis (default: 65536, - auto-capped at signal segment length) - :type nfft: int - :param noise_threshold_db: Noise floor threshold in dB. If None (default), - auto-estimates from each segment. - :type noise_threshold_db: Optional[float] - :param min_component_bw: Minimum component bandwidth in Hz (default: 50 kHz). - Components narrower than this are filtered out. - :type min_component_bw: float - - :returns: New Recording with split annotations - :rtype: Recording - - **Example**:: - - >>> from utils.io import load_recording - >>> from utils.annotations import split_recording_annotations - >>> recording = load_recording("capture.sigmf") - >>> # Split all annotations - >>> split_rec = split_recording_annotations(recording) - >>> print(f"Original: {len(recording.annotations)} annotations") - >>> print(f"Split: {len(split_rec.annotations)} annotations") - Original: 5 annotations - Split: 9 annotations - - **Algorithm**: - 1. For each annotation in indices (or all if None): - 2. Call split_annotation_by_components with RF center_frequency - 3. If components found, replace annotation with components - 4. If no components found, keep original annotation - 5. Annotations not in indices are kept unchanged - - **Notes**: - - Original recording is not modified - - Returns empty Recording.annotations if recording has no annotations - - RF center_frequency from metadata ensures correct absolute frequencies - - If an annotation can't be split (too short, wrong format), original kept - """ - if indices is None: - # Split all annotations - indices = list(range(len(recording.annotations))) - - if not recording.annotations: - # No annotations to split - return recording - - signal = recording.data[0] - sample_rate = recording.metadata["sample_rate"] - center_frequency = recording.metadata.get("center_frequency", 0.0) - - # Build new annotation list - new_annotations = [] - for i, anno in enumerate(recording.annotations): - if i in indices: - # Attempt to split this annotation - try: - components = split_annotation_by_components( - anno, - signal, - sample_rate, - center_frequency_hz=center_frequency, - nfft=nfft, - noise_threshold_db=noise_threshold_db, - min_component_bw=min_component_bw, - ) - if components: - # Split successful, use components - new_annotations.extend(components) - else: - # No components found, keep original - new_annotations.append(anno) - except Exception: - # Split failed for any reason, keep original - new_annotations.append(anno) - else: - # Not in split list, keep as-is - new_annotations.append(anno) - - return Recording(data=recording.data, metadata=recording.metadata, annotations=new_annotations) diff --git a/src/ria_toolkit_oss/annotations/qualify_slice.py b/src/ria_toolkit_oss/annotations/qualify_slice.py deleted file mode 100644 index 2336fe5..0000000 --- a/src/ria_toolkit_oss/annotations/qualify_slice.py +++ /dev/null @@ -1,35 +0,0 @@ -import numpy as np - -from ria_toolkit_oss.datatypes import Recording - - -def qualify_slice_from_annotations(recording: Recording, slice_length: int): - """ - Slice a recording into many smaller recordings, - discarding any slices which do not have annotations that apply to those samples. - Used together with an annotation based qualifier. - - :param recording: The recording to slice. - :type recording: Recording - :param slice_length: The length in samples of a slice. - :type slice_length: int""" - - if len(recording.annotations) == 0: - print("Warning, no annotations.") - - annotation_mask = np.zeros(len(recording.data[0])) - - for annotation in recording.annotations: - annotation_mask[annotation.sample_start : annotation.sample_start + annotation.sample_count] = 1 - - output_recordings = [] - - for i in range((len(recording.data[0]) // slice_length) - 1): - start_index = slice_length * i - end_index = slice_length * (i + 1) - - if 1 in annotation_mask[start_index:end_index]: - sl = recording.data[:, start_index:end_index] - output_recordings.append(Recording(data=sl, metadata=recording.metadata)) - - return output_recordings diff --git a/src/ria_toolkit_oss/annotations/signal_isolation.py b/src/ria_toolkit_oss/annotations/signal_isolation.py deleted file mode 100644 index 47852ae..0000000 --- a/src/ria_toolkit_oss/annotations/signal_isolation.py +++ /dev/null @@ -1,97 +0,0 @@ -import numpy as np -from scipy.signal import butter, lfilter - -from ria_toolkit_oss.datatypes.annotation import Annotation -from ria_toolkit_oss.datatypes.recording import Recording - - -def isolate_signal(recording: Recording, annotation: Annotation) -> Recording: - """ - Slice, filter and frequency shift the input recording according to the bounding box defined by the annotation. - - :param recording: The input Recording to be sliced. - :type recording: Recording - :param annotation: The Annotation object defining the area of the recording to isolate. - :type annotation: Annotation - :param decimate: Decimate the input signal after filtering to reduce the sample rate. - :type decimate: bool - - :returns: The subsection of the original recording defined by the annotation. - :rtype: Recording""" - - sample_start = max(0, annotation.sample_start) - sample_stop = min(len(recording), annotation.sample_start + annotation.sample_count) - - anno_base_center_freq = (annotation.freq_lower_edge + annotation.freq_upper_edge) / 2 - recording.metadata.get( - "center_frequency", 0 - ) - - anno_bw = annotation.freq_upper_edge - annotation.freq_lower_edge - - signal_slice = recording.data[0, sample_start:sample_stop] - - # normalize - signal_slice = signal_slice / np.max(np.abs(signal_slice)) - - isolation_bw = anno_bw - - # frequency shift the center of the box about zero - shifted_signal_slice = frequency_shift_iq_samples( - iq_samples=signal_slice, - sample_rate=recording.metadata["sample_rate"], - shift_frequency=-1 * anno_base_center_freq, - ) - - # filter - if isolation_bw < recording.metadata["sample_rate"] - 1: - filtered_signal = apply_complex_lowpass_filter( - signal=shifted_signal_slice, cutoff_frequency=isolation_bw, sample_rate=recording.metadata["sample_rate"] - ) - - else: - filtered_signal = shifted_signal_slice - - output = Recording(data=[filtered_signal], metadata=recording.metadata) - return output - - -def frequency_shift_iq_samples(iq_samples, sample_rate, shift_frequency): - # Number of samples - num_samples = len(iq_samples) - - # Create a time vector from 0 to the total duration in seconds - time_vector = np.arange(num_samples) / sample_rate - - # Generate the complex exponential for the frequency shift - complex_exponential = np.exp(1j * 2 * np.pi * shift_frequency * time_vector) - - # Apply the frequency shift to the IQ samples - shifted_samples = iq_samples * complex_exponential - - return shifted_samples - - -# Function to apply a lowpass Butterworth filter to a complex signal -def apply_complex_lowpass_filter(signal, cutoff_frequency, sample_rate, order=5): - # Design the lowpass filter - b, a = design_complex_lowpass_filter(cutoff_frequency, sample_rate, order) - - # Apply the lowpass filter - filtered_signal = lfilter(b, a, signal) - return filtered_signal - - -def design_complex_lowpass_filter(cutoff_frequency, sample_rate, order=5): - # Nyquist frequency for complex signals is the sample rate - nyquist = sample_rate - - # Ensure the cutoff frequency is positive and within the Nyquist limit - if cutoff_frequency <= 0 or cutoff_frequency > nyquist: - raise ValueError("Cutoff frequency must be between 0 and the Nyquist frequency.") - - # Normalize the cutoff frequency to the Nyquist frequency - cutoff_normalized = cutoff_frequency / nyquist - - # Create a Butterworth lowpass filter - b, a = butter(order, cutoff_normalized, btype="low") - return b, a diff --git a/src/ria_toolkit_oss/annotations/threshold_qualifier.py b/src/ria_toolkit_oss/annotations/threshold_qualifier.py deleted file mode 100644 index e20bc0e..0000000 --- a/src/ria_toolkit_oss/annotations/threshold_qualifier.py +++ /dev/null @@ -1,114 +0,0 @@ -import json -import warnings -from typing import Optional - -import numpy as np - -from ria_toolkit_oss.datatypes import Annotation, Recording - - -def _find_ranges(indices, window_size): - if len(indices) == 0: - return [] - - ranges = [] - start = indices[0] # Initialize the start of the first range - in_range = False # Track if we are currently in a valid range - - for i in range(1, len(indices)): - # Check if the current index is within `window_size` of the previous one - if indices[i] - indices[i - 1] <= window_size: - if not in_range: - # Start a new range - start = indices[i - 1] - in_range = True - else: - # If we were in a range, close it - if in_range: - ranges.append((start, indices[i - 1])) - in_range = False - - # Handle the last range if it is still open - if in_range: - ranges.append((start, indices[-1])) - - return ranges - - -def threshold_qualifier( - recording: Recording, - threshold: float, - window_size: Optional[int] = 1024, - label: Optional[str] = "signal", - annotation_type: Optional[str] = "standalone", -): - """ - Annotate a recording with bounding boxes labeled "signal" for regions above a threshold. - Threshold is defined as a fraction of the maximum sample magnitude. - This algorithm searches for samples above the threshold and combines them into ranges if they - are within window_size of each other. - - :param recording: The recording to annotate. - :type recording: utils.data.Recording - :param threshold: The threshold value, range 0-1. - The actual threshold value used is this number multiplied by the max sample magnitude. - :type threshold: float - :param window_size: The size of the sliding window. Defaults to 1024 samples. - :type window_size: int - :param label: The label of the output annotations. Defaults to "signal". - :type label: str - :param annotation_type: Annotation type (standalone, parallel, intersection). - :type annotation_type: str - - :returns: Recording with added annotations. - :rtype: Recording - """ - - sample_data = recording.data - if sample_data.shape[0] > 1: - warnings.warn( - "Warning: Multichannel recording input to threshold_qualifier. " - "Only the first channel will be considered by this algorithm." - ) - - # do the absolute value calculation once for efficiency - abs_channel_data = np.abs(sample_data[0]) - max_sample = np.max(abs_channel_data) - threshold_value = threshold * max_sample - indices = np.where(abs_channel_data > threshold_value)[0] - ranges = _find_ranges(indices=indices, window_size=window_size) - - annotations = [] - - # to input freq upper and lower based on center frequency and sample rate - sample_rate = recording.metadata["sample_rate"] - center_frequency = recording.metadata.get("center_frequency", 0) - - for range in ranges: - start, stop = range - - # Build comment JSON with type metadata - comment_data = { - "type": annotation_type, - "generator": "threshold_qualifier", - "params": { - "threshold": threshold, - "threshold_value": float(threshold_value), - "max_sample": float(max_sample), - "window_size": window_size, - }, - } - - anno = Annotation( - sample_start=start, - sample_count=stop - start, - freq_lower_edge=center_frequency - (sample_rate / 2), - freq_upper_edge=center_frequency + (sample_rate / 2), - label=label, - comment=json.dumps(comment_data), - detail={"generator": "threshold_qualifier"}, - ) - - annotations.append(anno) - - return Recording(data=recording.data, metadata=recording.metadata, annotations=recording.annotations + annotations) diff --git a/src/ria_toolkit_oss/io/common.py b/src/ria_toolkit_oss/io/common.py new file mode 100644 index 0000000..5dcab12 --- /dev/null +++ b/src/ria_toolkit_oss/io/common.py @@ -0,0 +1,83 @@ +""" +Utilities for common input/output operations. +""" + +import os + +import ria_toolkit_oss + + +def exists(fid: str | os.PathLike) -> bool: + """Check if the file or directory exists. + + .. todo:: + + This method is not yet implemented. + + :param fid: The path to the file or directory to check for existence. + :type fid: str or os.PathLike + + :return: True if the file or directory exists, False otherwise. + :rtype: bool + """ + raise NotImplementedError + + +def validate(fid: str | os.PathLike) -> bool: + """Validate the contents of the file or directory to ensure it is not corrupted, + the correct format for its extension, and readable RIA. + + .. todo:: + + This method is not yet implemented. + + :param fid: The path to the file or directory to validate. + :type fid: str or os.PathLike + + :return: True if the file or directory is valid and readable, False otherwise. + """ + raise NotImplementedError + + +def move(source_path: str | os.PathLike, destination_path: str | os.PathLike, copy: bool = False) -> None: + """Recursively move a file or directory at source_path to destination_path. + + .. todo:: + + This method is not yet implemented. + + :param source_path: The path to the source file or directory. + :type source_path: str or os.PathLike + :param destination_path: The path to the destination directory. + :type destination_path: str or os.PathLike + :param copy: If True, perform a copy instead of a move. Default is False. + :type copy: bool, optional + + :raises RuntimeError: If the move was unsuccessful. + + :return: None + """ + if copy: + ria_toolkit_oss.io.common.copy(source_path=source_path, destination_path=destination_path) + return + + raise NotImplementedError + + +def copy(source_path: str | os.PathLike, destination_path: str | os.PathLike) -> None: + """Copy the file or directory at source_path to destination_path. + + .. todo:: + + This function is not yet implemented. + + :param source_path: The path to the source file or directory. + :type source_path: str or os.PathLike + :param destination_path: The path to the destination directory. + :type destination_path: str or os.PathLike + + :raises RuntimeError: If the copy was unsuccessful. + + :return: None + """ + raise NotImplementedError diff --git a/ria_toolkit_oss_cli/__init__.py b/src/ria_toolkit_oss/ria_toolkit_oss_cli/__init__.py similarity index 100% rename from ria_toolkit_oss_cli/__init__.py rename to src/ria_toolkit_oss/ria_toolkit_oss_cli/__init__.py diff --git a/ria_toolkit_oss_cli/cli.py b/src/ria_toolkit_oss/ria_toolkit_oss_cli/cli.py similarity index 87% rename from ria_toolkit_oss_cli/cli.py rename to src/ria_toolkit_oss/ria_toolkit_oss_cli/cli.py index bf97dfa..8062f94 100644 --- a/ria_toolkit_oss_cli/cli.py +++ b/src/ria_toolkit_oss/ria_toolkit_oss_cli/cli.py @@ -4,7 +4,7 @@ This module contains the main group for the ria toolkit oss CLI. import click -from ria_toolkit_oss_cli.ria_toolkit_oss import commands +from ria_toolkit_oss.ria_toolkit_oss_cli.ria_toolkit_oss import commands @click.group() diff --git a/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/__init__.py b/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ria_toolkit_oss_cli/ria_toolkit_oss/capture.py b/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/capture.py similarity index 98% rename from ria_toolkit_oss_cli/ria_toolkit_oss/capture.py rename to src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/capture.py index f2781e6..b89dc46 100644 --- a/ria_toolkit_oss_cli/ria_toolkit_oss/capture.py +++ b/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/capture.py @@ -4,9 +4,9 @@ import os import click -from utils.io import to_blue, to_npy, to_sigmf, to_wav -from utils.io.recording import generate_filename -from utils.view.view_signal_simple import view_simple_sig +from ria_toolkit_oss.io import to_blue, to_npy, to_sigmf, to_wav +from ria_toolkit_oss.io.recording import generate_filename +from ria_toolkit_oss.view.view_signal_simple import view_simple_sig from .common import ( echo_progress, diff --git a/ria_toolkit_oss_cli/ria_toolkit_oss/combine.py b/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/combine.py similarity index 100% rename from ria_toolkit_oss_cli/ria_toolkit_oss/combine.py rename to src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/combine.py diff --git a/ria_toolkit_oss_cli/ria_toolkit_oss/commands.py b/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/commands.py similarity index 94% rename from ria_toolkit_oss_cli/ria_toolkit_oss/commands.py rename to src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/commands.py index 101be50..2223880 100644 --- a/ria_toolkit_oss_cli/ria_toolkit_oss/commands.py +++ b/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/commands.py @@ -3,7 +3,6 @@ This module contains all the CLI bindings for the utils package. """ -from .annotate import annotate from .capture import capture from .combine import combine from .convert import convert diff --git a/ria_toolkit_oss_cli/ria_toolkit_oss/common.py b/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/common.py similarity index 100% rename from ria_toolkit_oss_cli/ria_toolkit_oss/common.py rename to src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/common.py diff --git a/ria_toolkit_oss_cli/ria_toolkit_oss/config.py b/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/config.py similarity index 100% rename from ria_toolkit_oss_cli/ria_toolkit_oss/config.py rename to src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/config.py diff --git a/ria_toolkit_oss_cli/ria_toolkit_oss/convert.py b/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/convert.py similarity index 100% rename from ria_toolkit_oss_cli/ria_toolkit_oss/convert.py rename to src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/convert.py diff --git a/ria_toolkit_oss_cli/ria_toolkit_oss/discover.py b/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/discover.py similarity index 100% rename from ria_toolkit_oss_cli/ria_toolkit_oss/discover.py rename to src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/discover.py diff --git a/ria_toolkit_oss_cli/ria_toolkit_oss/generate.py b/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/generate.py similarity index 98% rename from ria_toolkit_oss_cli/ria_toolkit_oss/generate.py rename to src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/generate.py index 9a10d0f..83fe2e9 100644 --- a/ria_toolkit_oss_cli/ria_toolkit_oss/generate.py +++ b/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/generate.py @@ -7,15 +7,15 @@ import click import numpy as np import yaml -import utils.signal.basic_signal_generator as basic_gen -from utils.data import Recording -from utils.signal.block_gen.continuous_modulation.fsk_modulator import FSKModulator -from utils.signal.block_generator.basic import FrequencyShift -from utils.signal.block_generator.data_types import DataType -from utils.signal.block_generator.mapping.apsk_mapper import _APSKMapper -from utils.signal.block_generator.mapping.cross_qam_mapper import _CrossQAMMapper -from utils.signal.block_generator.mapping.mapper import Mapper -from utils.signal.block_generator.modulation import ( +import ria_toolkit_oss.signal.basic_signal_generator as basic_gen +from ria_toolkit_oss.datatypes import Recording +from ria_toolkit_oss.signal.block_gen.continuous_modulation.fsk_modulator import FSKModulator +from ria_toolkit_oss.signal.block_generator.basic import FrequencyShift +from ria_toolkit_oss.signal.block_generator.data_types import DataType +from ria_toolkit_oss.signal.block_generator.mapping.apsk_mapper import _APSKMapper +from ria_toolkit_oss.signal.block_generator.mapping.cross_qam_mapper import _CrossQAMMapper +from ria_toolkit_oss.signal.block_generator.mapping.mapper import Mapper +from ria_toolkit_oss.signal.block_generator.modulation import ( GMSKModulator, OOKModulator, OQPSKModulator, diff --git a/ria_toolkit_oss_cli/ria_toolkit_oss/init.py b/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/init.py similarity index 100% rename from ria_toolkit_oss_cli/ria_toolkit_oss/init.py rename to src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/init.py diff --git a/ria_toolkit_oss_cli/ria_toolkit_oss/split.py b/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/split.py similarity index 100% rename from ria_toolkit_oss_cli/ria_toolkit_oss/split.py rename to src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/split.py diff --git a/ria_toolkit_oss_cli/ria_toolkit_oss/transform.py b/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/transform.py similarity index 100% rename from ria_toolkit_oss_cli/ria_toolkit_oss/transform.py rename to src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/transform.py diff --git a/ria_toolkit_oss_cli/ria_toolkit_oss/transmit.py b/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/transmit.py similarity index 100% rename from ria_toolkit_oss_cli/ria_toolkit_oss/transmit.py rename to src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/transmit.py diff --git a/ria_toolkit_oss_cli/ria_toolkit_oss/view.py b/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/view.py similarity index 100% rename from ria_toolkit_oss_cli/ria_toolkit_oss/view.py rename to src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/view.py diff --git a/src/ria_toolkit_oss/view/__init__.py b/src/ria_toolkit_oss/view/__init__.py index f9db472..0b09e0a 100644 --- a/src/ria_toolkit_oss/view/__init__.py +++ b/src/ria_toolkit_oss/view/__init__.py @@ -4,9 +4,8 @@ recordings and radio datasets. """ __all__ = [ - "view_annotations", "view_channels", "view_sig", ] -from .view_signal import view_annotations, view_channels, view_sig +from .view_signal import view_channels, view_sig diff --git a/src/ria_toolkit_oss/view/dataset.py b/src/ria_toolkit_oss/view/dataset.py index ae41b7a..714174b 100644 --- a/src/ria_toolkit_oss/view/dataset.py +++ b/src/ria_toolkit_oss/view/dataset.py @@ -4,7 +4,7 @@ import matplotlib.pyplot as plt import numpy as np from matplotlib.backends.backend_pdf import PdfPages -from utils.io.recording import from_npy +from ria_toolkit_oss.io.recording import from_npy def create_dataset_pdf(dataset_path, output_path, div=64, metadata_keys=None): diff --git a/src/ria_toolkit_oss/view/recording.py b/src/ria_toolkit_oss/view/recording.py index 77d5282..381f07e 100644 --- a/src/ria_toolkit_oss/view/recording.py +++ b/src/ria_toolkit_oss/view/recording.py @@ -4,7 +4,7 @@ import scipy.signal as signal from plotly.graph_objs import Figure from scipy.fft import fft, fftshift -from utils.data import Recording +from ria_toolkit_oss.datatypes import Recording def spectrogram(rec: Recording, thumbnail: bool = False) -> Figure: diff --git a/src/ria_toolkit_oss/view/view_signal.py b/src/ria_toolkit_oss/view/view_signal.py index 2d94efa..1826962 100644 --- a/src/ria_toolkit_oss/view/view_signal.py +++ b/src/ria_toolkit_oss/view/view_signal.py @@ -38,6 +38,66 @@ def set_spines(ax, spines): ax.spines["bottom"].set_visible(False) ax.spines["left"].set_visible(False) +def view_channels( + recording: Recording, + output_path: Optional[str] = "images/signal.png", + title: Optional[str] = "Multichannel Signal Plot", +) -> None: + """Create a PNG of the recording samples, spectrogram, and constellation plot. + Plot is automatically saved to file at output_path. + + :param recording: The recording object to plot + :type recording: Recording + :param output_path: The path to save the image. Defaults to "images/signal.png". + :type output_path: str, optional + :param title: The plot title. Defaults to "Multichannel Signal Plot". + :type title: str, optional + + :return: None + + **Examples:** + + .. todo:: Usage examples coming soon. + """ + num_channels = recording.data.shape[0] + + fig, axes = plt.subplots(nrows=num_channels, ncols=2) + + fig.subplots_adjust(wspace=0.5, hspace=0.5) + + plt.style.use("dark_background") + + fig.suptitle(title, fontsize=16) + axes[0, 0].set_title("IQ Signal", color=COLORS["light"]) + axes[0, 1].set_title("Spectrogram", color=COLORS["light"]) + + linewidth = 0.5 + tick_fontsize = 4 + center_frequency = recording.metadata.get("center_frequency", 0) + sample_rate = recording.metadata.get("sample_rate", 1) + + sample_indexes = np.arange(0, len(recording.data[0]), 1) + t = sample_indexes / sample_rate + + for i in range(num_channels): + axes[i, 0].plot(t, np.real(recording.data[i]), linewidth=linewidth) + axes[i, 0].plot(t, np.imag(recording.data[i]), linewidth=linewidth) + axes[i, 1].specgram(recording.data[i], Fs=sample_rate, Fc=center_frequency) + axes[i, 0].tick_params(labelsize=tick_fontsize, colors=COLORS["light"]) + axes[i, 1].tick_params(labelsize=tick_fontsize, colors=COLORS["light"]) + axes[i, 0].set_ylabel("Amplitude", fontsize=6, color=COLORS["light"]) + axes[i, 1].set_ylabel("Freq (Hz)", fontsize=6, color=COLORS["light"]) + if i != num_channels - 1: + axes[i, 0].set_xticks([]) + axes[i, 1].set_xticks([]) + else: + axes[i, 0].set_xlabel("Time (s)", fontsize=6, color=COLORS["light"]) + axes[i, 1].set_xlabel("Time (s)", fontsize=6, color=COLORS["light"]) + + output_path, _ = set_path(output_path=output_path) + plt.savefig(output_path, dpi=1000) + print(f"Saved signal plot to {output_path}") + def view_sig( recording: Recording,