"""Split command - Split, trim, and extract portions of recordings.""" from pathlib import Path import click import numpy as np from ria_toolkit_oss.io import from_npy_legacy, load_recording from ria_toolkit_oss_cli.ria_toolkit_oss.common import ( detect_file_format, echo_progress, echo_verbose, format_sample_count, save_recording, ) def get_output_extension(format_name): """Get file extension for format name.""" extension_map = {"sigmf": ".sigmf", "npy": ".npy", "wav": ".wav", "blue": ".blue"} return extension_map[format_name] def validate_operation(split_at, split_every, split_duration, trim, extract_annotations): # Validate operation selection operations = sum( [split_at is not None, split_every is not None, split_duration is not None, trim, extract_annotations] ) if operations == 0: raise click.ClickException( "No operation specified. Use one of:\n" " --split-at SAMPLE\n" " --split-every N\n" " --split-duration SECONDS\n" " --trim (with --start and --length or --end)\n" " --extract-annotations" ) if operations > 1: raise click.ClickException( "Multiple operations specified. Use only one of:\n" " --split-at, --split-every, --split-duration, --trim, --extract-annotations" ) @click.command() @click.argument("input", type=click.Path(exists=True)) @click.option("--split-at", type=int, metavar="SAMPLE", help="Split into two files at sample index") @click.option("--split-every", type=int, metavar="N", help="Split into chunks of N samples") @click.option( "--split-duration", type=float, metavar="SECONDS", help="Split into chunks of specified duration (requires sample_rate in metadata)", ) @click.option("--trim", is_flag=True, help="Extract portion of recording (use with --start and --length or --end)") @click.option( "--start", "start_sample", type=int, default=0, show_default=True, help="Start sample for trim operation" ) @click.option("--length", "num_samples", type=int, help="Number of samples for trim operation") @click.option("--end", "end_sample", type=int, help="End sample for trim operation (alternative to --length)") @click.option("--extract-annotations", is_flag=True, help="Extract each annotated region to separate file") @click.option("--annotation-label", type=str, help="Only extract annotations with this label") @click.option("--annotation-index", type=int, help="Extract specific annotation by index") @click.option("--output-dir", type=click.Path(), help="Output directory (default: current directory)") @click.option("--output-prefix", type=str, help="Prefix for output filenames") @click.option( "--output-format", type=click.Choice(["npy", "sigmf", "wav", "blue"]), help="Force output format (default: same as input)", ) @click.option("--overwrite", is_flag=True, help="Overwrite existing output files") @click.option("--legacy", is_flag=True, help="Load input as legacy NPY format") @click.option("--verbose", "-v", is_flag=True, help="Verbose output") @click.option("--quiet", "-q", is_flag=True, help="Suppress output") def split( # noqa: C901 input, split_at, split_every, split_duration, trim, start_sample, num_samples, end_sample, extract_annotations, annotation_label, annotation_index, output_dir, output_prefix, output_format, overwrite, legacy, verbose, quiet, ): """Split, trim, and extract portions of recordings. Split recordings into multiple files, extract portions, or extract annotated regions. \b Examples: # Split at specific sample ria split recording.sigmf --split-at 500000 --output-dir split_output \b # Split into equal chunks ria split capture.npy --split-every 100000 --output-dir chunks \b # Split by duration (requires sample_rate in metadata) ria split recording.sigmf --split-duration 1.0 --output-dir segments \b # Trim recording ria split signal.npy --trim --start 1000 --length 5000 --output-dir trimmed \b # Trim with end index ria split signal.npy --trim --start 1000 --end 6000 --output-dir trimmed \b # Extract all annotated regions ria split annotated.sigmf --extract-annotations --output-dir annotations \b # Extract specific annotation label ria split annotated.sigmf --extract-annotations --annotation-label "payload" \b # Extract specific annotation by index ria split annotated.sigmf --extract-annotations --annotation-index 1 """ # Validate operation selection validate_operation(split_at, split_every, split_duration, trim, extract_annotations) # Validate trim parameters if trim: if num_samples is None and end_sample is None: raise click.ClickException("Trim operation requires either --length or --end") if num_samples is not None and end_sample is not None: raise click.ClickException("Cannot specify both --length and --end") # Load input recording input_path = Path(input) input_format = detect_file_format(input_path) echo_progress(f"Loading: {input_path.name}", quiet) echo_verbose(f"Input format: {input_format.upper()}", verbose) try: if legacy: echo_verbose("Using legacy NPY loader", verbose) recording = from_npy_legacy(input) else: recording = load_recording(input) except Exception as e: raise click.ClickException(f"Failed to load input file: {e}") # Get recording info if hasattr(recording.data, "shape") and len(recording.data.shape) == 2: total_samples = recording.data.shape[1] else: total_samples = len(recording.data) echo_progress(f"Total samples: {format_sample_count(total_samples)}", quiet) # Determine output format if output_format is None: output_format = input_format echo_verbose(f"Output format: {output_format.upper()}", verbose) # Determine output directory if output_dir: out_dir = Path(output_dir) else: out_dir = Path(".") # Current directory # Get base filename for outputs if output_prefix: base_name = output_prefix else: # Get input stem without format-specific suffixes base_name = input_path.stem if base_name.endswith(".sigmf-data") or base_name.endswith(".sigmf-meta"): base_name = base_name[:-11] elif base_name.endswith(".sigmf"): base_name = base_name[:-6] # Execute operation if split_at is not None: # Split at specific sample if split_at < 0 or split_at >= total_samples: raise click.ClickException(f"Invalid split point: {split_at}\n" f"Must be between 0 and {total_samples-1}") echo_progress(f"\nSplitting at sample {format_sample_count(split_at)}...", quiet) # Create two parts part1 = recording.trim(start_sample=0, num_samples=split_at) part2 = recording.trim(start_sample=split_at, num_samples=total_samples - split_at) # Add metadata about original file part1._metadata["original_file"] = str(input_path.name) part1._metadata["original_start_sample"] = 0 part1._metadata["original_end_sample"] = split_at part1._metadata["split_operation"] = "split_at" part2._metadata["original_file"] = str(input_path.name) part2._metadata["original_start_sample"] = split_at part2._metadata["original_end_sample"] = total_samples part2._metadata["split_operation"] = "split_at" # Save parts ext = get_output_extension(output_format) output1 = out_dir / f"{base_name}_part1{ext}" output2 = out_dir / f"{base_name}_part2{ext}" echo_progress( f" Part 1: samples 0-{format_sample_count(split_at-1)} ({format_sample_count(split_at)} samples)", quiet ) save_recording(part1, output1, output_format, overwrite, verbose) echo_progress( message=( f" Part 2: samples {format_sample_count(split_at)}-{format_sample_count(total_samples-1)} " f"({format_sample_count(total_samples - split_at)} samples)" ), quiet=quiet, ) save_recording(part2, output2, output_format, overwrite, verbose) echo_progress("\nSaved:", quiet) echo_progress(f" {output1}", quiet) echo_progress(f" {output2}", quiet) elif split_every is not None or split_duration is not None: # Split into equal chunks if split_duration is not None: # Convert duration to samples sample_rate = recording.metadata.get("sample_rate") if not sample_rate: raise click.ClickException( "Cannot split by duration: no sample_rate in metadata\n" "Use --split-every with sample count instead" ) split_samples = int(split_duration * sample_rate) echo_progress( f"\nSplitting into {split_duration}s chunks ({format_sample_count(split_samples)} samples)...", quiet ) else: split_samples = split_every echo_progress(f"\nSplitting into chunks of {format_sample_count(split_samples)} samples...", quiet) if split_samples <= 0: raise click.ClickException(f"Invalid chunk size: {split_samples}") # Calculate number of chunks num_chunks = int(np.ceil(total_samples / split_samples)) echo_progress(f"Creating {num_chunks} chunks...", quiet) # Create chunks ext = get_output_extension(output_format) created_files = [] for i in range(num_chunks): start = i * split_samples length = min(split_samples, total_samples - start) end = start + length - 1 # Trim chunk chunk = recording.trim(start_sample=start, num_samples=length) # Add metadata chunk._metadata["original_file"] = str(input_path.name) chunk._metadata["original_start_sample"] = start chunk._metadata["original_end_sample"] = start + length chunk._metadata["split_operation"] = "split_every" chunk._metadata["chunk_index"] = i + 1 chunk._metadata["total_chunks"] = num_chunks # Generate output filename chunk_num = str(i + 1).zfill(len(str(num_chunks))) output_path = out_dir / f"{base_name}_chunk{chunk_num}{ext}" echo_progress( f" Chunk {i+1}/{num_chunks}: samples {format_sample_count(start)}-{format_sample_count(end)}...", quiet, ) save_recording(chunk, output_path, output_format, overwrite, verbose) created_files.append(output_path) echo_progress(f"\nCreated {num_chunks} chunks in {out_dir}/", quiet) elif trim: # Trim operation if end_sample is not None: if end_sample <= start_sample: raise click.ClickException( f"Invalid range: end ({end_sample}) must be greater than start ({start_sample})" ) num_samples = end_sample - start_sample if start_sample < 0 or num_samples < 0: raise click.ClickException("Invalid trim range: start and length must be non-negative") if start_sample + num_samples > total_samples: raise click.ClickException( f"Invalid trim range\n" f"Start: {format_sample_count(start_sample)}, Length: {format_sample_count(num_samples)}, " f"End: {format_sample_count(start_sample + num_samples)}\n" f"Recording only has {format_sample_count(total_samples)} samples " f"(indices 0-{format_sample_count(total_samples-1)})" ) echo_progress("\nTrimming recording...", quiet) echo_progress(f" Start: {format_sample_count(start_sample)}", quiet) echo_progress(f" Length: {format_sample_count(num_samples)} samples", quiet) echo_progress(f" End: {format_sample_count(start_sample + num_samples - 1)}", quiet) # Trim recording trimmed = recording.trim(start_sample=start_sample, num_samples=num_samples) # Add metadata trimmed._metadata["original_file"] = str(input_path.name) trimmed._metadata["original_start_sample"] = start_sample trimmed._metadata["original_end_sample"] = start_sample + num_samples trimmed._metadata["split_operation"] = "trim" # Save trimmed recording ext = get_output_extension(output_format) output_path = out_dir / f"{base_name}{ext}" save_recording(trimmed, output_path, output_format, overwrite, verbose) echo_progress(f"\nOutput: {output_path}", quiet) echo_progress("Done.", quiet) elif extract_annotations: # Extract annotated regions if not recording.annotations: raise click.ClickException( "No annotations found in recording\n" "Use 'ria annotate' to add annotations first" ) # Filter annotations annotations_to_extract = recording.annotations if annotation_index is not None: if annotation_index < 0 or annotation_index >= len(annotations_to_extract): raise click.ClickException( f"Invalid annotation index: {annotation_index}\n" f"Recording has {len(annotations_to_extract)} annotations " f"(indices 0-{len(annotations_to_extract)-1})" ) annotations_to_extract = [annotations_to_extract[annotation_index]] if annotation_label is not None: filtered = [ann for ann in annotations_to_extract if ann.label == annotation_label] if not filtered: available_labels = list(set(ann.label for ann in recording.annotations)) raise click.ClickException( f"No annotations with label '{annotation_label}'\n" f"Available labels: {', '.join(available_labels)}" ) annotations_to_extract = filtered echo_progress(f"\nExtracting {len(annotations_to_extract)} annotated region(s)...", quiet) # Extract each annotation ext = get_output_extension(output_format) created_files = [] for ann in annotations_to_extract: # Get annotation bounds start = ann.sample_start count = ann.sample_count end = start + count - 1 # Trim to annotation bounds chunk = recording.trim(start_sample=start, num_samples=count) # Clear annotations - the trimmed chunk IS the annotation, # and trim() may produce invalid annotations chunk._annotations = [] # Add metadata chunk._metadata["original_file"] = str(input_path.name) chunk._metadata["original_start_sample"] = start chunk._metadata["original_end_sample"] = start + count chunk._metadata["split_operation"] = "extract_annotation" chunk._metadata["annotation_label"] = ann.label # Generate filename label_safe = ann.label.replace(" ", "_").replace("/", "_") output_filename = f"{base_name}_{label_safe}_{start}-{start+count}{ext}" output_path = out_dir / output_filename # Get original index in full annotation list if we filtered if annotation_index is not None: display_idx = annotation_index else: display_idx = recording.annotations.index(ann) echo_progress( message=( f" [{display_idx}] {ann.label} ({format_sample_count(start)}" f"-{format_sample_count(end)}): {output_filename}" ), quiet=quiet, ) save_recording(chunk, output_path, output_format, overwrite, verbose) created_files.append(output_path) echo_progress(f"\nExtracted {len(annotations_to_extract)} annotated region(s).", quiet) if __name__ == "__main__": split()