import json from typing import Optional import numpy as np from ria_toolkit_oss.datatypes import Annotation, Recording def annotate_with_cusum( recording: Recording, label: Optional[str] = "segment", window_size: Optional[int] = 1, min_duration: Optional[float] = None, tolerance: Optional[int] = None, annotation_type: Optional[str] = "standalone", ): """ Add annotations that divide the recording into distinct time segments. This algorithm computes the cumulative sum of the sample magnitudes and determines break points in the signal. This tool can be used to find points where a signal turns on or off, or changes between a low and high amplitude. :param recording: A ``Recording`` object to annotate. :type recording: ``ria_toolkit_oss.datatypes.Recording`` :param label: Label for the detected segments. :type label: str :param window_size: The length (in samples) of the moving average window. :type window_size: int :param min_duration: The minimum duration (in ms) of a segment. The algorithm will not produce annotations shorter than this length. :type min_duration: float :param tolerance: The minimum length (in samples) of a segment. :type tolerance: int :param annotation_type: Annotation type (standalone, parallel, intersection). :type annotation_type: str """ sample_rate = recording.metadata["sample_rate"] center_frequency = recording.metadata.get("center_frequency", 0) # Create an object of the time segmenter time_segmenter = TimeSegmenter(sample_rate, min_duration, window_size, tolerance) change_points = time_segmenter.apply(recording.data[0]) time_segments_indices = np.append(np.insert(change_points, 0, 0), len(recording.data[0])) annotations = [] for i in range(len(time_segments_indices) - 1): # Build comment JSON with type metadata comment_data = { "type": annotation_type, "generator": "cusum_annotator", "params": { "window_size": window_size, "min_duration": min_duration, "tolerance": tolerance, }, } f_min, f_max = detect_frequency( signal=recording.data[0], start=time_segments_indices[i], stop=time_segments_indices[i + 1], sample_rate=sample_rate, ) annotations.append( Annotation( sample_start=time_segments_indices[i], sample_count=time_segments_indices[i + 1] - time_segments_indices[i], freq_lower_edge=center_frequency + f_min, freq_upper_edge=center_frequency + f_max, label=label, comment=json.dumps(comment_data), detail={"generator": "cusum_annotator"}, ) ) return Recording(data=recording.data, metadata=recording.metadata, annotations=recording.annotations + annotations) def _compute_cusum(_signal, sample_rate: int, tolerance: int = None, min_duration: float = -1): """ This function efficiently computes the cumulative sum of a give list (_signal), with an optional tolerance. Args: - _signal: array of iq samples. - Tolerance: the least acceptable length of a block, Defaults to None. Returns: - cusum (array): Array of the cumulative sum of the given list - sample_rate (int): __description_ - change_points (array): Array of the indices at which a change in the CUSUM direction happens. - min_duration (float): The least acceptable time width of each segment (in ms). Defaults to -1. """ # efficiently calculate the running sum of the signal # cusum = list(itertools.accumulate((_signal - np.mean(_signal)))) x = _signal - np.mean(_signal) cusum = np.cumsum(x) # 'diff' computes the differences between the consecutive values, # then 'sign' determines if it is +ve or -ve. change_indicators = np.sign(np.diff(cusum)) change_points = np.where(np.diff(change_indicators))[0] + 1 # Limit the change_points # Reject those whose number of samples < minimum accepted #n of samples in (min duration) ms. if min_duration is not None and min_duration > 0: min_samples_wide = int(min_duration * sample_rate / 1000) segments_lengths = np.diff(change_points) segments_lengths = np.insert(segments_lengths, 0, change_points[0]) change_points = change_points[np.where(segments_lengths > min_samples_wide)[0]] return cusum, change_points def detect_frequency(signal, start, stop, sample_rate): signal_segment = signal[start:stop] if len(signal_segment) > 0: fft_data = np.abs(np.fft.fftshift(np.fft.fft(signal_segment))) fft_freqs = np.fft.fftshift(np.fft.fftfreq(len(signal_segment), 1 / sample_rate)) # Use a spectral threshold to find the 'height' of the orange block spectral_thresh = np.max(fft_data) * 0.15 sig_indices = np.where(fft_data > spectral_thresh)[0] if len(sig_indices) > 4: return fft_freqs[sig_indices[0]], fft_freqs[sig_indices[-1]] else: return -sample_rate / 4, sample_rate / 4 else: return -sample_rate / 4, sample_rate / 4 class TimeSegmenter: """Time Segmenter class, it creates a segmenter object with certain\ characteristics to easily split an input signal to segments based on\ the cumulative sum of deviations (of the signal mean) """ def __init__( self, sample_rate: int, min_duration: float = 1, moving_average_window: int = 3, tolerance: int = None ): """_summary_ Args: sample_rate (int): _description_ min_duration (float, optional): _description_. Defaults to 1. moving_average_window (int, optional): _description_. Defaults to 3. tolerance (int, optional): _description_. Defaults to None. """ self.sample_rate = sample_rate self.min_duration = min_duration self.moving_average_window = moving_average_window self._moving_avg_filter = self._init_filter() self.tolerance = tolerance def _init_filter(self): """_summary_ Returns: _type_: _description_ """ return np.ones(self.moving_average_window) / self.moving_average_window def _apply_filter(self, iqsignal: np.array): """_summary_ Args: iqsignal (np.array): _description_ Returns: _type_: _description_ """ return np.convolve(abs(iqsignal), self._moving_avg_filter, mode="same") def _create_segments(self, iq_signal: np.array, change_points: np.array): """_summary_ Args: iq_signal (np.array): _description_ change_points (np.array): _description_ Returns: _type_: _description_ """ return np.split(iq_signal, change_points) def apply(self, iq_signal: np.array): """_summary_ Args: iq_signal (np.array): _description_ Returns: _type_: _description_ """ smoothed_signal = self._apply_filter(iq_signal) _, change_points = _compute_cusum(smoothed_signal, self.sample_rate, self.tolerance, self.min_duration) # segments = self._create_segments(iq_signal, change_points) return change_points