diff --git a/ria_toolkit_oss_cli/cli.py b/ria_toolkit_oss_cli/cli.py index f15f49f..bf97dfa 100644 --- a/ria_toolkit_oss_cli/cli.py +++ b/ria_toolkit_oss_cli/cli.py @@ -1,10 +1,10 @@ """ -This module contains the main group for the utils CLI. +This module contains the main group for the ria toolkit oss CLI. """ import click -from utils_cli.utils import commands +from ria_toolkit_oss_cli.ria_toolkit_oss import commands @click.group() diff --git a/ria_toolkit_oss_cli/ria_toolkit_oss/annotate.py b/ria_toolkit_oss_cli/ria_toolkit_oss/annotate.py index debda31..37759a2 100644 --- a/ria_toolkit_oss_cli/ria_toolkit_oss/annotate.py +++ b/ria_toolkit_oss_cli/ria_toolkit_oss/annotate.py @@ -5,7 +5,7 @@ from pathlib import Path import click -from ria_toolkit_oss.annotations import ( +from src.ria_toolkit_oss.annotations import ( annotate_with_cusum, detect_signals_energy, split_recording_annotations, @@ -222,8 +222,8 @@ def list(input, verbose): \b Examples: - utils annotate list recording.sigmf-data - utils annotate list signal.npy --verbose + ria annotate list recording.sigmf-data + ria annotate list signal.npy --verbose """ try: recording = load_recording(input) @@ -291,8 +291,8 @@ def add(input, start, count, label, freq_lower, freq_upper, comment, annotation_ \b Examples: - utils annotate add file.npy --start 1000 --count 500 --label wifi - utils annotate add signal.sigmf-data --start 0 --count 1000 --label burst --comment "Strong signal" + ria annotate add file.npy --start 1000 --count 500 --label wifi + ria annotate add signal.sigmf-data --start 0 --count 1000 --label burst --comment "Strong signal" """ try: recording = load_recording(input) @@ -374,12 +374,12 @@ def add(input, start, count, label, freq_lower, freq_upper, comment, annotation_ def remove(input, index, output, overwrite, quiet): """Remove annotation by index. - Use 'utils annotate list' to see annotation indices. + Use 'ria annotate list' to see annotation indices. \b Examples: - utils annotate remove signal.sigmf-data 2 - utils annotate remove file.npy 0 + ria annotate remove signal.sigmf-data 2 + ria annotate remove file.npy 0 """ try: recording = load_recording(input) @@ -428,8 +428,8 @@ def clear(input, output, overwrite, force, quiet): \b Examples: - utils annotate clear signal.sigmf-data - utils annotate clear file.npy --force + ria annotate clear signal.sigmf-data + ria annotate clear file.npy --force """ try: recording = load_recording(input) @@ -522,10 +522,10 @@ def energy( \b Examples: - utils annotate energy capture.sigmf-data --label burst - utils annotate energy signal.npy --threshold 1.5 --min-distance 10000 - utils annotate energy signal.sigmf-data --freq-method obw - utils annotate energy signal.sigmf-data --freq-method full-detected + ria annotate energy capture.sigmf-data --label burst + ria annotate energy signal.npy --threshold 1.5 --min-distance 10000 + ria annotate energy signal.sigmf-data --freq-method obw + ria annotate energy signal.sigmf-data --freq-method full-detected """ try: @@ -601,8 +601,8 @@ def cusum(input, label, min_duration, window_size, tolerance, annotation_type, o \b Examples: - utils annotate cusum signal.sigmf-data --min-duration 5.0 - utils annotate cusum data.npy --min-duration 10.0 --label state + ria annotate cusum signal.sigmf-data --min-duration 5.0 + ria annotate cusum data.npy --min-duration 10.0 --label state """ try: recording = load_recording(input) @@ -667,8 +667,8 @@ def threshold(input, threshold, label, window_size, annotation_type, output, ove \b Examples: - utils annotate threshold signal.sigmf-data --threshold 0.7 --label wifi - utils annotate threshold data.npy --threshold 0.5 --window-size 2048 + ria annotate threshold signal.sigmf-data --threshold 0.7 --label wifi + ria annotate threshold data.npy --threshold 0.5 --window-size 2048 """ if not (0.0 <= threshold <= 1.0): raise click.ClickException(f"--threshold must be between 0.0 and 1.0, got {threshold}") @@ -737,10 +737,10 @@ def separate(input, indices, nfft, noise_threshold_db, min_component_bw, output, \b Examples: - utils annotate separate capture.sigmf-data - utils annotate separate signal.npy --indices 0,1,2 - utils annotate separate data.sigmf-data --noise-threshold-db -70 - utils annotate separate signal.npy --min-component-bw 100000 + ria annotate separate capture.sigmf-data + ria annotate separate signal.npy --indices 0,1,2 + ria annotate separate data.sigmf-data --noise-threshold-db -70 + ria annotate separate signal.npy --min-component-bw 100000 """ try: diff --git a/ria_toolkit_oss_cli/ria_toolkit_oss/common.py b/ria_toolkit_oss_cli/ria_toolkit_oss/common.py index 5b5f61d..a55c08c 100644 --- a/ria_toolkit_oss_cli/ria_toolkit_oss/common.py +++ b/ria_toolkit_oss_cli/ria_toolkit_oss/common.py @@ -357,7 +357,7 @@ def get_sdr_device(device_type: str, ident: Optional[str] = None, tx=False): try: if device_type == "pluto": - from utils.sdr.pluto import Pluto + from src.ria_toolkit_oss.sdr.pluto import Pluto if ip_addr: return Pluto(identifier=ip_addr) @@ -365,17 +365,17 @@ def get_sdr_device(device_type: str, ident: Optional[str] = None, tx=False): return Pluto() elif device_type == "hackrf": - from utils.sdr.hackrf import HackRF + from src.ria_toolkit_oss.sdr.hackrf import HackRF return HackRF() elif device_type == "bladerf": - from utils.sdr.blade import Blade + from src.ria_toolkit_oss.sdr.blade import Blade return Blade() elif device_type == "usrp": - from utils.sdr.usrp import USRP + from src.ria_toolkit_oss.sdr.usrp import USRP if ip_addr: return USRP(identifier=f"addr={ip_addr}") @@ -385,12 +385,12 @@ def get_sdr_device(device_type: str, ident: Optional[str] = None, tx=False): return USRP() elif device_type == "rtlsdr": - from utils.sdr.rtlsdr import RTLSDR + from src.ria_toolkit_oss.sdr.rtlsdr import RTLSDR return RTLSDR() elif device_type == "thinkrf": - from utils.sdr.thinkrf import ThinkRF + from src.ria_toolkit_oss.sdr.thinkrf import ThinkRF if ip_addr: return ThinkRF(identifier=ip_addr) diff --git a/src/ria_toolkit_oss/annotations/__init__.py b/src/ria_toolkit_oss/annotations/__init__.py new file mode 100644 index 0000000..1542dcf --- /dev/null +++ b/src/ria_toolkit_oss/annotations/__init__.py @@ -0,0 +1,54 @@ +""" +The annotations package contains tools and utilities for creating, managing, and processing annotations. + +Provides automatic annotation generation using various signal detection algorithms: +- Energy-based detection (detect_signals_energy) +- CUSUM-based segmentation (annotate_with_cusum) +- Threshold-based qualification (threshold_qualifier) +- Signal isolation and extraction (isolate_signal) +- Occupied bandwidth analysis (calculate_occupied_bandwidth, calculate_nominal_bandwidth) + +All detection functions return Recording objects with added annotations. +""" + +__all__ = [ + # Energy-based detection + "detect_signals_energy", + "calculate_occupied_bandwidth", + "calculate_nominal_bandwidth", + "calculate_full_detected_bandwidth", + "annotate_with_obw", + # CUSUM detection + "annotate_with_cusum", + # Threshold detection + "threshold_qualifier", + # Parallel signal separation (Phase 2) + "find_spectral_components", + "split_annotation_by_components", + "split_recording_annotations", + # Signal isolation + "isolate_signal", + # Annotation transforms + "remove_contained_boxes", + "is_annotation_contained", + # Dataset creation + "qualify_slice_from_annotations", +] + +from .annotation_transforms import is_annotation_contained, remove_contained_boxes +from .cusum_annotator import annotate_with_cusum +from .energy_detector import ( + annotate_with_obw, + calculate_full_detected_bandwidth, + calculate_nominal_bandwidth, + calculate_occupied_bandwidth, + detect_signals_energy, +) +from .parallel_signal_separator import ( + find_spectral_components, + split_annotation_by_components, + split_recording_annotations, +) +from .qualify_slice import qualify_slice_from_annotations +from .signal_isolation import isolate_signal +from .threshold_qualifier import threshold_qualifier diff --git a/src/ria_toolkit_oss/annotations/annotation_transforms.py b/src/ria_toolkit_oss/annotations/annotation_transforms.py new file mode 100644 index 0000000..47300c1 --- /dev/null +++ b/src/ria_toolkit_oss/annotations/annotation_transforms.py @@ -0,0 +1,55 @@ +from ria_toolkit_oss.datatypes.annotation import Annotation + +# TODO figure out how to transfer labels in the merge case + + +def remove_contained_boxes(annotations: list[Annotation]): + """ + Remove all annotations (bounding boxes) that are entirely contained within other boxes in the list. + + :param annotations: A list of Annotation objects. + :type annotations: list[Annotation] + + :returns: A new list of Annotation objects. + :rtype: list[Annotation]""" + + output_boxes = [] + + for i in range(len(annotations)): + contained = False + for j in range(len(annotations)): + if i != j and is_annotation_contained(annotations[i], annotations[j]): + contained = True + break + + if not contained: + output_boxes.append(annotations[i]) + + return output_boxes + + +def is_annotation_contained(inner: Annotation, outer: Annotation) -> bool: + """ + Check if an annotation box is entirely contained within another annotation bounding box. + + :param inner: The inner box. + :type inner: Annotation. + :param outer: The outer box. + :type outer: Annotation. + + :returns: True if inner is within outer, false otherwise. + :rtype: bool + """ + + inner_sample_stop = inner.sample_start + inner.sample_count + outer_sample_stop = outer.sample_start + outer.sample_count + + if inner.sample_start > outer.sample_start and inner_sample_stop < outer_sample_stop: + if inner.freq_lower_edge > outer.freq_lower_edge and inner.freq_upper_edge < outer.freq_upper_edge: + return True + + return False + + +def merge_annotations(annotations: list[Annotation], overlap_threshold) -> list[Annotation]: + raise NotImplementedError diff --git a/src/ria_toolkit_oss/annotations/cusum_annotator.py b/src/ria_toolkit_oss/annotations/cusum_annotator.py new file mode 100644 index 0000000..879abc1 --- /dev/null +++ b/src/ria_toolkit_oss/annotations/cusum_annotator.py @@ -0,0 +1,197 @@ +import itertools +import json +from itertools import groupby +from typing import Optional + +import numpy as np + +from ria_toolkit_oss.datatypes import Annotation, Recording + + +def annotate_with_cusum( + recording: Recording, + label: Optional[str] = "segment", + window_size: Optional[int] = 1, + min_duration: Optional[float] = None, + tolerance: Optional[int] = -1, + annotation_type: Optional[str] = "standalone", +): + """ + Add annotations that divide the recording into distinct time segments. + + This algorithm computes the cumulative sum of the sample magnitudes and + determines break points in the signal. + + This tool can be used to find points where a signal turns on or off, or + changes between a low and high amplitude. + + :param recording: A ``Recording`` object to annotate. + :type recording: ``utils.data.Recording`` + :param label: Label for the detected segments. + :type label: str + :param window_size: The length (in samples) of the moving average window. + :type window_size: int + :param min_duration: The minimum duration (in ms) of a segment. + The algorithm will not produce annotations shorter than this length. + :type min_duration: float + :param tolerance: The minimum length (in samples) of a segment. + :type tolerance: int + :param annotation_type: Annotation type (standalone, parallel, intersection). + :type annotation_type: str + """ + + sample_rate = recording.metadata["sample_rate"] + center_frequency = recording.metadata.get("center_frequency", 0) + + # Create an object of the time segmenter + time_segmenter = TimeSegmenter(sample_rate, min_duration, window_size, tolerance) + + # TODO refactor time segmentor such that the _ s are not required. + _, _, change_points, _ = time_segmenter.apply(recording.data[0]) + + time_segments_indices = np.append(np.insert(change_points, 0, 0), len(recording.data[0])) + annotations = [] + for i in range(len(time_segments_indices) - 1): + # Build comment JSON with type metadata + comment_data = { + "type": annotation_type, + "generator": "cusum_annotator", + "params": { + "window_size": window_size, + "min_duration": min_duration, + "tolerance": tolerance, + }, + } + + annotations.append( + Annotation( + sample_start=time_segments_indices[i], + sample_count=time_segments_indices[i + 1] - time_segments_indices[i], + freq_lower_edge=center_frequency - (sample_rate / 2), + freq_upper_edge=center_frequency + (sample_rate / 2), + label=label, + comment=json.dumps(comment_data), + detail={"generator": "cusum_annotator"}, + ) + ) + + return Recording(data=recording.data, metadata=recording.metadata, annotations=recording.annotations + annotations) + + +def _compute_cusum(_signal, sample_rate: int, tolerance: int = None, min_duration: float = -1): + """ + This function efficiently computes the cumulative sum of a give list (_signal), with an optional tolerance. + + Args: + - _signal: array of iq samples. + - Tolerance: the least acceptable length of a block, Defaults to None. + + Returns: + - cusum (array): Array of the cumulative sum of the given list + - sample_rate (int): __description_ + - change_points (array): Array of the indices at which a change in the CUSUM direction happens. + - min_duration (float): The least acceptable time width of each segment (in ms). Defaults to -1. + """ + + # efficiently calculate the running sum of the signal + cusum = list(itertools.accumulate((_signal - np.mean(_signal)))) + + # 'diff' computes the differences between the consecutive values, + # then 'sign' determines if it is +ve or -ve. + change_indicators = np.sign(np.diff(cusum)) + + # TODO: Tolerance is not useful, we can get rid of it. + """ + To add the tolerance: + 1. Group the consecutive values, count them and mark their start index + """ + if tolerance: + groups_list, index = [], 0 + # 1 + for key, group in groupby(change_indicators): + group_len = len(list(group)) + groups_list.append((key, group_len, index)) + index += group_len + # 2 + for val, n, idx in groups_list: + if n <= tolerance: + change_indicators[idx : idx + n] = -1 * val + change_points = np.where(np.diff(change_indicators))[0] + 1 + + # Limit the change_points + # Reject those whose number of samples < minimum accepted #n of samples in (min duration) ms. + if min_duration is not None and min_duration > 0: + min_samples_wide = min_duration * sample_rate / 1000 + segments_lengths = np.diff(change_points) + segments_lengths = np.insert(segments_lengths, 0, change_points[0]) + change_points = change_points[np.where(segments_lengths > min_samples_wide)[0]] + return cusum, change_points + + +class TimeSegmenter: + """Time Segmenter class, it creates a segmenter object with certain\ + characteristics to easily split an input signal to segments based on\ + the cumulative sum of deviations (of the signal mean) + """ + + def __init__( + self, sample_rate: int, min_duration: float = 1, moving_average_window: int = 3, tolerance: int = None + ): + """_summary_ + + Args: + sample_rate (int): _description_ + min_duration (float, optional): _description_. Defaults to 1. + moving_average_window (int, optional): _description_. Defaults to 3. + tolerance (int, optional): _description_. Defaults to None. + """ + self.sample_rate = sample_rate + self.min_duration = min_duration + self.moving_average_window = moving_average_window + self._moving_avg_filter = self._init_filter() + self.tolerance = tolerance + + def _init_filter(self): + """_summary_ + + Returns: + _type_: _description_ + """ + return np.ones(self.moving_average_window) / self.moving_average_window + + def _apply_filter(self, iqsignal: np.array): + """_summary_ + + Args: + iqsignal (np.array): _description_ + + Returns: + _type_: _description_ + """ + return np.convolve(abs(iqsignal), self._moving_avg_filter, mode="same") + + def _create_segments(self, iq_signal: np.array, change_points: np.array): + """_summary_ + + Args: + iq_signal (np.array): _description_ + change_points (np.array): _description_ + + Returns: + _type_: _description_ + """ + return np.split(iq_signal, change_points) + + def apply(self, iq_signal: np.array): + """_summary_ + + Args: + iq_signal (np.array): _description_ + + Returns: + _type_: _description_ + """ + smoothed_signal = self._apply_filter(iq_signal) + cusum, change_points = _compute_cusum(smoothed_signal, self.sample_rate, self.tolerance, self.min_duration) + segments = self._create_segments(iq_signal, change_points) + return smoothed_signal, cusum, change_points, segments diff --git a/src/ria_toolkit_oss/annotations/energy_detector.py b/src/ria_toolkit_oss/annotations/energy_detector.py new file mode 100644 index 0000000..c27a5f7 --- /dev/null +++ b/src/ria_toolkit_oss/annotations/energy_detector.py @@ -0,0 +1,520 @@ +""" +Energy-based signal detection and bandwidth analysis. + +Provides automatic annotation generation using energy-based signal detection +and occupied bandwidth calculation following ITU-R SM.328 standard. +""" + +import json +from typing import Tuple + +import numpy as np + +from ria_toolkit_oss.datatypes import Annotation, Recording + + +def detect_signals_energy( + recording: Recording, + k: int = 10, + threshold_factor: float = 1.2, + window_size: int = 200, + min_distance: int = 5000, + label: str = "signal", + annotation_type: str = "standalone", + freq_method: str = "nbw", + nfft: int = 65536, + obw_power: float = 0.9999, +) -> Recording: + """ + Detect signal bursts using energy-based method with adaptive noise floor estimation. + + This algorithm smooths the signal with a moving average filter, estimates the noise + floor from k segments, applies a threshold to detect regions above noise, and merges + nearby detections. Detected time boundaries are then assigned frequency bounds based + on the selected frequency method. + + Time Detection Algorithm: + 1. Smooth signal using moving average (envelope detection) + 2. Divide smoothed signal into k segments + 3. Estimate noise floor as median of segment mean powers + 4. Detect regions where power exceeds threshold_factor * noise_floor + 5. Merge regions closer than min_distance samples + + Frequency Bounding (freq_method): + - 'nbw': Nominal bandwidth (OBW + center frequency) - DEFAULT + - 'obw': Occupied bandwidth (99.99% power, includes siedelobes) + - 'full-detected': Lowest to highest spectral component + - 'full-bandwidth': Entire Nyquist span (center_freq ± sample_rate/2) + + :param recording: Recording to analyze + :type recording: Recording + :param k: Number of segments for noise floor estimation (default: 10) + :type k: int + :param threshold_factor: Threshold multiplier above noise floor (typical: 1.2-2.0, default: 1.2) + :type threshold_factor: float + :param window_size: Moving average window size in samples (default: 200) + :type window_size: int + :param min_distance: Minimum distance between separate signals in samples (default: 5000) + :type min_distance: int + :param label: Label for detected annotations (default: "signal") + :type label: str + :param annotation_type: Annotation type (standalone, parallel, intersection, default: standalone) + :type annotation_type: str + :param freq_method: How to calculate frequency bounds (default: 'nbw') + :type freq_method: str + :param nfft: FFT size for frequency calculations (default: 65536) + :type nfft: int + :param obw_power: Power percentage for OBW (0.9999 = 99.99%, default: 0.9999) + :type obw_power: float + + :returns: New Recording with added annotations + :rtype: Recording + + **Example**:: + + >>> from utils.io import load_recording + >>> from utils.annotations import detect_signals_energy + >>> recording = load_recording("capture.sigmf") + + >>> # Detect with NBW frequency bounds (default, best for real signals) + >>> annotated = detect_signals_energy(recording, label="burst") + + >>> # Detect with OBW (more conservative, includes siedelobes) + >>> annotated = detect_signals_energy( + ... recording, label="burst", freq_method="obw" + ... ) + + >>> # Detect with full detected range (captures all spectral components) + >>> annotated = detect_signals_energy( + ... recording, label="burst", freq_method="full-detected" + ... ) + """ + # Extract signal data (use first channel only) + signal = recording.data[0] + + # Smooth signal using moving average filter (envelope detection) + smoothed_signal = np.convolve(np.abs(signal), np.ones(window_size) / window_size, mode="valid") + + # Estimate noise floor using segment-based median (robust to signal presence) + segment_length = len(smoothed_signal) // k + segment_powers = [np.mean(smoothed_signal[i * segment_length : (i + 1) * segment_length] ** 2) for i in range(k)] + noise_floor = np.median(segment_powers) + threshold = noise_floor * threshold_factor + + # Detect signal boundaries (regions above threshold) + boundaries = [] + start = None + + for i, power in enumerate(smoothed_signal**2): + if power > threshold and start is None: + # Signal starts + start = i + elif power <= threshold and start is not None: + # Signal ends + boundaries.append((start, i - start)) + start = None + + # Handle case where signal extends to end + if start is not None: + boundaries.append((start, len(smoothed_signal) - start)) + + # Merge boundaries that are closer than min_distance + merged_boundaries = [] + if boundaries: + start, length = boundaries[0] + for next_start, next_length in boundaries[1:]: + if next_start - (start + length) < min_distance: + # Merge with current boundary + length = next_start + next_length - start + else: + # Save current and start new boundary + merged_boundaries.append((start, length)) + start, length = next_start, next_length + # Add final boundary + merged_boundaries.append((start, length)) + + # Create annotations from detected boundaries + sample_rate = recording.metadata["sample_rate"] + center_frequency = recording.metadata.get("center_frequency", 0) + + # Validate frequency method + valid_freq_methods = ["nbw", "obw", "full-detected", "full-bandwidth"] + if freq_method not in valid_freq_methods: + raise ValueError(f"Invalid freq_method '{freq_method}'. " f"Must be one of: {', '.join(valid_freq_methods)}") + + annotations = [] + for start_sample, sample_count in merged_boundaries: + # Calculate frequency bounds based on method + freq_lower, freq_upper = calculate_frequency_bounds( + freq_method, center_frequency, sample_rate, nfft, signal, start_sample, sample_count, obw_power + ) + # Build comment JSON with type metadata + comment_data = { + "type": annotation_type, + "generator": "energy_detector", + "freq_method": freq_method, + "params": { + "threshold_factor": threshold_factor, + "window_size": window_size, + "noise_floor": float(noise_floor), + "threshold": float(threshold), + }, + } + + anno = Annotation( + sample_start=start_sample, + sample_count=sample_count, + freq_lower_edge=freq_lower, + freq_upper_edge=freq_upper, + label=label, + comment=json.dumps(comment_data), + detail={"generator": "energy_detector", "freq_method": freq_method}, + ) + annotations.append(anno) + + # Return new Recording with combined annotations + return Recording(data=recording.data, metadata=recording.metadata, annotations=recording.annotations + annotations) + + +def calculate_occupied_bandwidth( + signal: np.ndarray, + sampling_rate: float, + nfft: int = 65536, + power_percentage: float = 0.9999, + start_offset: int = 1000, +) -> Tuple[float, float, float]: + """ + Calculate occupied bandwidth following ITU-R SM.328 standard. + + Occupied bandwidth (OBW) is defined as the bandwidth containing a specified + percentage of the total signal power. This implementation uses FFT-based + power spectral density analysis. + + The default power_percentage of 99.99% (0.9999) captures the main lobe and + first sidelobe, providing a realistic measure of actual spectral occupancy. + + ITU-R SM.328 defines OBW as bandwidth containing 99% of power, but this + implementation allows customization. + + :param signal: Complex IQ signal samples + :type signal: np.ndarray + :param sampling_rate: Sample rate in Hz + :type sampling_rate: float + :param nfft: FFT size (larger = better frequency resolution) + :type nfft: int + :param power_percentage: Fraction of power to contain (0.99 = 99%, 0.9999 = 99.99%) + :type power_percentage: float + :param start_offset: Skip this many samples at start (avoid transients) + :type start_offset: int + + :returns: Tuple of (occupied_bandwidth_hz, lower_edge_hz, upper_edge_hz) + :rtype: Tuple[float, float, float] + + :raises ValueError: If signal is too short for nfft + start_offset + + **Example**:: + + >>> import numpy as np + >>> from utils.annotations import calculate_occupied_bandwidth + >>> # Generate 1 MHz QPSK signal at 10 Msps + >>> signal = np.random.randn(100000) + 1j * np.random.randn(100000) + >>> obw, f_lower, f_upper = calculate_occupied_bandwidth( + ... signal, sampling_rate=10e6, power_percentage=0.99 + ... ) + >>> print(f"99% OBW: {obw/1e6:.3f} MHz") + >>> print(f"Range: {f_lower/1e6:.3f} to {f_upper/1e6:.3f} MHz") + + **References**: + ITU-R SM.328: "Spectra and bandwidth of emissions" + FCC Part 15: Uses 99% power containment for bandwidth limits + """ + # Validate input + if len(signal) < nfft + start_offset: + raise ValueError( + f"Signal too short: need {nfft + start_offset} samples, " + f"got {len(signal)}. Reduce nfft or start_offset." + ) + + # Extract segment (skip transients at start) + signal_segment = signal[start_offset : nfft + start_offset] + + # Compute FFT and power spectral density + freq_spectrum = np.fft.fft(signal_segment, n=nfft) + psd = np.abs(freq_spectrum) ** 2 + + # Shift to center DC at middle (makes freq interpretation easier) + psd_shifted = np.fft.fftshift(psd) + freq_bins = np.fft.fftshift(np.fft.fftfreq(nfft, 1 / sampling_rate)) + + # Compute total power + total_power = np.sum(psd_shifted) + + # Find frequency range containing specified power percentage + # Use cumulative sum to find edges + cumulative_power = np.cumsum(psd_shifted) + + # Lower edge: where cumulative power reaches (1 - power_percentage) / 2 + # Upper edge: where cumulative power reaches 1 - (1 - power_percentage) / 2 + # This centers the power percentage window + lower_threshold = total_power * (1 - power_percentage) / 2 + upper_threshold = total_power * (1 - (1 - power_percentage) / 2) + + # Find indices + lower_idx = np.where(cumulative_power >= lower_threshold)[0][0] + upper_idx = np.where(cumulative_power <= upper_threshold)[0][-1] + + # Get frequency values + lower_freq = freq_bins[lower_idx] + upper_freq = freq_bins[upper_idx] + + # Calculate occupied bandwidth + occupied_bandwidth = upper_freq - lower_freq + + return occupied_bandwidth, lower_freq, upper_freq + + +def calculate_nominal_bandwidth( + signal: np.ndarray, + sampling_rate: float, + nfft: int = 65536, + power_percentage: float = 0.9999, + start_offset: int = 1000, +) -> Tuple[float, float]: + """ + Calculate nominal bandwidth and center frequency. + + Nominal bandwidth (NBW) is the occupied bandwidth along with the center + frequency of the signal's spectral occupancy. Useful for characterizing + signals with unknown or drifting center frequencies. + + :param signal: Complex IQ signal samples + :type signal: np.ndarray + :param sampling_rate: Sample rate in Hz + :type sampling_rate: float + :param nfft: FFT size + :type nfft: int + :param power_percentage: Fraction of power to contain + :type power_percentage: float + :param start_offset: Skip samples at start + :type start_offset: int + + :returns: Tuple of (nominal_bandwidth_hz, center_frequency_hz) + :rtype: Tuple[float, float] + + **Example**:: + + >>> from utils.annotations import calculate_nominal_bandwidth + >>> nbw, center = calculate_nominal_bandwidth(signal, sampling_rate=10e6) + >>> print(f"NBW: {nbw/1e6:.3f} MHz, Center: {center/1e6:.3f} MHz") + """ + obw, lower_freq, upper_freq = calculate_occupied_bandwidth( + signal, sampling_rate, nfft, power_percentage, start_offset + ) + + # Center frequency is midpoint of occupied band + center_freq = (lower_freq + upper_freq) / 2 + + return obw, center_freq + + +def calculate_full_detected_bandwidth( + signal: np.ndarray, + sampling_rate: float, + nfft: int = 65536, + start_offset: int = 1000, +) -> Tuple[float, float, float]: + """ + Calculate frequency range from lowest to highest spectral component. + + Unlike OBW/NBW which define a power-based bandwidth, this calculates + the absolute frequency span from the lowest non-zero spectral component + to the highest non-zero component. + + Useful for: + - Signals with spectral gaps + - Multiple parallel signals (captures all of them) + - Understanding total occupied spectrum vs. actual bandwidth + + :param signal: Complex IQ signal samples + :type signal: np.ndarray + :param sampling_rate: Sample rate in Hz + :type sampling_rate: float + :param nfft: FFT size + :type nfft: int + :param start_offset: Skip samples at start + :type start_offset: int + + :returns: Tuple of (bandwidth_hz, lower_freq_hz, upper_freq_hz) + :rtype: Tuple[float, float, float] + + **Example**:: + + >>> # Signal with two components at different frequencies + >>> bw, f_low, f_high = calculate_full_detected_bandwidth( + ... signal, sampling_rate=10e6, nfft=65536 + ... ) + >>> print(f"Full span: {f_low/1e6:.3f} to {f_high/1e6:.3f} MHz") + """ + # Validate input + if len(signal) < nfft + start_offset: + raise ValueError( + f"Signal too short: need {nfft + start_offset} samples, " + f"got {len(signal)}. Reduce nfft or start_offset." + ) + + # Extract segment + signal_segment = signal[start_offset : nfft + start_offset] + + # Compute FFT and power spectral density + freq_spectrum = np.fft.fft(signal_segment, n=nfft) + psd = np.abs(freq_spectrum) ** 2 + + # Shift to center DC + psd_shifted = np.fft.fftshift(psd) + freq_bins = np.fft.fftshift(np.fft.fftfreq(nfft, 1 / sampling_rate)) + + # Find noise floor (mean of lowest 10% of bins) + noise_floor = np.mean(np.sort(psd_shifted)[: int(len(psd_shifted) * 0.1)]) + + # Find all bins above noise floor + above_noise = np.where(psd_shifted > noise_floor * 1.5)[0] + + if len(above_noise) == 0: + # No signal above noise, return zero bandwidth + return 0.0, 0.0, 0.0 + + # Get frequency range of signal components + lower_idx = above_noise[0] + upper_idx = above_noise[-1] + + lower_freq = freq_bins[lower_idx] + upper_freq = freq_bins[upper_idx] + + bandwidth = upper_freq - lower_freq + + return bandwidth, lower_freq, upper_freq + + +def annotate_with_obw( + recording: Recording, + label: str = "signal", + annotation_type: str = "standalone", + nfft: int = 65536, + power_percentage: float = 0.9999, + start_offset: int = 1000, +) -> Recording: + """ + Create a single annotation spanning the occupied bandwidth of the entire recording. + + Analyzes the full recording to find its occupied bandwidth and creates an annotation + covering that frequency range for the entire time duration. + + :param recording: Recording to analyze + :type recording: Recording + :param label: Annotation label + :type label: str + :param annotation_type: Annotation type + :type annotation_type: str + :param nfft: FFT size + :type nfft: int + :param power_percentage: Power percentage for OBW calculation + :type power_percentage: float + :param start_offset: Sample offset + :type start_offset: int + + :returns: Recording with OBW annotation added + :rtype: Recording + + **Example**:: + + >>> from utils.annotations import annotate_with_obw + >>> annotated = annotate_with_obw(recording, label="signal_obw") + """ + signal = recording.data[0] + sample_rate = recording.metadata["sample_rate"] + center_freq = recording.metadata.get("center_frequency", 0) + + # Calculate OBW + obw, lower_offset, upper_offset = calculate_occupied_bandwidth( + signal, sample_rate, nfft, power_percentage, start_offset + ) + + # Convert baseband offsets to absolute frequencies + freq_lower = center_freq + lower_offset + freq_upper = center_freq + upper_offset + + # Create comment JSON + comment_data = { + "type": annotation_type, + "generator": "obw_annotator", + "obw_hz": float(obw), + "power_percentage": power_percentage, + "params": {"nfft": nfft, "start_offset": start_offset}, + } + + # Create annotation spanning entire recording + anno = Annotation( + sample_start=0, + sample_count=len(signal), + freq_lower_edge=freq_lower, + freq_upper_edge=freq_upper, + label=label, + comment=json.dumps(comment_data), + detail={"generator": "obw_annotator", "obw_hz": float(obw)}, + ) + + return Recording(data=recording.data, metadata=recording.metadata, annotations=recording.annotations + [anno]) + + +def calculate_frequency_bounds( + freq_method, center_frequency, sample_rate, nfft, signal, start_sample, sample_count, obw_power +): + if freq_method == "full-bandwidth": + # Full Nyquist span + freq_lower = center_frequency - (sample_rate / 2) + freq_upper = center_frequency + (sample_rate / 2) + else: + # Extract segment for frequency analysis + segment_start = start_sample + segment_end = min(start_sample + sample_count, len(signal)) + segment = signal[segment_start:segment_end] + + if len(segment) >= nfft: + if freq_method == "nbw": + # Nominal bandwidth (OBW + center frequency) + try: + nbw_val, center = calculate_nominal_bandwidth(segment, sample_rate, nfft, obw_power) + freq_lower = center_frequency + center - (nbw_val / 2) + freq_upper = center_frequency + center + (nbw_val / 2) + except (ValueError, IndexError): + # Fallback if calculation fails + freq_lower = center_frequency - (sample_rate / 2) + freq_upper = center_frequency + (sample_rate / 2) + + elif freq_method == "obw": + # Occupied bandwidth + try: + _, f_lower, f_upper = calculate_occupied_bandwidth(segment, sample_rate, nfft, obw_power) + freq_lower = center_frequency + f_lower + freq_upper = center_frequency + f_upper + except (ValueError, IndexError): + # Fallback if calculation fails + freq_lower = center_frequency - (sample_rate / 2) + freq_upper = center_frequency + (sample_rate / 2) + + elif freq_method == "full-detected": + # Full detected range (lowest to highest component) + try: + _, f_lower, f_upper = calculate_full_detected_bandwidth(segment, sample_rate, nfft) + freq_lower = center_frequency + f_lower + freq_upper = center_frequency + f_upper + except (ValueError, IndexError): + # Fallback if calculation fails + freq_lower = center_frequency - (sample_rate / 2) + freq_upper = center_frequency + (sample_rate / 2) + else: + # Segment too short for FFT, use full bandwidth + freq_lower = center_frequency - (sample_rate / 2) + freq_upper = center_frequency + (sample_rate / 2) + + return freq_lower, freq_upper diff --git a/src/ria_toolkit_oss/annotations/parallel_signal_separator.py b/src/ria_toolkit_oss/annotations/parallel_signal_separator.py new file mode 100644 index 0000000..90c2744 --- /dev/null +++ b/src/ria_toolkit_oss/annotations/parallel_signal_separator.py @@ -0,0 +1,484 @@ +""" +Parallel signal separation for multi-component frequency-offset signals. + +Provides methods to detect and separate overlapping frequency-domain signals +that occupy the same time window but different frequency bands. + +This module implements **spectral peak detection** to identify distinct frequency +components and split single time-domain annotations into frequency-specific +sub-annotations. + +**Key Design Decisions** (per Codex review): + +1. **Complex IQ Support**: Uses `scipy.signal.welch` with `return_onesided=False` + for proper complex signal handling. Window length automatically adapts to + signal length via `nperseg=min(nfft, len(signal))` to handle bursts >> from utils.annotations import find_spectral_components + >>> # Detect the two distinct channels (returns relative frequencies) + >>> components = find_spectral_components(signal, sampling_rate=20e6) + >>> print(f"Found {len(components)} components") + Found 2 components + +The module is designed to work with detected time-domain annotations, +allowing splitting of overlapping signals into separate training samples. +""" + +import json +from typing import List, Optional, Tuple + +import numpy as np +from scipy import ndimage +from scipy import signal as scipy_signal + +from ria_toolkit_oss.datatypes import Annotation, Recording + + +def find_spectral_components( + signal_data: np.ndarray, + sampling_rate: float, + nfft: int = 65536, + noise_threshold_db: Optional[float] = None, + min_component_bw: float = 50e3, + power_threshold: float = 0.99, +) -> List[Tuple[float, float, float]]: + """ + Find distinct frequency components using spectral peak detection. + + Identifies separate frequency components in a signal by analyzing the power + spectral density and finding peaks corresponding to distinct signals. This is + useful for separating parallel signals that occupy different frequency bands. + + **Frequency Representation**: Returns frequencies in **baseband/relative** Hz + (centered at 0). To get absolute RF frequencies, add center_frequency_hz from + recording metadata to all returned values. + + Algorithm: + 1. Compute power spectral density using Welch (properly handles complex IQ) + 2. Auto-estimate noise floor from data if not specified + 3. Smooth PSD to reduce spurious peaks + 4. Find local maxima above noise floor + 5. Estimate bandwidth per peak using -3dB (fallback: cumulative power) + 6. Filter components below minimum bandwidth threshold + + :param signal_data: Complex IQ signal samples (np.complex64/128) + :type signal_data: np.ndarray + :param sampling_rate: Sample rate in Hz + :type sampling_rate: float + :param nfft: FFT size / window length for Welch. Automatically capped at + signal length to handle bursts (default: 65536) + :type nfft: int + :param noise_threshold_db: Minimum SNR threshold in dB. If None (default), + auto-estimates as np.percentile(psd_db, 10). + Adapt this across hardware (Pluto: ~-100, ThinkRF: ~-60). + :type noise_threshold_db: Optional[float] + :param min_component_bw: Minimum component bandwidth in Hz (default: 50 kHz) + :type min_component_bw: float + :param power_threshold: Cumulative power threshold for fallback bandwidth + estimation (default: 0.99 = 99% power, like OBW) + :type power_threshold: float + + :returns: List of (center_freq_hz, lower_freq_hz, upper_freq_hz) tuples. + **All frequencies are relative (baseband, 0-centered).** + Add recording metadata['center_frequency'] to get absolute RF frequencies. + :rtype: List[Tuple[float, float, float]] + + :raises ValueError: If signal has fewer than 256 samples + + **Example**:: + + >>> from utils.io import load_recording + >>> from utils.annotations import find_spectral_components + >>> recording = load_recording("capture.sigmf") + >>> segment = recording.data[0][start:end] + >>> # Components in relative (baseband) frequency + >>> components = find_spectral_components(segment, sampling_rate=20e6) + >>> for center_rel, lower_rel, upper_rel in components: + ... # Convert to absolute RF frequency + ... center_abs = recording.metadata['center_frequency'] + center_rel + ... print(f"Component @ {center_abs/1e9:.3f} GHz") + + **Key Implementation Notes**: + + 1. **Welch for complex signals**: Uses `return_onesided=False` to properly + handle complex IQ pairs. Window length auto-capped via + `nperseg = min(nfft, len(signal))` so bursts = threshold_3db)[0] + right_indices = np.where(psd_smooth[peak_idx:] >= threshold_3db)[0] + + if len(left_indices) > 0 and len(right_indices) > 0: + lower_idx = left_indices[0] + upper_idx = peak_idx + right_indices[-1] + lower_freq = freqs_shifted[lower_idx] + upper_freq = freqs_shifted[upper_idx] + bw_3db = upper_freq - lower_freq + + # Sanity check: if -3dB BW is unreasonably wide (>sampling_rate/2), + # fallback to power-based estimation + if bw_3db > sampling_rate / 4: + # Strategy 2: Use cumulative power (like OBW) for wide signals + psd_around_peak = psd_smooth[max(0, peak_idx - 100) : peak_idx + 100] + if len(psd_around_peak) > 0: + total_power = np.sum(psd_around_peak) + cumulative = np.cumsum(psd_around_peak) + # Find edges where cumulative power = power_threshold * total + # threshold_power = power_threshold * total_power + + # Crude but effective: find where we cross the threshold + left_local = np.where(cumulative >= (1 - power_threshold) / 2 * total_power)[0] + right_local = np.where(cumulative <= (1 + power_threshold) / 2 * total_power)[0] + if len(left_local) > 0 and len(right_local) > 0: + lower_idx_local = left_local[0] + upper_idx_local = right_local[-1] + lower_freq = freqs_shifted[max(0, peak_idx - 100) + lower_idx_local] + upper_freq = freqs_shifted[max(0, peak_idx - 100) + upper_idx_local] + else: + # Fallback: estimate bandwidth from spectral width heuristic + lower_freq = peak_freq - min_component_bw / 2 + upper_freq = peak_freq + min_component_bw / 2 + + center_freq = (lower_freq + upper_freq) / 2 + bw = upper_freq - lower_freq + + # Filter by minimum bandwidth + if bw >= min_component_bw: + # All frequencies are relative (baseband) + components.append((center_freq, lower_freq, upper_freq)) + + return components + + +def split_annotation_by_components( + annotation: Annotation, + signal: np.ndarray, + sampling_rate: float, + center_frequency_hz: float = 0.0, + nfft: int = 65536, + noise_threshold_db: Optional[float] = None, + min_component_bw: float = 50e3, +) -> List[Annotation]: + """ + Split an annotation into multiple annotations by detected frequency components. + + Takes an existing annotation spanning multiple frequency components and + analyzes the frequency content to create separate sub-annotations for + each distinct frequency component. + + **Use case**: Energy detection found a time window with 2-3 parallel WiFi + channels. This function splits it into separate annotations per channel. + + **Frequency Handling**: `find_spectral_components` returns relative (baseband) + frequencies. This function adds `center_frequency_hz` to convert to absolute + RF frequencies for SigMF annotation bounds. This ensures correct frequency + context across baseband and RF domains. + + :param annotation: Original annotation to split + :type annotation: Annotation + :param signal: Full signal array (complex IQ) + :type signal: np.ndarray + :param sampling_rate: Sample rate in Hz + :type sampling_rate: float + :param center_frequency_hz: RF center frequency to add to relative frequencies + from peak detection (default: 0.0 = baseband) + :type center_frequency_hz: float + :param nfft: FFT size for analysis (default: 65536, auto-capped at signal length) + :type nfft: int + :param noise_threshold_db: Noise floor threshold in dB. If None (default), + auto-estimates from data. + :type noise_threshold_db: Optional[float] + :param min_component_bw: Minimum component bandwidth in Hz (default: 50 kHz) + :type min_component_bw: float + + :returns: List of new annotations (one per detected component). + Returns empty list if no components found or segment too short. + :rtype: List[Annotation] + + **Example**:: + + >>> from utils.io import load_recording + >>> from utils.annotations import split_annotation_by_components + >>> recording = load_recording("capture.sigmf") + >>> # Original annotation spans multiple channels + >>> original = recording.annotations[0] + >>> # Split using RF center frequency from metadata + >>> components = split_annotation_by_components( + ... original, + ... recording.data[0], + ... recording.metadata['sample_rate'], + ... center_frequency_hz=recording.metadata.get('center_frequency', 0.0) + ... ) + >>> print(f"Split into {len(components)} components") + Split into 2 components + + **Algorithm**: + 1. Extract segment corresponding to annotation time bounds + 2. Find frequency components in that segment (returns relative frequencies) + 3. Add center_frequency_hz to get absolute RF frequencies + 4. Create new annotation for each component + 5. Preserve original metadata (label, type, etc.) + 6. Add component info to comment JSON + + **Notes**: + - Original annotation is not modified + - Returns empty list if segment too short (<256 samples) + - Segments Recording: + """ + Split multiple annotations in a recording by frequency components. + + Processes specified annotations (or all if indices=None), replacing each + with its frequency-separated components. Uses RF center_frequency from + recording metadata for proper absolute frequency conversion. + + :param recording: Recording to process + :type recording: Recording + :param indices: Annotation indices to split (None = all, default: None). + Use indices=[] to skip splitting (returns unchanged recording). + :type indices: Optional[List[int]] + :param nfft: FFT size for spectral analysis (default: 65536, + auto-capped at signal segment length) + :type nfft: int + :param noise_threshold_db: Noise floor threshold in dB. If None (default), + auto-estimates from each segment. + :type noise_threshold_db: Optional[float] + :param min_component_bw: Minimum component bandwidth in Hz (default: 50 kHz). + Components narrower than this are filtered out. + :type min_component_bw: float + + :returns: New Recording with split annotations + :rtype: Recording + + **Example**:: + + >>> from utils.io import load_recording + >>> from utils.annotations import split_recording_annotations + >>> recording = load_recording("capture.sigmf") + >>> # Split all annotations + >>> split_rec = split_recording_annotations(recording) + >>> print(f"Original: {len(recording.annotations)} annotations") + >>> print(f"Split: {len(split_rec.annotations)} annotations") + Original: 5 annotations + Split: 9 annotations + + **Algorithm**: + 1. For each annotation in indices (or all if None): + 2. Call split_annotation_by_components with RF center_frequency + 3. If components found, replace annotation with components + 4. If no components found, keep original annotation + 5. Annotations not in indices are kept unchanged + + **Notes**: + - Original recording is not modified + - Returns empty Recording.annotations if recording has no annotations + - RF center_frequency from metadata ensures correct absolute frequencies + - If an annotation can't be split (too short, wrong format), original kept + """ + if indices is None: + # Split all annotations + indices = list(range(len(recording.annotations))) + + if not recording.annotations: + # No annotations to split + return recording + + signal = recording.data[0] + sample_rate = recording.metadata["sample_rate"] + center_frequency = recording.metadata.get("center_frequency", 0.0) + + # Build new annotation list + new_annotations = [] + for i, anno in enumerate(recording.annotations): + if i in indices: + # Attempt to split this annotation + try: + components = split_annotation_by_components( + anno, + signal, + sample_rate, + center_frequency_hz=center_frequency, + nfft=nfft, + noise_threshold_db=noise_threshold_db, + min_component_bw=min_component_bw, + ) + if components: + # Split successful, use components + new_annotations.extend(components) + else: + # No components found, keep original + new_annotations.append(anno) + except Exception: + # Split failed for any reason, keep original + new_annotations.append(anno) + else: + # Not in split list, keep as-is + new_annotations.append(anno) + + return Recording(data=recording.data, metadata=recording.metadata, annotations=new_annotations) diff --git a/src/ria_toolkit_oss/annotations/qualify_slice.py b/src/ria_toolkit_oss/annotations/qualify_slice.py new file mode 100644 index 0000000..2336fe5 --- /dev/null +++ b/src/ria_toolkit_oss/annotations/qualify_slice.py @@ -0,0 +1,35 @@ +import numpy as np + +from ria_toolkit_oss.datatypes import Recording + + +def qualify_slice_from_annotations(recording: Recording, slice_length: int): + """ + Slice a recording into many smaller recordings, + discarding any slices which do not have annotations that apply to those samples. + Used together with an annotation based qualifier. + + :param recording: The recording to slice. + :type recording: Recording + :param slice_length: The length in samples of a slice. + :type slice_length: int""" + + if len(recording.annotations) == 0: + print("Warning, no annotations.") + + annotation_mask = np.zeros(len(recording.data[0])) + + for annotation in recording.annotations: + annotation_mask[annotation.sample_start : annotation.sample_start + annotation.sample_count] = 1 + + output_recordings = [] + + for i in range((len(recording.data[0]) // slice_length) - 1): + start_index = slice_length * i + end_index = slice_length * (i + 1) + + if 1 in annotation_mask[start_index:end_index]: + sl = recording.data[:, start_index:end_index] + output_recordings.append(Recording(data=sl, metadata=recording.metadata)) + + return output_recordings diff --git a/src/ria_toolkit_oss/annotations/signal_isolation.py b/src/ria_toolkit_oss/annotations/signal_isolation.py new file mode 100644 index 0000000..47852ae --- /dev/null +++ b/src/ria_toolkit_oss/annotations/signal_isolation.py @@ -0,0 +1,97 @@ +import numpy as np +from scipy.signal import butter, lfilter + +from ria_toolkit_oss.datatypes.annotation import Annotation +from ria_toolkit_oss.datatypes.recording import Recording + + +def isolate_signal(recording: Recording, annotation: Annotation) -> Recording: + """ + Slice, filter and frequency shift the input recording according to the bounding box defined by the annotation. + + :param recording: The input Recording to be sliced. + :type recording: Recording + :param annotation: The Annotation object defining the area of the recording to isolate. + :type annotation: Annotation + :param decimate: Decimate the input signal after filtering to reduce the sample rate. + :type decimate: bool + + :returns: The subsection of the original recording defined by the annotation. + :rtype: Recording""" + + sample_start = max(0, annotation.sample_start) + sample_stop = min(len(recording), annotation.sample_start + annotation.sample_count) + + anno_base_center_freq = (annotation.freq_lower_edge + annotation.freq_upper_edge) / 2 - recording.metadata.get( + "center_frequency", 0 + ) + + anno_bw = annotation.freq_upper_edge - annotation.freq_lower_edge + + signal_slice = recording.data[0, sample_start:sample_stop] + + # normalize + signal_slice = signal_slice / np.max(np.abs(signal_slice)) + + isolation_bw = anno_bw + + # frequency shift the center of the box about zero + shifted_signal_slice = frequency_shift_iq_samples( + iq_samples=signal_slice, + sample_rate=recording.metadata["sample_rate"], + shift_frequency=-1 * anno_base_center_freq, + ) + + # filter + if isolation_bw < recording.metadata["sample_rate"] - 1: + filtered_signal = apply_complex_lowpass_filter( + signal=shifted_signal_slice, cutoff_frequency=isolation_bw, sample_rate=recording.metadata["sample_rate"] + ) + + else: + filtered_signal = shifted_signal_slice + + output = Recording(data=[filtered_signal], metadata=recording.metadata) + return output + + +def frequency_shift_iq_samples(iq_samples, sample_rate, shift_frequency): + # Number of samples + num_samples = len(iq_samples) + + # Create a time vector from 0 to the total duration in seconds + time_vector = np.arange(num_samples) / sample_rate + + # Generate the complex exponential for the frequency shift + complex_exponential = np.exp(1j * 2 * np.pi * shift_frequency * time_vector) + + # Apply the frequency shift to the IQ samples + shifted_samples = iq_samples * complex_exponential + + return shifted_samples + + +# Function to apply a lowpass Butterworth filter to a complex signal +def apply_complex_lowpass_filter(signal, cutoff_frequency, sample_rate, order=5): + # Design the lowpass filter + b, a = design_complex_lowpass_filter(cutoff_frequency, sample_rate, order) + + # Apply the lowpass filter + filtered_signal = lfilter(b, a, signal) + return filtered_signal + + +def design_complex_lowpass_filter(cutoff_frequency, sample_rate, order=5): + # Nyquist frequency for complex signals is the sample rate + nyquist = sample_rate + + # Ensure the cutoff frequency is positive and within the Nyquist limit + if cutoff_frequency <= 0 or cutoff_frequency > nyquist: + raise ValueError("Cutoff frequency must be between 0 and the Nyquist frequency.") + + # Normalize the cutoff frequency to the Nyquist frequency + cutoff_normalized = cutoff_frequency / nyquist + + # Create a Butterworth lowpass filter + b, a = butter(order, cutoff_normalized, btype="low") + return b, a diff --git a/src/ria_toolkit_oss/annotations/threshold_qualifier.py b/src/ria_toolkit_oss/annotations/threshold_qualifier.py new file mode 100644 index 0000000..e20bc0e --- /dev/null +++ b/src/ria_toolkit_oss/annotations/threshold_qualifier.py @@ -0,0 +1,114 @@ +import json +import warnings +from typing import Optional + +import numpy as np + +from ria_toolkit_oss.datatypes import Annotation, Recording + + +def _find_ranges(indices, window_size): + if len(indices) == 0: + return [] + + ranges = [] + start = indices[0] # Initialize the start of the first range + in_range = False # Track if we are currently in a valid range + + for i in range(1, len(indices)): + # Check if the current index is within `window_size` of the previous one + if indices[i] - indices[i - 1] <= window_size: + if not in_range: + # Start a new range + start = indices[i - 1] + in_range = True + else: + # If we were in a range, close it + if in_range: + ranges.append((start, indices[i - 1])) + in_range = False + + # Handle the last range if it is still open + if in_range: + ranges.append((start, indices[-1])) + + return ranges + + +def threshold_qualifier( + recording: Recording, + threshold: float, + window_size: Optional[int] = 1024, + label: Optional[str] = "signal", + annotation_type: Optional[str] = "standalone", +): + """ + Annotate a recording with bounding boxes labeled "signal" for regions above a threshold. + Threshold is defined as a fraction of the maximum sample magnitude. + This algorithm searches for samples above the threshold and combines them into ranges if they + are within window_size of each other. + + :param recording: The recording to annotate. + :type recording: utils.data.Recording + :param threshold: The threshold value, range 0-1. + The actual threshold value used is this number multiplied by the max sample magnitude. + :type threshold: float + :param window_size: The size of the sliding window. Defaults to 1024 samples. + :type window_size: int + :param label: The label of the output annotations. Defaults to "signal". + :type label: str + :param annotation_type: Annotation type (standalone, parallel, intersection). + :type annotation_type: str + + :returns: Recording with added annotations. + :rtype: Recording + """ + + sample_data = recording.data + if sample_data.shape[0] > 1: + warnings.warn( + "Warning: Multichannel recording input to threshold_qualifier. " + "Only the first channel will be considered by this algorithm." + ) + + # do the absolute value calculation once for efficiency + abs_channel_data = np.abs(sample_data[0]) + max_sample = np.max(abs_channel_data) + threshold_value = threshold * max_sample + indices = np.where(abs_channel_data > threshold_value)[0] + ranges = _find_ranges(indices=indices, window_size=window_size) + + annotations = [] + + # to input freq upper and lower based on center frequency and sample rate + sample_rate = recording.metadata["sample_rate"] + center_frequency = recording.metadata.get("center_frequency", 0) + + for range in ranges: + start, stop = range + + # Build comment JSON with type metadata + comment_data = { + "type": annotation_type, + "generator": "threshold_qualifier", + "params": { + "threshold": threshold, + "threshold_value": float(threshold_value), + "max_sample": float(max_sample), + "window_size": window_size, + }, + } + + anno = Annotation( + sample_start=start, + sample_count=stop - start, + freq_lower_edge=center_frequency - (sample_rate / 2), + freq_upper_edge=center_frequency + (sample_rate / 2), + label=label, + comment=json.dumps(comment_data), + detail={"generator": "threshold_qualifier"}, + ) + + annotations.append(anno) + + return Recording(data=recording.data, metadata=recording.metadata, annotations=recording.annotations + annotations)