diff --git a/src/ria_toolkit_oss/datatypes/recording.py b/src/ria_toolkit_oss/datatypes/recording.py index 8932e81..ca5198a 100644 --- a/src/ria_toolkit_oss/datatypes/recording.py +++ b/src/ria_toolkit_oss/datatypes/recording.py @@ -559,6 +559,103 @@ class Recording: to_npy(recording=self, filename=filename, path=path, overwrite=overwrite) + def to_wav( + self, + filename: Optional[str] = None, + path: Optional[os.PathLike | str] = None, + target_sample_rate: Optional[int] = 48000, + bits_per_sample: int = 32, + overwrite: bool = False, + ) -> str: + """Write recording to WAV file with embedded YAML metadata. + + WAV format uses stereo audio with I (in-phase) in left channel and Q (quadrature) in right channel. + Metadata is stored in standard LIST INFO chunks with RF-specific metadata encoded as YAML + in the ICMT (comment) field for human readability. + + :param filename: The name of the file where the recording is to be saved. Defaults to auto generated filename. + :type filename: os.PathLike or str, optional + :param path: The directory path to where the recording is to be saved. Defaults to recordings/. + :type path: os.PathLike or str, optional + :param target_sample_rate: Sample rate stored in the WAV header when no sample_rate metadata + is present. IQ samples are written without decimation or interpolation. Default is 48000 Hz. + :type target_sample_rate: int, optional + :param bits_per_sample: Bits per sample (32 for float32, 16 for int16). Default is 32. + :type bits_per_sample: int, optional + :param overwrite: Whether to overwrite existing files. Default is False. + :type overwrite: bool, optional + + :raises IOError: If there is an issue encountered during the file writing process. + + :return: Path where the file was saved. + :rtype: str + + **Examples:** + + Create a recording and save it to a .wav file: + + >>> import numpy + >>> from utils.data import Recording + >>> samples = numpy.exp(1j * 2 * numpy.pi * 0.1 * numpy.arange(10000)) + >>> metadata = {"sample_rate": 1e6, "center_frequency": 915e6} + >>> recording = Recording(data=samples, metadata=metadata) + >>> recording.to_wav() + """ + from utils.io.recording import to_wav + + return to_wav( + recording=self, + filename=filename, + path=path, + target_sample_rate=target_sample_rate, + bits_per_sample=bits_per_sample, + overwrite=overwrite, + ) + + def to_blue( + self, + filename: Optional[str] = None, + path: Optional[os.PathLike | str] = None, + data_format: str = "CI", + overwrite: bool = False, + ) -> str: + """Write recording to MIDAS Blue file format. + + MIDAS Blue is a legacy RF file format with a 512-byte binary header. + Commonly used with X-Midas and other RF/radar signal processing tools. + + :param filename: The name of the file where the recording is to be saved. Defaults to auto generated filename. + :type filename: os.PathLike or str, optional + :param path: The directory path to where the recording is to be saved. Defaults to recordings/. + :type path: os.PathLike or str, optional + :param data_format: Format code (default 'CI' = complex int16). + Common formats: 'CI' (complex int16), 'CF' (complex float32), 'CD' (complex float64). + Integer formats require the IQ samples to already be scaled within [-1, 1). + :type data_format: str, optional + :param overwrite: Whether to overwrite existing files. Default is False. + :type overwrite: bool, optional + + :raises IOError: If there is an issue encountered during the file writing process. + + :return: Path where the file was saved. + :rtype: str + + **Examples:** + + Create a recording and save it to a .blue file: + + >>> import numpy + >>> from utils.data import Recording + >>> samples = numpy.ones(10000, dtype=numpy.complex64) + >>> metadata = {"sample_rate": 1e6, "center_frequency": 2.44e9} + >>> recording = Recording(data=samples, metadata=metadata) + >>> recording.to_blue() + """ + from utils.io.recording import to_blue + + return to_blue(recording=self, filename=filename, path=path, data_format=data_format, overwrite=overwrite) + + def trim(self, num_samples: int, start_sample: Optional[int] = 0) -> Recording: """Trim Recording samples to a desired length, shifting annotations to maintain alignment. diff --git a/src/ria_toolkit_oss/io/__init__.py b/src/ria_toolkit_oss/io/__init__.py index 1a6e1a0..52dced1 100644 --- a/src/ria_toolkit_oss/io/__init__.py +++ b/src/ria_toolkit_oss/io/__init__.py @@ -2,3 +2,37 @@ The IO package contains utilities for input and output operations, such as loading and saving recordings to and from file. """ + +__all__ = [ + # Common: + "exists", + "copy", + "move", + "validate", + # Recording: + "save_recording", + "load_recording", + "to_sigmf", + "from_sigmf", + "to_npy", + "from_npy", + "from_npy_legacy", + "to_wav", + "from_wav", + "to_blue", + "from_blue", +] + +from .common import copy, exists, move, validate +from .recording import ( + from_blue, + from_npy, + from_npy_legacy, + from_sigmf, + from_wav, + load_recording, + to_blue, + to_npy, + to_sigmf, + to_wav, +) diff --git a/src/ria_toolkit_oss/io/recording.py b/src/ria_toolkit_oss/io/recording.py index d1d6105..9c7af02 100644 --- a/src/ria_toolkit_oss/io/recording.py +++ b/src/ria_toolkit_oss/io/recording.py @@ -4,9 +4,13 @@ Utilities for input/output operations on the ria_toolkit_oss.datatypes.Recording import datetime import datetime as dt +import numbers import os +import re +import struct from datetime import timezone from typing import Optional +from typing import Any, List, Optional import numpy as np import sigmf @@ -18,34 +22,16 @@ from ria_toolkit_oss.datatypes import Annotation from ria_toolkit_oss.datatypes.recording import Recording -def load_rec(file: os.PathLike) -> Recording: - """Load a recording from file. - - :param file: The directory path to the file(s) to load, **with** the file extension. - To loading from SigMF, the file extension must be one of *sigmf*, *sigmf-data*, or *sigmf-meta*, - either way both the SigMF data and meta files must be present for a successful read. - :type file: os.PathLike - - :raises IOError: If there is an issue encountered during the file reading process. - - :raises ValueError: If the inferred file extension is not supported. - - :return: The recording, as initialized from file(s). - :rtype: ria_toolkit_oss.datatypes.Recording - """ - _, extension = os.path.splitext(file) - extension = extension.lstrip(".") - - if extension.lower() in ["sigmf", "sigmf-data", "sigmf-meta"]: - return from_sigmf(file=file) - - elif extension.lower() == "npy": - return from_npy(file=file) - - else: - raise ValueError(f"File extension {extension} not supported.") - - +_BLUE_META_PREFIX = "META_" +_BLUE_META_TAG_MAX_LEN = 60 +_BLUE_SKIP_METADATA_KEYS = {"blue_data_format", "blue_endian", "blue_keywords"} +_BLUE_NUMERIC_DTYPE = { + "B": "i1", + "I": "i2", + "L": "i4", + "F": "f4", + "D": "f8", +} SIGMF_KEY_CONVERSION = { SigMFFile.AUTHOR_KEY: "author", SigMFFile.COLLECTION_KEY: "sigmf:collection", @@ -69,29 +55,159 @@ SIGMF_KEY_CONVERSION = { } -def convert_to_serializable(obj): +def to_npy( + recording: Recording, + filename: Optional[str] = None, + path: Optional[os.PathLike | str] = None, + overwrite: bool = False, +) -> str: + """Write recording to ``.npy`` binary file. + + :param recording: The recording to be written to file. + :type recording: ria_toolkit_oss.datatypes.Recording + :param filename: The name of the file where the recording is to be saved. Defaults to auto generated filename. + :type filename: os.PathLike or str, optional + :param path: The directory path to where the recording is to be saved. Defaults to recordings/. + :type path: os.PathLike or str, optional + + :raises IOError: If there is an issue encountered during the file writing process. + + :return: Path where the file was saved. + :rtype: str + + **Examples:** + + >>> from ria_toolkit_oss.sdr import Synth + >>> from ria_toolkit_oss.data import Recording + >>> from ria_toolkit_oss.io import to_npy + >>> sdr = Synth() + >>> rec = sdr.record(center_frequency=2.4e9, sample_rate=20e6) + >>> to_npy(recording=rec, file="sample_recording.npy") """ - Recursively convert a JSON-compatible structure into a fully JSON-serializable one. - Handles cases like NumPy data types, nested dicts, lists, and sets. + + filename, path, fullpath = generate_fullpath( + recording=recording, filename=filename, path=path, extension=".npy", overwrite=overwrite + ) + + data = np.array(recording.data) + metadata = recording.metadata + annotations = recording.annotations + + with open(file=fullpath, mode="wb") as f: + np.save(f, data) + np.save(f, metadata) + np.save(f, annotations) + + # print(f"Saved recording to {os.getcwd()}/{fullpath}") + return str(fullpath) + + +def from_npy(file: os.PathLike | str, legacy: bool = False) -> Recording: + """Load a recording from a ``.npy`` binary file. + + :param file: The directory path to the recording file, with or without the ``.npy`` file extension. + :type file: str or os.PathLike + :param legacy: If True, load legacy format (iqdata, meta[4], extended_meta dict). + If False, load current format (data, metadata dict, annotations list). + Default is False. + :type legacy: bool, optional + + :raises IOError: If there is an issue encountered during the file reading process. + + :return: The recording, as initialized from the ``.npy`` file. + :rtype: ria_toolkit_oss.datatypes.Recording """ - if isinstance(obj, np.integer): - return int(obj) # Convert NumPy int to Python int - elif isinstance(obj, np.floating): - return float(obj) # Convert NumPy float to Python float - elif isinstance(obj, np.ndarray): - return obj.tolist() # Convert NumPy array to list - elif isinstance(obj, (list, tuple)): - return [convert_to_serializable(item) for item in obj] # Process list or tuple - elif isinstance(obj, dict): - return {key: convert_to_serializable(value) for key, value in obj.items()} # Process dict - elif isinstance(obj, set): - return list(obj) # Convert set to list - elif obj in [float("inf"), float("-inf"), None]: # Handle infinity or None - return None - elif isinstance(obj, (str, int, float, bool)) or obj is None: - return obj # Base case: already serializable - else: - raise TypeError(f"Value of type {type(obj)} is not JSON serializable: {obj}") + + filename, extension = os.path.splitext(file) + if extension != ".npy" and extension != "": + raise ValueError("Cannot use from_npy if file extension is not .npy") + + # Rebuild with .npy extension. + filename = str(filename) + ".npy" + + if legacy: + return from_npy_legacy(filename) + + with open(file=filename, mode="rb") as f: + data = np.load(f, allow_pickle=True) + metadata = np.load(f, allow_pickle=True) + metadata = metadata.tolist() + try: + annotations = list(np.load(f, allow_pickle=True)) + except EOFError: + annotations = [] + + recording = Recording(data=data, metadata=metadata, annotations=annotations) + return recording + + +def from_npy_legacy(file: os.PathLike | str) -> Recording: + """Load a recording from legacy NPY format. + + Legacy format (pre-utils) stores three numpy arrays: + 1. iqdata: shape (2, N) with I and Q as separate rows (float32) + 2. meta: shape (4,) with [center_freq, rec_length, decimation, sample_rate] + 3. extended_meta: dict with additional metadata + + :param file: The directory path to the recording file, with or without the ``.npy`` file extension. + :type file: str or os.PathLike + + :raises IOError: If there is an issue encountered during the file reading process. + + :return: The recording, as initialized from the legacy ``.npy`` file. + :rtype: ria_toolkit_oss.datatypes.Recording + + **Examples:** + + Load legacy SRS recordings: + + >>> from ria_toolkit_oss.io import from_npy_legacy + >>> rec = from_npy_legacy("~/sample_recs/srs/example_srs_recordings/bw40M_Youtube_sr46.08/iq3775MHz053601.npy") + >>> print(rec.metadata.get('protocol')) + 5G40 + """ + filename, extension = os.path.splitext(file) + if extension != ".npy" and extension != "": + raise ValueError("Cannot use from_npy_legacy if file extension is not .npy") + + # Rebuild with .npy extension. + filename = str(filename) + ".npy" + + with open(filename, "rb") as f: + # Read IQ data (2, N) format + iqdata = np.load(f) + + # Read basic metadata array [center_freq, rec_length, decimation, sample_rate] + meta = np.load(f) + + # Read extended metadata dict + extended_meta = np.load(f, allow_pickle=True)[0] + + # Convert IQ data from (2, N) to (N,) complex format + i_channel = iqdata[0, :] + q_channel = iqdata[1, :] + complex_data = i_channel + 1j * q_channel + + # Build metadata dictionary + metadata = {} + + # Extract from basic meta array if available + if len(meta) >= 4: + metadata["center_frequency"] = float(meta[0]) + metadata["legacy_rec_length"] = int(meta[1]) + metadata["legacy_decimation"] = int(meta[2]) + metadata["sample_rate"] = float(meta[3]) + + # Merge extended metadata + if isinstance(extended_meta, dict): + for key, value in extended_meta.items(): + # Convert keys to lowercase snake_case if needed + key_lower = key.lower() + # Don't overwrite already set values + if key_lower not in metadata: + metadata[key_lower] = value + + return Recording(data=complex_data, metadata=metadata) def to_sigmf( @@ -125,16 +241,9 @@ def to_sigmf( >>> to_sigmf(recording=rec, file="sample_recording") """ - if filename is not None: - filename, _ = os.path.splitext(filename) - else: - filename = generate_filename(recording=recording) - - if path is None: - path = "recordings" - - if not os.path.exists(path): - os.makedirs(path) + filename, path, _ = generate_fullpath( + recording=recording, filename=filename, path=path, extension="", overwrite=True + ) multichannel_samples = recording.data metadata = recording.metadata @@ -219,7 +328,6 @@ def from_sigmf(file: os.PathLike | str) -> Recording: :rtype: ria_toolkit_oss.datatypes.Recording """ - file = str(file) if len(file) > 11: if file[-11:-5] != ".sigmf": file = file + ".sigmf-data" @@ -264,95 +372,539 @@ def from_sigmf(file: os.PathLike | str) -> Recording: return output_recording -def to_npy( +def to_wav( recording: Recording, filename: Optional[str] = None, path: Optional[os.PathLike | str] = None, + target_sample_rate: Optional[int] = 48000, + bits_per_sample: int = 32, overwrite: bool = False, ) -> str: - """Write recording to ``.npy`` binary file. + """Write recording to WAV file with embedded YAML metadata in LIST INFO chunk. + + WAV format uses stereo audio with I (in-phase) in left channel and Q (quadrature) in right channel. + Metadata is stored in standard LIST INFO chunks with RF-specific metadata encoded as YAML + in the ICMT (comment) field for human readability. :param recording: The recording to be written to file. :type recording: ria_toolkit_oss.datatypes.Recording - :param filename: The name of the file where the recording is to be saved. Defaults to auto generated filename. - :type filename: os.PathLike or str, optional - :param path: The directory path to where the recording is to be saved. Defaults to recordings/. + :param filename: The name of the file where the recording is to be saved. + Defaults to auto-generated filename. + :type filename: str, optional + :param path: The directory path to where the recording is to be saved. + Defaults to recordings/. :type path: os.PathLike or str, optional + :param target_sample_rate: Sample rate written to the WAV header when the recording + metadata does not specify one. Defaults to 48 kHz. No decimation is performed— + IQ samples are written sample-for-sample exactly as provided. + :type target_sample_rate: int, optional + :param bits_per_sample: Bits per sample (32 for float32, 16 for int16). + Default is 32 (float32). + :type bits_per_sample: int, optional + :param overwrite: Whether to overwrite existing files. Default is False. + :type overwrite: bool, optional - :raises IOError: If there is an issue encountered during the file writing process. + :raises IOError: If file already exists and overwrite is False. + :raises ValueError: If recording has multiple channels. + :raises ValueError: If bits_per_sample is not 16 or 32. + :raises ValueError: If 16-bit export is requested but samples fall outside [-1, 1). :return: Path where the file was saved. :rtype: str - - **Examples:** - - >>> from ria_toolkit_oss.sdr import Synth - >>> from ria_toolkit_oss.data import Recording - >>> from ria_toolkit_oss.io import to_npy - >>> sdr = Synth() - >>> rec = sdr.record(center_frequency=2.4e9, sample_rate=20e6) - >>> to_npy(recording=rec, file="sample_recording.npy") """ - if filename is not None: - filename, _ = os.path.splitext(filename) + import wave + + if recording.n_chan > 1: + raise ValueError("WAV export not supported for multichannel recordings") + + if bits_per_sample not in [16, 32]: + raise ValueError("bits_per_sample must be 16 or 32") + + # Generate filename if not provided + filename, path, fullpath = generate_fullpath( + recording=recording, filename=filename, path=path, extension=".wav", overwrite=overwrite + ) + + # Extract single channel + iq_samples = np.asarray(recording.data[0]) + + # Determine WAV header sample rate (metadata only) + wav_sample_rate = recording.sample_rate or target_sample_rate or 48000 + + # Convert complex to stereo (I and Q channels) + i_channel = np.real(iq_samples) + q_channel = np.imag(iq_samples) + + # Convert to target data type + if bits_per_sample == 32: + # 32-bit float + i_data = np.ascontiguousarray(i_channel, dtype=np.float32) + q_data = np.ascontiguousarray(q_channel, dtype=np.float32) + sample_width = 4 else: - filename = generate_filename(recording=recording) - filename = filename + ".npy" + # 16-bit int + max_mag = np.max(np.abs(np.concatenate([i_channel, q_channel]))) + if max_mag > 1.0: + raise ValueError("16-bit WAV export requires samples within [-1, 1). Use float32 or normalize manually.") + scale = np.iinfo(np.int16).max + i_scaled = np.clip(i_channel, -1.0, 1.0 - (1.0 / scale)) + q_scaled = np.clip(q_channel, -1.0, 1.0 - (1.0 / scale)) + i_data = np.ascontiguousarray(np.round(i_scaled * scale).astype(np.int16)) + q_data = np.ascontiguousarray(np.round(q_scaled * scale).astype(np.int16)) + sample_width = 2 - if path is None: - path = "recordings" + # Interleave I and Q + stereo = np.empty(len(iq_samples) * 2, dtype=i_data.dtype) + stereo[0::2] = i_data + stereo[1::2] = q_data - if not os.path.exists(path): - os.makedirs(path) - fullpath = os.path.join(path, filename) + # Write WAV file + with wave.open(fullpath, "wb") as wav: + wav.setnchannels(2) # Stereo (I and Q) + wav.setsampwidth(sample_width) + wav.setframerate(int(wav_sample_rate)) + if bits_per_sample == 32: + wav.setcomptype("NONE", "not compressed") + wav.writeframes(stereo.tobytes()) - if not overwrite: - if os.path.isfile(fullpath): - raise IOError("File already exists") + # Prepare metadata for LIST INFO chunk + rf_metadata = recording.metadata.copy() - data = np.array(recording.data) - metadata = recording.metadata - annotations = recording.annotations + # Record both RF and WAV header sample rates for clarity + if recording.sample_rate: + rf_metadata["rf_sample_rate_hz"] = float(recording.sample_rate) + rf_metadata["wav_sample_rate_hz"] = float(wav_sample_rate) - with open(file=fullpath, mode="wb") as f: - np.save(f, data) - np.save(f, metadata) - np.save(f, annotations) + # Rename common keys to more descriptive versions + if "center_frequency" in rf_metadata: + rf_metadata["center_frequency_hz"] = rf_metadata.pop("center_frequency") + if "sample_rate" in rf_metadata and "rf_sample_rate_hz" not in rf_metadata: + rf_metadata["rf_sample_rate_hz"] = rf_metadata.pop("sample_rate") - # print(f"Saved recording to {os.getcwd()}/{fullpath}") - return str(fullpath) + # Append LIST INFO chunk with metadata + _append_wav_list_info_chunk(fullpath, rf_metadata) + + return fullpath -def from_npy(file: os.PathLike | str) -> Recording: - """Load a recording from a ``.npy`` binary file. +def from_wav(file: os.PathLike | str) -> Recording: + """Load recording from WAV file and extract RF metadata. - :param file: The directory path to the recording file, with or without the ``.npy`` file extension. + :param file: The path to the WAV file to load. :type file: str or os.PathLike + :raises IOError: If there is an issue reading the file. + :raises ValueError: If file is not stereo or has unsupported format. + + :return: The recording, as initialized from the WAV file. + :rtype: ria_toolkit_oss.datatypes.Recording + """ + import wave + + filename = str(file) + if not filename.endswith(".wav"): + filename = filename + ".wav" + + # Read audio data + with wave.open(filename, "rb") as wav: + n_channels = wav.getnchannels() + sample_width = wav.getsampwidth() + sample_rate = wav.getframerate() + n_frames = wav.getnframes() + comp_type = wav.getcomptype() + audio_bytes = wav.readframes(n_frames) + + if n_channels != 2: + raise ValueError(f"Expected stereo WAV file, got {n_channels} channels") + + # Determine data type + if sample_width == 4 and comp_type == "NONE": + # 32-bit float + dtype = np.float32 + elif sample_width == 2: + # 16-bit int + dtype = np.int16 + else: + raise ValueError(f"Unsupported WAV format: {sample_width} bytes per sample, comp_type={comp_type}") + + # Convert bytes to stereo array + stereo = np.frombuffer(audio_bytes, dtype=dtype) + + # De-interleave I and Q + i_channel = stereo[0::2] + q_channel = stereo[1::2] + + # Normalize int16 to float + if dtype == np.int16: + i_channel = i_channel.astype(np.float32) / 32767.0 + q_channel = q_channel.astype(np.float32) / 32767.0 + + # Convert to complex + iq_samples = i_channel + 1j * q_channel + + # Extract LIST INFO metadata + metadata = _extract_wav_list_info(filename) + + if metadata is None: + metadata = {} + + # Ensure sample_rate is in metadata + if "sample_rate" not in metadata: + # Prefer RF sample rate if available, otherwise fall back to WAV header + if "rf_sample_rate_hz" in metadata: + metadata["sample_rate"] = metadata["rf_sample_rate_hz"] + elif "wav_sample_rate_hz" in metadata: + metadata["sample_rate"] = metadata["wav_sample_rate_hz"] + else: + metadata["sample_rate"] = float(sample_rate) + + # Restore original keys for compatibility + if "center_frequency_hz" in metadata and "center_frequency" not in metadata: + metadata["center_frequency"] = metadata["center_frequency_hz"] + + return Recording(data=iq_samples, metadata=metadata) + + +def to_blue( + recording: Recording, + filename: Optional[str] = None, + path: Optional[os.PathLike | str] = None, + data_format: str = "CI", + overwrite: bool = False, +) -> str: + """ + Write recording to MIDAS Blue file format. + + MIDAS Blue is a legacy RF file format with a 512-byte binary header. + Commonly used with X-Midas and other RF/radar signal processing tools. + + :param recording: The recording to be written to file. + :type recording: ria_toolkit_oss.datatypes.Recording + :param filename: The name of the file where the recording is to be saved. + Defaults to auto-generated filename. + :type filename: str, optional + :param path: The directory path to where the recording is to be saved. + Defaults to recordings/. + :type path: os.PathLike or str, optional + :param data_format: Format code (default 'CI' = complex int16). + Common formats: 'CI' (complex int16), 'CF' (complex float32), 'CD' (complex float64). + :type data_format: str, optional + :param overwrite: Whether to overwrite existing files. Default is False. + :type overwrite: bool, optional + + :raises IOError: If file already exists and overwrite is False. + :raises ValueError: If recording has multiple channels. + :raises ValueError: If data_format is not supported. + :raises ValueError: If integer formats are requested but samples fall outside [-1, 1). + + :return: Path where the file was saved. + :rtype: str + """ + if recording.n_chan > 1: + raise ValueError("MIDAS Blue export not supported for multichannel recordings") + + if recording.sample_rate is None: + raise ValueError("Recording metadata must include 'sample_rate' for MIDAS Blue export.") + + # Generate filename if not provided + filename, path, fullpath = generate_fullpath( + recording=recording, filename=filename, path=path, extension=".blue", overwrite=overwrite + ) + + # Extract single channel + iq_samples = np.asarray(recording.data[0]) + + sample_rate = float(recording.sample_rate) + metadata = recording.metadata or {} + + # Data format + if data_format not in ["CI", "CF", "CD", "SI", "SF", "SD"]: + raise ValueError(f"Unsupported data format: {data_format}. Use CI, CF, CD, SI, SF, or SD") + + # Convert IQ samples to specified format + dtype_map = { + "CI": np.int16, + "CF": np.float32, + "CD": np.float64, + "SI": np.int16, + "SF": np.float32, + "SD": np.float64, + } + dtype = dtype_map[data_format] + + # Separate I and Q for complex formats + if data_format.startswith("C"): + # Convert using requested data type + if np.issubdtype(dtype, np.integer): + i_data = np.real(iq_samples) + q_data = np.imag(iq_samples) + + max_mag = np.max(np.abs(np.concatenate([i_data, q_data]))) + if max_mag > 1.0: + raise ValueError( + "Integer MIDAS Blue export requires samples within [-1, 1). " + "Normalize or export using a float format (CF/CD)." + ) + + max_val = np.iinfo(dtype).max + eps = 1.0 / max_val + i_scaled = np.clip(i_data, -1.0, 1.0 - eps) + q_scaled = np.clip(q_data, -1.0, 1.0 - eps) + + i_converted = np.round(i_scaled * max_val).astype(dtype) + q_converted = np.round(q_scaled * max_val).astype(dtype) + else: + i_converted = np.real(iq_samples).astype(dtype, copy=False) + q_converted = np.imag(iq_samples).astype(dtype, copy=False) + + # Interleave I and Q + interleaved = np.empty(len(iq_samples) * 2, dtype=dtype) + interleaved[0::2] = i_converted + interleaved[1::2] = q_converted + else: + # Real-valued data (use only I channel) + if np.issubdtype(dtype, np.integer): + real_channel = np.real(iq_samples) + + max_mag = np.max(np.abs(real_channel)) + if max_mag >= 1.0: + raise ValueError( + "Integer MIDAS Blue export requires samples within [-1, 1). " + "Normalize or export using a float format (SF/SD)." + ) + + max_val = np.iinfo(dtype).max + eps = 1.0 / max_val + clipped = np.clip(real_channel, -1.0, 1.0 - eps) + interleaved = np.round(clipped * max_val).astype(dtype) + else: + interleaved = np.real(iq_samples).astype(dtype, copy=False) + + # Create 512-byte header + header = bytearray(512) + header[0:4] = b"BLUE" + header[4:8] = b"EEEI" + header[8:12] = b"EEEI" + header[52:54] = data_format.encode("ascii") + struct.pack_into(" Recording: + """ + Load recording from MIDAS Blue file. + + :param file: The path to the MIDAS Blue file to load. + :type file: str or os.PathLike + + :raises IOError: If there is an issue reading the file. + :raises ValueError: If file format is not valid or unsupported. + + :return: The recording, as initialized from the Blue file. + :rtype: ria_toolkit_oss.datatypes.Recording + """ + filename = str(file) + if not filename.endswith(".blue"): + filename = filename + ".blue" + + with open(filename, "rb") as f: + header_bytes = f.read(512) + if len(header_bytes) < 512: + raise ValueError("File too small to be a valid MIDAS Blue file") + + magic = header_bytes[0:4].decode("ascii", errors="ignore") + if magic != "BLUE": + raise ValueError(f"Not a Blue file (magic={magic})") + + header_rep = header_bytes[4:8].decode("ascii", errors="ignore") + data_rep = header_bytes[8:12].decode("ascii", errors="ignore") + header_endian = ">" if header_rep == "IEEE" else "<" + data_endian = ">" if data_rep == "IEEE" else "<" + + ext_start = struct.unpack(f"{header_endian}i", header_bytes[24:28])[0] + ext_size = struct.unpack(f"{header_endian}i", header_bytes[28:32])[0] + data_start_offset = int(struct.unpack(f"{header_endian}d", header_bytes[32:40])[0]) + data_size_bytes = int(struct.unpack(f"{header_endian}d", header_bytes[40:48])[0]) + data_format = header_bytes[52:54].decode("ascii", errors="ignore") + timecode = struct.unpack(f"{header_endian}d", header_bytes[56:64])[0] + time_interval = struct.unpack(f"{header_endian}d", header_bytes[264:272])[0] + sample_rate = 1.0 / time_interval if time_interval > 0 else 0 + + file_size = os.path.getsize(filename) + if data_start_offset <= 0: + data_start_offset = 512 + if data_size_bytes <= 0: + data_end = ext_start * 512 if ext_start > 0 else file_size + data_size_bytes = max(0, data_end - data_start_offset) + + # Map format code to numpy dtype + dtype_map = { + "CB": (np.int8, True), + "CI": (np.int16, True), + "CL": (np.int32, True), + "CF": (np.float32, True), + "CD": (np.float64, True), + "SB": (np.int8, False), + "SI": (np.int16, False), + "SL": (np.int32, False), + "SF": (np.float32, False), + "SD": (np.float64, False), + } + + base_dtype, is_complex = dtype_map.get(data_format, (None, False)) + if base_dtype is None: + raise ValueError(f"Unsupported format: {data_format}") + + # Apply endianness + dtype = np.dtype(base_dtype).newbyteorder(data_endian) + + ext_keywords: dict[str, Any] = {} + + with open(filename, "rb") as f: + f.seek(data_start_offset) + num_elements = data_size_bytes // dtype.itemsize if dtype.itemsize else 0 + data = np.fromfile(f, dtype=dtype, count=num_elements) + + if ext_start > 0 and ext_size > 0: + f.seek(ext_start * 512) + ext_bytes = f.read(ext_size) + ext_keywords = _decode_blue_keywords(ext_bytes, header_rep) + + # Convert to complex if needed + if is_complex: + # Interleaved IQ: [I0, Q0, I1, Q1, ...] + i_samples = data[0::2] + q_samples = data[1::2] + + # Normalize integer data + if np.issubdtype(base_dtype, np.integer): + max_val = np.iinfo(base_dtype).max + i_samples = i_samples.astype(np.float32) / max_val + q_samples = q_samples.astype(np.float32) / max_val + + iq_samples = i_samples + 1j * q_samples + else: + # Real data - convert to complex + if np.issubdtype(base_dtype, np.integer): + max_val = np.iinfo(base_dtype).max + real_samples = data.astype(np.float32) / max_val + else: + real_samples = data.astype(np.float32) + iq_samples = real_samples.astype(np.complex64) + + # Create metadata + metadata = { + "sample_rate": float(sample_rate), + "blue_data_format": data_format, + "blue_endian": data_rep, + } + + if ext_keywords: + metadata["blue_keywords"] = ext_keywords + for tag, value in ext_keywords.items(): + meta_key = _meta_key_from_tag(tag) + if meta_key and meta_key not in metadata: + metadata[meta_key] = value + + if isinstance(timecode, numbers.Real) and timecode != 0: + metadata.setdefault("timestamp", timecode) + metadata["timecode"] = timecode + + return Recording(data=iq_samples, metadata=metadata) + + +def load_recording(file: os.PathLike) -> Recording: + """Load a recording from file. + + :param file: The directory path to the file(s) to load, **with** the file extension. + To loading from SigMF, the file extension must be one of *sigmf*, *sigmf-data*, or *sigmf-meta*, + either way both the SigMF data and meta files must be present for a successful read. + :type file: os.PathLike + :raises IOError: If there is an issue encountered during the file reading process. - :return: The recording, as initialized from the ``.npy`` file. + :raises ValueError: If the inferred file extension is not supported. + + :return: The recording, as initialized from file(s). :rtype: ria_toolkit_oss.datatypes.Recording """ + _, extension = os.path.splitext(file) + extension = extension.lstrip(".") - filename, extension = os.path.splitext(file) - if extension != ".npy" and extension != "": - raise ValueError("Cannot use from_npy if file extension is not .npy") + if extension.lower() in ["sigmf", "sigmf-data", "sigmf-meta"]: + return from_sigmf(file=file) - # Rebuild with .npy extension. - filename = str(filename) + ".npy" + elif extension.lower() == "npy": + return from_npy(file=file) - with open(file=filename, mode="rb") as f: - data = np.load(f, allow_pickle=True) - metadata = np.load(f, allow_pickle=True) - metadata = metadata.tolist() - try: - annotations = list(np.load(f, allow_pickle=True)) - except EOFError: - annotations = [] + elif extension.lower() == "wav": + return from_wav(file=file) - recording = Recording(data=data, metadata=metadata, annotations=annotations) - return recording + elif extension.lower() == "blue": + return from_blue(file=file) + + else: + raise ValueError(f"File extension {extension} not supported.") + + +def convert_to_serializable(obj): + """ + Recursively convert a JSON-compatible structure into a fully JSON-serializable one. + Handles cases like NumPy data types, nested dicts, lists, and sets. + """ + if isinstance(obj, np.integer): + return int(obj) # Convert NumPy int to Python int + elif isinstance(obj, np.floating): + return float(obj) # Convert NumPy float to Python float + elif isinstance(obj, np.ndarray): + return obj.tolist() # Convert NumPy array to list + elif isinstance(obj, (list, tuple)): + return [convert_to_serializable(item) for item in obj] # Process list or tuple + elif isinstance(obj, dict): + return {key: convert_to_serializable(value) for key, value in obj.items()} # Process dict + elif isinstance(obj, set): + return list(obj) # Convert set to list + elif obj in [float("inf"), float("-inf"), None]: # Handle infinity or None + return None + elif isinstance(obj, (str, int, float, bool)) or obj is None: + return obj # Base case: already serializable + else: + raise TypeError(f"Value of type {type(obj)} is not JSON serializable: {obj}") def generate_filename(recording: Recording, tag: Optional[str] = "rec"): @@ -387,3 +939,323 @@ def generate_filename(recording: Recording, tag: Optional[str] = "rec"): # Add first seven characters of rec_id for uniqueness rec_id = recording.rec_id[0:7] return tag + source + center_frequency + timestamp + rec_id + + +def generate_fullpath(recording: Recording, filename: str, path: os.PathLike | str, extension: str, overwrite: bool): + """ + Generate the filename, path, and fullpath of the given recording. + """ + # Generate filename if not provided + if filename is not None: + filename, _ = os.path.splitext(filename) + else: + filename = generate_filename(recording=recording) + filename = filename + extension + + if path is None: + path = "recordings" + + if not os.path.exists(path): + os.makedirs(path) + + fullpath = os.path.join(path, filename) + + if not overwrite and os.path.isfile(fullpath): + raise IOError(f"File already exists: {fullpath}") + + return filename, path, fullpath + + +def _append_wav_list_info_chunk(filename: str, rf_metadata: dict) -> None: + """Append LIST INFO chunk to existing WAV file. + + Uses ICMT field for YAML-formatted RF metadata. + + :param filename: Path to WAV file. + :type filename: str + :param rf_metadata: Dictionary of RF metadata to embed. + :type rf_metadata: dict + """ + import yaml + + # Convert metadata to YAML string + yaml_str = "# RF Recording Metadata\n" + yaml_str += yaml.dump(rf_metadata, default_flow_style=False, sort_keys=False) + + # Create LIST INFO chunk data + info_data = b"" + + # Add ICMT (comments) tag with YAML metadata + icmt_value = yaml_str.encode("utf-8", errors="ignore") + icmt_value += b"\x00" # NULL terminator + # Pad to even length (RIFF requirement) + if len(icmt_value) % 2: + icmt_value += b"\x00" + + info_data += b"ICMT" + info_data += len(icmt_value).to_bytes(4, "little") + info_data += icmt_value + + # Add ISFT (software) tag + isft_value = b"riatoolkit oss SDR toolchain\x00" + if len(isft_value) % 2: + isft_value += b"\x00" + info_data += b"ISFT" + info_data += len(isft_value).to_bytes(4, "little") + info_data += isft_value + + # Create LIST chunk + list_chunk = b"LIST" + list_chunk += (4 + len(info_data)).to_bytes(4, "little") # Size includes "INFO" tag + list_chunk += b"INFO" # List type + list_chunk += info_data + + # Append to WAV file + with open(filename, "r+b") as f: + # Read RIFF header + f.seek(0) + riff_header = f.read(4) + if riff_header != b"RIFF": + raise ValueError("Not a valid RIFF/WAV file") + + old_size = int.from_bytes(f.read(4), "little") + + # Update RIFF chunk size + f.seek(4) + new_size = old_size + len(list_chunk) + f.write(new_size.to_bytes(4, "little")) + + # Append LIST INFO chunk at end + f.seek(0, 2) # End of file + f.write(list_chunk) + + +def _extract_wav_list_info(filename: str) -> Optional[dict]: + """Extract LIST INFO chunk and parse ICMT field as YAML. + + :param filename: Path to WAV file. + :type filename: str + + :return: Dictionary of metadata from ICMT field, or None if not found. + :rtype: dict or None + """ + with open(filename, "rb") as f: + # Read RIFF header + riff_header = f.read(4) + if riff_header != b"RIFF": + return None + + file_size = int.from_bytes(f.read(4), "little") + wave_header = f.read(4) + if wave_header != b"WAVE": + return None + + # Skip to chunks after header (12 bytes = RIFF + size + WAVE) + f.seek(12) + + while f.tell() < file_size + 8: + chunk_id = f.read(4) + if len(chunk_id) < 4: + break + + chunk_size = int.from_bytes(f.read(4), "little") + + if chunk_id == b"LIST": + list_type = f.read(4) + if list_type == b"INFO": + # Read INFO chunk data + info_data = f.read(chunk_size - 4) + return _parse_wav_info_chunk(info_data) + else: + # Skip this LIST chunk + f.seek(chunk_size - 4, 1) + else: + # Skip chunk (align to even boundary) + skip_size = chunk_size + if chunk_size % 2: + skip_size += 1 + f.seek(skip_size, 1) + + return None + + +def _parse_wav_info_chunk(info_data: bytes) -> Optional[dict]: + """Parse INFO chunk data and extract ICMT field as YAML. + + :param info_data: Raw bytes from INFO chunk. + :type info_data: bytes + + :return: Dictionary parsed from YAML in ICMT field, or None. + :rtype: dict or None + """ + import yaml + + offset = 0 + + while offset < len(info_data) - 8: + tag = info_data[offset : offset + 4] + size = int.from_bytes(info_data[offset + 4 : offset + 8], "little") + value_bytes = info_data[offset + 8 : offset + 8 + size] + + if tag == b"ICMT": + # Found comments field - decode and parse YAML + icmt_str = value_bytes.decode("utf-8", errors="ignore").rstrip("\x00") + try: + metadata = yaml.safe_load(icmt_str) + # If YAML parsing returns a string (no YAML structure), wrap it + if isinstance(metadata, str): + return {"raw_comment": metadata} + return metadata + except yaml.YAMLError: + # If YAML parsing fails, return as raw comment + return {"raw_comment": icmt_str} + + # Move to next tag (aligned to even boundary) + offset += 8 + size + if size % 2: + offset += 1 + + return None + + +def _blue_meta_tag_from_key(key: str) -> str: + base = re.sub(r"[^0-9A-Za-z]+", "_", key).strip("_") + if not base: + return "" + base = base.upper()[:_BLUE_META_TAG_MAX_LEN] + return f"{_BLUE_META_PREFIX}{base}" + + +def _encode_blue_value(value: Any) -> Optional[tuple[str, bytes]]: + if value is None: + return None + + if isinstance(value, np.generic): + value = value.item() + + if isinstance(value, bool): + value = int(value) + + if isinstance(value, numbers.Integral): + if -(2**31) <= int(value) < 2**31: + return "L", struct.pack(" Optional[bytes]: + encoded = _encode_blue_value(value) + if encoded is None: + return None + + type_char, value_bytes = encoded + tag_bytes = tag.encode("ascii", errors="ignore") + ltag = len(tag_bytes) + value_length = len(value_bytes) + base_length = 8 + value_length + ltag + padding = (8 - (base_length % 8)) % 8 + lkey = base_length + padding + lext = 8 + ltag + padding + + parts = [ + struct.pack(" bytes: + if not metadata: + return b"" + + keywords: List[bytes] = [] + for key in sorted(metadata.keys()): + if key in _BLUE_SKIP_METADATA_KEYS: + continue + tag = _blue_meta_tag_from_key(key) + if not tag: + continue + encoded = _encode_blue_keyword(tag, metadata[key]) + if encoded: + keywords.append(encoded) + + return b"".join(keywords) + + +def _decode_blue_keyword_value(type_char: str, value_bytes: bytes, endian: str) -> Any: + if type_char == "A": + return value_bytes.decode("utf-8", errors="ignore").rstrip("\x00") + + dtype_code = _BLUE_NUMERIC_DTYPE.get(type_char) + if dtype_code is None or not value_bytes: + return value_bytes if value_bytes else None + + dtype = np.dtype(endian + dtype_code) + array = np.frombuffer(value_bytes, dtype=dtype) + if array.size == 0: + return None + if array.size == 1: + return array[0].item() + return array.tolist() + + +def _decode_blue_keywords(data: bytes, endian: str) -> dict[str, Any]: + if not data: + return {} + + metadata: dict[str, Any] = {} + offset = 0 + endian_prefix = "<" if endian in ["EEEI", "VAX", ""] else ">" + + while offset + 8 <= len(data): + lkey = struct.unpack_from(f"{endian_prefix}i", data, offset)[0] + if lkey <= 0 or offset + lkey > len(data): + break + lext = struct.unpack_from(f"{endian_prefix}h", data, offset + 4)[0] + ltag = data[offset + 6] + type_char = chr(data[offset + 7]) + value_len = lkey - lext + value_start = offset + 8 + value_end = value_start + value_len + tag_start = value_end + tag_end = tag_start + ltag + if value_len < 0 or tag_end > offset + lkey: + break + value_bytes = data[value_start:value_end] + tag = data[tag_start:tag_end].decode("ascii", errors="ignore").strip() + metadata[tag] = _decode_blue_keyword_value(type_char, value_bytes, endian_prefix) + offset += lkey + + return metadata + + +def _meta_key_from_tag(tag: str) -> str: + if not tag.startswith(_BLUE_META_PREFIX): + return "" + base = tag[len(_BLUE_META_PREFIX) :].lower() + base = re.sub(r"__+", "_", base) + return base.strip("_")