annotationsfix #19

Merged
madrigal merged 23 commits from annotationsfix into main 2026-04-20 15:57:23 -04:00
19 changed files with 491 additions and 14 deletions
Showing only changes of commit ee2ce3b1f4 - Show all commits

View File

@ -11,15 +11,15 @@ The Radio Dataset Framework provides a software interface to access and manipula
the need for users to interface with the source files directly. Instead, users initialize and interact with a Python
object, while the complexities of efficient data retrieval and source file manipulation are managed behind the scenes.
Utils includes an abstract class called :py:obj:`ria_toolkit_oss.datatypes.datasets.RadioDataset`, which defines common properties and
Ria Toolkit OSS includes an abstract class called :py:obj:`ria_toolkit_oss.datatypes.datasets.RadioDataset`, which defines common properties and
behaviors for all radio datasets. :py:obj:`ria_toolkit_oss.datatypes.datasets.RadioDataset` can be considered a blueprint for all
other radio dataset classes. This class is then subclassed to define more specific blueprints for different types
of radio datasets. For example, :py:obj:`ria_toolkit_oss.datatypes.datasets.IQDataset`, which is tailored for machine learning tasks
involving the processing of signals represented as IQ (In-phase and Quadrature) samples.
Then, in the various project backends, there are concrete dataset classes, which inherit from both Utils and the base
Then, in the various project backends, there are concrete dataset classes, which inherit from both Ria Toolkit OSS and the base
dataset class from the respective backend. For example, the :py:obj:`TorchIQDataset` class extends both
:py:obj:`ria_toolkit_oss.datatypes.datasets.IQDataset` from Utils and :py:obj:`torch.ria_toolkit_oss.datatypes.IterableDataset` from
:py:obj:`ria_toolkit_oss.datatypes.datasets.IQDataset` from Ria Toolkit OSS and :py:obj:`torch.ria_toolkit_oss.datatypes.IterableDataset` from
PyTorch, providing a concrete dataset class tailored for IQ datasets and optimized for the PyTorch backend.
Dataset initialization
@ -130,7 +130,7 @@ Dataset processing and manipulation
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
All radio datasets support methods tailored specifically for radio processing. These methods are backend-independent,
inherited from the blueprints in Utils like :py:obj:`ria_toolkit_oss.datatypes.datasets.RadioDataset`.
inherited from the blueprints in Ria Toolkit OSS like :py:obj:`ria_toolkit_oss.datatypes.datasets.RadioDataset`.
For example, we can trim down the length of the examples from 1,024 to 512 samples, and then augment the dataset:

View File

@ -1,3 +1,4 @@
<<<<<<< HEAD
"""
The annotations package contains tools and utilities for creating, managing, and processing annotations.
@ -52,4 +53,10 @@ from .parallel_signal_separator import (
)
from .qualify_slice import qualify_slice_from_annotations
from .signal_isolation import isolate_signal
from .threshold_qualifier import threshold_qualifier
from .threshold_qualifier import threshold_qualifier
=======
from .cusum_annotator import annotate_with_cusum
from .energy_detector import detect_signals_energy
from .parallel_signal_separator import split_recording_annotations
from .threshold_qualifier import threshold_qualifier
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4

View File

@ -1,4 +1,8 @@
<<<<<<< HEAD
from utils.data.annotation import Annotation
=======
from ria_toolkit_oss.datatypes.annotation import Annotation
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
# TODO figure out how to transfer labels in the merge case

View File

@ -3,7 +3,11 @@ from typing import Optional
import numpy as np
<<<<<<< HEAD
from utils.data import Annotation, Recording
=======
from ria_toolkit_oss.datatypes import Annotation, Recording
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
def annotate_with_cusum(
@ -24,7 +28,11 @@ def annotate_with_cusum(
changes between a low and high amplitude.
:param recording: A ``Recording`` object to annotate.
<<<<<<< HEAD
:type recording: ``utils.data.Recording``
=======
:type recording: ``ria_toolkit_oss.datatypes.Recording``
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
:param label: Label for the detected segments.
:type label: str
:param window_size: The length (in samples) of the moving average window.

View File

@ -11,7 +11,11 @@ from typing import Tuple
import numpy as np
from scipy.signal import filtfilt
<<<<<<< HEAD
from utils.data import Annotation, Recording
=======
from ria_toolkit_oss.datatypes import Annotation, Recording
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
def detect_signals_energy(
@ -73,8 +77,13 @@ def detect_signals_energy(
**Example**::
<<<<<<< HEAD
>>> from utils.io import load_recording
>>> from utils.annotations import detect_signals_energy
=======
>>> from ria.io import load_recording
>>> from ria_toolkit_oss.annotations import detect_signals_energy
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
>>> recording = load_recording("capture.sigmf")
>>> # Detect with NBW frequency bounds (default, best for real signals)
@ -347,7 +356,11 @@ def annotate_with_obw(
**Example**::
<<<<<<< HEAD
>>> from utils.annotations import annotate_with_obw
=======
>>> from ria_toolkit_oss.annotations import annotate_with_obw
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
>>> annotated = annotate_with_obw(recording, label="signal_obw")
"""
signal = recording.data[0]

View File

@ -38,7 +38,11 @@ sub-annotations.
Example:
Two WiFi channels captured simultaneously:
<<<<<<< HEAD
>>> from utils.annotations import find_spectral_components
=======
>>> from ria_toolkit_oss.annotations import find_spectral_components
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
>>> # Detect the two distinct channels (returns relative frequencies)
>>> components = find_spectral_components(signal, sampling_rate=20e6)
>>> print(f"Found {len(components)} components")
@ -55,7 +59,11 @@ import numpy as np
from scipy import ndimage
from scipy import signal as scipy_signal
<<<<<<< HEAD
from utils.data import Annotation, Recording
=======
from ria_toolkit_oss.datatypes import Annotation, Recording
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
def find_spectral_components(
@ -111,8 +119,13 @@ def find_spectral_components(
**Example**::
<<<<<<< HEAD
>>> from utils.io import load_recording
>>> from utils.annotations import find_spectral_components
=======
>>> from ria.io import load_recording
>>> from ria_toolkit_oss.annotations import find_spectral_components
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
>>> recording = load_recording("capture.sigmf")
>>> segment = recording.data[0][start:end]
>>> # Components in relative (baseband) frequency
@ -241,8 +254,13 @@ def split_annotation_by_components(
**Example**::
<<<<<<< HEAD
>>> from utils.io import load_recording
>>> from utils.annotations import split_annotation_by_components
=======
>>> from ria.io import load_recording
>>> from ria_toolkit_oss.annotations import split_annotation_by_components
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
>>> recording = load_recording("capture.sigmf")
>>> # Original annotation spans multiple channels
>>> original = recording.annotations[0]
@ -369,8 +387,13 @@ def split_recording_annotations(
**Example**::
<<<<<<< HEAD
>>> from utils.io import load_recording
>>> from utils.annotations import split_recording_annotations
=======
>>> from ria.io import load_recording
>>> from ria_toolkit_oss.annotations import split_recording_annotations
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
>>> recording = load_recording("capture.sigmf")
>>> # Split all annotations
>>> split_rec = split_recording_annotations(recording)

View File

@ -1,6 +1,10 @@
import numpy as np
<<<<<<< HEAD
from utils.data import Recording
=======
from ria_toolkit_oss.datatypes import Recording
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
def qualify_slice_from_annotations(recording: Recording, slice_length: int):

View File

@ -1,8 +1,13 @@
import numpy as np
from scipy.signal import butter, lfilter
<<<<<<< HEAD
from utils.data.annotation import Annotation
from utils.data.recording import Recording
=======
from ria_toolkit_oss.datatypes.annotation import Annotation
from ria_toolkit_oss.datatypes.recording import Recording
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
def isolate_signal(recording: Recording, annotation: Annotation) -> Recording:

View File

@ -46,17 +46,29 @@ from typing import Optional
import numpy as np
<<<<<<< HEAD
from utils.data import Annotation, Recording
def _find_ranges(indices, window_size):
=======
from ria_toolkit_oss.datatypes import Annotation, Recording
def _find_ranges(indices, max_gap):
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
"""
Groups individual indices into continuous temporal ranges.
Args:
indices: Array of indices where the signal exceeded a threshold.
<<<<<<< HEAD
window_size: Maximum gap allowed between indices to consider them part
of the same range.
=======
max_gap: Maximum gap allowed between indices to consider them part
of the same range.
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
Returns:
A list of (start, stop) tuples representing detected signal segments.
@ -65,6 +77,7 @@ def _find_ranges(indices, window_size):
if len(indices) == 0:
return []
<<<<<<< HEAD
ranges = []
start = indices[0]
@ -87,16 +100,138 @@ def _find_ranges(indices, window_size):
# Ensure the final segment is captured if the loop ends while in_range.
if in_range:
ranges.append((start, indices[-1]))
=======
start = indices[0]
prev = indices[0]
ranges = []
for i in range(1, len(indices)):
if indices[i] - prev > max_gap:
ranges.append((start, prev))
start = indices[i]
prev = indices[i]
ranges.append((start, prev))
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
return ranges
<<<<<<< HEAD
def threshold_qualifier(
recording: Recording,
threshold: float,
window_size: Optional[int] = 1024,
label: Optional[str] = None,
annotation_type: Optional[str] = "standalone",
=======
def _expand_and_filter_ranges(
smoothed_power: np.ndarray,
initial_ranges: list[tuple[int, int]],
boundary_val: float,
min_duration_samples: int,
) -> list[tuple[int, int]]:
"""Apply hysteresis expansion and minimum-duration filtering."""
out: list[tuple[int, int]] = []
n = len(smoothed_power)
for start, stop in initial_ranges:
if (stop - start) < min_duration_samples:
continue
true_start = start
while true_start > 0 and smoothed_power[true_start] > boundary_val:
true_start -= 1
true_stop = stop
while true_stop < n - 1 and smoothed_power[true_stop] > boundary_val:
true_stop += 1
if (true_stop - true_start) >= min_duration_samples:
out.append((true_start, true_stop))
return out
def _merge_ranges(ranges: list[tuple[int, int]], max_gap: int) -> list[tuple[int, int]]:
"""Merge overlapping or near-adjacent ranges."""
if not ranges:
return []
ranges = sorted(ranges, key=lambda r: r[0])
merged = [ranges[0]]
for s, e in ranges[1:]:
last_s, last_e = merged[-1]
if s <= last_e + max_gap:
merged[-1] = (last_s, max(last_e, e))
else:
merged.append((s, e))
return merged
def _estimate_noise_floor(power: np.ndarray, quantile: float = 20.0) -> float:
"""Estimate baseline from the quieter portion of the envelope."""
return float(np.percentile(power, quantile))
def _estimate_group_gap(sample_rate: float) -> int:
"""Use a fixed temporal grouping gap instead of reusing the smoothing window."""
return max(1, int(0.001 * sample_rate))
def _estimate_spectral_bounds(signal_segment: np.ndarray, sample_rate: float) -> tuple[float, float]:
"""Estimate occupied bandwidth from a smoothed magnitude spectrum."""
if len(signal_segment) == 0:
return -sample_rate / 4, sample_rate / 4
window = np.hanning(len(signal_segment))
windowed = signal_segment * window
fft_data = np.abs(np.fft.fftshift(np.fft.fft(windowed)))
fft_freqs = np.fft.fftshift(np.fft.fftfreq(len(signal_segment), 1 / sample_rate))
# Smooth the spectrum so noise-like wideband bursts form a contiguous mask
# instead of thousands of tiny isolated runs.
spectral_smooth_bins = max(5, min(257, (len(signal_segment) // 512) | 1))
spectral_kernel = np.ones(spectral_smooth_bins, dtype=np.float64) / spectral_smooth_bins
smoothed_fft = np.convolve(fft_data, spectral_kernel, mode="same")
spectral_floor = float(np.percentile(smoothed_fft, 20))
spectral_peak = float(np.max(smoothed_fft))
spectral_ratio = spectral_peak / max(spectral_floor, 1e-12)
if spectral_ratio < 1.2:
return -sample_rate / 4, sample_rate / 4
spectral_thresh = spectral_floor + 0.1 * (spectral_peak - spectral_floor)
sig_indices = np.where(smoothed_fft > spectral_thresh)[0]
if len(sig_indices) == 0:
peak_idx = int(np.argmax(smoothed_fft))
bin_hz = sample_rate / len(signal_segment)
half_bins = max(1, int(np.ceil(10_000.0 / bin_hz)))
lo_idx = max(0, peak_idx - half_bins)
hi_idx = min(len(smoothed_fft) - 1, peak_idx + half_bins)
else:
runs = _find_ranges(sig_indices, max_gap=max(1, spectral_smooth_bins // 2))
peak_idx = int(np.argmax(smoothed_fft))
lo_idx, hi_idx = min(runs, key=lambda run: 0 if run[0] <= peak_idx <= run[1] else min(abs(run[0] - peak_idx), abs(run[1] - peak_idx)))
# Prevent extremely narrow tone boxes from collapsing to just a few bins.
min_total_bw_hz = 20_000.0
min_half_bins = max(1, int(np.ceil((min_total_bw_hz / 2) / (sample_rate / len(signal_segment)))))
center_idx = int(round((lo_idx + hi_idx) / 2))
lo_idx = max(0, min(lo_idx, center_idx - min_half_bins))
hi_idx = min(len(smoothed_fft) - 1, max(hi_idx, center_idx + min_half_bins))
return float(fft_freqs[lo_idx]), float(fft_freqs[hi_idx])
def threshold_qualifier(
recording: Recording,
threshold: float,
window_size: Optional[int] = None,
label: Optional[str] = None,
annotation_type: Optional[str] = "standalone",
channel: int = 0,
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
) -> Recording:
"""
Annotate a recording with bounding boxes for regions above a threshold.
@ -114,23 +249,41 @@ def threshold_qualifier(
Args:
recording: The Recording object containing IQ or real signal data.
threshold: Sensitivity multiplier (0.0 to 1.0) applied to max power.
<<<<<<< HEAD
window_size: Size of the smoothing filter and max gap for merging hits.
label: Custom string label for annotations.
annotation_type: Metadata string for the 'type' field in the annotation.
=======
window_size: Size of the smoothing filter in samples. Defaults to 1ms worth of samples.
label: Custom string label for annotations.
annotation_type: Metadata string for the 'type' field in the annotation.
channel: Index of the channel to annotate. Defaults to 0.
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
Returns:
A new Recording object populated with detected Annotations.
"""
# Extract signal and metadata
<<<<<<< HEAD
sample_data = recording.data[0]
sample_rate = recording.metadata["sample_rate"]
center_frequency = recording.metadata.get("center_frequency", 0)
=======
sample_data = recording.data[channel]
sample_rate = recording.metadata["sample_rate"]
center_frequency = recording.metadata.get("center_frequency", 0)
if window_size is None:
window_size = max(64, int(sample_rate * 0.001))
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
# --- 1. SIGNAL CONDITIONING ---
# Convert to power (Magnitude squared)
power_data = np.abs(sample_data) ** 2
smoothing_window = np.ones(window_size) / window_size
smoothed_power = np.convolve(power_data, smoothing_window, mode="same")
<<<<<<< HEAD
# Define thresholds based on the global peak of the smoothed signal
max_power = np.max(smoothed_power)
@ -186,6 +339,110 @@ def threshold_qualifier(
# --- 5. ANNOTATION GENERATION ---
if label is None:
label = f"{int(threshold*100)}%"
=======
group_gap_samples = _estimate_group_gap(sample_rate)
# Define thresholds using peak relative to baseline.
max_power = np.max(smoothed_power)
noise_floor = _estimate_noise_floor(smoothed_power)
dynamic_range_ratio = max_power / max(noise_floor, 1e-12)
# Soft early exit: keep a guard for low-contrast noise, but compute it from
# the quieter tail of the envelope so burst-heavy captures are not rejected.
if dynamic_range_ratio < 1.5:
return Recording(data=recording.data, metadata=recording.metadata, annotations=recording.annotations)
trigger_val = noise_floor + threshold * (max_power - noise_floor)
boundary_val = noise_floor + 0.5 * threshold * (max_power - noise_floor)
# --- 2. INITIAL DETECTION ---
# Enforce an explicit minimum duration in seconds; this is stable across
# varying capture lengths and avoids over-fitting to recording length.
min_duration_samples = max(1, int(0.005 * sample_rate))
annotations = []
# Pass 1: Detect stronger bursts.
indices = np.where(smoothed_power > trigger_val)[0]
pass1_initial = _find_ranges(indices=indices, max_gap=group_gap_samples)
pass1_ranges = _expand_and_filter_ranges(
smoothed_power=smoothed_power,
initial_ranges=pass1_initial,
boundary_val=boundary_val,
min_duration_samples=min_duration_samples,
)
# Pass 2: Recover weaker bursts on residual power not already covered.
# This improves recall in mixed-amplitude captures.
mask = np.ones_like(smoothed_power, dtype=np.float32)
for s, e in pass1_ranges:
mask[max(0, s) : min(len(mask), e)] = 0.0
residual_power = smoothed_power * mask
residual_max = float(np.max(residual_power))
residual_ratio = residual_max / max(noise_floor, 1e-12)
pass2_ranges: list[tuple[int, int]] = []
if residual_ratio >= 2.0:
weak_threshold = max(0.3, threshold * 0.7)
weak_trigger = noise_floor + weak_threshold * (residual_max - noise_floor)
weak_boundary = noise_floor + 0.5 * weak_threshold * (residual_max - noise_floor)
weak_indices = np.where(residual_power > weak_trigger)[0]
pass2_initial = _find_ranges(indices=weak_indices, max_gap=group_gap_samples)
pass2_ranges = _expand_and_filter_ranges(
smoothed_power=smoothed_power,
initial_ranges=pass2_initial,
boundary_val=weak_boundary,
min_duration_samples=min_duration_samples,
)
# Pass 3: Detect sustained faint bursts via macro-window averaging.
# Targets bursts whose peak power is near the trigger level but whose
# *average* power is consistently elevated above the noise floor — these
# are missed by peak-based detection because only a few short spikes exceed
# the trigger, all too brief to pass the minimum-duration filter.
#
# The mask is applied to power_data *before* convolving so that bright
# burst energy does not bleed through the long window into adjacent regions,
# which would inflate macro_residual_max and push the trigger above the
# faint burst's average power.
macro_window_size = max(window_size * 16, int(sample_rate * 0.02))
macro_kernel = np.ones(macro_window_size, dtype=np.float64) / macro_window_size
# Expand each annotated range by half the macro window on both sides so that
# the long convolution cannot "see" the leading/trailing edges of already-
# annotated bursts, which would produce spurious short fragments in Pass 3.
macro_expand = macro_window_size * 2
masked_power_for_macro = power_data.copy()
n = len(masked_power_for_macro)
for s, e in pass1_ranges + pass2_ranges:
masked_power_for_macro[max(0, s - macro_expand) : min(n, e + macro_expand)] = 0.0
macro_residual = np.convolve(masked_power_for_macro, macro_kernel, mode="same")
macro_residual_max = float(np.max(macro_residual))
pass3_ranges: list[tuple[int, int]] = []
if macro_residual_max / max(noise_floor, 1e-12) >= 1.3:
macro_trigger = noise_floor + threshold * (macro_residual_max - noise_floor)
macro_boundary = noise_floor + 0.5 * threshold * (macro_residual_max - noise_floor)
macro_indices = np.where(macro_residual > macro_trigger)[0]
macro_initial = _find_ranges(indices=macro_indices, max_gap=group_gap_samples)
pass3_ranges = _expand_and_filter_ranges(
smoothed_power=macro_residual,
initial_ranges=macro_initial,
boundary_val=macro_boundary,
min_duration_samples=min_duration_samples,
)
all_ranges = _merge_ranges(pass1_ranges + pass2_ranges + pass3_ranges, max_gap=group_gap_samples)
for true_start, true_stop in all_ranges:
# --- 4. SPECTRAL ANALYSIS (Frequency Detection) ---
signal_segment = sample_data[true_start:true_stop]
f_min, f_max = _estimate_spectral_bounds(signal_segment, sample_rate)
# --- 5. ANNOTATION GENERATION ---
ann_label = label if label is not None else f"{int(threshold*100)}%"
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
# Pack metadata for the UI/Downstream processing
comment_data = {
@ -202,7 +459,11 @@ def threshold_qualifier(
sample_count=true_stop - true_start,
freq_lower_edge=center_frequency + f_min,
freq_upper_edge=center_frequency + f_max,
<<<<<<< HEAD
label=label,
=======
label=ann_label,
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
comment=json.dumps(comment_data),
detail={"generator": "hysteresis_qualifier"},
)

View File

@ -601,7 +601,7 @@ class Recording:
>>> recording = Recording(data=samples, metadata=metadata)
>>> recording.to_wav()
"""
from utils.io.recording import to_wav
from ria_toolkit_oss.io.recording import to_wav
return to_wav(
recording=self,
@ -651,7 +651,7 @@ class Recording:
>>> recording = Recording(data=samples, metadata=metadata)
>>> recording.to_blue()
"""
from utils.io.recording import to_blue
from ria_toolkit_oss.io.recording import to_blue
return to_blue(recording=self, filename=filename, path=path, data_format=data_format, overwrite=overwrite)

View File

@ -134,6 +134,27 @@ def from_npy(file: os.PathLike | str, legacy: bool = False) -> Recording:
annotations = list(np.load(f, allow_pickle=True))
except EOFError:
annotations = []
except ModuleNotFoundError:
# File was pickled with utils.data.Annotation — remap to ria_toolkit_oss
import pickle
import sys
import types
import ria_toolkit_oss.datatypes.annotation as _ann_mod
utils_shim = types.ModuleType("utils")
utils_data = types.ModuleType("utils.data")
utils_data_annotation = types.ModuleType("utils.data.annotation")
utils_data_annotation.Annotation = _ann_mod.Annotation
utils_shim.data = utils_data
utils_data.annotation = utils_data_annotation
sys.modules.setdefault("utils", utils_shim)
sys.modules.setdefault("utils.data", utils_data)
sys.modules.setdefault("utils.data.annotation", utils_data_annotation)
f.seek(0)
np.load(f, allow_pickle=True) # skip data
np.load(f, allow_pickle=True) # skip metadata
annotations = list(np.load(f, allow_pickle=True))
recording = Recording(data=data, metadata=metadata, annotations=annotations)
return recording

View File

@ -4,6 +4,7 @@ import textwrap
from typing import Optional
import matplotlib.pyplot as plt
from matplotlib.patches import Patch
import numpy as np
from matplotlib import gridspec
from matplotlib.patches import Patch
@ -57,6 +58,7 @@ def view_annotations(
sample_rate, center_frequency, _ = extract_metadata_fields(recording.metadata)
annotations = recording.annotations
<<<<<<< HEAD
# 2. Setup Color Mapping (No more hardcoded yellow fallback!)
# available_colors = [
# COLORS.get("magenta", "magenta"),
@ -66,6 +68,17 @@ def view_annotations(
# ]
palette = ["#FF00FF", "#00FF00", "#00FFFF", "#FFFF00", "#FF8000"]
=======
# 2. Setup Color Mapping
available_colors = [
COLORS.get("magenta", "magenta"),
COLORS.get("accent", "cyan"),
COLORS.get("light", "white"),
"lime",
]
palette = ["#2196F3", "#9C27B0", "#64B5F6", "#7B1FA2", "#5C6BC0", "#CE93D8", "#1565C0", "#7C4DFF"]
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
unique_labels = sorted(list(set(ann.label for ann in annotations if ann.label)))
label_to_color = {label: palette[i % len(palette)] for i, label in enumerate(unique_labels)}
@ -74,18 +87,34 @@ def view_annotations(
complex_signal, NFFT=256, Fs=sample_rate, Fc=center_frequency, noverlap=128, cmap="twilight"
)
<<<<<<< HEAD
# 4. Draw Annotations
for annotation in annotations:
# --- DEFINING VARIABLES FIRST ---
=======
# 4. Draw Annotations (highest threshold % first so lower % renders on top)
def _threshold_sort_key(ann):
try:
return int(ann.label.rstrip("%"))
except (ValueError, AttributeError):
return 0
for annotation in sorted(annotations, key=_threshold_sort_key, reverse=True):
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
t_start = annotation.sample_start / sample_rate
t_width = annotation.sample_count / sample_rate
f_start = annotation.freq_lower_edge
f_height = annotation.freq_upper_edge - annotation.freq_lower_edge
<<<<<<< HEAD
# Look up the color for this specific label
ann_color = label_to_color.get(annotation.label, "gray")
# Draw the Rectangle
=======
ann_color = label_to_color.get(annotation.label, "gray")
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
rect = plt.Rectangle(
(t_start, f_start), t_width, f_height, linewidth=1.5, edgecolor=ann_color, facecolor="none", alpha=0.8
)
@ -101,7 +130,11 @@ def view_annotations(
ax.set_title(title, fontsize=title_fontsize, pad=20)
ax.set_xlabel("Time (s)", fontsize=12)
ax.set_ylabel("Frequency (MHz)", fontsize=12)
<<<<<<< HEAD
ax.grid(alpha=0.1) # Add faint grid
=======
ax.grid(alpha=0.1)
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
output_path, _ = set_path(output_path=output_path)
plt.savefig(output_path, dpi=dpi, bbox_inches="tight")

View File

@ -11,8 +11,13 @@ from ria_toolkit_oss.annotations import (
split_recording_annotations,
threshold_qualifier,
)
<<<<<<< HEAD
from ria_toolkit_oss.data import Annotation
from ria_toolkit_oss.data.recording import Recording
=======
from ria_toolkit_oss.datatypes import Annotation
from ria_toolkit_oss.datatypes.recording import Recording
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
from ria_toolkit_oss.io import load_recording, to_blue, to_npy, to_sigmf, to_wav
from ria_toolkit_oss_cli.ria_toolkit_oss.common import format_frequency, format_sample_count
@ -50,6 +55,7 @@ def detect_input_format(filepath):
def determine_output_path(input_path, output_path, fmt, quiet, overwrite):
input_path = Path(input_path)
<<<<<<< HEAD
if output_path:
target = Path(output_path)
@ -57,6 +63,17 @@ def determine_output_path(input_path, output_path, fmt, quiet, overwrite):
else:
annotated_name = f"{input_path.stem}_annotated"
target = input_path.with_name(f"{annotated_name}{input_path.suffix}")
=======
input_is_annotated = input_path.stem.endswith("_annotated")
if output_path:
target = Path(output_path)
elif overwrite and input_is_annotated:
# Write back in-place only when the input is already an _annotated file
target = input_path
else:
target = input_path.with_name(f"{input_path.stem}_annotated{input_path.suffix}")
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
if fmt == "sigmf":
final_path = normalize_sigmf_path(target)
@ -67,8 +84,15 @@ def determine_output_path(input_path, output_path, fmt, quiet, overwrite):
if not quiet:
click.echo(f"Saving to: {final_path}")
<<<<<<< HEAD
if final_path.exists() and not overwrite and final_path != input_path:
click.echo(f"Error: {final_path} already exists. Use --overwrite to replace it.", err=True)
=======
# Always allow writing to _annotated files; guard against overwriting originals
target_is_annotated = final_path.stem.endswith("_annotated")
if final_path.exists() and not target_is_annotated and final_path != input_path:
click.echo(f"Error: {final_path} is not an annotated file and cannot be overwritten.", err=True)
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
return None
return final_path
@ -226,8 +250,13 @@ def list(input, verbose):
\b
Examples:
<<<<<<< HEAD
utils annotate list recording.sigmf-data
utils annotate list signal.npy --verbose
=======
ria annotate list recording.sigmf-data
ria annotate list signal.npy --verbose
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
"""
try:
recording = load_recording(input)
@ -295,8 +324,13 @@ def add(input, start, count, label, freq_lower, freq_upper, comment, annotation_
\b
Examples:
<<<<<<< HEAD
utils annotate add file.npy --start 1000 --count 500 --label wifi
utils annotate add signal.sigmf-data --start 0 --count 1000 --label burst --comment "Strong signal"
=======
ria annotate add file.npy --start 1000 --count 500 --label wifi
ria annotate add signal.sigmf-data --start 0 --count 1000 --label burst --comment "Strong signal"
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
"""
try:
recording = load_recording(input)
@ -378,12 +412,21 @@ def add(input, start, count, label, freq_lower, freq_upper, comment, annotation_
def remove(input, index, output, overwrite, quiet):
"""Remove annotation by index.
<<<<<<< HEAD
Use 'utils annotate list' to see annotation indices.
\b
Examples:
utils annotate remove signal.sigmf-data 2
utils annotate remove file.npy 0
=======
Use 'ria annotate list' to see annotation indices.
\b
Examples:
ria annotate remove signal.sigmf-data 2
ria annotate remove file.npy 0
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
"""
try:
recording = load_recording(input)
@ -432,8 +475,13 @@ def clear(input, output, overwrite, force, quiet):
\b
Examples:
<<<<<<< HEAD
utils annotate clear signal.sigmf-data
utils annotate clear file.npy --force
=======
ria annotate clear signal.sigmf-data
ria annotate clear file.npy --force
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
"""
try:
recording = load_recording(input)
@ -528,10 +576,17 @@ def energy(
\b
Examples:
<<<<<<< HEAD
utils annotate energy capture.sigmf-data --label burst
utils annotate energy signal.npy --threshold 1.5 --min-distance 10000
utils annotate energy signal.sigmf-data --freq-method obw
utils annotate energy signal.sigmf-data --freq-method full-detected
=======
ria annotate energy capture.sigmf-data --label burst
ria annotate energy signal.npy --threshold 1.5 --min-distance 10000
ria annotate energy signal.sigmf-data --freq-method obw
ria annotate energy signal.sigmf-data --freq-method full-detected
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
"""
try:
@ -607,8 +662,13 @@ def cusum(input, label, min_duration, window_size, tolerance, annotation_type, o
\b
Examples:
<<<<<<< HEAD
utils annotate cusum signal.sigmf-data --min-duration 5.0
utils annotate cusum data.npy --min-duration 10.0 --label state
=======
ria annotate cusum signal.sigmf-data --min-duration 5.0
ria annotate cusum data.npy --min-duration 10.0 --label state
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
"""
try:
recording = load_recording(input)
@ -654,7 +714,11 @@ def cusum(input, label, min_duration, window_size, tolerance, annotation_type, o
@click.argument("input", type=click.Path(exists=True))
@click.option("--threshold", type=float, required=True, help="Threshold (0.0-1.0, fraction of max magnitude)")
@click.option("--label", type=str, default=None, help="Annotation label")
<<<<<<< HEAD
@click.option("--window-size", type=int, default=1024, help="Smoothing window size")
=======
@click.option("--window-size", type=int, default=None, help="Smoothing window size in samples (default: 1ms at recording sample rate)")
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
@click.option(
"--type",
"annotation_type",
@ -662,10 +726,18 @@ def cusum(input, label, min_duration, window_size, tolerance, annotation_type, o
default="standalone",
help="Annotation type",
)
<<<<<<< HEAD
@click.option("--output", "-o", type=click.Path(), help="Output file path")
@click.option("--overwrite", is_flag=True, help="Overwrite input file (non-SigMF only)")
@click.option("--quiet", is_flag=True, help="Quiet mode")
def threshold(input, threshold, label, window_size, annotation_type, output, overwrite, quiet):
=======
@click.option("--channel", type=int, default=0, help="Channel index to annotate (default: 0)")
@click.option("--output", "-o", type=click.Path(), help="Output file path")
@click.option("--overwrite", is_flag=True, help="Overwrite input file (non-SigMF only)")
@click.option("--quiet", is_flag=True, help="Quiet mode")
def threshold(input, threshold, label, window_size, annotation_type, channel, output, overwrite, quiet):
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
"""Auto-detect signals using threshold method.
Detects samples above a percentage of maximum magnitude. Best for simple
@ -673,8 +745,13 @@ def threshold(input, threshold, label, window_size, annotation_type, output, ove
\b
Examples:
<<<<<<< HEAD
utils annotate threshold signal.sigmf-data --threshold 0.7 --label wifi
utils annotate threshold data.npy --threshold 0.5 --window-size 2048
=======
ria annotate threshold signal.sigmf-data --threshold 0.7 --label wifi
ria annotate threshold data.npy --threshold 0.5 --window-size 2048
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
"""
if not (0.0 <= threshold <= 1.0):
raise click.ClickException(f"--threshold must be between 0.0 and 1.0, got {threshold}")
@ -689,7 +766,12 @@ def threshold(input, threshold, label, window_size, annotation_type, output, ove
if not quiet:
click.echo("\nDetecting signals using threshold qualifier...")
click.echo(f" Threshold: {threshold * 100:.1f}% of max magnitude")
<<<<<<< HEAD
click.echo(f" Window size: {window_size} samples")
=======
click.echo(f" Window size: {'auto (1ms)' if window_size is None else f'{window_size} samples'}")
click.echo(f" Channel: {channel}")
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
try:
initial_count = len(recording.annotations)
@ -699,6 +781,10 @@ def threshold(input, threshold, label, window_size, annotation_type, output, ove
window_size=window_size,
label=label,
annotation_type=annotation_type,
<<<<<<< HEAD
=======
channel=channel,
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
)
added = len(recording.annotations) - initial_count
@ -747,10 +833,17 @@ def separate(input, indices, nfft, noise_threshold_db, min_component_bw, output,
\b
Examples:
<<<<<<< HEAD
utils annotate separate capture.sigmf-data
utils annotate separate signal.npy --indices 0,1,2
utils annotate separate data.sigmf-data --noise-threshold-db -70
utils annotate separate signal.npy --min-component-bw 100000
=======
ria annotate separate capture.sigmf-data
ria annotate separate signal.npy --indices 0,1,2
ria annotate separate data.sigmf-data --noise-threshold-db -70
ria annotate separate signal.npy --min-component-bw 100000
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
"""
try:

View File

@ -2,6 +2,10 @@
"""
This module contains all the CLI bindings for the ria package.
"""
<<<<<<< HEAD
=======
>>>>>>> 2bb2d9d5a780dbc17172135a5a1f10eba14b1af4
from .annotate import annotate
from .capture import capture
from .combine import combine

View File

@ -232,8 +232,8 @@ def generate():
\b
Examples:
utils synth chirp -b 1e6 -p 0.01 -s 10e6 -o chirp_basic.sigmf
utils synth fsk -M 2 -r 100e3 -s 2e6 -o fsk2_basic.sigmf
ria synth chirp -b 1e6 -p 0.01 -s 10e6 -o chirp_basic.sigmf
ria synth fsk -M 2 -r 100e3 -s 2e6 -o fsk2_basic.sigmf
"""
pass

View File

@ -264,13 +264,13 @@ def transform():
Examples:\n
\b
# List available augmentations
utils transform augment --list
ria transform augment --list
\b
# Apply channel swap
utils transform augment channel_swap input.npy
ria transform augment channel_swap input.npy
\b
# Apply AWGN impairment
utils transform impair awgn input.npy --snr-db 15
ria transform impair awgn input.npy --snr-db 15
"""
pass

View File

@ -40,6 +40,7 @@ VISUALIZATION_TYPES = {
"options": ["channel", "dark"],
},
"channels": {"function": view_channels, "description": "Multi-channel IQ and spectrogram view", "options": []},
"annotations": {"function": view_annotations, "description": "Annotated spectrogram view", "options": ["channel", "dark"]},
}

View File

@ -1,6 +1,6 @@
# CLI Tests
Comprehensive test suite for the utils CLI commands.
Comprehensive test suite for the ria CLI commands.
## Test Structure

View File

@ -1 +1 @@
"""Tests for utils CLI commands."""
"""Tests for ria CLI commands."""