""" Temporal signal detection and boundary refinement via Hysteresis Thresholding. Provides methods to detect signal bursts in the time domain by triggering on smoothed power peaks and expanding boundaries to capture the full energy envelope. This module implements a **dual-threshold trigger** to solve the 'chatter' problem in noisy environments, ensuring that signal annotations encapsulate the entire rise and fall of a burst rather than just the peak. **Key Design Decisions**: 1. **Hysteresis Logic (Dual-Threshold)**: - **Trigger**: High threshold (`threshold * max_power`) ensures high confidence in signal presence. - **Boundary**: Low threshold (`0.5 * trigger`) allows the annotation to "crawl" outward, capturing the lower-energy start and end of the burst often missed by simple single-threshold detectors. 2. **Temporal Smoothing**: Uses a moving average window (`window_size`) prior - to thresholding. This prevents high-frequency noise spikes from causing fragmented annotations and provides a more stable estimate of the signal's power envelope. 3. **Spectral Profiling**: Once a temporal segment is isolated, the module - performs an automated FFT analysis. It identifies the **90% spectral occupancy** to define the frequency boundaries (`f_min`, `f_max`), allowing the detector to work on narrowband and wideband signals without manual frequency tuning. 4. **Baseband/RF Mapping**: Automatically handles the conversion from - relative FFT bin frequencies to absolute RF frequencies by referencing `recording.metadata["center_frequency"]`. 5. **False Positive Mitigation**: Implements a hard minimum duration check - (10ms) to ignore transient hardware spikes or noise floor fluctuations that do not constitute a valid signal burst. The module is designed to be the primary "first-pass" detector for pulsed waveforms (like ADS-B, Lora, or bursty FSK) before passing them to classification or demodulation stages. """ import json from typing import Optional import numpy as np from utils.data import Annotation, Recording def _find_ranges(indices, window_size): """ Groups individual indices into continuous temporal ranges. Args: indices: Array of indices where the signal exceeded a threshold. window_size: Maximum gap allowed between indices to consider them part of the same range. Returns: A list of (start, stop) tuples representing detected signal segments. """ if len(indices) == 0: return [] ranges = [] start = indices[0] in_range = False for i in range(1, len(indices)): # If the gap between current and previous index is within window_size, # keep the range alive. if indices[i] - indices[i - 1] <= window_size: if not in_range: # Start a new range start = indices[i - 1] in_range = True else: # Gap is too large; close the current range if one was active. if in_range: ranges.append((start, indices[i - 1])) in_range = False # Ensure the final segment is captured if the loop ends while in_range. if in_range: ranges.append((start, indices[-1])) return ranges def threshold_qualifier( recording: Recording, threshold: float, window_size: Optional[int] = 1024, label: Optional[str] = None, annotation_type: Optional[str] = "standalone", ) -> Recording: """ Annotate a recording with bounding boxes for regions above a threshold. Threshold is defined as a fraction of the maximum sample magnitude. This algorithm searches for samples above the threshold and combines them into ranges if they are within window_size of each other. Detects and annotates signals using energy thresholding and spectral analysis. The algorithm follows these steps: 1. Smooths power data using a moving average. 2. Identifies 'peak' regions exceeding a high trigger threshold. 3. Uses hysteresis to expand boundaries until power drops below a lower threshold. 4. Performs an FFT on each segment to determine frequency occupancy. Args: recording: The Recording object containing IQ or real signal data. threshold: Sensitivity multiplier (0.0 to 1.0) applied to max power. window_size: Size of the smoothing filter and max gap for merging hits. label: Custom string label for annotations. annotation_type: Metadata string for the 'type' field in the annotation. Returns: A new Recording object populated with detected Annotations. """ # Extract signal and metadata sample_data = recording.data[0] sample_rate = recording.metadata["sample_rate"] center_frequency = recording.metadata.get("center_frequency", 0) # --- 1. SIGNAL CONDITIONING --- # Convert to power (Magnitude squared) power_data = np.abs(sample_data) ** 2 smoothing_window = np.ones(window_size) / window_size smoothed_power = np.convolve(power_data, smoothing_window, mode="same") # Define thresholds based on the global peak of the smoothed signal max_power = np.max(smoothed_power) trigger_val = threshold * max_power # High threshold to trigger detection boundary_val = (threshold / 2) * max_power # Low threshold to define signal edges # --- 2. INITIAL DETECTION --- # Identify indices that strictly exceed the high trigger indices = np.where(smoothed_power > trigger_val)[0] initial_ranges = _find_ranges(indices=indices, window_size=window_size) annotations = [] threshold_base = min(sample_rate, len(sample_data)) for start, stop in initial_ranges: if (stop - start) < (threshold_base * 0.01): continue # --- 3. HYSTERESIS (Boundary Expansion) --- # Search backward from 'start' until power drops below the low boundary_val true_start = start while true_start > 0 and smoothed_power[true_start] > boundary_val: true_start -= 1 # Search forward from 'stop' until power drops below the low boundary_val true_stop = stop while true_stop < len(smoothed_power) - 1 and smoothed_power[true_stop] > boundary_val: true_stop += 1 # --- 4. SPECTRAL ANALYSIS (Frequency Detection) --- signal_segment = sample_data[true_start:true_stop] if len(signal_segment) > 0: fft_data = np.abs(np.fft.fftshift(np.fft.fft(signal_segment))) fft_freqs = np.fft.fftshift(np.fft.fftfreq(len(signal_segment), 1 / sample_rate)) # Determine frequency bounds where spectral energy is > 15% of segment peak spectral_thresh = np.max(fft_data) * 0.15 sig_indices = np.where(fft_data > spectral_thresh)[0] # Ensure the signal has some spectral width before annotating if len(sig_indices) < 5: continue if len(sig_indices) > 0: f_min, f_max = fft_freqs[sig_indices[0]], fft_freqs[sig_indices[-1]] else: # Default to middle half of bandwidth if no clear peaks found f_min, f_max = -sample_rate / 4, sample_rate / 4 else: f_min, f_max = -sample_rate / 4, sample_rate / 4 # --- 5. ANNOTATION GENERATION --- if label is None: label = f"{int(threshold*100)}%" # Pack metadata for the UI/Downstream processing comment_data = { "type": annotation_type, "generator": "threshold_qualifier", "params": { "threshold": threshold, "window_size": window_size, }, } anno = Annotation( sample_start=true_start, sample_count=true_stop - true_start, freq_lower_edge=center_frequency + f_min, freq_upper_edge=center_frequency + f_max, label=label, comment=json.dumps(comment_data), detail={"generator": "hysteresis_qualifier"}, ) annotations.append(anno) # Return a new Recording object including the new annotations return Recording(data=recording.data, metadata=recording.metadata, annotations=recording.annotations + annotations)