From 93ae08bc91402f8387583d2bd4a93b75657d2969 Mon Sep 17 00:00:00 2001 From: muq Date: Mon, 20 Apr 2026 12:08:43 -0400 Subject: [PATCH] Port threshold_qualifier improvements and Pass 2 spillover fix from utils The OSS threshold_qualifier was last synced from utils on Feb 23 2026, before the major robustness improvements landed in utils on Mar 19 2026. This commit brings it fully up to date. Changes ported from utils: - Multi-pass detection (Pass 1 strong burst, Pass 2 weak residual, Pass 3 sustained faint burst via macro-window averaging) - Noise floor estimation via percentile instead of simple max*threshold - Dynamic range ratio guard (early exit on low-contrast captures) - Improved _find_ranges, _expand_and_filter_ranges, _merge_ranges helpers - Spectral smoothing in _estimate_spectral_bounds for wideband bursts - Minimum duration filter expressed in absolute time (5ms) not sample count Also includes the Pass 2 hysteresis spillover fix: - Pass 2 expansion now runs against residual_power (masked) instead of smoothed_power, preventing it from walking into Pass 1 territory - Pass 2 mask now has a window_size guard band around Pass 1 ranges, matching the guard already used in Pass 3 Only change from utils: import swapped to ria_toolkit_oss.datatypes. --- .../annotations/annotation_transforms.py | 2 +- .../annotations/cusum_annotator.py | 2 +- .../annotations/energy_detector.py | 2 +- .../annotations/parallel_signal_separator.py | 2 +- .../annotations/qualify_slice.py | 2 +- .../annotations/signal_isolation.py | 4 +- .../annotations/threshold_qualifier.py | 281 +++++++++++++----- .../ria_toolkit_oss/view.py | 7 +- 8 files changed, 226 insertions(+), 76 deletions(-) diff --git a/src/ria_toolkit_oss/annotations/annotation_transforms.py b/src/ria_toolkit_oss/annotations/annotation_transforms.py index af48465..47300c1 100644 --- a/src/ria_toolkit_oss/annotations/annotation_transforms.py +++ b/src/ria_toolkit_oss/annotations/annotation_transforms.py @@ -1,4 +1,4 @@ -from utils.data.annotation import Annotation +from ria_toolkit_oss.datatypes.annotation import Annotation # TODO figure out how to transfer labels in the merge case diff --git a/src/ria_toolkit_oss/annotations/cusum_annotator.py b/src/ria_toolkit_oss/annotations/cusum_annotator.py index a32162b..afd92cf 100644 --- a/src/ria_toolkit_oss/annotations/cusum_annotator.py +++ b/src/ria_toolkit_oss/annotations/cusum_annotator.py @@ -3,7 +3,7 @@ from typing import Optional import numpy as np -from utils.data import Annotation, Recording +from ria_toolkit_oss.datatypes import Annotation, Recording def annotate_with_cusum( diff --git a/src/ria_toolkit_oss/annotations/energy_detector.py b/src/ria_toolkit_oss/annotations/energy_detector.py index 6cc2466..0f8cc3e 100644 --- a/src/ria_toolkit_oss/annotations/energy_detector.py +++ b/src/ria_toolkit_oss/annotations/energy_detector.py @@ -11,7 +11,7 @@ from typing import Tuple import numpy as np from scipy.signal import filtfilt -from utils.data import Annotation, Recording +from ria_toolkit_oss.datatypes import Annotation, Recording def detect_signals_energy( diff --git a/src/ria_toolkit_oss/annotations/parallel_signal_separator.py b/src/ria_toolkit_oss/annotations/parallel_signal_separator.py index b75a28f..21b1956 100644 --- a/src/ria_toolkit_oss/annotations/parallel_signal_separator.py +++ b/src/ria_toolkit_oss/annotations/parallel_signal_separator.py @@ -55,7 +55,7 @@ import numpy as np from scipy import ndimage from scipy import signal as scipy_signal -from utils.data import Annotation, Recording +from ria_toolkit_oss.datatypes import Annotation, Recording def find_spectral_components( diff --git a/src/ria_toolkit_oss/annotations/qualify_slice.py b/src/ria_toolkit_oss/annotations/qualify_slice.py index 10ff369..2336fe5 100644 --- a/src/ria_toolkit_oss/annotations/qualify_slice.py +++ b/src/ria_toolkit_oss/annotations/qualify_slice.py @@ -1,6 +1,6 @@ import numpy as np -from utils.data import Recording +from ria_toolkit_oss.datatypes import Recording def qualify_slice_from_annotations(recording: Recording, slice_length: int): diff --git a/src/ria_toolkit_oss/annotations/signal_isolation.py b/src/ria_toolkit_oss/annotations/signal_isolation.py index 8d6c9ac..47852ae 100644 --- a/src/ria_toolkit_oss/annotations/signal_isolation.py +++ b/src/ria_toolkit_oss/annotations/signal_isolation.py @@ -1,8 +1,8 @@ import numpy as np from scipy.signal import butter, lfilter -from utils.data.annotation import Annotation -from utils.data.recording import Recording +from ria_toolkit_oss.datatypes.annotation import Annotation +from ria_toolkit_oss.datatypes.recording import Recording def isolate_signal(recording: Recording, annotation: Annotation) -> Recording: diff --git a/src/ria_toolkit_oss/annotations/threshold_qualifier.py b/src/ria_toolkit_oss/annotations/threshold_qualifier.py index 200c9e8..804c5e1 100644 --- a/src/ria_toolkit_oss/annotations/threshold_qualifier.py +++ b/src/ria_toolkit_oss/annotations/threshold_qualifier.py @@ -46,17 +46,17 @@ from typing import Optional import numpy as np -from utils.data import Annotation, Recording +from ria_toolkit_oss.datatypes import Annotation, Recording -def _find_ranges(indices, window_size): +def _find_ranges(indices, max_gap): """ Groups individual indices into continuous temporal ranges. Args: indices: Array of indices where the signal exceeded a threshold. - window_size: Maximum gap allowed between indices to consider them part - of the same range. + max_gap: Maximum gap allowed between indices to consider them part + of the same range. Returns: A list of (start, stop) tuples representing detected signal segments. @@ -65,38 +65,130 @@ def _find_ranges(indices, window_size): if len(indices) == 0: return [] + start = indices[0] + prev = indices[0] ranges = [] - start = indices[0] - in_range = False - for i in range(1, len(indices)): - # If the gap between current and previous index is within window_size, - # keep the range alive. - if indices[i] - indices[i - 1] <= window_size: - if not in_range: - # Start a new range - start = indices[i - 1] - in_range = True - else: - # Gap is too large; close the current range if one was active. - if in_range: - ranges.append((start, indices[i - 1])) - in_range = False + if indices[i] - prev > max_gap: + ranges.append((start, prev)) + start = indices[i] + prev = indices[i] - # Ensure the final segment is captured if the loop ends while in_range. - if in_range: - ranges.append((start, indices[-1])) + ranges.append((start, prev)) return ranges +def _expand_and_filter_ranges( + smoothed_power: np.ndarray, + initial_ranges: list[tuple[int, int]], + boundary_val: float, + min_duration_samples: int, +) -> list[tuple[int, int]]: + """Apply hysteresis expansion and minimum-duration filtering.""" + out: list[tuple[int, int]] = [] + n = len(smoothed_power) + for start, stop in initial_ranges: + if (stop - start) < min_duration_samples: + continue + + true_start = start + while true_start > 0 and smoothed_power[true_start] > boundary_val: + true_start -= 1 + + true_stop = stop + while true_stop < n - 1 and smoothed_power[true_stop] > boundary_val: + true_stop += 1 + + if (true_stop - true_start) >= min_duration_samples: + out.append((true_start, true_stop)) + return out + + +def _merge_ranges(ranges: list[tuple[int, int]], max_gap: int) -> list[tuple[int, int]]: + """Merge overlapping or near-adjacent ranges.""" + if not ranges: + return [] + ranges = sorted(ranges, key=lambda r: r[0]) + merged = [ranges[0]] + for s, e in ranges[1:]: + last_s, last_e = merged[-1] + if s <= last_e + max_gap: + merged[-1] = (last_s, max(last_e, e)) + else: + merged.append((s, e)) + return merged + + +def _estimate_noise_floor(power: np.ndarray, quantile: float = 20.0) -> float: + """Estimate baseline from the quieter portion of the envelope.""" + return float(np.percentile(power, quantile)) + + +def _estimate_group_gap(sample_rate: float) -> int: + """Use a fixed temporal grouping gap instead of reusing the smoothing window.""" + return max(1, int(0.001 * sample_rate)) + + +def _estimate_spectral_bounds(signal_segment: np.ndarray, sample_rate: float) -> tuple[float, float]: + """Estimate occupied bandwidth from a smoothed magnitude spectrum.""" + if len(signal_segment) == 0: + return -sample_rate / 4, sample_rate / 4 + + window = np.hanning(len(signal_segment)) + windowed = signal_segment * window + + fft_data = np.abs(np.fft.fftshift(np.fft.fft(windowed))) + fft_freqs = np.fft.fftshift(np.fft.fftfreq(len(signal_segment), 1 / sample_rate)) + + # Smooth the spectrum so noise-like wideband bursts form a contiguous mask + # instead of thousands of tiny isolated runs. + spectral_smooth_bins = max(5, min(257, (len(signal_segment) // 512) | 1)) + spectral_kernel = np.ones(spectral_smooth_bins, dtype=np.float64) / spectral_smooth_bins + smoothed_fft = np.convolve(fft_data, spectral_kernel, mode="same") + + spectral_floor = float(np.percentile(smoothed_fft, 20)) + spectral_peak = float(np.max(smoothed_fft)) + spectral_ratio = spectral_peak / max(spectral_floor, 1e-12) + + if spectral_ratio < 1.2: + return -sample_rate / 4, sample_rate / 4 + + spectral_thresh = spectral_floor + 0.1 * (spectral_peak - spectral_floor) + sig_indices = np.where(smoothed_fft > spectral_thresh)[0] + + if len(sig_indices) == 0: + peak_idx = int(np.argmax(smoothed_fft)) + bin_hz = sample_rate / len(signal_segment) + half_bins = max(1, int(np.ceil(10_000.0 / bin_hz))) + lo_idx = max(0, peak_idx - half_bins) + hi_idx = min(len(smoothed_fft) - 1, peak_idx + half_bins) + else: + runs = _find_ranges(sig_indices, max_gap=max(1, spectral_smooth_bins // 2)) + peak_idx = int(np.argmax(smoothed_fft)) + lo_idx, hi_idx = min( + runs, + key=lambda run: 0 if run[0] <= peak_idx <= run[1] else min(abs(run[0] - peak_idx), abs(run[1] - peak_idx)), + ) + + # Prevent extremely narrow tone boxes from collapsing to just a few bins. + min_total_bw_hz = 20_000.0 + min_half_bins = max(1, int(np.ceil((min_total_bw_hz / 2) / (sample_rate / len(signal_segment))))) + center_idx = int(round((lo_idx + hi_idx) / 2)) + lo_idx = max(0, min(lo_idx, center_idx - min_half_bins)) + hi_idx = min(len(smoothed_fft) - 1, max(hi_idx, center_idx + min_half_bins)) + + return float(fft_freqs[lo_idx]), float(fft_freqs[hi_idx]) + + def threshold_qualifier( recording: Recording, threshold: float, - window_size: Optional[int] = 1024, + window_size: Optional[int] = None, label: Optional[str] = None, annotation_type: Optional[str] = "standalone", + channel: int = 0, ) -> Recording: """ Annotate a recording with bounding boxes for regions above a threshold. @@ -114,78 +206,133 @@ def threshold_qualifier( Args: recording: The Recording object containing IQ or real signal data. threshold: Sensitivity multiplier (0.0 to 1.0) applied to max power. - window_size: Size of the smoothing filter and max gap for merging hits. + window_size: Size of the smoothing filter in samples. Defaults to 1ms worth of samples. label: Custom string label for annotations. annotation_type: Metadata string for the 'type' field in the annotation. + channel: Index of the channel to annotate. Defaults to 0. Returns: A new Recording object populated with detected Annotations. """ # Extract signal and metadata - sample_data = recording.data[0] + sample_data = recording.data[channel] sample_rate = recording.metadata["sample_rate"] center_frequency = recording.metadata.get("center_frequency", 0) + if window_size is None: + window_size = max(64, int(sample_rate * 0.001)) + # --- 1. SIGNAL CONDITIONING --- # Convert to power (Magnitude squared) power_data = np.abs(sample_data) ** 2 smoothing_window = np.ones(window_size) / window_size smoothed_power = np.convolve(power_data, smoothing_window, mode="same") + group_gap_samples = _estimate_group_gap(sample_rate) - # Define thresholds based on the global peak of the smoothed signal + # Define thresholds using peak relative to baseline. max_power = np.max(smoothed_power) - trigger_val = threshold * max_power # High threshold to trigger detection - boundary_val = (threshold / 2) * max_power # Low threshold to define signal edges + noise_floor = _estimate_noise_floor(smoothed_power) + dynamic_range_ratio = max_power / max(noise_floor, 1e-12) + + # Soft early exit: keep a guard for low-contrast noise, but compute it from + # the quieter tail of the envelope so burst-heavy captures are not rejected. + if dynamic_range_ratio < 1.5: + return Recording(data=recording.data, metadata=recording.metadata, annotations=recording.annotations) + + trigger_val = noise_floor + threshold * (max_power - noise_floor) + boundary_val = noise_floor + 0.5 * threshold * (max_power - noise_floor) # --- 2. INITIAL DETECTION --- - # Identify indices that strictly exceed the high trigger - indices = np.where(smoothed_power > trigger_val)[0] - initial_ranges = _find_ranges(indices=indices, window_size=window_size) - + # Enforce an explicit minimum duration in seconds; this is stable across + # varying capture lengths and avoids over-fitting to recording length. + min_duration_samples = max(1, int(0.005 * sample_rate)) annotations = [] - threshold_base = min(sample_rate, len(sample_data)) + # Pass 1: Detect stronger bursts. + indices = np.where(smoothed_power > trigger_val)[0] + pass1_initial = _find_ranges(indices=indices, max_gap=group_gap_samples) + pass1_ranges = _expand_and_filter_ranges( + smoothed_power=smoothed_power, + initial_ranges=pass1_initial, + boundary_val=boundary_val, + min_duration_samples=min_duration_samples, + ) - for start, stop in initial_ranges: - if (stop - start) < (threshold_base * 0.01): - continue + # Pass 2: Recover weaker bursts on residual power not already covered. + # This improves recall in mixed-amplitude captures. + # Expand each Pass-1 range by the smoothing window on both sides so the + # smoothing skirts of a strong burst are not re-detected as a weak burst + # immediately adjacent to it (mirrors the guard used in Pass 3). + mask = np.ones_like(smoothed_power, dtype=np.float32) + pass2_mask_expand = window_size + for s, e in pass1_ranges: + mask[max(0, s - pass2_mask_expand) : min(len(mask), e + pass2_mask_expand)] = 0.0 + residual_power = smoothed_power * mask - # --- 3. HYSTERESIS (Boundary Expansion) --- - # Search backward from 'start' until power drops below the low boundary_val - true_start = start - while true_start > 0 and smoothed_power[true_start] > boundary_val: - true_start -= 1 + residual_max = float(np.max(residual_power)) + residual_ratio = residual_max / max(noise_floor, 1e-12) - # Search forward from 'stop' until power drops below the low boundary_val - true_stop = stop - while true_stop < len(smoothed_power) - 1 and smoothed_power[true_stop] > boundary_val: - true_stop += 1 + pass2_ranges: list[tuple[int, int]] = [] + if residual_ratio >= 2.0: + weak_threshold = max(0.3, threshold * 0.7) + weak_trigger = noise_floor + weak_threshold * (residual_max - noise_floor) + weak_boundary = noise_floor + 0.5 * weak_threshold * (residual_max - noise_floor) + weak_indices = np.where(residual_power > weak_trigger)[0] + pass2_initial = _find_ranges(indices=weak_indices, max_gap=group_gap_samples) + pass2_ranges = _expand_and_filter_ranges( + smoothed_power=residual_power, + initial_ranges=pass2_initial, + boundary_val=weak_boundary, + min_duration_samples=min_duration_samples, + ) + + # Pass 3: Detect sustained faint bursts via macro-window averaging. + # Targets bursts whose peak power is near the trigger level but whose + # *average* power is consistently elevated above the noise floor — these + # are missed by peak-based detection because only a few short spikes exceed + # the trigger, all too brief to pass the minimum-duration filter. + # + # The mask is applied to power_data *before* convolving so that bright + # burst energy does not bleed through the long window into adjacent regions, + # which would inflate macro_residual_max and push the trigger above the + # faint burst's average power. + macro_window_size = max(window_size * 16, int(sample_rate * 0.02)) + macro_kernel = np.ones(macro_window_size, dtype=np.float64) / macro_window_size + # Expand each annotated range by half the macro window on both sides so that + # the long convolution cannot "see" the leading/trailing edges of already- + # annotated bursts, which would produce spurious short fragments in Pass 3. + macro_expand = macro_window_size * 2 + masked_power_for_macro = power_data.copy() + n = len(masked_power_for_macro) + for s, e in pass1_ranges + pass2_ranges: + masked_power_for_macro[max(0, s - macro_expand) : min(n, e + macro_expand)] = 0.0 + macro_residual = np.convolve(masked_power_for_macro, macro_kernel, mode="same") + + macro_residual_max = float(np.max(macro_residual)) + + pass3_ranges: list[tuple[int, int]] = [] + if macro_residual_max / max(noise_floor, 1e-12) >= 1.3: + macro_trigger = noise_floor + threshold * (macro_residual_max - noise_floor) + macro_boundary = noise_floor + 0.5 * threshold * (macro_residual_max - noise_floor) + macro_indices = np.where(macro_residual > macro_trigger)[0] + macro_initial = _find_ranges(indices=macro_indices, max_gap=group_gap_samples) + pass3_ranges = _expand_and_filter_ranges( + smoothed_power=macro_residual, + initial_ranges=macro_initial, + boundary_val=macro_boundary, + min_duration_samples=min_duration_samples, + ) + + all_ranges = _merge_ranges(pass1_ranges + pass2_ranges + pass3_ranges, max_gap=group_gap_samples) + + for true_start, true_stop in all_ranges: # --- 4. SPECTRAL ANALYSIS (Frequency Detection) --- signal_segment = sample_data[true_start:true_stop] - if len(signal_segment) > 0: - fft_data = np.abs(np.fft.fftshift(np.fft.fft(signal_segment))) - fft_freqs = np.fft.fftshift(np.fft.fftfreq(len(signal_segment), 1 / sample_rate)) - - # Determine frequency bounds where spectral energy is > 15% of segment peak - spectral_thresh = np.max(fft_data) * 0.15 - sig_indices = np.where(fft_data > spectral_thresh)[0] - - # Ensure the signal has some spectral width before annotating - if len(sig_indices) < 5: - continue - - if len(sig_indices) > 0: - f_min, f_max = fft_freqs[sig_indices[0]], fft_freqs[sig_indices[-1]] - else: - # Default to middle half of bandwidth if no clear peaks found - f_min, f_max = -sample_rate / 4, sample_rate / 4 - else: - f_min, f_max = -sample_rate / 4, sample_rate / 4 + f_min, f_max = _estimate_spectral_bounds(signal_segment, sample_rate) # --- 5. ANNOTATION GENERATION --- - if label is None: - label = f"{int(threshold*100)}%" + ann_label = label if label is not None else f"{int(threshold*100)}%" # Pack metadata for the UI/Downstream processing comment_data = { @@ -202,7 +349,7 @@ def threshold_qualifier( sample_count=true_stop - true_start, freq_lower_edge=center_frequency + f_min, freq_upper_edge=center_frequency + f_max, - label=label, + label=ann_label, comment=json.dumps(comment_data), detail={"generator": "hysteresis_qualifier"}, ) diff --git a/src/ria_toolkit_oss_cli/ria_toolkit_oss/view.py b/src/ria_toolkit_oss_cli/ria_toolkit_oss/view.py index 9c67e80..438cc29 100644 --- a/src/ria_toolkit_oss_cli/ria_toolkit_oss/view.py +++ b/src/ria_toolkit_oss_cli/ria_toolkit_oss/view.py @@ -34,7 +34,7 @@ VISUALIZATION_TYPES = { "spines", ], }, - "annotations": { + "annotations": { "function": view_annotations, "description": "Annotation-focused spectrogram view", "options": ["channel", "dark"], @@ -199,7 +199,7 @@ def print_metadata(recording, quiet): @click.option( "--type", "viz_type", - type=click.Choice(list(VISUALIZATION_TYPES.keys())), + type=click.Choice(list(VISUALIZATION_TYPES.keys()) + ["annotate", "annotation"]), default="simple", show_default=True, help="Visualization type", @@ -302,6 +302,9 @@ def view( # Legacy NPY file ria view old_capture.npy --legacy --type simple """ + if viz_type in ["annotate", "annotation"]: + viz_type = "annotations" + # Load config file if specified if config: _ = load_yaml_config(config)