M
muq
93ae08bc91
The OSS threshold_qualifier was last synced from utils on Feb 23 2026, before the major robustness improvements landed in utils on Mar 19 2026. This commit brings it fully up to date. Changes ported from utils: - Multi-pass detection (Pass 1 strong burst, Pass 2 weak residual, Pass 3 sustained faint burst via macro-window averaging) - Noise floor estimation via percentile instead of simple max*threshold - Dynamic range ratio guard (early exit on low-contrast captures) - Improved _find_ranges, _expand_and_filter_ranges, _merge_ranges helpers - Spectral smoothing in _estimate_spectral_bounds for wideband bursts - Minimum duration filter expressed in absolute time (5ms) not sample count Also includes the Pass 2 hysteresis spillover fix: - Pass 2 expansion now runs against residual_power (masked) instead of smoothed_power, preventing it from walking into Pass 1 territory - Pass 2 mask now has a window_size guard band around Pass 1 ranges, matching the guard already used in Pass 3 Only change from utils: import swapped to ria_toolkit_oss.datatypes.
439 lines
16 KiB
Python
439 lines
16 KiB
Python
"""
|
|
Energy-based signal detection and bandwidth analysis.
|
|
|
|
Provides automatic annotation generation using energy-based signal detection
|
|
and occupied bandwidth calculation following ITU-R SM.328 standard.
|
|
"""
|
|
|
|
import json
|
|
from typing import Tuple
|
|
|
|
import numpy as np
|
|
from scipy.signal import filtfilt
|
|
|
|
from ria_toolkit_oss.datatypes import Annotation, Recording
|
|
|
|
|
|
def detect_signals_energy(
|
|
recording: Recording,
|
|
k: int = 10,
|
|
threshold_factor: float = 1.2,
|
|
window_size: int = 200,
|
|
min_distance: int = 5000,
|
|
label: str = "signal",
|
|
annotation_type: str = "standalone",
|
|
freq_method: str = "nbw",
|
|
nfft: int = None,
|
|
obw_power: float = 0.99,
|
|
) -> Recording:
|
|
"""
|
|
Detect signal bursts using energy-based method with adaptive noise floor estimation.
|
|
|
|
This algorithm smooths the signal with a moving average filter, estimates the noise
|
|
floor from k segments, applies a threshold to detect regions above noise, and merges
|
|
nearby detections. Detected time boundaries are then assigned frequency bounds based
|
|
on the selected frequency method.
|
|
|
|
Time Detection Algorithm:
|
|
1. Smooth signal using moving average (envelope detection)
|
|
2. Divide smoothed signal into k segments
|
|
3. Estimate noise floor as median of segment mean powers
|
|
4. Detect regions where power exceeds threshold_factor * noise_floor
|
|
5. Merge regions closer than min_distance samples
|
|
|
|
Frequency Bounding (freq_method):
|
|
- 'nbw': Nominal bandwidth (OBW + center frequency) - DEFAULT
|
|
- 'obw': Occupied bandwidth (99.99% power, includes siedelobes)
|
|
- 'full-detected': Lowest to highest spectral component
|
|
- 'full-bandwidth': Entire Nyquist span (center_freq ± sample_rate/2)
|
|
|
|
:param recording: Recording to analyze
|
|
:type recording: Recording
|
|
:param k: Number of segments for noise floor estimation (default: 10)
|
|
:type k: int
|
|
:param threshold_factor: Threshold multiplier above noise floor (typical: 1.2-2.0, default: 1.2)
|
|
:type threshold_factor: float
|
|
:param window_size: Moving average window size in samples (default: 200)
|
|
:type window_size: int
|
|
:param min_distance: Minimum distance between separate signals in samples (default: 5000)
|
|
:type min_distance: int
|
|
:param label: Label for detected annotations (default: "signal")
|
|
:type label: str
|
|
:param annotation_type: Annotation type (standalone, parallel, intersection, default: standalone)
|
|
:type annotation_type: str
|
|
:param freq_method: How to calculate frequency bounds (default: 'nbw')
|
|
:type freq_method: str
|
|
:param nfft: FFT size for frequency calculations (default: None)
|
|
:type nfft: int
|
|
:param obw_power: Power percentage for OBW (0.9999 = 99.99%, default: 0.99)
|
|
:type obw_power: float
|
|
|
|
:returns: New Recording with added annotations
|
|
:rtype: Recording
|
|
|
|
**Example**::
|
|
|
|
>>> from utils.io import load_recording
|
|
>>> from utils.annotations import detect_signals_energy
|
|
>>> recording = load_recording("capture.sigmf")
|
|
|
|
>>> # Detect with NBW frequency bounds (default, best for real signals)
|
|
>>> annotated = detect_signals_energy(recording, label="burst")
|
|
|
|
>>> # Detect with OBW (more conservative, includes siedelobes)
|
|
>>> annotated = detect_signals_energy(
|
|
... recording, label="burst", freq_method="obw"
|
|
... )
|
|
|
|
>>> # Detect with full detected range (captures all spectral components)
|
|
>>> annotated = detect_signals_energy(
|
|
... recording, label="burst", freq_method="full-detected"
|
|
... )
|
|
"""
|
|
# Extract signal data (use first channel only)
|
|
signal = recording.data[0]
|
|
|
|
# Calculate smoothed signal power
|
|
kernel = np.ones(window_size) / window_size
|
|
smoothed_power = filtfilt(kernel, [1], np.abs(signal) ** 2)
|
|
|
|
# Estimate noise floor using segment-based median (robust to signal presence)
|
|
segments = np.array_split(smoothed_power, k)
|
|
noise_floor = np.median([np.mean(s) for s in segments])
|
|
|
|
# Detect signal boundaries (regions above threshold)
|
|
enter = noise_floor * threshold_factor
|
|
exit = enter * 0.8
|
|
boundaries = []
|
|
start = None
|
|
active = False
|
|
|
|
for i, p in enumerate(smoothed_power):
|
|
if not active and p > enter:
|
|
start = i
|
|
active = True
|
|
elif active and p < exit:
|
|
boundaries.append((start, i - start))
|
|
active = False
|
|
|
|
if active:
|
|
boundaries.append((start, len(smoothed_power) - start))
|
|
|
|
# Merge boundaries that are closer than min_distance
|
|
merged_boundaries = []
|
|
if boundaries:
|
|
start, length = boundaries[0]
|
|
for next_start, next_length in boundaries[1:]:
|
|
if next_start - (start + length) < min_distance:
|
|
# Merge with current boundary
|
|
length = next_start + next_length - start
|
|
else:
|
|
# Save current and start new boundary
|
|
merged_boundaries.append((start, length))
|
|
start, length = next_start, next_length
|
|
# Add final boundary
|
|
merged_boundaries.append((start, length))
|
|
|
|
# Create annotations from detected boundaries
|
|
sample_rate = recording.metadata["sample_rate"]
|
|
center_frequency = recording.metadata.get("center_frequency", 0)
|
|
|
|
# Validate frequency method
|
|
valid_freq_methods = ["nbw", "obw", "full-detected", "full-bandwidth"]
|
|
if freq_method not in valid_freq_methods:
|
|
raise ValueError(f"Invalid freq_method '{freq_method}'. " f"Must be one of: {', '.join(valid_freq_methods)}")
|
|
|
|
annotations = []
|
|
for start_sample, sample_count in merged_boundaries:
|
|
# Calculate frequency bounds based on method
|
|
freq_lower, freq_upper = calculate_frequency_bounds(
|
|
freq_method, center_frequency, sample_rate, nfft, signal, start_sample, sample_count, obw_power
|
|
)
|
|
# Build comment JSON with type metadata
|
|
comment_data = {
|
|
"type": annotation_type,
|
|
"generator": "energy_detector",
|
|
"freq_method": freq_method,
|
|
"params": {
|
|
"threshold_factor": threshold_factor,
|
|
"window_size": window_size,
|
|
"noise_floor": float(noise_floor),
|
|
"threshold": float(enter),
|
|
},
|
|
}
|
|
|
|
anno = Annotation(
|
|
sample_start=start_sample,
|
|
sample_count=sample_count,
|
|
freq_lower_edge=freq_lower,
|
|
freq_upper_edge=freq_upper,
|
|
label=label,
|
|
comment=json.dumps(comment_data),
|
|
detail={"generator": "energy_detector", "freq_method": freq_method},
|
|
)
|
|
annotations.append(anno)
|
|
|
|
return Recording(data=recording.data, metadata=recording.metadata, annotations=recording.annotations + annotations)
|
|
|
|
|
|
def calculate_occupied_bandwidth(
|
|
signal: np.ndarray,
|
|
sampling_rate: float,
|
|
nfft: int = None,
|
|
power_percentage: float = 0.99,
|
|
):
|
|
if nfft is None:
|
|
nfft = max(65536, 2 ** int(np.floor(np.log2(len(signal)))))
|
|
|
|
window = np.blackman(len(signal))
|
|
spec = np.fft.fftshift(np.fft.fft(signal * window, n=nfft))
|
|
|
|
psd = np.abs(spec) ** 2
|
|
psd = psd / psd.sum() # normalize
|
|
|
|
freqs = np.fft.fftshift(np.fft.fftfreq(nfft, 1 / sampling_rate))
|
|
|
|
cdf = np.cumsum(psd)
|
|
|
|
tail = (1 - power_percentage) / 2
|
|
|
|
lower_idx = np.searchsorted(cdf, tail)
|
|
upper_idx = np.searchsorted(cdf, 1 - tail)
|
|
|
|
return freqs[upper_idx] - freqs[lower_idx], freqs[lower_idx], freqs[upper_idx]
|
|
|
|
|
|
def calculate_nominal_bandwidth(
|
|
signal: np.ndarray,
|
|
sampling_rate: float,
|
|
nfft: int = None,
|
|
power_percentage: float = 0.99,
|
|
) -> Tuple[float, float]:
|
|
"""
|
|
Calculate nominal bandwidth and center frequency.
|
|
|
|
Nominal bandwidth (NBW) is the occupied bandwidth along with the center
|
|
frequency of the signal's spectral occupancy. Useful for characterizing
|
|
signals with unknown or drifting center frequencies.
|
|
|
|
:param signal: Complex IQ signal samples
|
|
:type signal: np.ndarray
|
|
:param sampling_rate: Sample rate in Hz
|
|
:type sampling_rate: float
|
|
:param nfft: FFT size
|
|
:type nfft: int
|
|
:param power_percentage: Fraction of power to contain
|
|
:type power_percentage: float
|
|
|
|
:returns: Tuple of (nominal_bandwidth_hz, center_frequency_hz)
|
|
:rtype: Tuple[float, float]
|
|
|
|
**Example**::
|
|
|
|
>>> from utils.annotations import calculate_nominal_bandwidth
|
|
>>> nbw, center = calculate_nominal_bandwidth(signal, sampling_rate=10e6)
|
|
>>> print(f"NBW: {nbw/1e6:.3f} MHz, Center: {center/1e6:.3f} MHz")
|
|
"""
|
|
bw, lower_freq, upper_freq = calculate_occupied_bandwidth(signal, sampling_rate, nfft, power_percentage)
|
|
|
|
# Center frequency is midpoint of occupied band
|
|
center_freq = (lower_freq + upper_freq) / 2
|
|
|
|
return lower_freq, upper_freq, center_freq
|
|
|
|
|
|
def calculate_full_detected_bandwidth(
|
|
signal: np.ndarray,
|
|
sampling_rate: float,
|
|
nfft: int = None,
|
|
start_offset: int = 1000,
|
|
) -> Tuple[float, float, float]:
|
|
"""
|
|
Calculate frequency range from lowest to highest spectral component.
|
|
|
|
Unlike OBW/NBW which define a power-based bandwidth, this calculates
|
|
the absolute frequency span from the lowest non-zero spectral component
|
|
to the highest non-zero component.
|
|
|
|
Useful for:
|
|
- Signals with spectral gaps
|
|
- Multiple parallel signals (captures all of them)
|
|
- Understanding total occupied spectrum vs. actual bandwidth
|
|
|
|
:param signal: Complex IQ signal samples
|
|
:type signal: np.ndarray
|
|
:param sampling_rate: Sample rate in Hz
|
|
:type sampling_rate: float
|
|
:param nfft: FFT size
|
|
:type nfft: int
|
|
:param start_offset: Skip samples at start
|
|
:type start_offset: int
|
|
|
|
:returns: Tuple of (bandwidth_hz, lower_freq_hz, upper_freq_hz)
|
|
:rtype: Tuple[float, float, float]
|
|
|
|
**Example**::
|
|
|
|
>>> # Signal with two components at different frequencies
|
|
>>> bw, f_low, f_high = calculate_full_detected_bandwidth(
|
|
... signal, sampling_rate=10e6, nfft=65536
|
|
... )
|
|
>>> print(f"Full span: {f_low/1e6:.3f} to {f_high/1e6:.3f} MHz")
|
|
"""
|
|
# Validate input
|
|
if len(signal) < nfft + start_offset:
|
|
raise ValueError(
|
|
f"Signal too short: need {nfft + start_offset} samples, "
|
|
f"got {len(signal)}. Reduce nfft or start_offset."
|
|
)
|
|
|
|
# Extract segment
|
|
signal_segment = signal[start_offset : nfft + start_offset]
|
|
|
|
# Compute FFT and power spectral density
|
|
freq_spectrum = np.fft.fft(signal_segment, n=nfft)
|
|
psd = np.abs(freq_spectrum) ** 2
|
|
|
|
# Shift to center DC
|
|
psd_shifted = np.fft.fftshift(psd)
|
|
freq_bins = np.fft.fftshift(np.fft.fftfreq(nfft, 1 / sampling_rate))
|
|
|
|
# Find noise floor (mean of lowest 10% of bins) and all bins above noise floor
|
|
noise_floor = np.mean(np.sort(psd_shifted)[: int(len(psd_shifted) * 0.1)])
|
|
above_noise = np.where(psd_shifted > noise_floor * 1.5)[0]
|
|
|
|
if len(above_noise) == 0:
|
|
# No signal above noise, return zero bandwidth
|
|
return 0.0, 0.0, 0.0
|
|
|
|
# Get frequency range of signal components
|
|
lower_idx = above_noise[0]
|
|
upper_idx = above_noise[-1]
|
|
|
|
lower_freq = freq_bins[lower_idx]
|
|
upper_freq = freq_bins[upper_idx]
|
|
|
|
bandwidth = upper_freq - lower_freq
|
|
|
|
return bandwidth, lower_freq, upper_freq
|
|
|
|
|
|
def annotate_with_obw(
|
|
recording: Recording,
|
|
label: str = "signal",
|
|
annotation_type: str = "standalone",
|
|
nfft: int = None,
|
|
power_percentage: float = 0.99,
|
|
) -> Recording:
|
|
"""
|
|
Create a single annotation spanning the occupied bandwidth of the entire recording.
|
|
|
|
Analyzes the full recording to find its occupied bandwidth and creates an annotation
|
|
covering that frequency range for the entire time duration.
|
|
|
|
:param recording: Recording to analyze
|
|
:type recording: Recording
|
|
:param label: Annotation label
|
|
:type label: str
|
|
:param annotation_type: Annotation type
|
|
:type annotation_type: str
|
|
:param nfft: FFT size
|
|
:type nfft: int
|
|
:param power_percentage: Power percentage for OBW calculation
|
|
:type power_percentage: float
|
|
|
|
:returns: Recording with OBW annotation added
|
|
:rtype: Recording
|
|
|
|
**Example**::
|
|
|
|
>>> from utils.annotations import annotate_with_obw
|
|
>>> annotated = annotate_with_obw(recording, label="signal_obw")
|
|
"""
|
|
signal = recording.data[0]
|
|
sample_rate = recording.metadata["sample_rate"]
|
|
center_freq = recording.metadata.get("center_frequency", 0)
|
|
|
|
# Calculate OBW
|
|
obw, lower_offset, upper_offset = calculate_occupied_bandwidth(signal, sample_rate, nfft, power_percentage)
|
|
|
|
# Convert baseband offsets to absolute frequencies
|
|
freq_lower = center_freq + lower_offset
|
|
freq_upper = center_freq + upper_offset
|
|
|
|
# Create comment JSON
|
|
comment_data = {
|
|
"type": annotation_type,
|
|
"generator": "obw_annotator",
|
|
"obw_hz": float(obw),
|
|
"power_percentage": power_percentage,
|
|
"params": {"nfft": nfft},
|
|
}
|
|
|
|
# Create annotation spanning entire recording
|
|
anno = Annotation(
|
|
sample_start=0,
|
|
sample_count=len(signal),
|
|
freq_lower_edge=freq_lower,
|
|
freq_upper_edge=freq_upper,
|
|
label=label,
|
|
comment=json.dumps(comment_data),
|
|
detail={"generator": "obw_annotator", "obw_hz": float(obw)},
|
|
)
|
|
|
|
return Recording(data=recording.data, metadata=recording.metadata, annotations=recording.annotations + [anno])
|
|
|
|
|
|
def calculate_frequency_bounds(
|
|
freq_method, center_frequency, sample_rate, nfft, signal, start_sample, sample_count, obw_power
|
|
):
|
|
if freq_method == "full-bandwidth":
|
|
# Full Nyquist span
|
|
freq_lower = center_frequency - (sample_rate / 2)
|
|
freq_upper = center_frequency + (sample_rate / 2)
|
|
else:
|
|
# Extract segment for frequency analysis
|
|
segment_start = start_sample
|
|
segment_end = min(start_sample + sample_count, len(signal))
|
|
segment = signal[segment_start:segment_end]
|
|
|
|
if nfft is None or len(segment) >= nfft:
|
|
if freq_method == "nbw":
|
|
# Nominal bandwidth (OBW + center frequency)
|
|
try:
|
|
lower_freq, upper_freq, _ = calculate_nominal_bandwidth(segment, sample_rate, nfft, obw_power)
|
|
freq_lower = center_frequency + lower_freq
|
|
freq_upper = center_frequency + upper_freq
|
|
except (ValueError, IndexError):
|
|
# Fallback if calculation fails
|
|
freq_lower = center_frequency - (sample_rate / 2)
|
|
freq_upper = center_frequency + (sample_rate / 2)
|
|
|
|
elif freq_method == "obw":
|
|
# Occupied bandwidth
|
|
try:
|
|
_, f_lower, f_upper = calculate_occupied_bandwidth(segment, sample_rate, nfft, obw_power)
|
|
freq_lower = center_frequency + f_lower
|
|
freq_upper = center_frequency + f_upper
|
|
except (ValueError, IndexError):
|
|
# Fallback if calculation fails
|
|
freq_lower = center_frequency - (sample_rate / 2)
|
|
freq_upper = center_frequency + (sample_rate / 2)
|
|
|
|
elif freq_method == "full-detected":
|
|
# Full detected range (lowest to highest component)
|
|
try:
|
|
_, f_lower, f_upper = calculate_full_detected_bandwidth(segment, sample_rate, nfft)
|
|
freq_lower = center_frequency + f_lower
|
|
freq_upper = center_frequency + f_upper
|
|
except (ValueError, IndexError):
|
|
# Fallback if calculation fails
|
|
freq_lower = center_frequency - (sample_rate / 2)
|
|
freq_upper = center_frequency + (sample_rate / 2)
|
|
else:
|
|
# Segment too short for FFT, use full bandwidth
|
|
freq_lower = center_frequency - (sample_rate / 2)
|
|
freq_upper = center_frequency + (sample_rate / 2)
|
|
|
|
return freq_lower, freq_upper
|