Compare commits
1 Commits
main
...
signal-vie
| Author | SHA1 | Date | |
|---|---|---|---|
| 07fc871463 |
|
|
@ -45,7 +45,14 @@ def is_annotation_contained(inner: Annotation, outer: Annotation) -> bool:
|
||||||
outer_sample_stop = outer.sample_start + outer.sample_count
|
outer_sample_stop = outer.sample_start + outer.sample_count
|
||||||
|
|
||||||
if inner.sample_start > outer.sample_start and inner_sample_stop < outer_sample_stop:
|
if inner.sample_start > outer.sample_start and inner_sample_stop < outer_sample_stop:
|
||||||
if inner.freq_lower_edge > outer.freq_lower_edge and inner.freq_upper_edge < outer.freq_upper_edge:
|
if (
|
||||||
|
inner.freq_lower_edge is not None
|
||||||
|
and inner.freq_upper_edge is not None
|
||||||
|
and outer.freq_lower_edge is not None
|
||||||
|
and outer.freq_upper_edge is not None
|
||||||
|
and inner.freq_lower_edge > outer.freq_lower_edge
|
||||||
|
and inner.freq_upper_edge < outer.freq_upper_edge
|
||||||
|
):
|
||||||
return True
|
return True
|
||||||
|
|
||||||
return False
|
return False
|
||||||
|
|
|
||||||
|
|
@ -17,10 +17,10 @@ class Annotation:
|
||||||
:type sample_start: int
|
:type sample_start: int
|
||||||
:param sample_count: The index of the ending sample of the annotation, inclusive.
|
:param sample_count: The index of the ending sample of the annotation, inclusive.
|
||||||
:type sample_count: int
|
:type sample_count: int
|
||||||
:param freq_lower_edge: The lower frequency of the annotation.
|
:param freq_lower_edge: The lower frequency of the annotation. Optional; None if not specified in source.
|
||||||
:type freq_lower_edge: float
|
:type freq_lower_edge: float, optional
|
||||||
:param freq_upper_edge: The upper frequency of the annotation.
|
:param freq_upper_edge: The upper frequency of the annotation. Optional; None if not specified in source.
|
||||||
:type freq_upper_edge: float
|
:type freq_upper_edge: float, optional
|
||||||
:param label: The label that will be displayed with the bounding box in compatible viewers including IQEngine.
|
:param label: The label that will be displayed with the bounding box in compatible viewers including IQEngine.
|
||||||
Defaults to an emtpy string.
|
Defaults to an emtpy string.
|
||||||
:type label: str, optional
|
:type label: str, optional
|
||||||
|
|
@ -34,8 +34,8 @@ class Annotation:
|
||||||
self,
|
self,
|
||||||
sample_start: int,
|
sample_start: int,
|
||||||
sample_count: int,
|
sample_count: int,
|
||||||
freq_lower_edge: float,
|
freq_lower_edge: Optional[float] = None,
|
||||||
freq_upper_edge: float,
|
freq_upper_edge: Optional[float] = None,
|
||||||
label: Optional[str] = "",
|
label: Optional[str] = "",
|
||||||
comment: Optional[str] = "",
|
comment: Optional[str] = "",
|
||||||
detail: Optional[dict] = None,
|
detail: Optional[dict] = None,
|
||||||
|
|
@ -43,8 +43,8 @@ class Annotation:
|
||||||
"""Initialize a new Annotation instance."""
|
"""Initialize a new Annotation instance."""
|
||||||
self.sample_start = int(sample_start)
|
self.sample_start = int(sample_start)
|
||||||
self.sample_count = int(sample_count)
|
self.sample_count = int(sample_count)
|
||||||
self.freq_lower_edge = float(freq_lower_edge)
|
self.freq_lower_edge = float(freq_lower_edge) if freq_lower_edge is not None else None
|
||||||
self.freq_upper_edge = float(freq_upper_edge)
|
self.freq_upper_edge = float(freq_upper_edge) if freq_upper_edge is not None else None
|
||||||
self.label = str(label)
|
self.label = str(label)
|
||||||
self.comment = str(comment)
|
self.comment = str(comment)
|
||||||
|
|
||||||
|
|
@ -62,6 +62,8 @@ class Annotation:
|
||||||
:returns: True if valid, False if not.
|
:returns: True if valid, False if not.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
if self.freq_lower_edge is None or self.freq_upper_edge is None:
|
||||||
|
return self.sample_count > 0
|
||||||
return self.sample_count > 0 and self.freq_lower_edge < self.freq_upper_edge
|
return self.sample_count > 0 and self.freq_lower_edge < self.freq_upper_edge
|
||||||
|
|
||||||
def overlap(self, other):
|
def overlap(self, other):
|
||||||
|
|
@ -73,6 +75,14 @@ class Annotation:
|
||||||
|
|
||||||
:returns: The area of the overlap in samples*frequency, or 0 if they do not overlap."""
|
:returns: The area of the overlap in samples*frequency, or 0 if they do not overlap."""
|
||||||
|
|
||||||
|
if (
|
||||||
|
self.freq_lower_edge is None
|
||||||
|
or self.freq_upper_edge is None
|
||||||
|
or other.freq_lower_edge is None
|
||||||
|
or other.freq_upper_edge is None
|
||||||
|
):
|
||||||
|
return 0
|
||||||
|
|
||||||
sample_overlap_start = max(self.sample_start, other.sample_start)
|
sample_overlap_start = max(self.sample_start, other.sample_start)
|
||||||
sample_overlap_end = min(self.sample_start + self.sample_count, other.sample_start + other.sample_count)
|
sample_overlap_end = min(self.sample_start + self.sample_count, other.sample_start + other.sample_count)
|
||||||
|
|
||||||
|
|
@ -91,6 +101,8 @@ class Annotation:
|
||||||
|
|
||||||
:returns: sample length multiplied by bandwidth."""
|
:returns: sample length multiplied by bandwidth."""
|
||||||
|
|
||||||
|
if self.freq_lower_edge is None or self.freq_upper_edge is None:
|
||||||
|
return 0
|
||||||
return self.sample_count * (self.freq_upper_edge - self.freq_lower_edge)
|
return self.sample_count * (self.freq_upper_edge - self.freq_lower_edge)
|
||||||
|
|
||||||
def __eq__(self, other: Annotation) -> bool:
|
def __eq__(self, other: Annotation) -> bool:
|
||||||
|
|
@ -103,13 +115,16 @@ class Annotation:
|
||||||
|
|
||||||
annotation_dict = {SigMFFile.START_INDEX_KEY: self.sample_start, SigMFFile.LENGTH_INDEX_KEY: self.sample_count}
|
annotation_dict = {SigMFFile.START_INDEX_KEY: self.sample_start, SigMFFile.LENGTH_INDEX_KEY: self.sample_count}
|
||||||
|
|
||||||
annotation_dict["metadata"] = {
|
metadata = {
|
||||||
SigMFFile.LABEL_KEY: self.label,
|
SigMFFile.LABEL_KEY: self.label,
|
||||||
SigMFFile.COMMENT_KEY: self.comment,
|
SigMFFile.COMMENT_KEY: self.comment,
|
||||||
SigMFFile.FHI_KEY: self.freq_upper_edge,
|
|
||||||
SigMFFile.FLO_KEY: self.freq_lower_edge,
|
|
||||||
"ria:detail": self.detail,
|
"ria:detail": self.detail,
|
||||||
}
|
}
|
||||||
|
if self.freq_upper_edge is not None:
|
||||||
|
metadata[SigMFFile.FHI_KEY] = self.freq_upper_edge
|
||||||
|
if self.freq_lower_edge is not None:
|
||||||
|
metadata[SigMFFile.FLO_KEY] = self.freq_lower_edge
|
||||||
|
annotation_dict["metadata"] = metadata
|
||||||
|
|
||||||
if _is_jsonable(annotation_dict):
|
if _is_jsonable(annotation_dict):
|
||||||
return annotation_dict
|
return annotation_dict
|
||||||
|
|
|
||||||
|
|
@ -81,6 +81,8 @@ def view_annotations(
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
for annotation in sorted(annotations, key=_threshold_sort_key, reverse=True):
|
for annotation in sorted(annotations, key=_threshold_sort_key, reverse=True):
|
||||||
|
if annotation.freq_lower_edge is None or annotation.freq_upper_edge is None:
|
||||||
|
continue
|
||||||
t_start = annotation.sample_start / sample_rate
|
t_start = annotation.sample_start / sample_rate
|
||||||
t_width = annotation.sample_count / sample_rate
|
t_width = annotation.sample_count / sample_rate
|
||||||
f_start = annotation.freq_lower_edge
|
f_start = annotation.freq_lower_edge
|
||||||
|
|
|
||||||
|
|
@ -86,9 +86,7 @@ def save_recording_auto(recording, output_path, input_path, quiet=False, overwri
|
||||||
input_path = Path(input_path)
|
input_path = Path(input_path)
|
||||||
fmt = detect_input_format(input_path)
|
fmt = detect_input_format(input_path)
|
||||||
|
|
||||||
output_path = determine_output_path(
|
output_path = determine_output_path(input_path=input_path, output_path=output_path, fmt=fmt, overwrite=overwrite)
|
||||||
input_path=input_path, output_path=output_path, fmt=fmt, overwrite=overwrite
|
|
||||||
)
|
|
||||||
|
|
||||||
if not quiet:
|
if not quiet:
|
||||||
if fmt == "sigmf":
|
if fmt == "sigmf":
|
||||||
|
|
@ -258,7 +256,11 @@ def list(input, verbose):
|
||||||
user_comment = ann.comment or ""
|
user_comment = ann.comment or ""
|
||||||
|
|
||||||
# Basic info
|
# Basic info
|
||||||
freq_range = f"{format_frequency(ann.freq_lower_edge)} - {format_frequency(ann.freq_upper_edge)}"
|
freq_range = (
|
||||||
|
f"{format_frequency(ann.freq_lower_edge)} - {format_frequency(ann.freq_upper_edge)}"
|
||||||
|
if ann.freq_lower_edge is not None and ann.freq_upper_edge is not None
|
||||||
|
else "N/A"
|
||||||
|
)
|
||||||
click.echo(
|
click.echo(
|
||||||
f" [{i}] Samples {format_sample_count(ann.sample_start)}-"
|
f" [{i}] Samples {format_sample_count(ann.sample_start)}-"
|
||||||
f"{format_sample_count(ann.sample_start + ann.sample_count)}: {ann.label}"
|
f"{format_sample_count(ann.sample_start + ann.sample_count)}: {ann.label}"
|
||||||
|
|
@ -502,8 +504,7 @@ def clear(input, output, overwrite, force, quiet):
|
||||||
help="Annotation type",
|
help="Annotation type",
|
||||||
)
|
)
|
||||||
@click.option(
|
@click.option(
|
||||||
"--sample-rate", type=float, default=None,
|
"--sample-rate", type=float, default=None, help="Sample rate in Hz (overrides metadata; required if not in file)"
|
||||||
help="Sample rate in Hz (overrides metadata; required if not in file)"
|
|
||||||
)
|
)
|
||||||
@click.option("--output", "-o", type=click.Path(), help="Output file path")
|
@click.option("--output", "-o", type=click.Path(), help="Output file path")
|
||||||
@click.option("--overwrite", is_flag=True, help="Overwrite input file (non-SigMF only)")
|
@click.option("--overwrite", is_flag=True, help="Overwrite input file (non-SigMF only)")
|
||||||
|
|
@ -617,8 +618,7 @@ def energy(
|
||||||
help="Annotation type",
|
help="Annotation type",
|
||||||
)
|
)
|
||||||
@click.option(
|
@click.option(
|
||||||
"--sample-rate", type=float, default=None,
|
"--sample-rate", type=float, default=None, help="Sample rate in Hz (overrides metadata; required if not in file)"
|
||||||
help="Sample rate in Hz (overrides metadata; required if not in file)"
|
|
||||||
)
|
)
|
||||||
@click.option("--output", "-o", type=click.Path(), help="Output file path")
|
@click.option("--output", "-o", type=click.Path(), help="Output file path")
|
||||||
@click.option("--overwrite", is_flag=True, help="Overwrite input file (non-SigMF only)")
|
@click.option("--overwrite", is_flag=True, help="Overwrite input file (non-SigMF only)")
|
||||||
|
|
@ -707,8 +707,7 @@ def cusum(input, label, min_duration, window_size, tolerance, annotation_type, s
|
||||||
)
|
)
|
||||||
@click.option("--channel", type=int, default=0, help="Channel index to annotate (default: 0)")
|
@click.option("--channel", type=int, default=0, help="Channel index to annotate (default: 0)")
|
||||||
@click.option(
|
@click.option(
|
||||||
"--sample-rate", type=float, default=None,
|
"--sample-rate", type=float, default=None, help="Sample rate in Hz (overrides metadata; required if not in file)"
|
||||||
help="Sample rate in Hz (overrides metadata; required if not in file)"
|
|
||||||
)
|
)
|
||||||
@click.option("--output", "-o", type=click.Path(), help="Output file path")
|
@click.option("--output", "-o", type=click.Path(), help="Output file path")
|
||||||
@click.option("--overwrite", is_flag=True, help="Overwrite input file (non-SigMF only)")
|
@click.option("--overwrite", is_flag=True, help="Overwrite input file (non-SigMF only)")
|
||||||
|
|
@ -787,8 +786,7 @@ def threshold(input, threshold, label, window_size, annotation_type, channel, sa
|
||||||
@click.option("--noise-threshold-db", type=float, help="Noise floor threshold in dB (auto-estimated if not specified)")
|
@click.option("--noise-threshold-db", type=float, help="Noise floor threshold in dB (auto-estimated if not specified)")
|
||||||
@click.option("--min-component-bw", type=float, default=50e3, help="Min component bandwidth in Hz")
|
@click.option("--min-component-bw", type=float, default=50e3, help="Min component bandwidth in Hz")
|
||||||
@click.option(
|
@click.option(
|
||||||
"--sample-rate", type=float, default=None,
|
"--sample-rate", type=float, default=None, help="Sample rate in Hz (overrides metadata; required if not in file)"
|
||||||
help="Sample rate in Hz (overrides metadata; required if not in file)"
|
|
||||||
)
|
)
|
||||||
@click.option("--output", "-o", type=click.Path(), help="Output file path")
|
@click.option("--output", "-o", type=click.Path(), help="Output file path")
|
||||||
@click.option("--overwrite", is_flag=True, help="Overwrite input file (non-SigMF only)")
|
@click.option("--overwrite", is_flag=True, help="Overwrite input file (non-SigMF only)")
|
||||||
|
|
@ -809,7 +807,8 @@ def _log_separate_start(quiet, recording, indices_list, nfft, noise_threshold_db
|
||||||
|
|
||||||
|
|
||||||
def separate(
|
def separate(
|
||||||
input, indices, nfft, noise_threshold_db, min_component_bw, sample_rate, output, overwrite, quiet, verbose):
|
input, indices, nfft, noise_threshold_db, min_component_bw, sample_rate, output, overwrite, quiet, verbose
|
||||||
|
):
|
||||||
"""
|
"""
|
||||||
Auto-detect parallel frequency-offset signals and split into sub-bands.
|
Auto-detect parallel frequency-offset signals and split into sub-bands.
|
||||||
|
|
||||||
|
|
@ -883,7 +882,11 @@ def separate(
|
||||||
click.echo("\n Details:")
|
click.echo("\n Details:")
|
||||||
for i in range(initial_count, final_count):
|
for i in range(initial_count, final_count):
|
||||||
ann = recording.annotations[i]
|
ann = recording.annotations[i]
|
||||||
freq_range = f"{format_frequency(ann.freq_lower_edge)} - {format_frequency(ann.freq_upper_edge)}"
|
freq_range = (
|
||||||
|
f"{format_frequency(ann.freq_lower_edge)} - {format_frequency(ann.freq_upper_edge)}"
|
||||||
|
if ann.freq_lower_edge is not None and ann.freq_upper_edge is not None
|
||||||
|
else "N/A"
|
||||||
|
)
|
||||||
click.echo(
|
click.echo(
|
||||||
f" [{i}] samples {format_sample_count(ann.sample_start)}-"
|
f" [{i}] samples {format_sample_count(ann.sample_start)}-"
|
||||||
f"{format_sample_count(ann.sample_start + ann.sample_count)}: {freq_range}"
|
f"{format_sample_count(ann.sample_start + ann.sample_count)}: {freq_range}"
|
||||||
|
|
|
||||||
|
|
@ -199,3 +199,44 @@ def test_annotation_to_sigmf_format_values():
|
||||||
values = list(result.values())
|
values = list(result.values())
|
||||||
assert 50 in values or ann.sample_start in values
|
assert 50 in values or ann.sample_start in values
|
||||||
assert 100 in values or ann.sample_count in values
|
assert 100 in values or ann.sample_count in values
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# None freq-edge regression tests (SigMF optional fields)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def test_annotation_no_freq_edges():
|
||||||
|
ann = Annotation(sample_start=0, sample_count=10, label="burst")
|
||||||
|
assert ann.freq_lower_edge is None
|
||||||
|
assert ann.freq_upper_edge is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_annotation_is_valid_no_freq_edges():
|
||||||
|
ann = Annotation(sample_start=0, sample_count=10, label="burst")
|
||||||
|
assert ann.is_valid() is True
|
||||||
|
|
||||||
|
ann_zero = Annotation(sample_start=0, sample_count=0, label="burst")
|
||||||
|
assert ann_zero.is_valid() is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_annotation_overlap_none_edges_returns_zero():
|
||||||
|
ann1 = Annotation(sample_start=0, sample_count=10)
|
||||||
|
ann2 = Annotation(sample_start=0, sample_count=10, freq_lower_edge=0, freq_upper_edge=100)
|
||||||
|
assert ann1.overlap(ann2) == 0
|
||||||
|
assert ann2.overlap(ann1) == 0
|
||||||
|
|
||||||
|
|
||||||
|
def test_annotation_area_none_edges_returns_zero():
|
||||||
|
ann = Annotation(sample_start=0, sample_count=10, label="burst")
|
||||||
|
assert ann.area() == 0
|
||||||
|
|
||||||
|
|
||||||
|
def test_annotation_to_sigmf_omits_freq_keys_when_none():
|
||||||
|
from sigmf import SigMFFile
|
||||||
|
|
||||||
|
ann = Annotation(sample_start=0, sample_count=10, label="burst")
|
||||||
|
result = ann.to_sigmf_format()
|
||||||
|
metadata = result["metadata"]
|
||||||
|
assert SigMFFile.FLO_KEY not in metadata
|
||||||
|
assert SigMFFile.FHI_KEY not in metadata
|
||||||
|
|
|
||||||
|
|
@ -189,3 +189,21 @@ def test_sigmf_3(tmp_path):
|
||||||
to_sigmf(recording=recording1, path=tmp_path, filename=filename.name)
|
to_sigmf(recording=recording1, path=tmp_path, filename=filename.name)
|
||||||
except IOError as e:
|
except IOError as e:
|
||||||
assert str(e) == "File already exists"
|
assert str(e) == "File already exists"
|
||||||
|
|
||||||
|
|
||||||
|
def test_sigmf_annotation_without_freq_edges(tmp_path):
|
||||||
|
# Regression: annotations that omit the optional SigMF freq edge fields must
|
||||||
|
# load without error; edges should be None and the annotation still valid.
|
||||||
|
ann = Annotation(sample_start=0, sample_count=5, label="burst")
|
||||||
|
recording1 = Recording(data=complex_data_1, metadata=sample_metadata, annotations=[ann])
|
||||||
|
|
||||||
|
filename = tmp_path / "test"
|
||||||
|
to_sigmf(recording=recording1, path=tmp_path, filename=filename.name, overwrite=True)
|
||||||
|
recording2 = from_sigmf(filename)
|
||||||
|
|
||||||
|
assert len(recording2.annotations) == 1
|
||||||
|
loaded = recording2.annotations[0]
|
||||||
|
assert loaded.freq_lower_edge is None
|
||||||
|
assert loaded.freq_upper_edge is None
|
||||||
|
assert loaded.is_valid() is True
|
||||||
|
assert loaded.label == "burst"
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user