Push Tracker
ria-toolkit-oss/src/ria_toolkit_oss/annotations/parallel_signal_separator.py

445 lines
18 KiB
Python
Raw Normal View History

M
2026-02-23 14:12:34 -05:00
"""
Parallel signal separation for multi-component frequency-offset signals.
Provides methods to detect and separate overlapping frequency-domain signals
that occupy the same time window but different frequency bands.
This module implements **spectral peak detection** to identify distinct frequency
components and split single time-domain annotations into frequency-specific
sub-annotations.
**Key Design Decisions** (per Codex review):
1. **Complex IQ Support**: Uses `scipy.signal.welch` with `return_onesided=False`
for proper complex signal handling. Window length automatically adapts to
signal length via `nperseg=min(nfft, len(signal))` to handle bursts <nfft.
2. **Frequency Representation**: Components are detected in **relative** frequency
(baseband, centered at 0 Hz). Caller must add RF center_frequency_hz when
writing to SigMF annotations. This separation of concerns avoids the frequency
context bug where absolute Hz would be meaningless for baseband processing.
3. **Bandwidth Estimation**: Dual strategy avoids -3dB limitations:
- Primary: -3dB rolloff for typical narrowband signals
- Fallback: Cumulative power (99% like OBW) for wide/OFDM signals
- Auto-fallback when -3dB bandwidth is anomalous
4. **Noise Floor**: Auto-estimated via `np.percentile(psd_db, 10)` from data
to adapt across hardware (Pluto vs. ThinkRF). User can override if needed.
5. **Filter Sizing (Optional FIR extraction)**: When extracting components,
uses Kaiser window FIR with proper stopband specification. Auto-sizes
numtaps based on desired transition bandwidth. Includes downsampling
guidance for long captures.
6. **CLI Surface**: Single `separate` subcommand for all separation operations.
Can be chained after any detector or used standalone on existing annotations.
Example:
Two WiFi channels captured simultaneously:
F
Port annotation system from utils and fix ria package imports Annotations package (new): - Add threshold_qualifier with 3-pass hysteresis detector (Pass 1: strong bursts, Pass 2: weak residual bursts, Pass 3: macro-window faint burst detection), auto window_size scaled to 1ms, channel selection, and stable noise_floor baseline throughout - Add energy_detector, cusum_annotator, parallel_signal_separator, qualify_slice, signal_isolation, annotation_transforms - Add __init__.py exporting the four functions used by the CLI - Fix all imports from utils.data → ria_toolkit_oss.datatypes CLI annotate command (new): - Port full annotate CLI from utils including list, add, remove, clear, energy, cusum, threshold, and separate subcommands - Fix imports from utils.* → ria_toolkit_oss.* and utils_cli.* → ria_toolkit_oss_cli.* - Safe overwrite logic: _annotated files always writable, originals protected; --overwrite writes in-place only on _annotated inputs CLI view command: - Add 'annotations' as a valid --type, wiring view_annotations from view_signal view_signal.py: - Add view_annotations function with blue/purple alternating palette and threshold %-sorted drawing order (lower % renders on top) recording.py (datatypes): - Fix lazy imports in to_wav() and to_blue() from utils.io → ria_toolkit_oss.io io/recording.py: - Add compatibility shim in from_npy to remap utils.data.annotation.Annotation to ria_toolkit_oss.datatypes.annotation.Annotation when loading .npy files pickled by the utils package
2026-03-31 13:34:00 -04:00
>>> from ria_toolkit_oss.annotations import find_spectral_components
M
2026-02-23 14:12:34 -05:00
>>> # Detect the two distinct channels (returns relative frequencies)
>>> components = find_spectral_components(signal, sampling_rate=20e6)
>>> print(f"Found {len(components)} components")
Found 2 components
The module is designed to work with detected time-domain annotations,
allowing splitting of overlapping signals into separate training samples.
"""
import json
import warnings
M
2026-02-23 14:12:34 -05:00
from typing import List, Optional, Tuple
import numpy as np
from scipy import ndimage
from scipy import signal as scipy_signal
from ria_toolkit_oss.data import Annotation, Recording
M
2026-02-23 14:12:34 -05:00
def find_spectral_components(
signal_data: np.ndarray,
sampling_rate: float,
nfft: int = 65536,
noise_threshold_db: Optional[float] = None,
min_component_bw: float = 50e3,
time_percentile: float = 70.0,
) -> List[Tuple[float, float, float]]:
"""
Find distinct frequency components using spectral peak detection.
Identifies separate frequency components in a signal by analyzing the power
spectral density and finding peaks corresponding to distinct signals. This is
useful for separating parallel signals that occupy different frequency bands.
**Frequency Representation**: Returns frequencies in **baseband/relative** Hz
(centered at 0). To get absolute RF frequencies, add center_frequency_hz from
recording metadata to all returned values.
Algorithm:
1. Compute power spectral density using Welch (properly handles complex IQ)
2. Auto-estimate noise floor from data if not specified
3. Smooth PSD to reduce spurious peaks
4. Find local maxima above noise floor
5. Estimate bandwidth per peak using -3dB (fallback: cumulative power)
6. Filter components below minimum bandwidth threshold
:param signal_data: Complex IQ signal samples (np.complex64/128)
:type signal_data: np.ndarray
:param sampling_rate: Sample rate in Hz
:type sampling_rate: float
:param nfft: FFT size / window length for Welch. Automatically capped at
signal length to handle bursts (default: 65536)
:type nfft: int
:param noise_threshold_db: Minimum SNR threshold in dB. If None (default),
auto-estimates as np.percentile(psd_db, 10).
Adapt this across hardware (Pluto: ~-100, ThinkRF: ~-60).
:type noise_threshold_db: Optional[float]
:param min_component_bw: Minimum component bandwidth in Hz (default: 50 kHz)
:type min_component_bw: float
:param power_threshold: Cumulative power threshold for fallback bandwidth
estimation (default: 0.99 = 99% power, like OBW)
:type power_threshold: float
:returns: List of (center_freq_hz, lower_freq_hz, upper_freq_hz) tuples.
**All frequencies are relative (baseband, 0-centered).**
Add recording metadata['center_frequency'] to get absolute RF frequencies.
:rtype: List[Tuple[float, float, float]]
:raises ValueError: If signal has fewer than 256 samples
**Example**::
F
Port annotation system from utils and fix ria package imports Annotations package (new): - Add threshold_qualifier with 3-pass hysteresis detector (Pass 1: strong bursts, Pass 2: weak residual bursts, Pass 3: macro-window faint burst detection), auto window_size scaled to 1ms, channel selection, and stable noise_floor baseline throughout - Add energy_detector, cusum_annotator, parallel_signal_separator, qualify_slice, signal_isolation, annotation_transforms - Add __init__.py exporting the four functions used by the CLI - Fix all imports from utils.data → ria_toolkit_oss.datatypes CLI annotate command (new): - Port full annotate CLI from utils including list, add, remove, clear, energy, cusum, threshold, and separate subcommands - Fix imports from utils.* → ria_toolkit_oss.* and utils_cli.* → ria_toolkit_oss_cli.* - Safe overwrite logic: _annotated files always writable, originals protected; --overwrite writes in-place only on _annotated inputs CLI view command: - Add 'annotations' as a valid --type, wiring view_annotations from view_signal view_signal.py: - Add view_annotations function with blue/purple alternating palette and threshold %-sorted drawing order (lower % renders on top) recording.py (datatypes): - Fix lazy imports in to_wav() and to_blue() from utils.io → ria_toolkit_oss.io io/recording.py: - Add compatibility shim in from_npy to remap utils.data.annotation.Annotation to ria_toolkit_oss.datatypes.annotation.Annotation when loading .npy files pickled by the utils package
2026-03-31 13:34:00 -04:00
>>> from ria.io import load_recording
>>> from ria_toolkit_oss.annotations import find_spectral_components
M
2026-02-23 14:12:34 -05:00
>>> recording = load_recording("capture.sigmf")
>>> segment = recording.data[0][start:end]
>>> # Components in relative (baseband) frequency
>>> components = find_spectral_components(segment, sampling_rate=20e6)
>>> for center_rel, lower_rel, upper_rel in components:
... # Convert to absolute RF frequency
... center_abs = recording.metadata['center_frequency'] + center_rel
... print(f"Component @ {center_abs/1e9:.3f} GHz")
"""
# Validate input
min_samples = 256
if len(signal_data) < min_samples:
raise ValueError(f"Signal too short: need at least {min_samples} samples, " f"got {len(signal_data)}.")
# Compute PSD using Welch method for complex IQ signals
# CRITICAL: return_onesided=False for proper complex signal handling
nperseg = min(nfft, len(signal_data))
noverlap = nperseg // 2
# --- STFT ---
freqs, times, Zxx = scipy_signal.stft(
signal_data,
fs=sampling_rate,
window="blackman",
nperseg=nperseg,
noverlap=noverlap,
return_onesided=False,
boundary=None,
)
# Shift zero freq to center
Zxx = np.fft.fftshift(Zxx, axes=0)
freqs = np.fft.fftshift(freqs)
# Power spectrogram
power = np.abs(Zxx) ** 2
power_db = 10 * np.log10(power + 1e-12)
# --- Aggregate across time robustly ---
# Using percentile instead of mean prevents short signals from being diluted
freq_profile_db = np.percentile(power_db, time_percentile, axis=1)
# --- Noise floor estimation ---
if noise_threshold_db is None:
noise_threshold_db = np.percentile(freq_profile_db, 20)
threshold = noise_threshold_db + 3 # 3 dB above noise floor
# --- Smooth lightly (avoid merging nearby signals) ---
freq_profile_db = ndimage.gaussian_filter1d(freq_profile_db, sigma=1.5)
# --- Binary mask of significant frequencies ---
mask = freq_profile_db > threshold
# --- Find contiguous frequency regions ---
labeled, num_features = ndimage.label(mask)
components = []
for region_label in range(1, num_features + 1):
region_indices = np.where(labeled == region_label)[0]
if len(region_indices) == 0:
continue
lower_idx = region_indices[0]
upper_idx = region_indices[-1]
lower_freq = freqs[lower_idx]
upper_freq = freqs[upper_idx]
bw = upper_freq - lower_freq
if bw < min_component_bw:
continue
center_freq = (lower_freq + upper_freq) / 2
components.append((center_freq, lower_freq, upper_freq))
return components
def split_annotation_by_components(
annotation: Annotation,
signal: np.ndarray,
sampling_rate: float,
center_frequency_hz: float = 0.0,
nfft: int = 65536,
noise_threshold_db: Optional[float] = None,
min_component_bw: float = 50e3,
) -> List[Annotation]:
"""
Split an annotation into multiple annotations by detected frequency components.
Takes an existing annotation spanning multiple frequency components and
analyzes the frequency content to create separate sub-annotations for
each distinct frequency component.
**Use case**: Energy detection found a time window with 2-3 parallel WiFi
channels. This function splits it into separate annotations per channel.
**Frequency Handling**: `find_spectral_components` returns relative (baseband)
frequencies. This function adds `center_frequency_hz` to convert to absolute
RF frequencies for SigMF annotation bounds. This ensures correct frequency
context across baseband and RF domains.
:param annotation: Original annotation to split
:type annotation: Annotation
:param signal: Full signal array (complex IQ)
:type signal: np.ndarray
:param sampling_rate: Sample rate in Hz
:type sampling_rate: float
:param center_frequency_hz: RF center frequency to add to relative frequencies
from peak detection (default: 0.0 = baseband)
:type center_frequency_hz: float
:param nfft: FFT size for analysis (default: 65536, auto-capped at signal length)
:type nfft: int
:param noise_threshold_db: Noise floor threshold in dB. If None (default),
auto-estimates from data.
:type noise_threshold_db: Optional[float]
:param min_component_bw: Minimum component bandwidth in Hz (default: 50 kHz)
:type min_component_bw: float
:returns: List of new annotations (one per detected component).
Returns empty list if no components found or segment too short.
:rtype: List[Annotation]
**Example**::
F
Port annotation system from utils and fix ria package imports Annotations package (new): - Add threshold_qualifier with 3-pass hysteresis detector (Pass 1: strong bursts, Pass 2: weak residual bursts, Pass 3: macro-window faint burst detection), auto window_size scaled to 1ms, channel selection, and stable noise_floor baseline throughout - Add energy_detector, cusum_annotator, parallel_signal_separator, qualify_slice, signal_isolation, annotation_transforms - Add __init__.py exporting the four functions used by the CLI - Fix all imports from utils.data → ria_toolkit_oss.datatypes CLI annotate command (new): - Port full annotate CLI from utils including list, add, remove, clear, energy, cusum, threshold, and separate subcommands - Fix imports from utils.* → ria_toolkit_oss.* and utils_cli.* → ria_toolkit_oss_cli.* - Safe overwrite logic: _annotated files always writable, originals protected; --overwrite writes in-place only on _annotated inputs CLI view command: - Add 'annotations' as a valid --type, wiring view_annotations from view_signal view_signal.py: - Add view_annotations function with blue/purple alternating palette and threshold %-sorted drawing order (lower % renders on top) recording.py (datatypes): - Fix lazy imports in to_wav() and to_blue() from utils.io → ria_toolkit_oss.io io/recording.py: - Add compatibility shim in from_npy to remap utils.data.annotation.Annotation to ria_toolkit_oss.datatypes.annotation.Annotation when loading .npy files pickled by the utils package
2026-03-31 13:34:00 -04:00
>>> from ria.io import load_recording
>>> from ria_toolkit_oss.annotations import split_annotation_by_components
M
2026-02-23 14:12:34 -05:00
>>> recording = load_recording("capture.sigmf")
>>> # Original annotation spans multiple channels
>>> original = recording.annotations[0]
>>> # Split using RF center frequency from metadata
>>> components = split_annotation_by_components(
... original,
... recording.data[0],
... recording.metadata['sample_rate'],
... center_frequency_hz=recording.metadata.get('center_frequency', 0.0)
... )
>>> print(f"Split into {len(components)} components")
Split into 2 components
**Algorithm**:
1. Extract segment corresponding to annotation time bounds
2. Find frequency components in that segment (returns relative frequencies)
3. Add center_frequency_hz to get absolute RF frequencies
4. Create new annotation for each component
5. Preserve original metadata (label, type, etc.)
6. Add component info to comment JSON
**Notes**:
- Original annotation is not modified
- Returns empty list if segment too short (<256 samples)
- Segments <nfft get auto-downsampled to nfft (see find_spectral_components)
- Each component inherits label from original
- Component frequencies in comment JSON are absolute (RF) frequencies
"""
# Extract segment corresponding to annotation time bounds
start_sample = annotation.sample_start
end_sample = min(start_sample + annotation.sample_count, len(signal))
segment = signal[start_sample:end_sample]
# Validate segment length is enough for spectral analysis
if len(segment) < 256:
return []
# Find components in this segment (returns relative/baseband frequencies)
try:
components = find_spectral_components(segment, sampling_rate, nfft, noise_threshold_db, min_component_bw)
except ValueError:
# Spectral analysis failed (e.g., not complex IQ)
return []
if not components:
# No components found
return []
# Create annotations for each component
new_annotations = []
for center_freq_rel, lower_freq_rel, upper_freq_rel in components:
# Convert relative (baseband) frequencies to absolute (RF) frequencies
center_freq_abs = center_frequency_hz + center_freq_rel
lower_freq_abs = center_frequency_hz + lower_freq_rel
upper_freq_abs = center_frequency_hz + upper_freq_rel
# Parse original annotation metadata
try:
comment_data = json.loads(annotation.comment)
except (json.JSONDecodeError, TypeError):
comment_data = {"type": "standalone"}
# Add component information (with absolute RF frequencies)
comment_data["split_from_annotation"] = True
comment_data["original_freq_bounds"] = {
"lower": float(annotation.freq_lower_edge),
"upper": float(annotation.freq_upper_edge),
}
comment_data["component_freq_bounds_rf"] = {
"center": float(center_freq_abs),
"lower": float(lower_freq_abs),
"upper": float(upper_freq_abs),
}
# Create new annotation with absolute RF frequency bounds
new_anno = Annotation(
sample_start=annotation.sample_start,
sample_count=annotation.sample_count,
freq_lower_edge=lower_freq_abs,
freq_upper_edge=upper_freq_abs,
label=annotation.label,
comment=json.dumps(comment_data),
detail={
"generator": "parallel_signal_separator",
"center_freq_hz": float(center_freq_abs),
},
)
new_annotations.append(new_anno)
return new_annotations
def split_recording_annotations(
recording: Recording,
indices: Optional[List[int]] = None,
nfft: int = 65536,
noise_threshold_db: Optional[float] = None,
min_component_bw: float = 50e3,
) -> Recording:
"""
Split multiple annotations in a recording by frequency components.
Processes specified annotations (or all if indices=None), replacing each
with its frequency-separated components. Uses RF center_frequency from
recording metadata for proper absolute frequency conversion.
:param recording: Recording to process
:type recording: Recording
:param indices: Annotation indices to split (None = all, default: None).
Use indices=[] to skip splitting (returns unchanged recording).
:type indices: Optional[List[int]]
:param nfft: FFT size for spectral analysis (default: 65536,
auto-capped at signal segment length)
:type nfft: int
:param noise_threshold_db: Noise floor threshold in dB. If None (default),
auto-estimates from each segment.
:type noise_threshold_db: Optional[float]
:param min_component_bw: Minimum component bandwidth in Hz (default: 50 kHz).
Components narrower than this are filtered out.
:type min_component_bw: float
:returns: New Recording with split annotations
:rtype: Recording
**Example**::
F
Port annotation system from utils and fix ria package imports Annotations package (new): - Add threshold_qualifier with 3-pass hysteresis detector (Pass 1: strong bursts, Pass 2: weak residual bursts, Pass 3: macro-window faint burst detection), auto window_size scaled to 1ms, channel selection, and stable noise_floor baseline throughout - Add energy_detector, cusum_annotator, parallel_signal_separator, qualify_slice, signal_isolation, annotation_transforms - Add __init__.py exporting the four functions used by the CLI - Fix all imports from utils.data → ria_toolkit_oss.datatypes CLI annotate command (new): - Port full annotate CLI from utils including list, add, remove, clear, energy, cusum, threshold, and separate subcommands - Fix imports from utils.* → ria_toolkit_oss.* and utils_cli.* → ria_toolkit_oss_cli.* - Safe overwrite logic: _annotated files always writable, originals protected; --overwrite writes in-place only on _annotated inputs CLI view command: - Add 'annotations' as a valid --type, wiring view_annotations from view_signal view_signal.py: - Add view_annotations function with blue/purple alternating palette and threshold %-sorted drawing order (lower % renders on top) recording.py (datatypes): - Fix lazy imports in to_wav() and to_blue() from utils.io → ria_toolkit_oss.io io/recording.py: - Add compatibility shim in from_npy to remap utils.data.annotation.Annotation to ria_toolkit_oss.datatypes.annotation.Annotation when loading .npy files pickled by the utils package
2026-03-31 13:34:00 -04:00
>>> from ria.io import load_recording
>>> from ria_toolkit_oss.annotations import split_recording_annotations
M
2026-02-23 14:12:34 -05:00
>>> recording = load_recording("capture.sigmf")
>>> # Split all annotations
>>> split_rec = split_recording_annotations(recording)
>>> print(f"Original: {len(recording.annotations)} annotations")
>>> print(f"Split: {len(split_rec.annotations)} annotations")
Original: 5 annotations
Split: 9 annotations
**Algorithm**:
1. For each annotation in indices (or all if None):
2. Call split_annotation_by_components with RF center_frequency
3. If components found, replace annotation with components
4. If no components found, keep original annotation
5. Annotations not in indices are kept unchanged
**Notes**:
- Original recording is not modified
- Returns empty Recording.annotations if recording has no annotations
- RF center_frequency from metadata ensures correct absolute frequencies
- If an annotation can't be split (too short, wrong format), original kept
"""
if indices is None:
# Split all annotations
indices = list(range(len(recording.annotations)))
if not recording.annotations:
# No annotations to split
return recording
signal = recording.data[0]
sample_rate = recording.metadata.get("sample_rate")
if sample_rate is None:
raise ValueError(
"Recording metadata does not contain 'sample_rate'. "
"Supply it with --sample-rate when using the CLI, or set recording.sample_rate before calling this function."
)
M
2026-02-23 14:12:34 -05:00
center_frequency = recording.metadata.get("center_frequency", 0.0)
# Build new annotation list
new_annotations = []
for i, anno in enumerate(recording.annotations):
if i in indices:
# Attempt to split this annotation
try:
components = split_annotation_by_components(
anno,
signal,
sample_rate,
center_frequency_hz=center_frequency,
nfft=nfft,
noise_threshold_db=noise_threshold_db,
min_component_bw=min_component_bw,
)
if components:
# Split successful, use components
new_annotations.extend(components)
else:
# No components found, keep original
new_annotations.append(anno)
except Exception as e:
warnings.warn(
f"split_recording_annotations: failed to split annotation at index {i} ({e}); keeping original.",
stacklevel=2,
)
M
2026-02-23 14:12:34 -05:00
new_annotations.append(anno)
else:
# Not in split list, keep as-is
new_annotations.append(anno)
return Recording(data=recording.data, metadata=recording.metadata, annotations=new_annotations)