From 14539d92693f387a1ac2f2f468da63d409de0174 Mon Sep 17 00:00:00 2001 From: gillian Date: Tue, 9 Dec 2025 14:38:49 -0500 Subject: [PATCH] removed iq_channel_models in transform.py removed view_annotations from view.py --- pyproject.toml | 1 + src/ria_toolkit_oss/datatypes/recording.py | 1 - src/ria_toolkit_oss/io/recording.py | 2 - .../ria_toolkit_oss/capture.py | 3 +- .../ria_toolkit_oss/combine.py | 12 +- .../ria_toolkit_oss/commands.py | 7 +- .../ria_toolkit_oss/common.py | 2 +- .../ria_toolkit_oss/convert.py | 18 +- .../ria_toolkit_oss/generate.py | 1586 ----------------- .../ria_toolkit_oss/split.py | 6 +- .../ria_toolkit_oss/transform.py | 53 +- .../ria_toolkit_oss/transmit.py | 8 +- .../ria_toolkit_oss/view.py | 11 +- src/ria_toolkit_oss/view/view_signal.py | 1 + 14 files changed, 60 insertions(+), 1651 deletions(-) delete mode 100644 src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/generate.py diff --git a/pyproject.toml b/pyproject.toml index 1f0308d..032a6de 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -101,6 +101,7 @@ pylint = "^3.2.6" # For pyreverse, to automate the creation of UML diagrams [tool.poetry.scripts] ria = "ria_toolkit_oss.ria_toolkit_oss_cli.cli:cli" +ria-tools = "ria_toolkit_oss.ria_toolkit_oss_cli.cli:cli" [tool.black] line-length = 119 diff --git a/src/ria_toolkit_oss/datatypes/recording.py b/src/ria_toolkit_oss/datatypes/recording.py index ca5198a..1faec21 100644 --- a/src/ria_toolkit_oss/datatypes/recording.py +++ b/src/ria_toolkit_oss/datatypes/recording.py @@ -655,7 +655,6 @@ class Recording: return to_blue(recording=self, filename=filename, path=path, data_format=data_format, overwrite=overwrite) - def trim(self, num_samples: int, start_sample: Optional[int] = 0) -> Recording: """Trim Recording samples to a desired length, shifting annotations to maintain alignment. diff --git a/src/ria_toolkit_oss/io/recording.py b/src/ria_toolkit_oss/io/recording.py index 9c7af02..6fd4b48 100644 --- a/src/ria_toolkit_oss/io/recording.py +++ b/src/ria_toolkit_oss/io/recording.py @@ -9,7 +9,6 @@ import os import re import struct from datetime import timezone -from typing import Optional from typing import Any, List, Optional import numpy as np @@ -21,7 +20,6 @@ from sigmf.utils import get_data_type_str from ria_toolkit_oss.datatypes import Annotation from ria_toolkit_oss.datatypes.recording import Recording - _BLUE_META_PREFIX = "META_" _BLUE_META_TAG_MAX_LEN = 60 _BLUE_SKIP_METADATA_KEYS = {"blue_data_format", "blue_endian", "blue_keywords"} diff --git a/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/capture.py b/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/capture.py index b89dc46..c0a2c4c 100644 --- a/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/capture.py +++ b/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/capture.py @@ -238,6 +238,7 @@ def determine_output_format(output, output_format, output_dir): # Main command # ============================================================================ + @click.command() @click.argument("inputs", nargs=-1, required=True, type=click.Path(exists=True)) @click.argument("output", nargs=1, required=True, type=click.Path()) @@ -302,7 +303,7 @@ def capture( Examples: utils capture -d hackrf -s 2e6 -f 2.44e6 -b 2e6 utils capture -d pluto -s 1e6 -f 2e9 -b 2e6 -n 50 - + """ # Load config file if specified diff --git a/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/combine.py b/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/combine.py index ba1a363..b37f910 100644 --- a/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/combine.py +++ b/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/combine.py @@ -6,16 +6,16 @@ from pathlib import Path import click import numpy as np - -from utils.data import Recording -from utils.io import from_npy_legacy, load_recording -from utils_cli.utils.common import ( +from ria_toolkit_oss_cli.ria_toolkit_oss.common import ( echo_progress, echo_verbose, format_sample_count, save_recording, ) +from ria_toolkit_oss.datatypes import Recording +from ria_toolkit_oss.io import from_npy_legacy, load_recording + def load_recording_list(inputs, legacy, verbose, quiet): recordings = [] @@ -395,10 +395,10 @@ def combine( Examples: # Concatenate recordings utils combine chunk1.npy chunk2.npy chunk3.npy full.npy - \b + \b # Add signal and noise utils combine signal.npy noise.npy noisy.npy --mode add\n - \b + \b # Add with center alignment utils combine long.npy short.npy output.npy --mode add --align-mode pad-center\n \b diff --git a/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/commands.py b/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/commands.py index 2223880..d90c127 100644 --- a/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/commands.py +++ b/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/commands.py @@ -9,7 +9,8 @@ from .convert import convert # Import all command functions from .discover import discover -from .generate import generate + +# from .generate import generate from .init import init from .split import split from .transform import transform @@ -17,9 +18,7 @@ from .transmit import transmit from .view import view # Aliases -synth = generate +# synth = generate # All commands will be automatically registered by cli.py # Commands must be click.Command instances - - diff --git a/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/common.py b/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/common.py index a55c08c..05a06ce 100644 --- a/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/common.py +++ b/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/common.py @@ -8,7 +8,7 @@ import click import yaml from ria_toolkit_oss.datatypes.recording import Recording -from src.ria_toolkit_oss.io.recording import to_blue, to_npy, to_sigmf, to_wav +from ria_toolkit_oss.io.recording import to_blue, to_npy, to_sigmf, to_wav def load_yaml_config(config_file: str) -> Dict[str, Any]: diff --git a/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/convert.py b/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/convert.py index 5c196ad..aabbe46 100644 --- a/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/convert.py +++ b/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/convert.py @@ -4,8 +4,15 @@ import os from pathlib import Path import click +from ria_toolkit_oss_cli.ria_toolkit_oss.common import ( + check_for_overwriting, + detect_file_format, + echo_progress, + echo_verbose, + format_sample_count, +) -from utils.io.recording import ( +from ria_toolkit_oss.io.recording import ( from_npy, load_recording, to_blue, @@ -13,13 +20,6 @@ from utils.io.recording import ( to_sigmf, to_wav, ) -from utils_cli.utils.common import ( - check_for_overwriting, - detect_file_format, - echo_progress, - echo_verbose, - format_sample_count, -) from .config import load_user_config @@ -97,7 +97,7 @@ def convert( # noqa: C901 If OUTPUT is not specified, the input filename is used with a new extension based on the --format option. - + \b Examples: # SigMF to NumPy (explicit output) diff --git a/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/generate.py b/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/generate.py deleted file mode 100644 index 83fe2e9..0000000 --- a/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/generate.py +++ /dev/null @@ -1,1586 +0,0 @@ -"""Generate command - Generate synthetic signals.""" - -from pathlib import Path -from typing import Optional - -import click -import numpy as np -import yaml - -import ria_toolkit_oss.signal.basic_signal_generator as basic_gen -from ria_toolkit_oss.datatypes import Recording -from ria_toolkit_oss.signal.block_gen.continuous_modulation.fsk_modulator import FSKModulator -from ria_toolkit_oss.signal.block_generator.basic import FrequencyShift -from ria_toolkit_oss.signal.block_generator.data_types import DataType -from ria_toolkit_oss.signal.block_generator.mapping.apsk_mapper import _APSKMapper -from ria_toolkit_oss.signal.block_generator.mapping.cross_qam_mapper import _CrossQAMMapper -from ria_toolkit_oss.signal.block_generator.mapping.mapper import Mapper -from ria_toolkit_oss.signal.block_generator.modulation import ( - GMSKModulator, - OOKModulator, - OQPSKModulator, -) -from utils.signal.block_generator.pulse_shaping import ( - RaisedCosineFilter, - RootRaisedCosineFilter, - Upsampling, -) -from utils.signal.block_generator.source import ( - LFMJammingSource, - RandomBinarySource, - RecordingSource, - SawtoothSource, - SquareSource, -) - -# Block Generator Imports -from utils.signal.block_generator.source_block import SourceBlock - -# Transforms for impairments -from utils.transforms.iq_channel_models import ( - complex_multipath_rayleigh_channel, - rician_fading_channel, -) -from utils.transforms.iq_impairments import ( - add_compression, - add_doppler, - add_gain_fluctuation, - add_phase_noise, - iq_imbalance, -) - -# NR 5G Import -try: - from utils.signal.block_gen.nr_5g.nr_5g_generator import NR5GGenerator - - HAS_NR5G = True -except ImportError: - HAS_NR5G = False - -from utils_cli.utils.common import ( - echo_progress, - echo_verbose, - format_frequency, - format_sample_rate, - parse_metadata_args, - save_recording, -) -from utils_cli.utils.config import load_user_config - - -# Extend Mapper to support new types -def _create_extended_mapper(self): - if self.constellation_type.upper() == "APSK": - return _APSKMapper(self.num_bits_per_symbol, self.normalize, self.use_gray_code) - elif self.constellation_type.upper() == "CROSS_QAM": - return _CrossQAMMapper(self.num_bits_per_symbol, self.normalize, self.use_gray_code) - else: - # Original factory - return self._original_create_constellation_mapper() - - -# Monkey patch Mapper to support new types without modifying original file -Mapper._original_create_constellation_mapper = Mapper._create_constellation_mapper -Mapper._create_constellation_mapper = _create_extended_mapper - - -def load_config_options(ctx, param, value): - """Callback to load options from YAML config file.""" - if not value: - return None - - try: - with open(value, "r") as f: - config = yaml.safe_load(f) - - # Store config in context for other commands to access - ctx.default_map = config - return value - except Exception as e: - raise click.BadParameter(f"Error loading config file: {e}") - - -def apply_user_config_metadata(metadata_tuple): - """Apply user config metadata and merge with CLI metadata. - - Args: - metadata_tuple: Tuple of metadata KEY=VALUE strings from CLI - - Returns: - dict: Merged metadata dictionary - """ - # Load user config - user_config = load_user_config() - metadata_dict = {} - - # Apply user config metadata (if user config exists) - if user_config: - # Add standard metadata fields from config - for key in ["author", "organization", "project", "location", "testbed"]: - if key in user_config: - metadata_dict[key] = user_config[key] - - # Add SigMF fields from config - if "sigmf" in user_config: - sigmf = user_config["sigmf"] - for key in ["license", "hw", "dataset"]: - if key in sigmf: - metadata_dict[key] = sigmf[key] - - # CLI metadata overrides everything - if metadata_tuple: - metadata_dict.update(parse_metadata_args(metadata_tuple)) - - return metadata_dict - - -def get_output_format(output: Optional[str], format_opt: Optional[str]) -> str: - """Determine output format from filename or option.""" - if format_opt: - return format_opt - - if not output: - return "sigmf" # Default to sigmf for better metadata support - - ext = Path(output).suffix.lower() - if ext in [".sigmf", ".sigmf-data", ".sigmf-meta"]: - return "sigmf" - elif ext == ".npy": - return "npy" - elif ext == ".wav": - return "wav" - elif ext == ".blue": - return "blue" - else: - return "sigmf" - - -class FileSourceBlock(SourceBlock): - """Generates bits from a file or bytes.""" - - def __init__(self, data: bytes, repeat: bool = True): - self.data = data - self.repeat = repeat - # Convert to bits - bits = np.unpackbits(np.frombuffer(data, dtype=np.uint8)) - self.bits = bits.astype(np.float32) # SourceBlock expects float32 bits (0.0, 1.0) - self.idx = 0 - - @property - def output_type(self) -> DataType: - return DataType.BITS - - def __call__(self, num_samples: int) -> np.ndarray: - out = np.zeros(num_samples, dtype=np.float32) - filled = 0 - while filled < num_samples: - remaining = num_samples - filled - available = len(self.bits) - self.idx - - take = min(remaining, available) - out[filled : filled + take] = self.bits[self.idx : self.idx + take] - - self.idx += take - filled += take - - if self.idx >= len(self.bits): - if self.repeat: - self.idx = 0 - else: - # Pad with zeros if not repeating - break - - return out - - -def apply_post_processing( - recording: Recording, frequency_shift: float, channel_type: str, channel_params: dict, verbose: bool -) -> Recording: - """Apply frequency shift and channel models to a recording.""" - - # 1. Frequency Shift (Pre-channel) - if frequency_shift != 0: - echo_verbose(f"Applying frequency shift: {format_frequency(frequency_shift)}", verbose) - # Use simple phase shift if only 1 block? No, basic gen FrequencyShift - # We can use RecordingSource + FrequencyShift + record() - source = RecordingSource(recording) - fs_block = FrequencyShift(shift_frequency=frequency_shift, sampling_rate=recording.sample_rate) - fs_block.input = [source] - num = len(recording.data[0]) if recording.n_chan > 0 else len(recording.data) - # get_samples - processed = fs_block.get_samples(num) - recording = Recording(data=processed, metadata=recording.metadata) - - # 2. Dynamic Impairments (Transforms) - - # Rician / Rayleigh - if channel_type == "rayleigh": - # Use improved complex multipath if available - echo_verbose("Applying Multipath Rayleigh Channel", verbose) - recording = complex_multipath_rayleigh_channel( - recording, - num_paths=channel_params.get("multipath_paths") or 3, - max_delay=channel_params.get("multipath_max_delay") or 2.6e-6, - sample_rate=recording.sample_rate, - snr_db=None, # We handle noise separately - ) - - elif channel_type == "rician": - echo_verbose(f"Applying Rician Channel (K={channel_params.get('rician_k', 2.0)})", verbose) - recording = rician_fading_channel( - recording, - k_factor=channel_params.get("rician_k", 2.0), - num_paths=channel_params.get("multipath_paths") or 3, - max_delay=channel_params.get("multipath_max_delay") or 1.2e-6, - sample_rate=recording.sample_rate, - snr_db=None, - ) - - # Doppler - doppler_freq = channel_params.get("doppler_freq") - if doppler_freq: - echo_verbose(f"Applying Doppler (Shift={doppler_freq} Hz)", verbose) - # add_doppler expects velocity. Convert freq to velocity assuming 1GHz carrier or pass freq directly? - # dynamic_channel wrapper handles this conversion. - # Or use add_doppler directly if we have velocity. - # User supplied doppler_freq. - # Let's use a simple transform or dynamic_channel - # We need to reuse dynamic_channel logic for freq->velocity conversion or assume carrier. - # Or create add_doppler_freq(signal, freq_shift) - # add_doppler takes satellite_velocity etc. - # dynamic_channel takes doppler_hz. - # We use dynamic_channel logic here but just for Doppler part - c_light = 299792458 - f_carrier = 1e9 # Assumption for conversion - velocity = doppler_freq * c_light / f_carrier - recording = add_doppler( - recording, - satellite_velocity=velocity, - satellite_initial_distance=1000, - frequency=f_carrier, - sample_rate=recording.sample_rate, - ) - - # IQ Imbalance - amp = channel_params.get("iq_amp_imbalance") - phase = channel_params.get("iq_phase_imbalance") - dc = channel_params.get("iq_dc_offset") - if amp or phase or dc: - echo_verbose(f"Applying IQ Imbalance (Amp={amp}dB, Phase={phase}rad, DC={dc})", verbose) - recording = iq_imbalance( - recording, - amplitude_imbalance=( - amp if amp is not None else 0 - ), # iq_imbalance defaults to 1.5? We want 0 if not set but one of others is set. - phase_imbalance=phase if phase is not None else 0, - dc_offset=dc if dc is not None else 0, - ) - - # Phase Noise - pn = channel_params.get("phase_noise") - if pn: - echo_verbose(f"Applying Phase Noise (Var={pn})", verbose) - recording = add_phase_noise(recording, phase_variance=pn) - - # Gain Fluctuation - gf = channel_params.get("gain_fluctuation") - if gf: - echo_verbose(f"Applying Gain Fluctuation (Var={gf})", verbose) - recording = add_gain_fluctuation(recording, amplitude_variance=gf) - - # Compression - comp = channel_params.get("compression") - if comp: - echo_verbose(f"Applying Compression (Gain={comp})", verbose) - recording = add_compression(recording, compression_gain=comp) - - # 3. AWGN (Final stage usually) - if channel_type == "awgn" or channel_params.get("noise_power"): - # If 'awgn' selected OR noise_power explicitly set (default is 0.1, so always set?) - # If channel_type is NOT awgn/rayleigh/rician, and noise_power is default 0.1? - # If user didn't specify noise_power, but did specify channel_type=none, do we add noise? - # Default noise_power is 0.1. - # If channel_type == 'none', we probably shouldn't add noise unless user asked for it. - # But noise_power has default. - # Let's check if channel_type is 'awgn'. - # Or if user provided --noise-power? - # (We can't distinguish default vs user provided easily with click unless we use ctx) - # For now: only add noise if channel_type is set to something, or if noise_power > 0 and user intended it. - # Simpler: If channel_type == 'awgn', definitely add. - # If rayleigh/rician, they might want noise too. - # If 'none', skip noise? - - should_add_noise = False - if channel_type in ["awgn", "rayleigh", "rician"]: - should_add_noise = True - - if should_add_noise: - npow = channel_params.get("noise_power", 0.1) - echo_verbose(f"Applying AWGN (Power={npow})", verbose) - # Convert Power (variance) to SNR? - # add_awgn_to_signal takes SNR. - # AWGNChannel block takes Variance. - # Use AWGNChannel block logic (additive noise with variance) - # or utils.transforms.iq_channel_models.awgn_channel which takes SNR. - # The user CLI says --noise-power (variance). - # We should use a simple additive noise function with variance. - # transforms.iq_augmentations.generate_awgn uses SNR. - # Let's implement simple additive noise here or use AWGNChannel block. - - # Use AWGNChannel block logic directly - noise_std = np.sqrt(npow / 2) - noise = noise_std * (np.random.randn(*recording.data.shape) + 1j * np.random.randn(*recording.data.shape)) - recording = Recording(data=recording.data + noise, metadata=recording.metadata) - - return recording - - -@click.group() -def generate(): - """Generate synthetic signals. - - \b - Examples: - utils synth chirp -b 1e6 -p 0.01 -s 10e6 -o chirp_basic.sigmf - utils synth fsk -M 2 -r 100e3 -s 2e6 -o fsk2_basic.sigmf - - """ - pass - - -def common_options(f): - """Decorator for common options.""" - f = click.option("--sample-rate", "-s", type=float, required=True, help="Sample rate in Hz")(f) - f = click.option("--num-samples", "-n", type=int, help="Number of samples")(f) - f = click.option("--duration", "-t", type=float, help="Duration in seconds (alternative to --num-samples)")(f) - f = click.option("--frequency-shift", type=float, default=0.0, help="Digital frequency shift from baseband (Hz)")( - f - ) - f = click.option("--center-frequency", "-fc", type=float, help="Metadata center frequency (Hz)")(f) - f = click.option( - "--channel-type", type=click.Choice(["none", "awgn", "rayleigh"]), default="none", help="Channel model" - )(f) - f = click.option("--noise-power", type=float, default=0.1, help="Noise power (variance) for AWGN")(f) - f = click.option("--path-gain", type=float, default=0.0, help="Path gain (dB) for Rayleigh")(f) - f = click.option("--output", "-o", required=True, help="Output filename")(f) - f = click.option("--format", "-F", type=click.Choice(["npy", "sigmf", "wav", "blue"]), help="Output format")(f) - - # Impairment options - f = click.option("--rician-k", type=float, help="Rician K-factor")(f) - f = click.option("--multipath-paths", type=int, help="Multipath: Number of paths")(f) - f = click.option("--multipath-max-delay", type=float, help="Multipath: Max delay (s)")(f) - f = click.option("--doppler-freq", type=float, help="Doppler: Frequency shift (Hz)")(f) - f = click.option("--iq-amp-imbalance", type=float, help="IQ Imbalance: Amplitude (dB)")(f) - f = click.option("--iq-phase-imbalance", type=float, help="IQ Imbalance: Phase (rad)")(f) - f = click.option("--iq-dc-offset", type=float, help="IQ Imbalance: DC Offset")(f) - f = click.option("--phase-noise", type=float, help="Phase Noise: Variance")(f) - f = click.option("--gain-fluctuation", type=float, help="Gain Fluctuation: Variance")(f) - f = click.option("--compression", type=float, help="Compression: Gain")(f) - - f = click.option( - "--config", - "-c", - callback=load_config_options, - is_eager=True, - expose_value=False, - type=click.Path(exists=True), - help="Load parameters from YAML", - )(f) - f = click.option("--overwrite", "-w", is_flag=True, help="Overwrite existing file")(f) - f = click.option("--metadata", "-m", multiple=True, help="Add metadata KEY=VALUE")(f) - f = click.option("--verbose", "-v", is_flag=True, help="Verbose output")(f) - f = click.option("--quiet", "-q", is_flag=True, help="Suppress output")(f) - return f - - -def resolve_length(sample_rate, num_samples, duration, symbols=None, sps=None): - """Resolve generation length.""" - if symbols is not None and sps is not None: - # Modulation specific - if num_samples: - # If both provided, check consistency or prefer num_samples? - # We'll treat symbols as the driver if provided. - pass - return int(symbols * sps) - - if num_samples: - return int(num_samples) - - if duration: - return int(duration * sample_rate) - - # Default - return 10000 - - -@generate.command() -@click.option("--frequency", "-f", type=float, default=1000.0, help="Tone frequency relative to carrier (Hz)") -@click.option("--amplitude", "-a", type=float, default=1.0, help="Amplitude (0.0-1.0)") -@click.option("--phase", "-p", type=float, default=0.0, help="Initial phase in radians") -@common_options -def tone( - sample_rate, - num_samples, - duration, - frequency_shift, - center_frequency, - channel_type, - noise_power, - path_gain, - output, - format, - overwrite, - metadata, - verbose, - quiet, - frequency, - amplitude, - phase, - **kwargs, -): - """Generate a complex tone.""" - - ns = resolve_length(sample_rate, num_samples, duration) - - echo_progress(f"Generating tone: {format_frequency(frequency)} at {format_sample_rate(sample_rate)}", quiet) - - # Use basic_gen for core tone - recording = basic_gen.sine( - sample_rate=int(sample_rate), length=ns, frequency=frequency, amplitude=amplitude, baseband_phase=phase - ) - - if center_frequency: - recording._metadata["center_frequency"] = center_frequency - echo_verbose(f"Center Frequency: {format_frequency(center_frequency)}", verbose) - - # Post processing - chan_params = {"noise_power": noise_power, "path_gain": path_gain} - recording = apply_post_processing(recording, frequency_shift, channel_type, chan_params, verbose) - - # User metadata - metadata = apply_user_config_metadata(metadata) - metadata["signal_type"] = "tone" - for key, value in metadata.items(): - recording.update_metadata(key, value) - - fmt = get_output_format(output, format) - save_recording(recording, output, fmt, overwrite, verbose) - - -@generate.command() -@click.option("--noise-type", "-T", type=click.Choice(["gaussian", "uniform"]), default="gaussian", help="Noise type") -@click.option("--power", "-p", type=float, default=1.0, help="Signal power/variance") -@common_options -def noise( - sample_rate, - num_samples, - duration, - frequency_shift, - center_frequency, - channel_type, - noise_power, - path_gain, - output, - format, - overwrite, - metadata, - verbose, - quiet, - noise_type, - power, - **kwargs, -): - """Generate random noise.""" - - ns = resolve_length(sample_rate, num_samples, duration) - echo_progress(f"Generating {noise_type} noise...", quiet) - - if noise_type == "gaussian": - # AWGN - rms = np.sqrt(power) - recording = basic_gen.noise(sample_rate=int(sample_rate), length=ns, rms_power=rms) - else: - # Uniform - real = np.random.uniform(-1, 1, ns) - imag = np.random.uniform(-1, 1, ns) - a = np.sqrt(3 * power / 2) - data = a * (real + 1j * imag) - recording = Recording(data=data, metadata={"sample_rate": sample_rate}) - - recording._metadata["signal_type"] = "noise" - recording._metadata["noise_type"] = noise_type - - if center_frequency: - recording._metadata["center_frequency"] = center_frequency - - # Post processing - chan_params = {"noise_power": noise_power, "path_gain": path_gain} - recording = apply_post_processing(recording, frequency_shift, channel_type, chan_params, verbose) - - for key, value in apply_user_config_metadata(metadata).items(): - recording.update_metadata(key, value) - fmt = get_output_format(output, format) - save_recording(recording, output, fmt, overwrite, verbose) - - -@generate.command() -@click.option("--bandwidth", "-b", type=float, required=True, help="Chirp bandwidth (Hz)") -@click.option("--period", "-p", type=float, required=True, help="Chirp period (seconds)") -@click.option("--type", "chirp_type", type=click.Choice(["up", "down", "up_down"]), default="up", help="Chirp type") -@common_options -def chirp( - sample_rate, - num_samples, - duration, - frequency_shift, - center_frequency, - channel_type, - noise_power, - path_gain, - output, - format, - overwrite, - metadata, - verbose, - quiet, - bandwidth, - period, - chirp_type, - **kwargs, -): - """Generate LFM Chirp signal.""" - - ns = resolve_length(sample_rate, num_samples, duration) - - echo_progress(f"Generating {chirp_type} chirp ({format_frequency(bandwidth)}, {period}s)...", quiet) - - source = LFMJammingSource(sample_rate=sample_rate, bandwidth=bandwidth, chirp_period=period, chirp_type=chirp_type) - - recording = source.record(ns) - - recording._metadata["signal_type"] = "chirp" - recording._metadata["chirp_type"] = chirp_type - recording._metadata["bandwidth"] = bandwidth - recording._metadata["period"] = period - - if center_frequency: - recording._metadata["center_frequency"] = center_frequency - - # Post processing - chan_params = {"noise_power": noise_power, "path_gain": path_gain} - recording = apply_post_processing(recording, frequency_shift, channel_type, chan_params, verbose) - - for key, value in apply_user_config_metadata(metadata).items(): - recording.update_metadata(key, value) - fmt = get_output_format(output, format) - save_recording(recording, output, fmt, overwrite, verbose) - - -@generate.command() -@click.option("--frequency", "-f", type=float, default=1000.0, help="Frequency (Hz)") -@click.option("--amplitude", "-a", type=float, default=1.0, help="Amplitude") -@click.option("--duty-cycle", "-d", type=float, default=0.5, help="Duty cycle (0.0-1.0)") -@click.option("--phase", "-p", type=float, default=0.0, help="Phase shift (radians)") -@common_options -def square( - sample_rate, - num_samples, - duration, - frequency_shift, - center_frequency, - channel_type, - noise_power, - path_gain, - output, - format, - overwrite, - metadata, - verbose, - quiet, - frequency, - amplitude, - duty_cycle, - phase, - **kwargs, -): - """Generate Square wave.""" - - ns = resolve_length(sample_rate, num_samples, duration) - - echo_progress(f"Generating square wave: {format_frequency(frequency)}...", quiet) - - source = SquareSource( - frequency=frequency, sample_rate=sample_rate, amplitude=amplitude, duty_cycle=duty_cycle, phase_shift=phase - ) - - recording = source.record(ns) - - recording._metadata["signal_type"] = "square" - if center_frequency: - recording._metadata["center_frequency"] = center_frequency - - chan_params = {"noise_power": noise_power, "path_gain": path_gain} - recording = apply_post_processing(recording, frequency_shift, channel_type, chan_params, verbose) - - for key, value in apply_user_config_metadata(metadata).items(): - recording.update_metadata(key, value) - fmt = get_output_format(output, format) - save_recording(recording, output, fmt, overwrite, verbose) - - -@generate.command() -@click.option("--frequency", "-f", type=float, default=1000.0, help="Frequency (Hz)") -@click.option("--amplitude", "-a", type=float, default=1.0, help="Amplitude") -@click.option("--phase", "-p", type=float, default=0.0, help="Phase shift (radians)") -@common_options -def sawtooth( - sample_rate, - num_samples, - duration, - frequency_shift, - center_frequency, - channel_type, - noise_power, - path_gain, - output, - format, - overwrite, - metadata, - verbose, - quiet, - frequency, - amplitude, - phase, - **kwargs, -): - """Generate Sawtooth wave.""" - - ns = resolve_length(sample_rate, num_samples, duration) - - echo_progress(f"Generating sawtooth wave: {format_frequency(frequency)}...", quiet) - - source = SawtoothSource(frequency=frequency, sample_rate=sample_rate, amplitude=amplitude, phase_shift=phase) - - recording = source.record(ns) - - recording._metadata["signal_type"] = "sawtooth" - if center_frequency: - recording._metadata["center_frequency"] = center_frequency - - chan_params = {"noise_power": noise_power, "path_gain": path_gain} - recording = apply_post_processing(recording, frequency_shift, channel_type, chan_params, verbose) - - for key, value in apply_user_config_metadata(metadata).items(): - recording.update_metadata(key, value) - fmt = get_output_format(output, format) - save_recording(recording, output, fmt, overwrite, verbose) - - -def load_source(message_source, message_content, num_bits=None): - if num_bits is not None: - if message_source == "random": - return RandomBinarySource()((1, num_bits)) - elif message_source == "string": - if not message_content: - raise click.BadParameter("Message content required for string source") - return FileSourceBlock(message_content.encode("utf-8"), repeat=True)(num_bits).reshape(1, -1) - - elif message_source == "file": - if not message_content: - raise click.BadParameter("File path required for file source") - - p = Path(message_content) - if not p.exists(): - raise click.BadParameter(f"File not found: {p}") - - return FileSourceBlock(p.read_bytes(), repeat=True)(num_bits).reshape(1, -1) - else: - if message_source == "random": - return RandomBinarySource() # Infinite source - - elif message_source == "string": - if not message_content: - raise click.BadParameter("Message content required for string source") - return FileSourceBlock(message_content.encode("utf-8"), repeat=True) - - elif message_source == "file": - if not message_content: - raise click.BadParameter("File path required for file source") - - p = Path(message_content) - if not p.exists(): - raise click.BadParameter(f"File not found: {p}") - - return FileSourceBlock(p.read_bytes(), repeat=True) - - -def _run_mod_gen( - mod_type, - sample_rate, - symbols, - num_samples, - duration, - order, - symbol_rate, - filter_type, - filter_span, - filter_beta, - message_source, - message_content, - frequency_shift, - center_frequency, - channel_type, - noise_power, - path_gain, - output, - format, - overwrite, - metadata, - verbose, - quiet, -): - - # Resolve length - # If symbols provided, it drives. - # If not, use num_samples/duration to calculate symbols - - if symbol_rate is None: - # Try to infer? No, required. - raise click.BadParameter("Symbol rate required") - - sps = sample_rate / symbol_rate - if not sps.is_integer(): - sps_int = int(round(sps)) - if sps_int < 1: - sps_int = 1 - actual_sr = sps_int * symbol_rate - echo_progress(f"Warning: Non-integer samples per symbol ({sps:.4f}). Rounding to {sps_int}.", quiet) - echo_progress(f"Actual sample rate will be {format_sample_rate(actual_sr)}", quiet) - sps = int(sps_int) - sample_rate = actual_sr - else: - sps = int(sps) - - if symbols is None: - # Calc from duration/samples - ns = resolve_length(sample_rate, num_samples, duration) - symbols = int(np.ceil(ns / sps)) - - echo_progress(f"Generating {mod_type}-{order} ({symbols} symbols)...", quiet) - echo_verbose(f" Sample Rate: {format_sample_rate(sample_rate)} (SPS={sps})", verbose) - - bps = int(np.log2(order)) - total_samples = symbols * sps - - # Source - source = load_source(message_source, message_content, None) - - # Mapper and Pulse Shaping - mapper = Mapper(constellation_type=mod_type, num_bits_per_symbol=bps) - upsampler = Upsampling(factor=sps) - - # Filter - if filter_type == "rrc": - filter_block = RootRaisedCosineFilter(span_in_symbols=filter_span, upsampling_factor=sps, beta=filter_beta) - elif filter_type == "rc": - filter_block = RaisedCosineFilter(span_in_symbols=filter_span, upsampling_factor=sps, beta=filter_beta) - elif filter_type == "gaussian": - raise click.ClickException("Gaussian filter not supported yet") - else: - filter_block = None - - # Generate base signal - mapper.connect_input([source]) - upsampler.connect_input([mapper]) - if filter_block: - filter_block.connect_input([upsampler]) - base_recording = filter_block.record(total_samples) - else: - base_recording = upsampler.record(total_samples) - - # Update metadata - for key, value in { - "modulation": mod_type, - "order": order, - "symbol_rate": symbol_rate, - "symbols": symbols, - "filter": filter_type, - }.items(): - base_recording.update_metadata(key, value) - - if center_frequency: - base_recording.update_metadata("center_frequency", center_frequency) - - # Post Processing - chan_params = {"noise_power": noise_power, "path_gain": path_gain} - final_recording = apply_post_processing(base_recording, frequency_shift, channel_type, chan_params, verbose) - - # Trim if explicit num_samples was requested and we generated more (due to symbol alignment) - target_ns = resolve_length(sample_rate, num_samples, duration) - if target_ns and len(final_recording.data[0]) > target_ns: - # Only trim if difference is significant? - # User usually wants exact length if specified. - if num_samples or duration: # If explicitly asked for length - final_recording = final_recording.trim(target_ns) - - for key, value in apply_user_config_metadata(metadata).items(): - final_recording.update_metadata(key, value) - fmt = get_output_format(output, format) - save_recording(final_recording, output, fmt, overwrite, verbose) - - -@generate.command() -@click.option("--symbols", "-N", type=int, help="Number of symbols") -@click.option("--order", "-M", type=int, required=True, help="QAM Order (4, 16, 32, 64, 128, 256, 1024)") -@click.option("--symbol-rate", "-r", type=float, required=True, help="Symbol rate in Hz") -@click.option( - "--filter", - "filter_type", - type=click.Choice(["rrc", "rc", "gaussian", "none"]), - default="rrc", - help="Pulse shaping filter", -) -@click.option("--filter-span", type=int, default=6, help="Filter span in symbols") -@click.option("--filter-beta", type=float, default=0.35, help="Filter roll-off factor") -@click.option( - "--message-source", type=click.Choice(["random", "file", "string"]), default="random", help="Data source" -) -@click.option("--message-content", help="File path or string content") -@common_options -def qam( - sample_rate, - num_samples, - duration, - frequency_shift, - center_frequency, - channel_type, - noise_power, - path_gain, - output, - format, - overwrite, - metadata, - verbose, - quiet, - rician_k, - multipath_paths, - multipath_max_delay, - doppler_freq, - iq_amp_imbalance, - iq_phase_imbalance, - iq_dc_offset, - phase_noise, - gain_fluctuation, - compression, - symbols, - order, - symbol_rate, - filter_type, - filter_span, - filter_beta, - message_source, - message_content, - **kwargs, -): - """Generate QAM modulated signal.""" - - # Determine modulation type (Normal QAM vs Cross QAM) - if order in [32, 128]: - mod_type = "CROSS_QAM" - else: - mod_type = "QAM" - - _run_mod_gen( - mod_type, - sample_rate, - symbols, - num_samples, - duration, - order, - symbol_rate, - filter_type, - filter_span, - filter_beta, - message_source, - message_content, - frequency_shift, - center_frequency, - channel_type, - noise_power, - path_gain, - output, - format, - overwrite, - metadata, - verbose, - quiet, - ) - - -@generate.command() -@click.option("--symbols", "-N", type=int, help="Number of symbols") -@click.option("--order", "-M", type=int, required=True, help="APSK Order (16, 32, 64, 128, 256)") -@click.option("--symbol-rate", "-r", type=float, required=True, help="Symbol rate in Hz") -@click.option( - "--filter", - "filter_type", - type=click.Choice(["rrc", "rc", "gaussian", "none"]), - default="rrc", - help="Pulse shaping filter", -) -@click.option("--filter-span", type=int, default=6, help="Filter span in symbols") -@click.option("--filter-beta", type=float, default=0.35, help="Filter roll-off factor") -@click.option( - "--message-source", type=click.Choice(["random", "file", "string"]), default="random", help="Data source" -) -@click.option("--message-content", help="File path or string content") -@common_options -def apsk( - sample_rate, - num_samples, - duration, - frequency_shift, - center_frequency, - channel_type, - noise_power, - path_gain, - output, - format, - overwrite, - metadata, - verbose, - quiet, - rician_k, - multipath_paths, - multipath_max_delay, - doppler_freq, - iq_amp_imbalance, - iq_phase_imbalance, - iq_dc_offset, - phase_noise, - gain_fluctuation, - compression, - symbols, - order, - symbol_rate, - filter_type, - filter_span, - filter_beta, - message_source, - message_content, - **kwargs, -): - """Generate APSK modulated signal.""" - _run_mod_gen( - "APSK", - sample_rate, - symbols, - num_samples, - duration, - order, - symbol_rate, - filter_type, - filter_span, - filter_beta, - message_source, - message_content, - frequency_shift, - center_frequency, - channel_type, - noise_power, - path_gain, - output, - format, - overwrite, - metadata, - verbose, - quiet, - ) - - -@generate.command() -@click.option("--symbols", "-N", type=int, help="Number of symbols") -@click.option("--order", "-M", type=int, required=True, help="PAM Order (4, 8, 16)") -@click.option("--symbol-rate", "-r", type=float, required=True, help="Symbol rate in Hz") -@click.option( - "--filter", - "filter_type", - type=click.Choice(["rrc", "rc", "gaussian", "none"]), - default="rrc", - help="Pulse shaping filter", -) -@click.option("--filter-span", type=int, default=6, help="Filter span in symbols") -@click.option("--filter-beta", type=float, default=0.35, help="Filter roll-off factor") -@click.option( - "--message-source", type=click.Choice(["random", "file", "string"]), default="random", help="Data source" -) -@click.option("--message-content", help="File path or string content") -@common_options -def pam( - sample_rate, - num_samples, - duration, - frequency_shift, - center_frequency, - channel_type, - noise_power, - path_gain, - output, - format, - overwrite, - metadata, - verbose, - quiet, - rician_k, - multipath_paths, - multipath_max_delay, - doppler_freq, - iq_amp_imbalance, - iq_phase_imbalance, - iq_dc_offset, - phase_noise, - gain_fluctuation, - compression, - symbols, - order, - symbol_rate, - filter_type, - filter_span, - filter_beta, - message_source, - message_content, - **kwargs, -): - """Generate PAM modulated signal.""" - _run_mod_gen( - "PAM", - sample_rate, - symbols, - num_samples, - duration, - order, - symbol_rate, - filter_type, - filter_span, - filter_beta, - message_source, - message_content, - frequency_shift, - center_frequency, - channel_type, - noise_power, - path_gain, - output, - format, - overwrite, - metadata, - verbose, - quiet, - ) - - -@generate.command() -@click.option("--symbols", "-N", type=int, help="Number of symbols") -@click.option("--order", "-M", type=int, default=2, help="FSK Order (2, 4, 8)") -@click.option("--symbol-rate", "-r", type=float, required=True, help="Symbol rate in Hz") -@click.option("--freq-spacing", type=float, help="Frequency spacing (Hz)") -@click.option("--modulation-index", "-h", type=float, help="Modulation Index (alternative to spacing)") -@click.option( - "--message-source", type=click.Choice(["random", "file", "string"]), default="random", help="Data source" -) -@click.option("--message-content", help="File path or string content") -@common_options -def fsk( - sample_rate, - num_samples, - duration, - frequency_shift, - center_frequency, - channel_type, - noise_power, - path_gain, - output, - format, - overwrite, - metadata, - verbose, - quiet, - rician_k, - multipath_paths, - multipath_max_delay, - doppler_freq, - iq_amp_imbalance, - iq_phase_imbalance, - iq_dc_offset, - phase_noise, - gain_fluctuation, - compression, - symbols, - order, - symbol_rate, - freq_spacing, - modulation_index, - message_source, - message_content, - **kwargs, -): - """Generate FSK modulated signal.""" - - # FSK uses FSKModulator which is a standalone Source/Modulator block? No, it's a Modulator. - # Takes bits input. - - # Determine spacing - if freq_spacing is None: - if modulation_index is None: - modulation_index = 1.0 # Default - freq_spacing = modulation_index * symbol_rate - - # Samples per symbol - sps = sample_rate / symbol_rate # FSKModulator takes sampling_freq and symbol_duration (1/rate) - symbol_duration = 1.0 / symbol_rate - - # Resolve length - ns = resolve_length(sample_rate, num_samples, duration, symbols, sps) - if symbols is None: - symbols = int(np.ceil(ns / sps)) - - echo_progress(f"Generating {order}-FSK (Spacing={format_frequency(freq_spacing)})...", quiet) - - # Bits - bps = int(np.log2(order)) - num_bits = symbols * bps - - # Source - source_bits = load_source(message_source, message_content, num_bits) - - # Modulator - mod = FSKModulator( - num_bits_per_symbol=bps, - frequency_spacing=freq_spacing, - symbol_duration=symbol_duration, - sampling_frequency=sample_rate, - ) - - # Generate - samples = mod(source_bits) - # Flatten - samples = samples.flatten()[:ns] - - recording = Recording(data=samples, metadata={"sample_rate": sample_rate}) - recording._metadata.update( - { - "modulation": "FSK", - "order": order, - "symbol_rate": symbol_rate, - "freq_spacing": freq_spacing, - "mod_index": modulation_index if modulation_index else freq_spacing / symbol_rate, - } - ) - - if center_frequency: - recording._metadata["center_frequency"] = center_frequency - - chan_params = { - "noise_power": noise_power, - "path_gain": path_gain, - "rician_k": rician_k, - "multipath_paths": multipath_paths, - "multipath_max_delay": multipath_max_delay, - "doppler_freq": doppler_freq, - "iq_amp_imbalance": iq_amp_imbalance, - "iq_phase_imbalance": iq_phase_imbalance, - "iq_dc_offset": iq_dc_offset, - "phase_noise": phase_noise, - "gain_fluctuation": gain_fluctuation, - "compression": compression, - } - - recording = apply_post_processing(recording, frequency_shift, channel_type, chan_params, verbose) - - for key, value in apply_user_config_metadata(metadata).items(): - recording.update_metadata(key, value) - fmt = get_output_format(output, format) - save_recording(recording, output, fmt, overwrite, verbose) - - -@generate.command() -@click.option("--symbol-rate", "-r", type=float, required=True, help="Symbol rate in Hz") -@click.option( - "--message-source", type=click.Choice(["random", "file", "string"]), default="random", help="Data source" -) -@click.option("--message-content", help="File path or string content") -@common_options -def ook( - sample_rate, - num_samples, - duration, - frequency_shift, - center_frequency, - channel_type, - noise_power, - path_gain, - output, - format, - overwrite, - metadata, - verbose, - quiet, - rician_k, - multipath_paths, - multipath_max_delay, - doppler_freq, - iq_amp_imbalance, - iq_phase_imbalance, - iq_dc_offset, - phase_noise, - gain_fluctuation, - compression, - symbol_rate, - message_source, - message_content, - **kwargs, -): - """Generate On-Off Keying (OOK) signal.""" - - sps = int(sample_rate / symbol_rate) - ns = resolve_length(sample_rate, num_samples, duration) - - echo_progress("Generating OOK...", quiet) - - # Source Block - source = load_source(message_source, message_content, None) - - # OOK Modulator - mod = OOKModulator(source, samples_per_symbol=sps) - recording = mod.record(ns) - recording._metadata["sample_rate"] = sample_rate - recording._metadata["modulation"] = "OOK" - - if center_frequency: - recording._metadata["center_frequency"] = center_frequency - - chan_params = { - "noise_power": noise_power, - "path_gain": path_gain, - "rician_k": rician_k, - "multipath_paths": multipath_paths, - "multipath_max_delay": multipath_max_delay, - "doppler_freq": doppler_freq, - "iq_amp_imbalance": iq_amp_imbalance, - "iq_phase_imbalance": iq_phase_imbalance, - "iq_dc_offset": iq_dc_offset, - "phase_noise": phase_noise, - "gain_fluctuation": gain_fluctuation, - "compression": compression, - } - - recording = apply_post_processing(recording, frequency_shift, channel_type, chan_params, verbose) - - for key, value in apply_user_config_metadata(metadata).items(): - recording.update_metadata(key, value) - fmt = get_output_format(output, format) - save_recording(recording, output, fmt, overwrite, verbose) - - -@generate.command() -@click.option("--symbol-rate", "-r", type=float, required=True, help="Symbol rate in Hz") -@click.option( - "--message-source", type=click.Choice(["random", "file", "string"]), default="random", help="Data source" -) -@click.option("--message-content", help="File path or string content") -@common_options -def oqpsk( - sample_rate, - num_samples, - duration, - frequency_shift, - center_frequency, - channel_type, - noise_power, - path_gain, - output, - format, - overwrite, - metadata, - verbose, - quiet, - rician_k, - multipath_paths, - multipath_max_delay, - doppler_freq, - iq_amp_imbalance, - iq_phase_imbalance, - iq_dc_offset, - phase_noise, - gain_fluctuation, - compression, - symbol_rate, - message_source, - message_content, - **kwargs, -): - """Generate Offset QPSK (OQPSK) signal.""" - - sps = int(sample_rate / symbol_rate) - ns = resolve_length(sample_rate, num_samples, duration) - - echo_progress("Generating OQPSK...", quiet) - - # Source Block - source = load_source(message_source, message_content, None) - - # OQPSK Modulator - mod = OQPSKModulator(source, samples_per_symbol=sps) - recording = mod.record(ns) - recording._metadata["sample_rate"] = sample_rate - recording._metadata["modulation"] = "OQPSK" - - if center_frequency: - recording._metadata["center_frequency"] = center_frequency - - chan_params = { - "noise_power": noise_power, - "path_gain": path_gain, - "rician_k": rician_k, - "multipath_paths": multipath_paths, - "multipath_max_delay": multipath_max_delay, - "doppler_freq": doppler_freq, - "iq_amp_imbalance": iq_amp_imbalance, - "iq_phase_imbalance": iq_phase_imbalance, - "iq_dc_offset": iq_dc_offset, - "phase_noise": phase_noise, - "gain_fluctuation": gain_fluctuation, - "compression": compression, - } - - recording = apply_post_processing(recording, frequency_shift, channel_type, chan_params, verbose) - - for key, value in apply_user_config_metadata(metadata).items(): - recording.update_metadata(key, value) - fmt = get_output_format(output, format) - save_recording(recording, output, fmt, overwrite, verbose) - - -@generate.command() -@click.option("--symbol-rate", "-r", type=float, required=True, help="Symbol rate in Hz") -@click.option("--bt", type=float, default=0.3, help="Bandwidth-Time product (e.g., 0.3, 0.5)") -@click.option( - "--message-source", type=click.Choice(["random", "file", "string"]), default="random", help="Data source" -) -@click.option("--message-content", help="File path or string content") -@common_options -def gmsk( - sample_rate, - num_samples, - duration, - frequency_shift, - center_frequency, - channel_type, - noise_power, - path_gain, - output, - format, - overwrite, - metadata, - verbose, - quiet, - rician_k, - multipath_paths, - multipath_max_delay, - doppler_freq, - iq_amp_imbalance, - iq_phase_imbalance, - iq_dc_offset, - phase_noise, - gain_fluctuation, - compression, - symbol_rate, - bt, - message_source, - message_content, - **kwargs, -): - """Generate GMSK modulated signal.""" - - sps = int(sample_rate / symbol_rate) - ns = resolve_length(sample_rate, num_samples, duration) - - echo_progress(f"Generating GMSK (BT={bt})...", quiet) - - # Source Block - source = load_source(message_source, message_content, None) - - # GMSK Modulator - mod = GMSKModulator(source, samples_per_symbol=sps, bt=bt) - recording = mod.record(ns) - recording._metadata["sample_rate"] = sample_rate - recording._metadata["modulation"] = "GMSK" - recording._metadata["bt_product"] = bt - - if center_frequency: - recording._metadata["center_frequency"] = center_frequency - - chan_params = { - "noise_power": noise_power, - "path_gain": path_gain, - "rician_k": rician_k, - "multipath_paths": multipath_paths, - "multipath_max_delay": multipath_max_delay, - "doppler_freq": doppler_freq, - "iq_amp_imbalance": iq_amp_imbalance, - "iq_phase_imbalance": iq_phase_imbalance, - "iq_dc_offset": iq_dc_offset, - "phase_noise": phase_noise, - "gain_fluctuation": gain_fluctuation, - "compression": compression, - } - - recording = apply_post_processing(recording, frequency_shift, channel_type, chan_params, verbose) - - for key, value in apply_user_config_metadata(metadata).items(): - recording.update_metadata(key, value) - fmt = get_output_format(output, format) - save_recording(recording, output, fmt, overwrite, verbose) - - -@generate.command() -@click.option("--symbols", "-N", type=int, help="Number of symbols") -@click.option("--order", "-M", type=int, required=True, help="PSK Order (2, 4, 8)") -@click.option("--symbol-rate", "-r", type=float, required=True, help="Symbol rate in Hz") -@click.option( - "--filter", - "filter_type", - type=click.Choice(["rrc", "rc", "gaussian", "none"]), - default="rrc", - help="Pulse shaping filter", -) -@click.option("--filter-span", type=int, default=6, help="Filter span in symbols") -@click.option("--filter-beta", type=float, default=0.35, help="Filter roll-off factor") -@click.option( - "--message-source", type=click.Choice(["random", "file", "string"]), default="random", help="Data source" -) -@click.option("--message-content", help="File path or string content") -@common_options -def psk( - sample_rate, - num_samples, - duration, - frequency_shift, - center_frequency, - channel_type, - noise_power, - path_gain, - output, - format, - overwrite, - metadata, - verbose, - quiet, - symbols, - order, - symbol_rate, - filter_type, - filter_span, - filter_beta, - message_source, - message_content, - **kwargs, -): - """Generate PSK modulated signal.""" - _run_mod_gen( - "PSK", - sample_rate, - symbols, - num_samples, - duration, - order, - symbol_rate, - filter_type, - filter_span, - filter_beta, - message_source, - message_content, - frequency_shift, - center_frequency, - channel_type, - noise_power, - path_gain, - output, - format, - overwrite, - metadata, - verbose, - quiet, - ) - - -@generate.command() -@click.option("--bandwidth", "-b", type=int, required=True, help="Bandwidth in MHz (e.g. 10, 20)") -@click.option("--mu", "-u", type=int, default=1, help="Numerology (0-3)") -@click.option("--frames", type=int, default=1, help="Number of 10ms frames") -@click.option("--ssb/--no-ssb", default=True, help="Enable SSB") -@common_options -def nr5g( - sample_rate, - frequency_shift, - center_frequency, - channel_type, - noise_power, - path_gain, - output, - format, - overwrite, - metadata, - verbose, - quiet, - bandwidth, - mu, - frames, - ssb, - **kwargs, -): - """Generate 5G NR frame.""" - - if not HAS_NR5G: - raise click.ClickException("5G NR Generator not available (missing dependencies or module)") - - echo_progress(f"Generating 5G NR ({bandwidth} MHz, mu={mu}, {frames} frames)...", quiet) - - # NR5GGenerator parameters - # It determines sample rate based on bandwidth/mu/fr? - # nr_ofdm_params(bandwidth_mhz, mu, fr) returns fs. - # We should verify if user supplied sample_rate matches or we should ignore user sample_rate? - # Or we resample? - # The generator has fixed fs for a given BW/mu config usually. - # Let's instantiate it and see its fs. - - gen = NR5GGenerator(bandwidth_mhz=bandwidth, mu=mu, frames_per_recording=frames, ssb=ssb) - - native_fs = gen.fs - if sample_rate and abs(sample_rate - native_fs) > 1.0: - echo_progress( - message=( - f"Warning: Requested sample rate {format_sample_rate(sample_rate)} " - f"differs from native NR rate {format_sample_rate(native_fs)}." - ), - quiet=quiet, - ) - echo_progress("Output will be at native rate.", quiet) - # If we really wanted to support arbitrary rate, we'd need resampling. - # For now, just warn and use native. - - recording = gen.record(batch_size=1) - - recording._metadata["signal_type"] = "nr5g" - - if center_frequency: - recording._metadata["center_frequency"] = center_frequency - - # Post processing - chan_params = {"noise_power": noise_power, "path_gain": path_gain} - recording = apply_post_processing(recording, frequency_shift, channel_type, chan_params, verbose) - - for key, value in apply_user_config_metadata(metadata).items(): - recording.update_metadata(key, value) - fmt = get_output_format(output, format) - save_recording(recording, output, fmt, overwrite, verbose) diff --git a/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/split.py b/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/split.py index 93974d3..2e0b0a0 100644 --- a/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/split.py +++ b/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/split.py @@ -4,9 +4,7 @@ from pathlib import Path import click import numpy as np - -from utils.io import from_npy_legacy, load_recording -from utils_cli.utils.common import ( +from ria_toolkit_oss_cli.ria_toolkit_oss.common import ( detect_file_format, echo_progress, echo_verbose, @@ -14,6 +12,8 @@ from utils_cli.utils.common import ( save_recording, ) +from ria_toolkit_oss.io import from_npy_legacy, load_recording + def get_output_extension(format_name): """Get file extension for format name.""" diff --git a/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/transform.py b/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/transform.py index e43d2f4..71ea6ab 100644 --- a/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/transform.py +++ b/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/transform.py @@ -7,17 +7,17 @@ import os from pathlib import Path import click - -from utils.data.recording import Recording -from utils.io.recording import load_recording -from utils.transforms import iq_augmentations, iq_channel_models, iq_impairments -from utils_cli.utils.common import ( +from ria_toolkit_oss_cli.ria_toolkit_oss.common import ( echo_progress, echo_verbose, format_sample_count, save_recording, ) +from ria_toolkit_oss.datatypes.recording import Recording +from ria_toolkit_oss.io.recording import load_recording +from ria_toolkit_oss.transforms import iq_augmentations, iq_impairments + def get_available_transforms(module): """Get list of public transform functions from a module. @@ -89,7 +89,7 @@ def show_transform_help(transform_name, func): def quick_view_transform(recording, output_path, title="Transform Result"): """Create a quick PNG visualization of transformed recording using constellation plot.""" try: - from utils.view.view_signal_simple import view_simple_sig + from ria_toolkit_oss.view.view_signal_simple import view_simple_sig # Create PNG in same directory as output output_dir = Path(output_path).parent @@ -249,6 +249,7 @@ def load_input(input, verbose): echo_verbose(f"Loaded {format_sample_count(recording.data.shape[-1])} samples", verbose) return recording + @click.group() def transform(): """Apply signal transformations to recordings. @@ -260,20 +261,20 @@ def transform(): Each operation is applied independently. Chain multiple transforms by running this command multiple times. - + \b Examples: - # List available augmentations - utils transform augment --list + # List available augmentations + ria_toolkit_oss transform augment --list \b # Apply channel swap - utils transform augment channel_swap input.npy + ria_toolkit_oss transform augment channel_swap input.npy \b # Apply AWGN impairment - utils transform impair awgn input.npy --snr-db 15 + ria_toolkit_oss transform impair awgn input.npy --snr-db 15 \b # Apply Rayleigh fading channel - utils transform apply_channel rayleigh input.npy --num-paths 5 + ria_toolkit_oss transform apply_channel rayleigh input.npy --num-paths 5 """ pass @@ -299,19 +300,19 @@ def augment(augmentation, input, output, list_transforms, help_transform, params # List all augmentations \b - utils transform augment --list + ria_toolkit_oss transform augment --list # Show parameters for an augmentation \b - utils transform augment channel_swap --help-transform + ria_toolkit_oss transform augment channel_swap --help-transform # Apply augmentation \b - utils transform augment channel_swap input.npy + ria_toolkit_oss transform augment channel_swap input.npy # Apply with parameters and save visualization \b - utils transform augment drop_samples input.npy --params max_section_size=5 --view + ria_toolkit_oss transform augment drop_samples input.npy --params max_section_size=5 --view """ available = get_available_transforms(iq_augmentations) @@ -406,19 +407,19 @@ def impair(impairment, input, output, list_transforms, help_transform, params, v # List all impairments \b - utils transform impair --list + ria_toolkit_oss transform impair --list # Show parameters for an impairment \b - utils transform impair add_awgn_to_signal --help-transform + ria_toolkit_oss transform impair add_awgn_to_signal --help-transform # Apply impairment \b - utils transform impair add_awgn_to_signal input.npy --params snr=10 + ria_toolkit_oss transform impair add_awgn_to_signal input.npy --params snr=10 # Apply with visualization \b - utils transform impair add_phase_noise input.npy --params phase_variance=0.001 --view + ria_toolkit_oss transform impair add_phase_noise input.npy --params phase_variance=0.001 --view """ available = get_available_transforms(iq_impairments) @@ -514,10 +515,10 @@ def apply_channel( \b Examples: - utils transform apply_channel rayleigh_fading_channel input.npy --params num_paths=3 snr_db=15 + ria_toolkit_oss transform apply_channel rayleigh_fading_channel input.npy --params num_paths=3 snr_db=15 \b - utils transform apply_channel doppler_channel recordings/input.npy \\ + ria_toolkit_oss transform apply_channel doppler_channel recordings/input.npy \\ --params satellite_velocity=7500 \\ --params satellite_initial_distance=400000 \\ --params frequency=1e9 \\ @@ -637,19 +638,19 @@ def custom( # List all custom transforms in directory \b - utils transform custom --transform-dir ~/my_transforms --list + ria_toolkit_oss transform custom --transform-dir ~/my_transforms --list # Show parameters for a transform \b - utils transform custom my_filter --transform-dir ~/my_transforms --help-transform + ria_toolkit_oss transform custom my_filter --transform-dir ~/my_transforms --help-transform # Apply custom transform \b - utils transform custom my_filter input.npy --transform-dir ~/my_transforms + ria_toolkit_oss transform custom my_filter input.npy --transform-dir ~/my_transforms # With parameters and visualization \b - utils transform custom my_filter input.npy --transform-dir ~/my_transforms \\ + ria_toolkit_oss transform custom my_filter input.npy --transform-dir ~/my_transforms \\ --params cutoff_freq=5000 order=4 --view """ try: diff --git a/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/transmit.py b/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/transmit.py index db3c4bb..4ef8e60 100644 --- a/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/transmit.py +++ b/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/transmit.py @@ -6,8 +6,8 @@ import time import click -from utils.data import Recording -from utils.io import from_npy_legacy, load_recording +from ria_toolkit_oss.datatypes import Recording +from ria_toolkit_oss.io import from_npy_legacy, load_recording from .common import ( echo_progress, @@ -374,12 +374,12 @@ def transmit( quiet, ): """Transmit IQ samples from file using SDR device. - + \b Examples: utils transmit -d hackrf --generate lfm --continuous utils transmit -d pluto -f 2.44G -g -10 -in recordings/rec_HackRF_2MHz_2025-12-01_15-36-21_80fc33f.sigmf-data - + """ # Load config file if specified diff --git a/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/view.py b/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/view.py index a092aab..ee6dc01 100644 --- a/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/view.py +++ b/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/view.py @@ -6,9 +6,9 @@ from typing import Optional import click -from utils.io.recording import from_npy, load_recording -from utils.view.view_signal import view_annotations, view_channels, view_sig -from utils.view.view_signal_simple import view_simple_sig +from ria_toolkit_oss.io.recording import from_npy, load_recording +from ria_toolkit_oss.view.view_signal import view_channels, view_sig +from ria_toolkit_oss.view.view_signal_simple import view_simple_sig from .common import echo_progress, echo_verbose, load_yaml_config @@ -34,11 +34,6 @@ VISUALIZATION_TYPES = { "spines", ], }, - "annotations": { - "function": view_annotations, - "description": "Annotation-focused spectrogram view", - "options": ["channel", "dark"], - }, "channels": {"function": view_channels, "description": "Multi-channel IQ and spectrogram view", "options": []}, } diff --git a/src/ria_toolkit_oss/view/view_signal.py b/src/ria_toolkit_oss/view/view_signal.py index 1826962..95c2b44 100644 --- a/src/ria_toolkit_oss/view/view_signal.py +++ b/src/ria_toolkit_oss/view/view_signal.py @@ -38,6 +38,7 @@ def set_spines(ax, spines): ax.spines["bottom"].set_visible(False) ax.spines["left"].set_visible(False) + def view_channels( recording: Recording, output_path: Optional[str] = "images/signal.png",