Push Tracker
ria-toolkit-oss/src/ria_toolkit_oss/annotations/signal_isolation.py
G gillian 0a1bef8453 fix: harden annotation pipeline and CLI robustness
- Replace bare metadata["sample_rate"] access with .get() + clear
  ValueError in threshold_qualifier, energy_detector, cusum_annotator,
  parallel_signal_separator, and signal_isolation
- Add --sample-rate option to energy, threshold, cusum, and separate
  CLI commands with a pre-flight error if sample rate is still absent
- Normalize namespaced metadata keys (e.g. BlockGenerator:Foo:sample_rate)
  to standard keys on legacy .npy load
- Cap threshold_qualifier smoothing window at 1% of signal length to
  prevent over-smoothing short recordings into a flat envelope
- Warn when threshold or energy detector returns 0 annotations due to
  constant-envelope signal; point to cusum as the right tool
- Enforce --overwrite before any work begins; error fires before load
  and detection, not after
- Fix qualify_slice off-by-one that silently dropped the last slice
- Surface split failures in parallel_signal_separator via warnings.warn
  instead of swallowing them silently
- Add threshold annotation example image to getting_started docs
2026-04-28 16:31:35 -04:00

105 lines
3.6 KiB
Python

import numpy as np
from scipy.signal import butter, lfilter
from ria_toolkit_oss.data.annotation import Annotation
from ria_toolkit_oss.data.recording import Recording
def isolate_signal(recording: Recording, annotation: Annotation) -> Recording:
"""
Slice, filter and frequency shift the input recording according to the bounding box defined by the annotation.
:param recording: The input Recording to be sliced.
:type recording: Recording
:param annotation: The Annotation object defining the area of the recording to isolate.
:type annotation: Annotation
:param decimate: Decimate the input signal after filtering to reduce the sample rate.
:type decimate: bool
:returns: The subsection of the original recording defined by the annotation.
:rtype: Recording"""
sample_start = max(0, annotation.sample_start)
sample_stop = min(len(recording), annotation.sample_start + annotation.sample_count)
anno_base_center_freq = (annotation.freq_lower_edge + annotation.freq_upper_edge) / 2 - recording.metadata.get(
"center_frequency", 0
)
anno_bw = annotation.freq_upper_edge - annotation.freq_lower_edge
signal_slice = recording.data[0, sample_start:sample_stop]
# normalize
signal_slice = signal_slice / np.max(np.abs(signal_slice))
isolation_bw = anno_bw
sample_rate = recording.metadata.get("sample_rate")
if sample_rate is None:
raise ValueError(
"Recording metadata does not contain 'sample_rate'. "
"Set recording.sample_rate before calling isolate_signal."
)
# frequency shift the center of the box about zero
shifted_signal_slice = frequency_shift_iq_samples(
iq_samples=signal_slice,
sample_rate=sample_rate,
shift_frequency=-1 * anno_base_center_freq,
)
# filter
if isolation_bw < sample_rate - 1:
filtered_signal = apply_complex_lowpass_filter(
signal=shifted_signal_slice, cutoff_frequency=isolation_bw, sample_rate=sample_rate
)
else:
filtered_signal = shifted_signal_slice
output = Recording(data=[filtered_signal], metadata=recording.metadata)
return output
def frequency_shift_iq_samples(iq_samples, sample_rate, shift_frequency):
# Number of samples
num_samples = len(iq_samples)
# Create a time vector from 0 to the total duration in seconds
time_vector = np.arange(num_samples) / sample_rate
# Generate the complex exponential for the frequency shift
complex_exponential = np.exp(1j * 2 * np.pi * shift_frequency * time_vector)
# Apply the frequency shift to the IQ samples
shifted_samples = iq_samples * complex_exponential
return shifted_samples
# Function to apply a lowpass Butterworth filter to a complex signal
def apply_complex_lowpass_filter(signal, cutoff_frequency, sample_rate, order=5):
# Design the lowpass filter
b, a = design_complex_lowpass_filter(cutoff_frequency, sample_rate, order)
# Apply the lowpass filter
filtered_signal = lfilter(b, a, signal)
return filtered_signal
def design_complex_lowpass_filter(cutoff_frequency, sample_rate, order=5):
# Nyquist frequency for complex signals is the sample rate
nyquist = sample_rate
# Ensure the cutoff frequency is positive and within the Nyquist limit
if cutoff_frequency <= 0 or cutoff_frequency > nyquist:
raise ValueError("Cutoff frequency must be between 0 and the Nyquist frequency.")
# Normalize the cutoff frequency to the Nyquist frequency
cutoff_normalized = cutoff_frequency / nyquist
# Create a Butterworth lowpass filter
b, a = butter(order, cutoff_normalized, btype="low")
return b, a