diff --git a/src/ria_toolkit_oss/annotations/annotation_transforms.py b/src/ria_toolkit_oss/annotations/annotation_transforms.py index af48465..47300c1 100644 --- a/src/ria_toolkit_oss/annotations/annotation_transforms.py +++ b/src/ria_toolkit_oss/annotations/annotation_transforms.py @@ -1,4 +1,4 @@ -from utils.data.annotation import Annotation +from ria_toolkit_oss.datatypes.annotation import Annotation # TODO figure out how to transfer labels in the merge case diff --git a/src/ria_toolkit_oss/annotations/cusum_annotator.py b/src/ria_toolkit_oss/annotations/cusum_annotator.py index a32162b..afd92cf 100644 --- a/src/ria_toolkit_oss/annotations/cusum_annotator.py +++ b/src/ria_toolkit_oss/annotations/cusum_annotator.py @@ -3,7 +3,7 @@ from typing import Optional import numpy as np -from utils.data import Annotation, Recording +from ria_toolkit_oss.datatypes import Annotation, Recording def annotate_with_cusum( diff --git a/src/ria_toolkit_oss/annotations/energy_detector.py b/src/ria_toolkit_oss/annotations/energy_detector.py index 6cc2466..0f8cc3e 100644 --- a/src/ria_toolkit_oss/annotations/energy_detector.py +++ b/src/ria_toolkit_oss/annotations/energy_detector.py @@ -11,7 +11,7 @@ from typing import Tuple import numpy as np from scipy.signal import filtfilt -from utils.data import Annotation, Recording +from ria_toolkit_oss.datatypes import Annotation, Recording def detect_signals_energy( diff --git a/src/ria_toolkit_oss/annotations/parallel_signal_separator.py b/src/ria_toolkit_oss/annotations/parallel_signal_separator.py index b75a28f..21b1956 100644 --- a/src/ria_toolkit_oss/annotations/parallel_signal_separator.py +++ b/src/ria_toolkit_oss/annotations/parallel_signal_separator.py @@ -55,7 +55,7 @@ import numpy as np from scipy import ndimage from scipy import signal as scipy_signal -from utils.data import Annotation, Recording +from ria_toolkit_oss.datatypes import Annotation, Recording def find_spectral_components( diff --git a/src/ria_toolkit_oss/annotations/qualify_slice.py b/src/ria_toolkit_oss/annotations/qualify_slice.py index 10ff369..2336fe5 100644 --- a/src/ria_toolkit_oss/annotations/qualify_slice.py +++ b/src/ria_toolkit_oss/annotations/qualify_slice.py @@ -1,6 +1,6 @@ import numpy as np -from utils.data import Recording +from ria_toolkit_oss.datatypes import Recording def qualify_slice_from_annotations(recording: Recording, slice_length: int): diff --git a/src/ria_toolkit_oss/annotations/signal_isolation.py b/src/ria_toolkit_oss/annotations/signal_isolation.py index 8d6c9ac..47852ae 100644 --- a/src/ria_toolkit_oss/annotations/signal_isolation.py +++ b/src/ria_toolkit_oss/annotations/signal_isolation.py @@ -1,8 +1,8 @@ import numpy as np from scipy.signal import butter, lfilter -from utils.data.annotation import Annotation -from utils.data.recording import Recording +from ria_toolkit_oss.datatypes.annotation import Annotation +from ria_toolkit_oss.datatypes.recording import Recording def isolate_signal(recording: Recording, annotation: Annotation) -> Recording: diff --git a/src/ria_toolkit_oss/annotations/threshold_qualifier.py b/src/ria_toolkit_oss/annotations/threshold_qualifier.py index 200c9e8..804c5e1 100644 --- a/src/ria_toolkit_oss/annotations/threshold_qualifier.py +++ b/src/ria_toolkit_oss/annotations/threshold_qualifier.py @@ -46,17 +46,17 @@ from typing import Optional import numpy as np -from utils.data import Annotation, Recording +from ria_toolkit_oss.datatypes import Annotation, Recording -def _find_ranges(indices, window_size): +def _find_ranges(indices, max_gap): """ Groups individual indices into continuous temporal ranges. Args: indices: Array of indices where the signal exceeded a threshold. - window_size: Maximum gap allowed between indices to consider them part - of the same range. + max_gap: Maximum gap allowed between indices to consider them part + of the same range. Returns: A list of (start, stop) tuples representing detected signal segments. @@ -65,38 +65,130 @@ def _find_ranges(indices, window_size): if len(indices) == 0: return [] + start = indices[0] + prev = indices[0] ranges = [] - start = indices[0] - in_range = False - for i in range(1, len(indices)): - # If the gap between current and previous index is within window_size, - # keep the range alive. - if indices[i] - indices[i - 1] <= window_size: - if not in_range: - # Start a new range - start = indices[i - 1] - in_range = True - else: - # Gap is too large; close the current range if one was active. - if in_range: - ranges.append((start, indices[i - 1])) - in_range = False + if indices[i] - prev > max_gap: + ranges.append((start, prev)) + start = indices[i] + prev = indices[i] - # Ensure the final segment is captured if the loop ends while in_range. - if in_range: - ranges.append((start, indices[-1])) + ranges.append((start, prev)) return ranges +def _expand_and_filter_ranges( + smoothed_power: np.ndarray, + initial_ranges: list[tuple[int, int]], + boundary_val: float, + min_duration_samples: int, +) -> list[tuple[int, int]]: + """Apply hysteresis expansion and minimum-duration filtering.""" + out: list[tuple[int, int]] = [] + n = len(smoothed_power) + for start, stop in initial_ranges: + if (stop - start) < min_duration_samples: + continue + + true_start = start + while true_start > 0 and smoothed_power[true_start] > boundary_val: + true_start -= 1 + + true_stop = stop + while true_stop < n - 1 and smoothed_power[true_stop] > boundary_val: + true_stop += 1 + + if (true_stop - true_start) >= min_duration_samples: + out.append((true_start, true_stop)) + return out + + +def _merge_ranges(ranges: list[tuple[int, int]], max_gap: int) -> list[tuple[int, int]]: + """Merge overlapping or near-adjacent ranges.""" + if not ranges: + return [] + ranges = sorted(ranges, key=lambda r: r[0]) + merged = [ranges[0]] + for s, e in ranges[1:]: + last_s, last_e = merged[-1] + if s <= last_e + max_gap: + merged[-1] = (last_s, max(last_e, e)) + else: + merged.append((s, e)) + return merged + + +def _estimate_noise_floor(power: np.ndarray, quantile: float = 20.0) -> float: + """Estimate baseline from the quieter portion of the envelope.""" + return float(np.percentile(power, quantile)) + + +def _estimate_group_gap(sample_rate: float) -> int: + """Use a fixed temporal grouping gap instead of reusing the smoothing window.""" + return max(1, int(0.001 * sample_rate)) + + +def _estimate_spectral_bounds(signal_segment: np.ndarray, sample_rate: float) -> tuple[float, float]: + """Estimate occupied bandwidth from a smoothed magnitude spectrum.""" + if len(signal_segment) == 0: + return -sample_rate / 4, sample_rate / 4 + + window = np.hanning(len(signal_segment)) + windowed = signal_segment * window + + fft_data = np.abs(np.fft.fftshift(np.fft.fft(windowed))) + fft_freqs = np.fft.fftshift(np.fft.fftfreq(len(signal_segment), 1 / sample_rate)) + + # Smooth the spectrum so noise-like wideband bursts form a contiguous mask + # instead of thousands of tiny isolated runs. + spectral_smooth_bins = max(5, min(257, (len(signal_segment) // 512) | 1)) + spectral_kernel = np.ones(spectral_smooth_bins, dtype=np.float64) / spectral_smooth_bins + smoothed_fft = np.convolve(fft_data, spectral_kernel, mode="same") + + spectral_floor = float(np.percentile(smoothed_fft, 20)) + spectral_peak = float(np.max(smoothed_fft)) + spectral_ratio = spectral_peak / max(spectral_floor, 1e-12) + + if spectral_ratio < 1.2: + return -sample_rate / 4, sample_rate / 4 + + spectral_thresh = spectral_floor + 0.1 * (spectral_peak - spectral_floor) + sig_indices = np.where(smoothed_fft > spectral_thresh)[0] + + if len(sig_indices) == 0: + peak_idx = int(np.argmax(smoothed_fft)) + bin_hz = sample_rate / len(signal_segment) + half_bins = max(1, int(np.ceil(10_000.0 / bin_hz))) + lo_idx = max(0, peak_idx - half_bins) + hi_idx = min(len(smoothed_fft) - 1, peak_idx + half_bins) + else: + runs = _find_ranges(sig_indices, max_gap=max(1, spectral_smooth_bins // 2)) + peak_idx = int(np.argmax(smoothed_fft)) + lo_idx, hi_idx = min( + runs, + key=lambda run: 0 if run[0] <= peak_idx <= run[1] else min(abs(run[0] - peak_idx), abs(run[1] - peak_idx)), + ) + + # Prevent extremely narrow tone boxes from collapsing to just a few bins. + min_total_bw_hz = 20_000.0 + min_half_bins = max(1, int(np.ceil((min_total_bw_hz / 2) / (sample_rate / len(signal_segment))))) + center_idx = int(round((lo_idx + hi_idx) / 2)) + lo_idx = max(0, min(lo_idx, center_idx - min_half_bins)) + hi_idx = min(len(smoothed_fft) - 1, max(hi_idx, center_idx + min_half_bins)) + + return float(fft_freqs[lo_idx]), float(fft_freqs[hi_idx]) + + def threshold_qualifier( recording: Recording, threshold: float, - window_size: Optional[int] = 1024, + window_size: Optional[int] = None, label: Optional[str] = None, annotation_type: Optional[str] = "standalone", + channel: int = 0, ) -> Recording: """ Annotate a recording with bounding boxes for regions above a threshold. @@ -114,78 +206,133 @@ def threshold_qualifier( Args: recording: The Recording object containing IQ or real signal data. threshold: Sensitivity multiplier (0.0 to 1.0) applied to max power. - window_size: Size of the smoothing filter and max gap for merging hits. + window_size: Size of the smoothing filter in samples. Defaults to 1ms worth of samples. label: Custom string label for annotations. annotation_type: Metadata string for the 'type' field in the annotation. + channel: Index of the channel to annotate. Defaults to 0. Returns: A new Recording object populated with detected Annotations. """ # Extract signal and metadata - sample_data = recording.data[0] + sample_data = recording.data[channel] sample_rate = recording.metadata["sample_rate"] center_frequency = recording.metadata.get("center_frequency", 0) + if window_size is None: + window_size = max(64, int(sample_rate * 0.001)) + # --- 1. SIGNAL CONDITIONING --- # Convert to power (Magnitude squared) power_data = np.abs(sample_data) ** 2 smoothing_window = np.ones(window_size) / window_size smoothed_power = np.convolve(power_data, smoothing_window, mode="same") + group_gap_samples = _estimate_group_gap(sample_rate) - # Define thresholds based on the global peak of the smoothed signal + # Define thresholds using peak relative to baseline. max_power = np.max(smoothed_power) - trigger_val = threshold * max_power # High threshold to trigger detection - boundary_val = (threshold / 2) * max_power # Low threshold to define signal edges + noise_floor = _estimate_noise_floor(smoothed_power) + dynamic_range_ratio = max_power / max(noise_floor, 1e-12) + + # Soft early exit: keep a guard for low-contrast noise, but compute it from + # the quieter tail of the envelope so burst-heavy captures are not rejected. + if dynamic_range_ratio < 1.5: + return Recording(data=recording.data, metadata=recording.metadata, annotations=recording.annotations) + + trigger_val = noise_floor + threshold * (max_power - noise_floor) + boundary_val = noise_floor + 0.5 * threshold * (max_power - noise_floor) # --- 2. INITIAL DETECTION --- - # Identify indices that strictly exceed the high trigger - indices = np.where(smoothed_power > trigger_val)[0] - initial_ranges = _find_ranges(indices=indices, window_size=window_size) - + # Enforce an explicit minimum duration in seconds; this is stable across + # varying capture lengths and avoids over-fitting to recording length. + min_duration_samples = max(1, int(0.005 * sample_rate)) annotations = [] - threshold_base = min(sample_rate, len(sample_data)) + # Pass 1: Detect stronger bursts. + indices = np.where(smoothed_power > trigger_val)[0] + pass1_initial = _find_ranges(indices=indices, max_gap=group_gap_samples) + pass1_ranges = _expand_and_filter_ranges( + smoothed_power=smoothed_power, + initial_ranges=pass1_initial, + boundary_val=boundary_val, + min_duration_samples=min_duration_samples, + ) - for start, stop in initial_ranges: - if (stop - start) < (threshold_base * 0.01): - continue + # Pass 2: Recover weaker bursts on residual power not already covered. + # This improves recall in mixed-amplitude captures. + # Expand each Pass-1 range by the smoothing window on both sides so the + # smoothing skirts of a strong burst are not re-detected as a weak burst + # immediately adjacent to it (mirrors the guard used in Pass 3). + mask = np.ones_like(smoothed_power, dtype=np.float32) + pass2_mask_expand = window_size + for s, e in pass1_ranges: + mask[max(0, s - pass2_mask_expand) : min(len(mask), e + pass2_mask_expand)] = 0.0 + residual_power = smoothed_power * mask - # --- 3. HYSTERESIS (Boundary Expansion) --- - # Search backward from 'start' until power drops below the low boundary_val - true_start = start - while true_start > 0 and smoothed_power[true_start] > boundary_val: - true_start -= 1 + residual_max = float(np.max(residual_power)) + residual_ratio = residual_max / max(noise_floor, 1e-12) - # Search forward from 'stop' until power drops below the low boundary_val - true_stop = stop - while true_stop < len(smoothed_power) - 1 and smoothed_power[true_stop] > boundary_val: - true_stop += 1 + pass2_ranges: list[tuple[int, int]] = [] + if residual_ratio >= 2.0: + weak_threshold = max(0.3, threshold * 0.7) + weak_trigger = noise_floor + weak_threshold * (residual_max - noise_floor) + weak_boundary = noise_floor + 0.5 * weak_threshold * (residual_max - noise_floor) + weak_indices = np.where(residual_power > weak_trigger)[0] + pass2_initial = _find_ranges(indices=weak_indices, max_gap=group_gap_samples) + pass2_ranges = _expand_and_filter_ranges( + smoothed_power=residual_power, + initial_ranges=pass2_initial, + boundary_val=weak_boundary, + min_duration_samples=min_duration_samples, + ) + + # Pass 3: Detect sustained faint bursts via macro-window averaging. + # Targets bursts whose peak power is near the trigger level but whose + # *average* power is consistently elevated above the noise floor — these + # are missed by peak-based detection because only a few short spikes exceed + # the trigger, all too brief to pass the minimum-duration filter. + # + # The mask is applied to power_data *before* convolving so that bright + # burst energy does not bleed through the long window into adjacent regions, + # which would inflate macro_residual_max and push the trigger above the + # faint burst's average power. + macro_window_size = max(window_size * 16, int(sample_rate * 0.02)) + macro_kernel = np.ones(macro_window_size, dtype=np.float64) / macro_window_size + # Expand each annotated range by half the macro window on both sides so that + # the long convolution cannot "see" the leading/trailing edges of already- + # annotated bursts, which would produce spurious short fragments in Pass 3. + macro_expand = macro_window_size * 2 + masked_power_for_macro = power_data.copy() + n = len(masked_power_for_macro) + for s, e in pass1_ranges + pass2_ranges: + masked_power_for_macro[max(0, s - macro_expand) : min(n, e + macro_expand)] = 0.0 + macro_residual = np.convolve(masked_power_for_macro, macro_kernel, mode="same") + + macro_residual_max = float(np.max(macro_residual)) + + pass3_ranges: list[tuple[int, int]] = [] + if macro_residual_max / max(noise_floor, 1e-12) >= 1.3: + macro_trigger = noise_floor + threshold * (macro_residual_max - noise_floor) + macro_boundary = noise_floor + 0.5 * threshold * (macro_residual_max - noise_floor) + macro_indices = np.where(macro_residual > macro_trigger)[0] + macro_initial = _find_ranges(indices=macro_indices, max_gap=group_gap_samples) + pass3_ranges = _expand_and_filter_ranges( + smoothed_power=macro_residual, + initial_ranges=macro_initial, + boundary_val=macro_boundary, + min_duration_samples=min_duration_samples, + ) + + all_ranges = _merge_ranges(pass1_ranges + pass2_ranges + pass3_ranges, max_gap=group_gap_samples) + + for true_start, true_stop in all_ranges: # --- 4. SPECTRAL ANALYSIS (Frequency Detection) --- signal_segment = sample_data[true_start:true_stop] - if len(signal_segment) > 0: - fft_data = np.abs(np.fft.fftshift(np.fft.fft(signal_segment))) - fft_freqs = np.fft.fftshift(np.fft.fftfreq(len(signal_segment), 1 / sample_rate)) - - # Determine frequency bounds where spectral energy is > 15% of segment peak - spectral_thresh = np.max(fft_data) * 0.15 - sig_indices = np.where(fft_data > spectral_thresh)[0] - - # Ensure the signal has some spectral width before annotating - if len(sig_indices) < 5: - continue - - if len(sig_indices) > 0: - f_min, f_max = fft_freqs[sig_indices[0]], fft_freqs[sig_indices[-1]] - else: - # Default to middle half of bandwidth if no clear peaks found - f_min, f_max = -sample_rate / 4, sample_rate / 4 - else: - f_min, f_max = -sample_rate / 4, sample_rate / 4 + f_min, f_max = _estimate_spectral_bounds(signal_segment, sample_rate) # --- 5. ANNOTATION GENERATION --- - if label is None: - label = f"{int(threshold*100)}%" + ann_label = label if label is not None else f"{int(threshold*100)}%" # Pack metadata for the UI/Downstream processing comment_data = { @@ -202,7 +349,7 @@ def threshold_qualifier( sample_count=true_stop - true_start, freq_lower_edge=center_frequency + f_min, freq_upper_edge=center_frequency + f_max, - label=label, + label=ann_label, comment=json.dumps(comment_data), detail={"generator": "hysteresis_qualifier"}, ) diff --git a/src/ria_toolkit_oss_cli/ria_toolkit_oss/view.py b/src/ria_toolkit_oss_cli/ria_toolkit_oss/view.py index 9c67e80..438cc29 100644 --- a/src/ria_toolkit_oss_cli/ria_toolkit_oss/view.py +++ b/src/ria_toolkit_oss_cli/ria_toolkit_oss/view.py @@ -34,7 +34,7 @@ VISUALIZATION_TYPES = { "spines", ], }, - "annotations": { + "annotations": { "function": view_annotations, "description": "Annotation-focused spectrogram view", "options": ["channel", "dark"], @@ -199,7 +199,7 @@ def print_metadata(recording, quiet): @click.option( "--type", "viz_type", - type=click.Choice(list(VISUALIZATION_TYPES.keys())), + type=click.Choice(list(VISUALIZATION_TYPES.keys()) + ["annotate", "annotation"]), default="simple", show_default=True, help="Visualization type", @@ -302,6 +302,9 @@ def view( # Legacy NPY file ria view old_capture.npy --legacy --type simple """ + if viz_type in ["annotate", "annotation"]: + viz_type = "annotations" + # Load config file if specified if config: _ = load_yaml_config(config)