io_improvement #7

Merged
madrigal merged 3 commits from io_improvement into main 2025-10-22 11:16:19 -04:00
2 changed files with 38 additions and 38 deletions
Showing only changes of commit ddf445fd4d - Show all commits

View File

@ -1,7 +1,6 @@
from __future__ import annotations from __future__ import annotations
import copy import copy
import datetime
import hashlib import hashlib
import json import json
import os import os
@ -12,7 +11,6 @@ from typing import Any, Iterator, Optional
import numpy as np import numpy as np
from numpy.typing import ArrayLike from numpy.typing import ArrayLike
from quantiphy import Quantity
from ria_toolkit_oss.datatypes.annotation import Annotation from ria_toolkit_oss.datatypes.annotation import Annotation
@ -598,40 +596,6 @@ class Recording:
scaled_data = self.data / np.max(abs(self.data)) scaled_data = self.data / np.max(abs(self.data))
return Recording(data=scaled_data, metadata=self.metadata, annotations=self.annotations) return Recording(data=scaled_data, metadata=self.metadata, annotations=self.annotations)
def generate_filename(self, tag: Optional[str] = "rec"):
"""Generate a filename from metadata.
:param tag: The string at the beginning of the generated filename. Default is "rec".
:type tag: str, optional
:return: A filename without an extension.
:rtype: str
"""
# TODO: This method should be refactored to use the first 7 characters of the 'rec_id' field.
tag = tag + "_"
source = self.metadata.get("source", "")
if source != "":
source = source + "_"
# converts 1000 to 1k for example
center_frequency = str(Quantity(self.metadata.get("center_frequency", 0)))
if center_frequency != "0":
num = center_frequency[:-1]
suffix = center_frequency[-1]
num = int(np.round(float(num)))
else:
num = 0
suffix = ""
center_frequency = str(num) + suffix + "Hz_"
timestamp = int(self.timestamp)
timestamp = datetime.datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d_%H-%M-%S") + "_"
# Add first seven characters of rec_id for uniqueness
rec_id = self.rec_id[0:7]
return tag + source + center_frequency + timestamp + rec_id
def __len__(self) -> int: def __len__(self) -> int:
"""The length of a recording is defined by the number of complex samples in each channel of the recording.""" """The length of a recording is defined by the number of complex samples in each channel of the recording."""
return self.shape[1] return self.shape[1]

View File

@ -2,6 +2,7 @@
Utilities for input/output operations on the ria_toolkit_oss.datatypes.Recording object. Utilities for input/output operations on the ria_toolkit_oss.datatypes.Recording object.
""" """
import datetime
import datetime as dt import datetime as dt
import os import os
from datetime import timezone from datetime import timezone
@ -9,6 +10,7 @@ from typing import Optional
import numpy as np import numpy as np
import sigmf import sigmf
from quantiphy import Quantity
from sigmf import SigMFFile, sigmffile from sigmf import SigMFFile, sigmffile
from sigmf.utils import get_data_type_str from sigmf.utils import get_data_type_str
@ -126,7 +128,7 @@ def to_sigmf(
if filename is not None: if filename is not None:
filename, _ = os.path.splitext(filename) filename, _ = os.path.splitext(filename)
else: else:
filename = recording.generate_filename() filename = generate_filename(recording=recording)
if path is None: if path is None:
path = "recordings" path = "recordings"
@ -294,7 +296,7 @@ def to_npy(
if filename is not None: if filename is not None:
filename, _ = os.path.splitext(filename) filename, _ = os.path.splitext(filename)
else: else:
filename = recording.generate_filename() filename = generate_filename(recording=recording)
filename = filename + ".npy" filename = filename + ".npy"
if path is None: if path is None:
@ -351,3 +353,37 @@ def from_npy(file: os.PathLike | str) -> Recording:
recording = Recording(data=data, metadata=metadata, annotations=annotations) recording = Recording(data=data, metadata=metadata, annotations=annotations)
return recording return recording
def generate_filename(recording: Recording, tag: Optional[str] = "rec"):
"""Generate a filename from metadata.
:param tag: The string at the beginning of the generated filename. Default is "rec".
:type tag: str, optional
:return: A filename without an extension.
:rtype: str
"""
tag = tag + "_"
source = recording.metadata.get("source", "")
if source != "":
source = source + "_"
# converts 1000 to 1k for example
center_frequency = str(Quantity(recording.metadata.get("center_frequency", 0)))
if center_frequency != "0":
num = center_frequency[:-1]
suffix = center_frequency[-1]
num = int(np.round(float(num)))
else:
num = 0
suffix = ""
center_frequency = str(num) + suffix + "Hz_"
timestamp = int(recording.timestamp)
timestamp = datetime.datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d_%H-%M-%S") + "_"
# Add first seven characters of rec_id for uniqueness
rec_id = recording.rec_id[0:7]
return tag + source + center_frequency + timestamp + rec_id