Compare commits

...

1 Commits

Author SHA1 Message Date
ben
07fc871463 signal viewing tools 2026-05-27 11:24:28 -04:00
6 changed files with 112 additions and 26 deletions

View File

@ -45,7 +45,14 @@ def is_annotation_contained(inner: Annotation, outer: Annotation) -> bool:
outer_sample_stop = outer.sample_start + outer.sample_count outer_sample_stop = outer.sample_start + outer.sample_count
if inner.sample_start > outer.sample_start and inner_sample_stop < outer_sample_stop: if inner.sample_start > outer.sample_start and inner_sample_stop < outer_sample_stop:
if inner.freq_lower_edge > outer.freq_lower_edge and inner.freq_upper_edge < outer.freq_upper_edge: if (
inner.freq_lower_edge is not None
and inner.freq_upper_edge is not None
and outer.freq_lower_edge is not None
and outer.freq_upper_edge is not None
and inner.freq_lower_edge > outer.freq_lower_edge
and inner.freq_upper_edge < outer.freq_upper_edge
):
return True return True
return False return False

View File

@ -17,10 +17,10 @@ class Annotation:
:type sample_start: int :type sample_start: int
:param sample_count: The index of the ending sample of the annotation, inclusive. :param sample_count: The index of the ending sample of the annotation, inclusive.
:type sample_count: int :type sample_count: int
:param freq_lower_edge: The lower frequency of the annotation. :param freq_lower_edge: The lower frequency of the annotation. Optional; None if not specified in source.
:type freq_lower_edge: float :type freq_lower_edge: float, optional
:param freq_upper_edge: The upper frequency of the annotation. :param freq_upper_edge: The upper frequency of the annotation. Optional; None if not specified in source.
:type freq_upper_edge: float :type freq_upper_edge: float, optional
:param label: The label that will be displayed with the bounding box in compatible viewers including IQEngine. :param label: The label that will be displayed with the bounding box in compatible viewers including IQEngine.
Defaults to an emtpy string. Defaults to an emtpy string.
:type label: str, optional :type label: str, optional
@ -34,8 +34,8 @@ class Annotation:
self, self,
sample_start: int, sample_start: int,
sample_count: int, sample_count: int,
freq_lower_edge: float, freq_lower_edge: Optional[float] = None,
freq_upper_edge: float, freq_upper_edge: Optional[float] = None,
label: Optional[str] = "", label: Optional[str] = "",
comment: Optional[str] = "", comment: Optional[str] = "",
detail: Optional[dict] = None, detail: Optional[dict] = None,
@ -43,8 +43,8 @@ class Annotation:
"""Initialize a new Annotation instance.""" """Initialize a new Annotation instance."""
self.sample_start = int(sample_start) self.sample_start = int(sample_start)
self.sample_count = int(sample_count) self.sample_count = int(sample_count)
self.freq_lower_edge = float(freq_lower_edge) self.freq_lower_edge = float(freq_lower_edge) if freq_lower_edge is not None else None
self.freq_upper_edge = float(freq_upper_edge) self.freq_upper_edge = float(freq_upper_edge) if freq_upper_edge is not None else None
self.label = str(label) self.label = str(label)
self.comment = str(comment) self.comment = str(comment)
@ -62,6 +62,8 @@ class Annotation:
:returns: True if valid, False if not. :returns: True if valid, False if not.
""" """
if self.freq_lower_edge is None or self.freq_upper_edge is None:
return self.sample_count > 0
return self.sample_count > 0 and self.freq_lower_edge < self.freq_upper_edge return self.sample_count > 0 and self.freq_lower_edge < self.freq_upper_edge
def overlap(self, other): def overlap(self, other):
@ -73,6 +75,14 @@ class Annotation:
:returns: The area of the overlap in samples*frequency, or 0 if they do not overlap.""" :returns: The area of the overlap in samples*frequency, or 0 if they do not overlap."""
if (
self.freq_lower_edge is None
or self.freq_upper_edge is None
or other.freq_lower_edge is None
or other.freq_upper_edge is None
):
return 0
sample_overlap_start = max(self.sample_start, other.sample_start) sample_overlap_start = max(self.sample_start, other.sample_start)
sample_overlap_end = min(self.sample_start + self.sample_count, other.sample_start + other.sample_count) sample_overlap_end = min(self.sample_start + self.sample_count, other.sample_start + other.sample_count)
@ -91,6 +101,8 @@ class Annotation:
:returns: sample length multiplied by bandwidth.""" :returns: sample length multiplied by bandwidth."""
if self.freq_lower_edge is None or self.freq_upper_edge is None:
return 0
return self.sample_count * (self.freq_upper_edge - self.freq_lower_edge) return self.sample_count * (self.freq_upper_edge - self.freq_lower_edge)
def __eq__(self, other: Annotation) -> bool: def __eq__(self, other: Annotation) -> bool:
@ -103,13 +115,16 @@ class Annotation:
annotation_dict = {SigMFFile.START_INDEX_KEY: self.sample_start, SigMFFile.LENGTH_INDEX_KEY: self.sample_count} annotation_dict = {SigMFFile.START_INDEX_KEY: self.sample_start, SigMFFile.LENGTH_INDEX_KEY: self.sample_count}
annotation_dict["metadata"] = { metadata = {
SigMFFile.LABEL_KEY: self.label, SigMFFile.LABEL_KEY: self.label,
SigMFFile.COMMENT_KEY: self.comment, SigMFFile.COMMENT_KEY: self.comment,
SigMFFile.FHI_KEY: self.freq_upper_edge,
SigMFFile.FLO_KEY: self.freq_lower_edge,
"ria:detail": self.detail, "ria:detail": self.detail,
} }
if self.freq_upper_edge is not None:
metadata[SigMFFile.FHI_KEY] = self.freq_upper_edge
if self.freq_lower_edge is not None:
metadata[SigMFFile.FLO_KEY] = self.freq_lower_edge
annotation_dict["metadata"] = metadata
if _is_jsonable(annotation_dict): if _is_jsonable(annotation_dict):
return annotation_dict return annotation_dict

View File

@ -81,6 +81,8 @@ def view_annotations(
return 0 return 0
for annotation in sorted(annotations, key=_threshold_sort_key, reverse=True): for annotation in sorted(annotations, key=_threshold_sort_key, reverse=True):
if annotation.freq_lower_edge is None or annotation.freq_upper_edge is None:
continue
t_start = annotation.sample_start / sample_rate t_start = annotation.sample_start / sample_rate
t_width = annotation.sample_count / sample_rate t_width = annotation.sample_count / sample_rate
f_start = annotation.freq_lower_edge f_start = annotation.freq_lower_edge

View File

@ -86,9 +86,7 @@ def save_recording_auto(recording, output_path, input_path, quiet=False, overwri
input_path = Path(input_path) input_path = Path(input_path)
fmt = detect_input_format(input_path) fmt = detect_input_format(input_path)
output_path = determine_output_path( output_path = determine_output_path(input_path=input_path, output_path=output_path, fmt=fmt, overwrite=overwrite)
input_path=input_path, output_path=output_path, fmt=fmt, overwrite=overwrite
)
if not quiet: if not quiet:
if fmt == "sigmf": if fmt == "sigmf":
@ -258,7 +256,11 @@ def list(input, verbose):
user_comment = ann.comment or "" user_comment = ann.comment or ""
# Basic info # Basic info
freq_range = f"{format_frequency(ann.freq_lower_edge)} - {format_frequency(ann.freq_upper_edge)}" freq_range = (
f"{format_frequency(ann.freq_lower_edge)} - {format_frequency(ann.freq_upper_edge)}"
if ann.freq_lower_edge is not None and ann.freq_upper_edge is not None
else "N/A"
)
click.echo( click.echo(
f" [{i}] Samples {format_sample_count(ann.sample_start)}-" f" [{i}] Samples {format_sample_count(ann.sample_start)}-"
f"{format_sample_count(ann.sample_start + ann.sample_count)}: {ann.label}" f"{format_sample_count(ann.sample_start + ann.sample_count)}: {ann.label}"
@ -502,8 +504,7 @@ def clear(input, output, overwrite, force, quiet):
help="Annotation type", help="Annotation type",
) )
@click.option( @click.option(
"--sample-rate", type=float, default=None, "--sample-rate", type=float, default=None, help="Sample rate in Hz (overrides metadata; required if not in file)"
help="Sample rate in Hz (overrides metadata; required if not in file)"
) )
@click.option("--output", "-o", type=click.Path(), help="Output file path") @click.option("--output", "-o", type=click.Path(), help="Output file path")
@click.option("--overwrite", is_flag=True, help="Overwrite input file (non-SigMF only)") @click.option("--overwrite", is_flag=True, help="Overwrite input file (non-SigMF only)")
@ -617,8 +618,7 @@ def energy(
help="Annotation type", help="Annotation type",
) )
@click.option( @click.option(
"--sample-rate", type=float, default=None, "--sample-rate", type=float, default=None, help="Sample rate in Hz (overrides metadata; required if not in file)"
help="Sample rate in Hz (overrides metadata; required if not in file)"
) )
@click.option("--output", "-o", type=click.Path(), help="Output file path") @click.option("--output", "-o", type=click.Path(), help="Output file path")
@click.option("--overwrite", is_flag=True, help="Overwrite input file (non-SigMF only)") @click.option("--overwrite", is_flag=True, help="Overwrite input file (non-SigMF only)")
@ -707,8 +707,7 @@ def cusum(input, label, min_duration, window_size, tolerance, annotation_type, s
) )
@click.option("--channel", type=int, default=0, help="Channel index to annotate (default: 0)") @click.option("--channel", type=int, default=0, help="Channel index to annotate (default: 0)")
@click.option( @click.option(
"--sample-rate", type=float, default=None, "--sample-rate", type=float, default=None, help="Sample rate in Hz (overrides metadata; required if not in file)"
help="Sample rate in Hz (overrides metadata; required if not in file)"
) )
@click.option("--output", "-o", type=click.Path(), help="Output file path") @click.option("--output", "-o", type=click.Path(), help="Output file path")
@click.option("--overwrite", is_flag=True, help="Overwrite input file (non-SigMF only)") @click.option("--overwrite", is_flag=True, help="Overwrite input file (non-SigMF only)")
@ -787,8 +786,7 @@ def threshold(input, threshold, label, window_size, annotation_type, channel, sa
@click.option("--noise-threshold-db", type=float, help="Noise floor threshold in dB (auto-estimated if not specified)") @click.option("--noise-threshold-db", type=float, help="Noise floor threshold in dB (auto-estimated if not specified)")
@click.option("--min-component-bw", type=float, default=50e3, help="Min component bandwidth in Hz") @click.option("--min-component-bw", type=float, default=50e3, help="Min component bandwidth in Hz")
@click.option( @click.option(
"--sample-rate", type=float, default=None, "--sample-rate", type=float, default=None, help="Sample rate in Hz (overrides metadata; required if not in file)"
help="Sample rate in Hz (overrides metadata; required if not in file)"
) )
@click.option("--output", "-o", type=click.Path(), help="Output file path") @click.option("--output", "-o", type=click.Path(), help="Output file path")
@click.option("--overwrite", is_flag=True, help="Overwrite input file (non-SigMF only)") @click.option("--overwrite", is_flag=True, help="Overwrite input file (non-SigMF only)")
@ -809,7 +807,8 @@ def _log_separate_start(quiet, recording, indices_list, nfft, noise_threshold_db
def separate( def separate(
input, indices, nfft, noise_threshold_db, min_component_bw, sample_rate, output, overwrite, quiet, verbose): input, indices, nfft, noise_threshold_db, min_component_bw, sample_rate, output, overwrite, quiet, verbose
):
""" """
Auto-detect parallel frequency-offset signals and split into sub-bands. Auto-detect parallel frequency-offset signals and split into sub-bands.
@ -883,7 +882,11 @@ def separate(
click.echo("\n Details:") click.echo("\n Details:")
for i in range(initial_count, final_count): for i in range(initial_count, final_count):
ann = recording.annotations[i] ann = recording.annotations[i]
freq_range = f"{format_frequency(ann.freq_lower_edge)} - {format_frequency(ann.freq_upper_edge)}" freq_range = (
f"{format_frequency(ann.freq_lower_edge)} - {format_frequency(ann.freq_upper_edge)}"
if ann.freq_lower_edge is not None and ann.freq_upper_edge is not None
else "N/A"
)
click.echo( click.echo(
f" [{i}] samples {format_sample_count(ann.sample_start)}-" f" [{i}] samples {format_sample_count(ann.sample_start)}-"
f"{format_sample_count(ann.sample_start + ann.sample_count)}: {freq_range}" f"{format_sample_count(ann.sample_start + ann.sample_count)}: {freq_range}"

View File

@ -199,3 +199,44 @@ def test_annotation_to_sigmf_format_values():
values = list(result.values()) values = list(result.values())
assert 50 in values or ann.sample_start in values assert 50 in values or ann.sample_start in values
assert 100 in values or ann.sample_count in values assert 100 in values or ann.sample_count in values
# ---------------------------------------------------------------------------
# None freq-edge regression tests (SigMF optional fields)
# ---------------------------------------------------------------------------
def test_annotation_no_freq_edges():
ann = Annotation(sample_start=0, sample_count=10, label="burst")
assert ann.freq_lower_edge is None
assert ann.freq_upper_edge is None
def test_annotation_is_valid_no_freq_edges():
ann = Annotation(sample_start=0, sample_count=10, label="burst")
assert ann.is_valid() is True
ann_zero = Annotation(sample_start=0, sample_count=0, label="burst")
assert ann_zero.is_valid() is False
def test_annotation_overlap_none_edges_returns_zero():
ann1 = Annotation(sample_start=0, sample_count=10)
ann2 = Annotation(sample_start=0, sample_count=10, freq_lower_edge=0, freq_upper_edge=100)
assert ann1.overlap(ann2) == 0
assert ann2.overlap(ann1) == 0
def test_annotation_area_none_edges_returns_zero():
ann = Annotation(sample_start=0, sample_count=10, label="burst")
assert ann.area() == 0
def test_annotation_to_sigmf_omits_freq_keys_when_none():
from sigmf import SigMFFile
ann = Annotation(sample_start=0, sample_count=10, label="burst")
result = ann.to_sigmf_format()
metadata = result["metadata"]
assert SigMFFile.FLO_KEY not in metadata
assert SigMFFile.FHI_KEY not in metadata

View File

@ -189,3 +189,21 @@ def test_sigmf_3(tmp_path):
to_sigmf(recording=recording1, path=tmp_path, filename=filename.name) to_sigmf(recording=recording1, path=tmp_path, filename=filename.name)
except IOError as e: except IOError as e:
assert str(e) == "File already exists" assert str(e) == "File already exists"
def test_sigmf_annotation_without_freq_edges(tmp_path):
# Regression: annotations that omit the optional SigMF freq edge fields must
# load without error; edges should be None and the annotation still valid.
ann = Annotation(sample_start=0, sample_count=5, label="burst")
recording1 = Recording(data=complex_data_1, metadata=sample_metadata, annotations=[ann])
filename = tmp_path / "test"
to_sigmf(recording=recording1, path=tmp_path, filename=filename.name, overwrite=True)
recording2 = from_sigmf(filename)
assert len(recording2.annotations) == 1
loaded = recording2.annotations[0]
assert loaded.freq_lower_edge is None
assert loaded.freq_upper_edge is None
assert loaded.is_valid() is True
assert loaded.label == "burst"