ria-toolkit-oss/src/ria_toolkit_oss/annotations/parallel_signal_separator.py

436 lines
17 KiB
Python
Raw Normal View History

M
2026-02-23 14:12:34 -05:00
"""
Parallel signal separation for multi-component frequency-offset signals.
Provides methods to detect and separate overlapping frequency-domain signals
that occupy the same time window but different frequency bands.
This module implements **spectral peak detection** to identify distinct frequency
components and split single time-domain annotations into frequency-specific
sub-annotations.
**Key Design Decisions** (per Codex review):
1. **Complex IQ Support**: Uses `scipy.signal.welch` with `return_onesided=False`
for proper complex signal handling. Window length automatically adapts to
signal length via `nperseg=min(nfft, len(signal))` to handle bursts <nfft.
2. **Frequency Representation**: Components are detected in **relative** frequency
(baseband, centered at 0 Hz). Caller must add RF center_frequency_hz when
writing to SigMF annotations. This separation of concerns avoids the frequency
context bug where absolute Hz would be meaningless for baseband processing.
3. **Bandwidth Estimation**: Dual strategy avoids -3dB limitations:
- Primary: -3dB rolloff for typical narrowband signals
- Fallback: Cumulative power (99% like OBW) for wide/OFDM signals
- Auto-fallback when -3dB bandwidth is anomalous
4. **Noise Floor**: Auto-estimated via `np.percentile(psd_db, 10)` from data
to adapt across hardware (Pluto vs. ThinkRF). User can override if needed.
5. **Filter Sizing (Optional FIR extraction)**: When extracting components,
uses Kaiser window FIR with proper stopband specification. Auto-sizes
numtaps based on desired transition bandwidth. Includes downsampling
guidance for long captures.
6. **CLI Surface**: Single `separate` subcommand for all separation operations.
Can be chained after any detector or used standalone on existing annotations.
Example:
Two WiFi channels captured simultaneously:
F
Port annotation system from utils and fix ria package imports Annotations package (new): - Add threshold_qualifier with 3-pass hysteresis detector (Pass 1: strong bursts, Pass 2: weak residual bursts, Pass 3: macro-window faint burst detection), auto window_size scaled to 1ms, channel selection, and stable noise_floor baseline throughout - Add energy_detector, cusum_annotator, parallel_signal_separator, qualify_slice, signal_isolation, annotation_transforms - Add __init__.py exporting the four functions used by the CLI - Fix all imports from utils.data → ria_toolkit_oss.datatypes CLI annotate command (new): - Port full annotate CLI from utils including list, add, remove, clear, energy, cusum, threshold, and separate subcommands - Fix imports from utils.* → ria_toolkit_oss.* and utils_cli.* → ria_toolkit_oss_cli.* - Safe overwrite logic: _annotated files always writable, originals protected; --overwrite writes in-place only on _annotated inputs CLI view command: - Add 'annotations' as a valid --type, wiring view_annotations from view_signal view_signal.py: - Add view_annotations function with blue/purple alternating palette and threshold %-sorted drawing order (lower % renders on top) recording.py (datatypes): - Fix lazy imports in to_wav() and to_blue() from utils.io → ria_toolkit_oss.io io/recording.py: - Add compatibility shim in from_npy to remap utils.data.annotation.Annotation to ria_toolkit_oss.datatypes.annotation.Annotation when loading .npy files pickled by the utils package
2026-03-31 13:34:00 -04:00
>>> from ria_toolkit_oss.annotations import find_spectral_components
M
2026-02-23 14:12:34 -05:00
>>> # Detect the two distinct channels (returns relative frequencies)
>>> components = find_spectral_components(signal, sampling_rate=20e6)
>>> print(f"Found {len(components)} components")
Found 2 components
The module is designed to work with detected time-domain annotations,
allowing splitting of overlapping signals into separate training samples.
"""
import json
from typing import List, Optional, Tuple
import numpy as np
from scipy import ndimage
from scipy import signal as scipy_signal
F
Port annotation system from utils and fix ria package imports Annotations package (new): - Add threshold_qualifier with 3-pass hysteresis detector (Pass 1: strong bursts, Pass 2: weak residual bursts, Pass 3: macro-window faint burst detection), auto window_size scaled to 1ms, channel selection, and stable noise_floor baseline throughout - Add energy_detector, cusum_annotator, parallel_signal_separator, qualify_slice, signal_isolation, annotation_transforms - Add __init__.py exporting the four functions used by the CLI - Fix all imports from utils.data → ria_toolkit_oss.datatypes CLI annotate command (new): - Port full annotate CLI from utils including list, add, remove, clear, energy, cusum, threshold, and separate subcommands - Fix imports from utils.* → ria_toolkit_oss.* and utils_cli.* → ria_toolkit_oss_cli.* - Safe overwrite logic: _annotated files always writable, originals protected; --overwrite writes in-place only on _annotated inputs CLI view command: - Add 'annotations' as a valid --type, wiring view_annotations from view_signal view_signal.py: - Add view_annotations function with blue/purple alternating palette and threshold %-sorted drawing order (lower % renders on top) recording.py (datatypes): - Fix lazy imports in to_wav() and to_blue() from utils.io → ria_toolkit_oss.io io/recording.py: - Add compatibility shim in from_npy to remap utils.data.annotation.Annotation to ria_toolkit_oss.datatypes.annotation.Annotation when loading .npy files pickled by the utils package
2026-03-31 13:34:00 -04:00
from ria_toolkit_oss.datatypes import Annotation, Recording
M
2026-02-23 14:12:34 -05:00
def find_spectral_components(
signal_data: np.ndarray,
sampling_rate: float,
nfft: int = 65536,
noise_threshold_db: Optional[float] = None,
min_component_bw: float = 50e3,
time_percentile: float = 70.0,
) -> List[Tuple[float, float, float]]:
"""
Find distinct frequency components using spectral peak detection.
Identifies separate frequency components in a signal by analyzing the power
spectral density and finding peaks corresponding to distinct signals. This is
useful for separating parallel signals that occupy different frequency bands.
**Frequency Representation**: Returns frequencies in **baseband/relative** Hz
(centered at 0). To get absolute RF frequencies, add center_frequency_hz from
recording metadata to all returned values.
Algorithm:
1. Compute power spectral density using Welch (properly handles complex IQ)
2. Auto-estimate noise floor from data if not specified
3. Smooth PSD to reduce spurious peaks
4. Find local maxima above noise floor
5. Estimate bandwidth per peak using -3dB (fallback: cumulative power)
6. Filter components below minimum bandwidth threshold
:param signal_data: Complex IQ signal samples (np.complex64/128)
:type signal_data: np.ndarray
:param sampling_rate: Sample rate in Hz
:type sampling_rate: float
:param nfft: FFT size / window length for Welch. Automatically capped at
signal length to handle bursts (default: 65536)
:type nfft: int
:param noise_threshold_db: Minimum SNR threshold in dB. If None (default),
auto-estimates as np.percentile(psd_db, 10).
Adapt this across hardware (Pluto: ~-100, ThinkRF: ~-60).
:type noise_threshold_db: Optional[float]
:param min_component_bw: Minimum component bandwidth in Hz (default: 50 kHz)
:type min_component_bw: float
:param power_threshold: Cumulative power threshold for fallback bandwidth
estimation (default: 0.99 = 99% power, like OBW)
:type power_threshold: float
:returns: List of (center_freq_hz, lower_freq_hz, upper_freq_hz) tuples.
**All frequencies are relative (baseband, 0-centered).**
Add recording metadata['center_frequency'] to get absolute RF frequencies.
:rtype: List[Tuple[float, float, float]]
:raises ValueError: If signal has fewer than 256 samples
**Example**::
F
Port annotation system from utils and fix ria package imports Annotations package (new): - Add threshold_qualifier with 3-pass hysteresis detector (Pass 1: strong bursts, Pass 2: weak residual bursts, Pass 3: macro-window faint burst detection), auto window_size scaled to 1ms, channel selection, and stable noise_floor baseline throughout - Add energy_detector, cusum_annotator, parallel_signal_separator, qualify_slice, signal_isolation, annotation_transforms - Add __init__.py exporting the four functions used by the CLI - Fix all imports from utils.data → ria_toolkit_oss.datatypes CLI annotate command (new): - Port full annotate CLI from utils including list, add, remove, clear, energy, cusum, threshold, and separate subcommands - Fix imports from utils.* → ria_toolkit_oss.* and utils_cli.* → ria_toolkit_oss_cli.* - Safe overwrite logic: _annotated files always writable, originals protected; --overwrite writes in-place only on _annotated inputs CLI view command: - Add 'annotations' as a valid --type, wiring view_annotations from view_signal view_signal.py: - Add view_annotations function with blue/purple alternating palette and threshold %-sorted drawing order (lower % renders on top) recording.py (datatypes): - Fix lazy imports in to_wav() and to_blue() from utils.io → ria_toolkit_oss.io io/recording.py: - Add compatibility shim in from_npy to remap utils.data.annotation.Annotation to ria_toolkit_oss.datatypes.annotation.Annotation when loading .npy files pickled by the utils package
2026-03-31 13:34:00 -04:00
>>> from ria.io import load_recording
>>> from ria_toolkit_oss.annotations import find_spectral_components
M
2026-02-23 14:12:34 -05:00
>>> recording = load_recording("capture.sigmf")
>>> segment = recording.data[0][start:end]
>>> # Components in relative (baseband) frequency
>>> components = find_spectral_components(segment, sampling_rate=20e6)
>>> for center_rel, lower_rel, upper_rel in components:
... # Convert to absolute RF frequency
... center_abs = recording.metadata['center_frequency'] + center_rel
... print(f"Component @ {center_abs/1e9:.3f} GHz")
"""
# Validate input
min_samples = 256
if len(signal_data) < min_samples:
raise ValueError(f"Signal too short: need at least {min_samples} samples, " f"got {len(signal_data)}.")
# Compute PSD using Welch method for complex IQ signals
# CRITICAL: return_onesided=False for proper complex signal handling
nperseg = min(nfft, len(signal_data))
noverlap = nperseg // 2
# --- STFT ---
freqs, times, Zxx = scipy_signal.stft(
signal_data,
fs=sampling_rate,
window="blackman",
nperseg=nperseg,
noverlap=noverlap,
return_onesided=False,
boundary=None,
)
# Shift zero freq to center
Zxx = np.fft.fftshift(Zxx, axes=0)
freqs = np.fft.fftshift(freqs)
# Power spectrogram
power = np.abs(Zxx) ** 2
power_db = 10 * np.log10(power + 1e-12)
# --- Aggregate across time robustly ---
# Using percentile instead of mean prevents short signals from being diluted
freq_profile_db = np.percentile(power_db, time_percentile, axis=1)
# --- Noise floor estimation ---
if noise_threshold_db is None:
noise_threshold_db = np.percentile(freq_profile_db, 20)
threshold = noise_threshold_db + 3 # 3 dB above noise floor
# --- Smooth lightly (avoid merging nearby signals) ---
freq_profile_db = ndimage.gaussian_filter1d(freq_profile_db, sigma=1.5)
# --- Binary mask of significant frequencies ---
mask = freq_profile_db > threshold
# --- Find contiguous frequency regions ---
labeled, num_features = ndimage.label(mask)
components = []
for region_label in range(1, num_features + 1):
region_indices = np.where(labeled == region_label)[0]
if len(region_indices) == 0:
continue
lower_idx = region_indices[0]
upper_idx = region_indices[-1]
lower_freq = freqs[lower_idx]
upper_freq = freqs[upper_idx]
bw = upper_freq - lower_freq
if bw < min_component_bw:
continue
center_freq = (lower_freq + upper_freq) / 2
components.append((center_freq, lower_freq, upper_freq))
return components
def split_annotation_by_components(
annotation: Annotation,
signal: np.ndarray,
sampling_rate: float,
center_frequency_hz: float = 0.0,
nfft: int = 65536,
noise_threshold_db: Optional[float] = None,
min_component_bw: float = 50e3,
) -> List[Annotation]:
"""
Split an annotation into multiple annotations by detected frequency components.
Takes an existing annotation spanning multiple frequency components and
analyzes the frequency content to create separate sub-annotations for
each distinct frequency component.
**Use case**: Energy detection found a time window with 2-3 parallel WiFi
channels. This function splits it into separate annotations per channel.
**Frequency Handling**: `find_spectral_components` returns relative (baseband)
frequencies. This function adds `center_frequency_hz` to convert to absolute
RF frequencies for SigMF annotation bounds. This ensures correct frequency
context across baseband and RF domains.
:param annotation: Original annotation to split
:type annotation: Annotation
:param signal: Full signal array (complex IQ)
:type signal: np.ndarray
:param sampling_rate: Sample rate in Hz
:type sampling_rate: float
:param center_frequency_hz: RF center frequency to add to relative frequencies
from peak detection (default: 0.0 = baseband)
:type center_frequency_hz: float
:param nfft: FFT size for analysis (default: 65536, auto-capped at signal length)
:type nfft: int
:param noise_threshold_db: Noise floor threshold in dB. If None (default),
auto-estimates from data.
:type noise_threshold_db: Optional[float]
:param min_component_bw: Minimum component bandwidth in Hz (default: 50 kHz)
:type min_component_bw: float
:returns: List of new annotations (one per detected component).
Returns empty list if no components found or segment too short.
:rtype: List[Annotation]
**Example**::
F
Port annotation system from utils and fix ria package imports Annotations package (new): - Add threshold_qualifier with 3-pass hysteresis detector (Pass 1: strong bursts, Pass 2: weak residual bursts, Pass 3: macro-window faint burst detection), auto window_size scaled to 1ms, channel selection, and stable noise_floor baseline throughout - Add energy_detector, cusum_annotator, parallel_signal_separator, qualify_slice, signal_isolation, annotation_transforms - Add __init__.py exporting the four functions used by the CLI - Fix all imports from utils.data → ria_toolkit_oss.datatypes CLI annotate command (new): - Port full annotate CLI from utils including list, add, remove, clear, energy, cusum, threshold, and separate subcommands - Fix imports from utils.* → ria_toolkit_oss.* and utils_cli.* → ria_toolkit_oss_cli.* - Safe overwrite logic: _annotated files always writable, originals protected; --overwrite writes in-place only on _annotated inputs CLI view command: - Add 'annotations' as a valid --type, wiring view_annotations from view_signal view_signal.py: - Add view_annotations function with blue/purple alternating palette and threshold %-sorted drawing order (lower % renders on top) recording.py (datatypes): - Fix lazy imports in to_wav() and to_blue() from utils.io → ria_toolkit_oss.io io/recording.py: - Add compatibility shim in from_npy to remap utils.data.annotation.Annotation to ria_toolkit_oss.datatypes.annotation.Annotation when loading .npy files pickled by the utils package
2026-03-31 13:34:00 -04:00
>>> from ria.io import load_recording
>>> from ria_toolkit_oss.annotations import split_annotation_by_components
M
2026-02-23 14:12:34 -05:00
>>> recording = load_recording("capture.sigmf")
>>> # Original annotation spans multiple channels
>>> original = recording.annotations[0]
>>> # Split using RF center frequency from metadata
>>> components = split_annotation_by_components(
... original,
... recording.data[0],
... recording.metadata['sample_rate'],
... center_frequency_hz=recording.metadata.get('center_frequency', 0.0)
... )
>>> print(f"Split into {len(components)} components")
Split into 2 components
**Algorithm**:
1. Extract segment corresponding to annotation time bounds
2. Find frequency components in that segment (returns relative frequencies)
3. Add center_frequency_hz to get absolute RF frequencies
4. Create new annotation for each component
5. Preserve original metadata (label, type, etc.)
6. Add component info to comment JSON
**Notes**:
- Original annotation is not modified
- Returns empty list if segment too short (<256 samples)
- Segments <nfft get auto-downsampled to nfft (see find_spectral_components)
- Each component inherits label from original
- Component frequencies in comment JSON are absolute (RF) frequencies
"""
# Extract segment corresponding to annotation time bounds
start_sample = annotation.sample_start
end_sample = min(start_sample + annotation.sample_count, len(signal))
segment = signal[start_sample:end_sample]
# Validate segment length is enough for spectral analysis
if len(segment) < 256:
return []
# Find components in this segment (returns relative/baseband frequencies)
try:
components = find_spectral_components(segment, sampling_rate, nfft, noise_threshold_db, min_component_bw)
except ValueError:
# Spectral analysis failed (e.g., not complex IQ)
return []
if not components:
# No components found
return []
# Create annotations for each component
new_annotations = []
for center_freq_rel, lower_freq_rel, upper_freq_rel in components:
# Convert relative (baseband) frequencies to absolute (RF) frequencies
center_freq_abs = center_frequency_hz + center_freq_rel
lower_freq_abs = center_frequency_hz + lower_freq_rel
upper_freq_abs = center_frequency_hz + upper_freq_rel
# Parse original annotation metadata
try:
comment_data = json.loads(annotation.comment)
except (json.JSONDecodeError, TypeError):
comment_data = {"type": "standalone"}
# Add component information (with absolute RF frequencies)
comment_data["split_from_annotation"] = True
comment_data["original_freq_bounds"] = {
"lower": float(annotation.freq_lower_edge),
"upper": float(annotation.freq_upper_edge),
}
comment_data["component_freq_bounds_rf"] = {
"center": float(center_freq_abs),
"lower": float(lower_freq_abs),
"upper": float(upper_freq_abs),
}
# Create new annotation with absolute RF frequency bounds
new_anno = Annotation(
sample_start=annotation.sample_start,
sample_count=annotation.sample_count,
freq_lower_edge=lower_freq_abs,
freq_upper_edge=upper_freq_abs,
label=annotation.label,
comment=json.dumps(comment_data),
detail={
"generator": "parallel_signal_separator",
"center_freq_hz": float(center_freq_abs),
},
)
new_annotations.append(new_anno)
return new_annotations
def split_recording_annotations(
recording: Recording,
indices: Optional[List[int]] = None,
nfft: int = 65536,
noise_threshold_db: Optional[float] = None,
min_component_bw: float = 50e3,
) -> Recording:
"""
Split multiple annotations in a recording by frequency components.
Processes specified annotations (or all if indices=None), replacing each
with its frequency-separated components. Uses RF center_frequency from
recording metadata for proper absolute frequency conversion.
:param recording: Recording to process
:type recording: Recording
:param indices: Annotation indices to split (None = all, default: None).
Use indices=[] to skip splitting (returns unchanged recording).
:type indices: Optional[List[int]]
:param nfft: FFT size for spectral analysis (default: 65536,
auto-capped at signal segment length)
:type nfft: int
:param noise_threshold_db: Noise floor threshold in dB. If None (default),
auto-estimates from each segment.
:type noise_threshold_db: Optional[float]
:param min_component_bw: Minimum component bandwidth in Hz (default: 50 kHz).
Components narrower than this are filtered out.
:type min_component_bw: float
:returns: New Recording with split annotations
:rtype: Recording
**Example**::
F
Port annotation system from utils and fix ria package imports Annotations package (new): - Add threshold_qualifier with 3-pass hysteresis detector (Pass 1: strong bursts, Pass 2: weak residual bursts, Pass 3: macro-window faint burst detection), auto window_size scaled to 1ms, channel selection, and stable noise_floor baseline throughout - Add energy_detector, cusum_annotator, parallel_signal_separator, qualify_slice, signal_isolation, annotation_transforms - Add __init__.py exporting the four functions used by the CLI - Fix all imports from utils.data → ria_toolkit_oss.datatypes CLI annotate command (new): - Port full annotate CLI from utils including list, add, remove, clear, energy, cusum, threshold, and separate subcommands - Fix imports from utils.* → ria_toolkit_oss.* and utils_cli.* → ria_toolkit_oss_cli.* - Safe overwrite logic: _annotated files always writable, originals protected; --overwrite writes in-place only on _annotated inputs CLI view command: - Add 'annotations' as a valid --type, wiring view_annotations from view_signal view_signal.py: - Add view_annotations function with blue/purple alternating palette and threshold %-sorted drawing order (lower % renders on top) recording.py (datatypes): - Fix lazy imports in to_wav() and to_blue() from utils.io → ria_toolkit_oss.io io/recording.py: - Add compatibility shim in from_npy to remap utils.data.annotation.Annotation to ria_toolkit_oss.datatypes.annotation.Annotation when loading .npy files pickled by the utils package
2026-03-31 13:34:00 -04:00
>>> from ria.io import load_recording
>>> from ria_toolkit_oss.annotations import split_recording_annotations
M
2026-02-23 14:12:34 -05:00
>>> recording = load_recording("capture.sigmf")
>>> # Split all annotations
>>> split_rec = split_recording_annotations(recording)
>>> print(f"Original: {len(recording.annotations)} annotations")
>>> print(f"Split: {len(split_rec.annotations)} annotations")
Original: 5 annotations
Split: 9 annotations
**Algorithm**:
1. For each annotation in indices (or all if None):
2. Call split_annotation_by_components with RF center_frequency
3. If components found, replace annotation with components
4. If no components found, keep original annotation
5. Annotations not in indices are kept unchanged
**Notes**:
- Original recording is not modified
- Returns empty Recording.annotations if recording has no annotations
- RF center_frequency from metadata ensures correct absolute frequencies
- If an annotation can't be split (too short, wrong format), original kept
"""
if indices is None:
# Split all annotations
indices = list(range(len(recording.annotations)))
if not recording.annotations:
# No annotations to split
return recording
signal = recording.data[0]
sample_rate = recording.metadata["sample_rate"]
center_frequency = recording.metadata.get("center_frequency", 0.0)
# Build new annotation list
new_annotations = []
for i, anno in enumerate(recording.annotations):
if i in indices:
# Attempt to split this annotation
try:
components = split_annotation_by_components(
anno,
signal,
sample_rate,
center_frequency_hz=center_frequency,
nfft=nfft,
noise_threshold_db=noise_threshold_db,
min_component_bw=min_component_bw,
)
if components:
# Split successful, use components
new_annotations.extend(components)
else:
# No components found, keep original
new_annotations.append(anno)
except Exception:
# Split failed for any reason, keep original
new_annotations.append(anno)
else:
# Not in split list, keep as-is
new_annotations.append(anno)
return Recording(data=recording.data, metadata=recording.metadata, annotations=new_annotations)