From e32f9877158aba46f1ab58d264c1f5d30f58c5a5 Mon Sep 17 00:00:00 2001 From: gillian Date: Tue, 9 Dec 2025 14:49:34 -0500 Subject: [PATCH] added generate file back in, will need to change things when signal is added to ria toolkit oss --- .../ria_toolkit_oss/generate.py | 1586 +++++++++++++++++ 1 file changed, 1586 insertions(+) create mode 100644 src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/generate.py diff --git a/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/generate.py b/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/generate.py new file mode 100644 index 0000000..9a10d0f --- /dev/null +++ b/src/ria_toolkit_oss/ria_toolkit_oss_cli/ria_toolkit_oss/generate.py @@ -0,0 +1,1586 @@ +"""Generate command - Generate synthetic signals.""" + +from pathlib import Path +from typing import Optional + +import click +import numpy as np +import yaml + +import utils.signal.basic_signal_generator as basic_gen +from utils.data import Recording +from utils.signal.block_gen.continuous_modulation.fsk_modulator import FSKModulator +from utils.signal.block_generator.basic import FrequencyShift +from utils.signal.block_generator.data_types import DataType +from utils.signal.block_generator.mapping.apsk_mapper import _APSKMapper +from utils.signal.block_generator.mapping.cross_qam_mapper import _CrossQAMMapper +from utils.signal.block_generator.mapping.mapper import Mapper +from utils.signal.block_generator.modulation import ( + GMSKModulator, + OOKModulator, + OQPSKModulator, +) +from utils.signal.block_generator.pulse_shaping import ( + RaisedCosineFilter, + RootRaisedCosineFilter, + Upsampling, +) +from utils.signal.block_generator.source import ( + LFMJammingSource, + RandomBinarySource, + RecordingSource, + SawtoothSource, + SquareSource, +) + +# Block Generator Imports +from utils.signal.block_generator.source_block import SourceBlock + +# Transforms for impairments +from utils.transforms.iq_channel_models import ( + complex_multipath_rayleigh_channel, + rician_fading_channel, +) +from utils.transforms.iq_impairments import ( + add_compression, + add_doppler, + add_gain_fluctuation, + add_phase_noise, + iq_imbalance, +) + +# NR 5G Import +try: + from utils.signal.block_gen.nr_5g.nr_5g_generator import NR5GGenerator + + HAS_NR5G = True +except ImportError: + HAS_NR5G = False + +from utils_cli.utils.common import ( + echo_progress, + echo_verbose, + format_frequency, + format_sample_rate, + parse_metadata_args, + save_recording, +) +from utils_cli.utils.config import load_user_config + + +# Extend Mapper to support new types +def _create_extended_mapper(self): + if self.constellation_type.upper() == "APSK": + return _APSKMapper(self.num_bits_per_symbol, self.normalize, self.use_gray_code) + elif self.constellation_type.upper() == "CROSS_QAM": + return _CrossQAMMapper(self.num_bits_per_symbol, self.normalize, self.use_gray_code) + else: + # Original factory + return self._original_create_constellation_mapper() + + +# Monkey patch Mapper to support new types without modifying original file +Mapper._original_create_constellation_mapper = Mapper._create_constellation_mapper +Mapper._create_constellation_mapper = _create_extended_mapper + + +def load_config_options(ctx, param, value): + """Callback to load options from YAML config file.""" + if not value: + return None + + try: + with open(value, "r") as f: + config = yaml.safe_load(f) + + # Store config in context for other commands to access + ctx.default_map = config + return value + except Exception as e: + raise click.BadParameter(f"Error loading config file: {e}") + + +def apply_user_config_metadata(metadata_tuple): + """Apply user config metadata and merge with CLI metadata. + + Args: + metadata_tuple: Tuple of metadata KEY=VALUE strings from CLI + + Returns: + dict: Merged metadata dictionary + """ + # Load user config + user_config = load_user_config() + metadata_dict = {} + + # Apply user config metadata (if user config exists) + if user_config: + # Add standard metadata fields from config + for key in ["author", "organization", "project", "location", "testbed"]: + if key in user_config: + metadata_dict[key] = user_config[key] + + # Add SigMF fields from config + if "sigmf" in user_config: + sigmf = user_config["sigmf"] + for key in ["license", "hw", "dataset"]: + if key in sigmf: + metadata_dict[key] = sigmf[key] + + # CLI metadata overrides everything + if metadata_tuple: + metadata_dict.update(parse_metadata_args(metadata_tuple)) + + return metadata_dict + + +def get_output_format(output: Optional[str], format_opt: Optional[str]) -> str: + """Determine output format from filename or option.""" + if format_opt: + return format_opt + + if not output: + return "sigmf" # Default to sigmf for better metadata support + + ext = Path(output).suffix.lower() + if ext in [".sigmf", ".sigmf-data", ".sigmf-meta"]: + return "sigmf" + elif ext == ".npy": + return "npy" + elif ext == ".wav": + return "wav" + elif ext == ".blue": + return "blue" + else: + return "sigmf" + + +class FileSourceBlock(SourceBlock): + """Generates bits from a file or bytes.""" + + def __init__(self, data: bytes, repeat: bool = True): + self.data = data + self.repeat = repeat + # Convert to bits + bits = np.unpackbits(np.frombuffer(data, dtype=np.uint8)) + self.bits = bits.astype(np.float32) # SourceBlock expects float32 bits (0.0, 1.0) + self.idx = 0 + + @property + def output_type(self) -> DataType: + return DataType.BITS + + def __call__(self, num_samples: int) -> np.ndarray: + out = np.zeros(num_samples, dtype=np.float32) + filled = 0 + while filled < num_samples: + remaining = num_samples - filled + available = len(self.bits) - self.idx + + take = min(remaining, available) + out[filled : filled + take] = self.bits[self.idx : self.idx + take] + + self.idx += take + filled += take + + if self.idx >= len(self.bits): + if self.repeat: + self.idx = 0 + else: + # Pad with zeros if not repeating + break + + return out + + +def apply_post_processing( + recording: Recording, frequency_shift: float, channel_type: str, channel_params: dict, verbose: bool +) -> Recording: + """Apply frequency shift and channel models to a recording.""" + + # 1. Frequency Shift (Pre-channel) + if frequency_shift != 0: + echo_verbose(f"Applying frequency shift: {format_frequency(frequency_shift)}", verbose) + # Use simple phase shift if only 1 block? No, basic gen FrequencyShift + # We can use RecordingSource + FrequencyShift + record() + source = RecordingSource(recording) + fs_block = FrequencyShift(shift_frequency=frequency_shift, sampling_rate=recording.sample_rate) + fs_block.input = [source] + num = len(recording.data[0]) if recording.n_chan > 0 else len(recording.data) + # get_samples + processed = fs_block.get_samples(num) + recording = Recording(data=processed, metadata=recording.metadata) + + # 2. Dynamic Impairments (Transforms) + + # Rician / Rayleigh + if channel_type == "rayleigh": + # Use improved complex multipath if available + echo_verbose("Applying Multipath Rayleigh Channel", verbose) + recording = complex_multipath_rayleigh_channel( + recording, + num_paths=channel_params.get("multipath_paths") or 3, + max_delay=channel_params.get("multipath_max_delay") or 2.6e-6, + sample_rate=recording.sample_rate, + snr_db=None, # We handle noise separately + ) + + elif channel_type == "rician": + echo_verbose(f"Applying Rician Channel (K={channel_params.get('rician_k', 2.0)})", verbose) + recording = rician_fading_channel( + recording, + k_factor=channel_params.get("rician_k", 2.0), + num_paths=channel_params.get("multipath_paths") or 3, + max_delay=channel_params.get("multipath_max_delay") or 1.2e-6, + sample_rate=recording.sample_rate, + snr_db=None, + ) + + # Doppler + doppler_freq = channel_params.get("doppler_freq") + if doppler_freq: + echo_verbose(f"Applying Doppler (Shift={doppler_freq} Hz)", verbose) + # add_doppler expects velocity. Convert freq to velocity assuming 1GHz carrier or pass freq directly? + # dynamic_channel wrapper handles this conversion. + # Or use add_doppler directly if we have velocity. + # User supplied doppler_freq. + # Let's use a simple transform or dynamic_channel + # We need to reuse dynamic_channel logic for freq->velocity conversion or assume carrier. + # Or create add_doppler_freq(signal, freq_shift) + # add_doppler takes satellite_velocity etc. + # dynamic_channel takes doppler_hz. + # We use dynamic_channel logic here but just for Doppler part + c_light = 299792458 + f_carrier = 1e9 # Assumption for conversion + velocity = doppler_freq * c_light / f_carrier + recording = add_doppler( + recording, + satellite_velocity=velocity, + satellite_initial_distance=1000, + frequency=f_carrier, + sample_rate=recording.sample_rate, + ) + + # IQ Imbalance + amp = channel_params.get("iq_amp_imbalance") + phase = channel_params.get("iq_phase_imbalance") + dc = channel_params.get("iq_dc_offset") + if amp or phase or dc: + echo_verbose(f"Applying IQ Imbalance (Amp={amp}dB, Phase={phase}rad, DC={dc})", verbose) + recording = iq_imbalance( + recording, + amplitude_imbalance=( + amp if amp is not None else 0 + ), # iq_imbalance defaults to 1.5? We want 0 if not set but one of others is set. + phase_imbalance=phase if phase is not None else 0, + dc_offset=dc if dc is not None else 0, + ) + + # Phase Noise + pn = channel_params.get("phase_noise") + if pn: + echo_verbose(f"Applying Phase Noise (Var={pn})", verbose) + recording = add_phase_noise(recording, phase_variance=pn) + + # Gain Fluctuation + gf = channel_params.get("gain_fluctuation") + if gf: + echo_verbose(f"Applying Gain Fluctuation (Var={gf})", verbose) + recording = add_gain_fluctuation(recording, amplitude_variance=gf) + + # Compression + comp = channel_params.get("compression") + if comp: + echo_verbose(f"Applying Compression (Gain={comp})", verbose) + recording = add_compression(recording, compression_gain=comp) + + # 3. AWGN (Final stage usually) + if channel_type == "awgn" or channel_params.get("noise_power"): + # If 'awgn' selected OR noise_power explicitly set (default is 0.1, so always set?) + # If channel_type is NOT awgn/rayleigh/rician, and noise_power is default 0.1? + # If user didn't specify noise_power, but did specify channel_type=none, do we add noise? + # Default noise_power is 0.1. + # If channel_type == 'none', we probably shouldn't add noise unless user asked for it. + # But noise_power has default. + # Let's check if channel_type is 'awgn'. + # Or if user provided --noise-power? + # (We can't distinguish default vs user provided easily with click unless we use ctx) + # For now: only add noise if channel_type is set to something, or if noise_power > 0 and user intended it. + # Simpler: If channel_type == 'awgn', definitely add. + # If rayleigh/rician, they might want noise too. + # If 'none', skip noise? + + should_add_noise = False + if channel_type in ["awgn", "rayleigh", "rician"]: + should_add_noise = True + + if should_add_noise: + npow = channel_params.get("noise_power", 0.1) + echo_verbose(f"Applying AWGN (Power={npow})", verbose) + # Convert Power (variance) to SNR? + # add_awgn_to_signal takes SNR. + # AWGNChannel block takes Variance. + # Use AWGNChannel block logic (additive noise with variance) + # or utils.transforms.iq_channel_models.awgn_channel which takes SNR. + # The user CLI says --noise-power (variance). + # We should use a simple additive noise function with variance. + # transforms.iq_augmentations.generate_awgn uses SNR. + # Let's implement simple additive noise here or use AWGNChannel block. + + # Use AWGNChannel block logic directly + noise_std = np.sqrt(npow / 2) + noise = noise_std * (np.random.randn(*recording.data.shape) + 1j * np.random.randn(*recording.data.shape)) + recording = Recording(data=recording.data + noise, metadata=recording.metadata) + + return recording + + +@click.group() +def generate(): + """Generate synthetic signals. + + \b + Examples: + utils synth chirp -b 1e6 -p 0.01 -s 10e6 -o chirp_basic.sigmf + utils synth fsk -M 2 -r 100e3 -s 2e6 -o fsk2_basic.sigmf + + """ + pass + + +def common_options(f): + """Decorator for common options.""" + f = click.option("--sample-rate", "-s", type=float, required=True, help="Sample rate in Hz")(f) + f = click.option("--num-samples", "-n", type=int, help="Number of samples")(f) + f = click.option("--duration", "-t", type=float, help="Duration in seconds (alternative to --num-samples)")(f) + f = click.option("--frequency-shift", type=float, default=0.0, help="Digital frequency shift from baseband (Hz)")( + f + ) + f = click.option("--center-frequency", "-fc", type=float, help="Metadata center frequency (Hz)")(f) + f = click.option( + "--channel-type", type=click.Choice(["none", "awgn", "rayleigh"]), default="none", help="Channel model" + )(f) + f = click.option("--noise-power", type=float, default=0.1, help="Noise power (variance) for AWGN")(f) + f = click.option("--path-gain", type=float, default=0.0, help="Path gain (dB) for Rayleigh")(f) + f = click.option("--output", "-o", required=True, help="Output filename")(f) + f = click.option("--format", "-F", type=click.Choice(["npy", "sigmf", "wav", "blue"]), help="Output format")(f) + + # Impairment options + f = click.option("--rician-k", type=float, help="Rician K-factor")(f) + f = click.option("--multipath-paths", type=int, help="Multipath: Number of paths")(f) + f = click.option("--multipath-max-delay", type=float, help="Multipath: Max delay (s)")(f) + f = click.option("--doppler-freq", type=float, help="Doppler: Frequency shift (Hz)")(f) + f = click.option("--iq-amp-imbalance", type=float, help="IQ Imbalance: Amplitude (dB)")(f) + f = click.option("--iq-phase-imbalance", type=float, help="IQ Imbalance: Phase (rad)")(f) + f = click.option("--iq-dc-offset", type=float, help="IQ Imbalance: DC Offset")(f) + f = click.option("--phase-noise", type=float, help="Phase Noise: Variance")(f) + f = click.option("--gain-fluctuation", type=float, help="Gain Fluctuation: Variance")(f) + f = click.option("--compression", type=float, help="Compression: Gain")(f) + + f = click.option( + "--config", + "-c", + callback=load_config_options, + is_eager=True, + expose_value=False, + type=click.Path(exists=True), + help="Load parameters from YAML", + )(f) + f = click.option("--overwrite", "-w", is_flag=True, help="Overwrite existing file")(f) + f = click.option("--metadata", "-m", multiple=True, help="Add metadata KEY=VALUE")(f) + f = click.option("--verbose", "-v", is_flag=True, help="Verbose output")(f) + f = click.option("--quiet", "-q", is_flag=True, help="Suppress output")(f) + return f + + +def resolve_length(sample_rate, num_samples, duration, symbols=None, sps=None): + """Resolve generation length.""" + if symbols is not None and sps is not None: + # Modulation specific + if num_samples: + # If both provided, check consistency or prefer num_samples? + # We'll treat symbols as the driver if provided. + pass + return int(symbols * sps) + + if num_samples: + return int(num_samples) + + if duration: + return int(duration * sample_rate) + + # Default + return 10000 + + +@generate.command() +@click.option("--frequency", "-f", type=float, default=1000.0, help="Tone frequency relative to carrier (Hz)") +@click.option("--amplitude", "-a", type=float, default=1.0, help="Amplitude (0.0-1.0)") +@click.option("--phase", "-p", type=float, default=0.0, help="Initial phase in radians") +@common_options +def tone( + sample_rate, + num_samples, + duration, + frequency_shift, + center_frequency, + channel_type, + noise_power, + path_gain, + output, + format, + overwrite, + metadata, + verbose, + quiet, + frequency, + amplitude, + phase, + **kwargs, +): + """Generate a complex tone.""" + + ns = resolve_length(sample_rate, num_samples, duration) + + echo_progress(f"Generating tone: {format_frequency(frequency)} at {format_sample_rate(sample_rate)}", quiet) + + # Use basic_gen for core tone + recording = basic_gen.sine( + sample_rate=int(sample_rate), length=ns, frequency=frequency, amplitude=amplitude, baseband_phase=phase + ) + + if center_frequency: + recording._metadata["center_frequency"] = center_frequency + echo_verbose(f"Center Frequency: {format_frequency(center_frequency)}", verbose) + + # Post processing + chan_params = {"noise_power": noise_power, "path_gain": path_gain} + recording = apply_post_processing(recording, frequency_shift, channel_type, chan_params, verbose) + + # User metadata + metadata = apply_user_config_metadata(metadata) + metadata["signal_type"] = "tone" + for key, value in metadata.items(): + recording.update_metadata(key, value) + + fmt = get_output_format(output, format) + save_recording(recording, output, fmt, overwrite, verbose) + + +@generate.command() +@click.option("--noise-type", "-T", type=click.Choice(["gaussian", "uniform"]), default="gaussian", help="Noise type") +@click.option("--power", "-p", type=float, default=1.0, help="Signal power/variance") +@common_options +def noise( + sample_rate, + num_samples, + duration, + frequency_shift, + center_frequency, + channel_type, + noise_power, + path_gain, + output, + format, + overwrite, + metadata, + verbose, + quiet, + noise_type, + power, + **kwargs, +): + """Generate random noise.""" + + ns = resolve_length(sample_rate, num_samples, duration) + echo_progress(f"Generating {noise_type} noise...", quiet) + + if noise_type == "gaussian": + # AWGN + rms = np.sqrt(power) + recording = basic_gen.noise(sample_rate=int(sample_rate), length=ns, rms_power=rms) + else: + # Uniform + real = np.random.uniform(-1, 1, ns) + imag = np.random.uniform(-1, 1, ns) + a = np.sqrt(3 * power / 2) + data = a * (real + 1j * imag) + recording = Recording(data=data, metadata={"sample_rate": sample_rate}) + + recording._metadata["signal_type"] = "noise" + recording._metadata["noise_type"] = noise_type + + if center_frequency: + recording._metadata["center_frequency"] = center_frequency + + # Post processing + chan_params = {"noise_power": noise_power, "path_gain": path_gain} + recording = apply_post_processing(recording, frequency_shift, channel_type, chan_params, verbose) + + for key, value in apply_user_config_metadata(metadata).items(): + recording.update_metadata(key, value) + fmt = get_output_format(output, format) + save_recording(recording, output, fmt, overwrite, verbose) + + +@generate.command() +@click.option("--bandwidth", "-b", type=float, required=True, help="Chirp bandwidth (Hz)") +@click.option("--period", "-p", type=float, required=True, help="Chirp period (seconds)") +@click.option("--type", "chirp_type", type=click.Choice(["up", "down", "up_down"]), default="up", help="Chirp type") +@common_options +def chirp( + sample_rate, + num_samples, + duration, + frequency_shift, + center_frequency, + channel_type, + noise_power, + path_gain, + output, + format, + overwrite, + metadata, + verbose, + quiet, + bandwidth, + period, + chirp_type, + **kwargs, +): + """Generate LFM Chirp signal.""" + + ns = resolve_length(sample_rate, num_samples, duration) + + echo_progress(f"Generating {chirp_type} chirp ({format_frequency(bandwidth)}, {period}s)...", quiet) + + source = LFMJammingSource(sample_rate=sample_rate, bandwidth=bandwidth, chirp_period=period, chirp_type=chirp_type) + + recording = source.record(ns) + + recording._metadata["signal_type"] = "chirp" + recording._metadata["chirp_type"] = chirp_type + recording._metadata["bandwidth"] = bandwidth + recording._metadata["period"] = period + + if center_frequency: + recording._metadata["center_frequency"] = center_frequency + + # Post processing + chan_params = {"noise_power": noise_power, "path_gain": path_gain} + recording = apply_post_processing(recording, frequency_shift, channel_type, chan_params, verbose) + + for key, value in apply_user_config_metadata(metadata).items(): + recording.update_metadata(key, value) + fmt = get_output_format(output, format) + save_recording(recording, output, fmt, overwrite, verbose) + + +@generate.command() +@click.option("--frequency", "-f", type=float, default=1000.0, help="Frequency (Hz)") +@click.option("--amplitude", "-a", type=float, default=1.0, help="Amplitude") +@click.option("--duty-cycle", "-d", type=float, default=0.5, help="Duty cycle (0.0-1.0)") +@click.option("--phase", "-p", type=float, default=0.0, help="Phase shift (radians)") +@common_options +def square( + sample_rate, + num_samples, + duration, + frequency_shift, + center_frequency, + channel_type, + noise_power, + path_gain, + output, + format, + overwrite, + metadata, + verbose, + quiet, + frequency, + amplitude, + duty_cycle, + phase, + **kwargs, +): + """Generate Square wave.""" + + ns = resolve_length(sample_rate, num_samples, duration) + + echo_progress(f"Generating square wave: {format_frequency(frequency)}...", quiet) + + source = SquareSource( + frequency=frequency, sample_rate=sample_rate, amplitude=amplitude, duty_cycle=duty_cycle, phase_shift=phase + ) + + recording = source.record(ns) + + recording._metadata["signal_type"] = "square" + if center_frequency: + recording._metadata["center_frequency"] = center_frequency + + chan_params = {"noise_power": noise_power, "path_gain": path_gain} + recording = apply_post_processing(recording, frequency_shift, channel_type, chan_params, verbose) + + for key, value in apply_user_config_metadata(metadata).items(): + recording.update_metadata(key, value) + fmt = get_output_format(output, format) + save_recording(recording, output, fmt, overwrite, verbose) + + +@generate.command() +@click.option("--frequency", "-f", type=float, default=1000.0, help="Frequency (Hz)") +@click.option("--amplitude", "-a", type=float, default=1.0, help="Amplitude") +@click.option("--phase", "-p", type=float, default=0.0, help="Phase shift (radians)") +@common_options +def sawtooth( + sample_rate, + num_samples, + duration, + frequency_shift, + center_frequency, + channel_type, + noise_power, + path_gain, + output, + format, + overwrite, + metadata, + verbose, + quiet, + frequency, + amplitude, + phase, + **kwargs, +): + """Generate Sawtooth wave.""" + + ns = resolve_length(sample_rate, num_samples, duration) + + echo_progress(f"Generating sawtooth wave: {format_frequency(frequency)}...", quiet) + + source = SawtoothSource(frequency=frequency, sample_rate=sample_rate, amplitude=amplitude, phase_shift=phase) + + recording = source.record(ns) + + recording._metadata["signal_type"] = "sawtooth" + if center_frequency: + recording._metadata["center_frequency"] = center_frequency + + chan_params = {"noise_power": noise_power, "path_gain": path_gain} + recording = apply_post_processing(recording, frequency_shift, channel_type, chan_params, verbose) + + for key, value in apply_user_config_metadata(metadata).items(): + recording.update_metadata(key, value) + fmt = get_output_format(output, format) + save_recording(recording, output, fmt, overwrite, verbose) + + +def load_source(message_source, message_content, num_bits=None): + if num_bits is not None: + if message_source == "random": + return RandomBinarySource()((1, num_bits)) + elif message_source == "string": + if not message_content: + raise click.BadParameter("Message content required for string source") + return FileSourceBlock(message_content.encode("utf-8"), repeat=True)(num_bits).reshape(1, -1) + + elif message_source == "file": + if not message_content: + raise click.BadParameter("File path required for file source") + + p = Path(message_content) + if not p.exists(): + raise click.BadParameter(f"File not found: {p}") + + return FileSourceBlock(p.read_bytes(), repeat=True)(num_bits).reshape(1, -1) + else: + if message_source == "random": + return RandomBinarySource() # Infinite source + + elif message_source == "string": + if not message_content: + raise click.BadParameter("Message content required for string source") + return FileSourceBlock(message_content.encode("utf-8"), repeat=True) + + elif message_source == "file": + if not message_content: + raise click.BadParameter("File path required for file source") + + p = Path(message_content) + if not p.exists(): + raise click.BadParameter(f"File not found: {p}") + + return FileSourceBlock(p.read_bytes(), repeat=True) + + +def _run_mod_gen( + mod_type, + sample_rate, + symbols, + num_samples, + duration, + order, + symbol_rate, + filter_type, + filter_span, + filter_beta, + message_source, + message_content, + frequency_shift, + center_frequency, + channel_type, + noise_power, + path_gain, + output, + format, + overwrite, + metadata, + verbose, + quiet, +): + + # Resolve length + # If symbols provided, it drives. + # If not, use num_samples/duration to calculate symbols + + if symbol_rate is None: + # Try to infer? No, required. + raise click.BadParameter("Symbol rate required") + + sps = sample_rate / symbol_rate + if not sps.is_integer(): + sps_int = int(round(sps)) + if sps_int < 1: + sps_int = 1 + actual_sr = sps_int * symbol_rate + echo_progress(f"Warning: Non-integer samples per symbol ({sps:.4f}). Rounding to {sps_int}.", quiet) + echo_progress(f"Actual sample rate will be {format_sample_rate(actual_sr)}", quiet) + sps = int(sps_int) + sample_rate = actual_sr + else: + sps = int(sps) + + if symbols is None: + # Calc from duration/samples + ns = resolve_length(sample_rate, num_samples, duration) + symbols = int(np.ceil(ns / sps)) + + echo_progress(f"Generating {mod_type}-{order} ({symbols} symbols)...", quiet) + echo_verbose(f" Sample Rate: {format_sample_rate(sample_rate)} (SPS={sps})", verbose) + + bps = int(np.log2(order)) + total_samples = symbols * sps + + # Source + source = load_source(message_source, message_content, None) + + # Mapper and Pulse Shaping + mapper = Mapper(constellation_type=mod_type, num_bits_per_symbol=bps) + upsampler = Upsampling(factor=sps) + + # Filter + if filter_type == "rrc": + filter_block = RootRaisedCosineFilter(span_in_symbols=filter_span, upsampling_factor=sps, beta=filter_beta) + elif filter_type == "rc": + filter_block = RaisedCosineFilter(span_in_symbols=filter_span, upsampling_factor=sps, beta=filter_beta) + elif filter_type == "gaussian": + raise click.ClickException("Gaussian filter not supported yet") + else: + filter_block = None + + # Generate base signal + mapper.connect_input([source]) + upsampler.connect_input([mapper]) + if filter_block: + filter_block.connect_input([upsampler]) + base_recording = filter_block.record(total_samples) + else: + base_recording = upsampler.record(total_samples) + + # Update metadata + for key, value in { + "modulation": mod_type, + "order": order, + "symbol_rate": symbol_rate, + "symbols": symbols, + "filter": filter_type, + }.items(): + base_recording.update_metadata(key, value) + + if center_frequency: + base_recording.update_metadata("center_frequency", center_frequency) + + # Post Processing + chan_params = {"noise_power": noise_power, "path_gain": path_gain} + final_recording = apply_post_processing(base_recording, frequency_shift, channel_type, chan_params, verbose) + + # Trim if explicit num_samples was requested and we generated more (due to symbol alignment) + target_ns = resolve_length(sample_rate, num_samples, duration) + if target_ns and len(final_recording.data[0]) > target_ns: + # Only trim if difference is significant? + # User usually wants exact length if specified. + if num_samples or duration: # If explicitly asked for length + final_recording = final_recording.trim(target_ns) + + for key, value in apply_user_config_metadata(metadata).items(): + final_recording.update_metadata(key, value) + fmt = get_output_format(output, format) + save_recording(final_recording, output, fmt, overwrite, verbose) + + +@generate.command() +@click.option("--symbols", "-N", type=int, help="Number of symbols") +@click.option("--order", "-M", type=int, required=True, help="QAM Order (4, 16, 32, 64, 128, 256, 1024)") +@click.option("--symbol-rate", "-r", type=float, required=True, help="Symbol rate in Hz") +@click.option( + "--filter", + "filter_type", + type=click.Choice(["rrc", "rc", "gaussian", "none"]), + default="rrc", + help="Pulse shaping filter", +) +@click.option("--filter-span", type=int, default=6, help="Filter span in symbols") +@click.option("--filter-beta", type=float, default=0.35, help="Filter roll-off factor") +@click.option( + "--message-source", type=click.Choice(["random", "file", "string"]), default="random", help="Data source" +) +@click.option("--message-content", help="File path or string content") +@common_options +def qam( + sample_rate, + num_samples, + duration, + frequency_shift, + center_frequency, + channel_type, + noise_power, + path_gain, + output, + format, + overwrite, + metadata, + verbose, + quiet, + rician_k, + multipath_paths, + multipath_max_delay, + doppler_freq, + iq_amp_imbalance, + iq_phase_imbalance, + iq_dc_offset, + phase_noise, + gain_fluctuation, + compression, + symbols, + order, + symbol_rate, + filter_type, + filter_span, + filter_beta, + message_source, + message_content, + **kwargs, +): + """Generate QAM modulated signal.""" + + # Determine modulation type (Normal QAM vs Cross QAM) + if order in [32, 128]: + mod_type = "CROSS_QAM" + else: + mod_type = "QAM" + + _run_mod_gen( + mod_type, + sample_rate, + symbols, + num_samples, + duration, + order, + symbol_rate, + filter_type, + filter_span, + filter_beta, + message_source, + message_content, + frequency_shift, + center_frequency, + channel_type, + noise_power, + path_gain, + output, + format, + overwrite, + metadata, + verbose, + quiet, + ) + + +@generate.command() +@click.option("--symbols", "-N", type=int, help="Number of symbols") +@click.option("--order", "-M", type=int, required=True, help="APSK Order (16, 32, 64, 128, 256)") +@click.option("--symbol-rate", "-r", type=float, required=True, help="Symbol rate in Hz") +@click.option( + "--filter", + "filter_type", + type=click.Choice(["rrc", "rc", "gaussian", "none"]), + default="rrc", + help="Pulse shaping filter", +) +@click.option("--filter-span", type=int, default=6, help="Filter span in symbols") +@click.option("--filter-beta", type=float, default=0.35, help="Filter roll-off factor") +@click.option( + "--message-source", type=click.Choice(["random", "file", "string"]), default="random", help="Data source" +) +@click.option("--message-content", help="File path or string content") +@common_options +def apsk( + sample_rate, + num_samples, + duration, + frequency_shift, + center_frequency, + channel_type, + noise_power, + path_gain, + output, + format, + overwrite, + metadata, + verbose, + quiet, + rician_k, + multipath_paths, + multipath_max_delay, + doppler_freq, + iq_amp_imbalance, + iq_phase_imbalance, + iq_dc_offset, + phase_noise, + gain_fluctuation, + compression, + symbols, + order, + symbol_rate, + filter_type, + filter_span, + filter_beta, + message_source, + message_content, + **kwargs, +): + """Generate APSK modulated signal.""" + _run_mod_gen( + "APSK", + sample_rate, + symbols, + num_samples, + duration, + order, + symbol_rate, + filter_type, + filter_span, + filter_beta, + message_source, + message_content, + frequency_shift, + center_frequency, + channel_type, + noise_power, + path_gain, + output, + format, + overwrite, + metadata, + verbose, + quiet, + ) + + +@generate.command() +@click.option("--symbols", "-N", type=int, help="Number of symbols") +@click.option("--order", "-M", type=int, required=True, help="PAM Order (4, 8, 16)") +@click.option("--symbol-rate", "-r", type=float, required=True, help="Symbol rate in Hz") +@click.option( + "--filter", + "filter_type", + type=click.Choice(["rrc", "rc", "gaussian", "none"]), + default="rrc", + help="Pulse shaping filter", +) +@click.option("--filter-span", type=int, default=6, help="Filter span in symbols") +@click.option("--filter-beta", type=float, default=0.35, help="Filter roll-off factor") +@click.option( + "--message-source", type=click.Choice(["random", "file", "string"]), default="random", help="Data source" +) +@click.option("--message-content", help="File path or string content") +@common_options +def pam( + sample_rate, + num_samples, + duration, + frequency_shift, + center_frequency, + channel_type, + noise_power, + path_gain, + output, + format, + overwrite, + metadata, + verbose, + quiet, + rician_k, + multipath_paths, + multipath_max_delay, + doppler_freq, + iq_amp_imbalance, + iq_phase_imbalance, + iq_dc_offset, + phase_noise, + gain_fluctuation, + compression, + symbols, + order, + symbol_rate, + filter_type, + filter_span, + filter_beta, + message_source, + message_content, + **kwargs, +): + """Generate PAM modulated signal.""" + _run_mod_gen( + "PAM", + sample_rate, + symbols, + num_samples, + duration, + order, + symbol_rate, + filter_type, + filter_span, + filter_beta, + message_source, + message_content, + frequency_shift, + center_frequency, + channel_type, + noise_power, + path_gain, + output, + format, + overwrite, + metadata, + verbose, + quiet, + ) + + +@generate.command() +@click.option("--symbols", "-N", type=int, help="Number of symbols") +@click.option("--order", "-M", type=int, default=2, help="FSK Order (2, 4, 8)") +@click.option("--symbol-rate", "-r", type=float, required=True, help="Symbol rate in Hz") +@click.option("--freq-spacing", type=float, help="Frequency spacing (Hz)") +@click.option("--modulation-index", "-h", type=float, help="Modulation Index (alternative to spacing)") +@click.option( + "--message-source", type=click.Choice(["random", "file", "string"]), default="random", help="Data source" +) +@click.option("--message-content", help="File path or string content") +@common_options +def fsk( + sample_rate, + num_samples, + duration, + frequency_shift, + center_frequency, + channel_type, + noise_power, + path_gain, + output, + format, + overwrite, + metadata, + verbose, + quiet, + rician_k, + multipath_paths, + multipath_max_delay, + doppler_freq, + iq_amp_imbalance, + iq_phase_imbalance, + iq_dc_offset, + phase_noise, + gain_fluctuation, + compression, + symbols, + order, + symbol_rate, + freq_spacing, + modulation_index, + message_source, + message_content, + **kwargs, +): + """Generate FSK modulated signal.""" + + # FSK uses FSKModulator which is a standalone Source/Modulator block? No, it's a Modulator. + # Takes bits input. + + # Determine spacing + if freq_spacing is None: + if modulation_index is None: + modulation_index = 1.0 # Default + freq_spacing = modulation_index * symbol_rate + + # Samples per symbol + sps = sample_rate / symbol_rate # FSKModulator takes sampling_freq and symbol_duration (1/rate) + symbol_duration = 1.0 / symbol_rate + + # Resolve length + ns = resolve_length(sample_rate, num_samples, duration, symbols, sps) + if symbols is None: + symbols = int(np.ceil(ns / sps)) + + echo_progress(f"Generating {order}-FSK (Spacing={format_frequency(freq_spacing)})...", quiet) + + # Bits + bps = int(np.log2(order)) + num_bits = symbols * bps + + # Source + source_bits = load_source(message_source, message_content, num_bits) + + # Modulator + mod = FSKModulator( + num_bits_per_symbol=bps, + frequency_spacing=freq_spacing, + symbol_duration=symbol_duration, + sampling_frequency=sample_rate, + ) + + # Generate + samples = mod(source_bits) + # Flatten + samples = samples.flatten()[:ns] + + recording = Recording(data=samples, metadata={"sample_rate": sample_rate}) + recording._metadata.update( + { + "modulation": "FSK", + "order": order, + "symbol_rate": symbol_rate, + "freq_spacing": freq_spacing, + "mod_index": modulation_index if modulation_index else freq_spacing / symbol_rate, + } + ) + + if center_frequency: + recording._metadata["center_frequency"] = center_frequency + + chan_params = { + "noise_power": noise_power, + "path_gain": path_gain, + "rician_k": rician_k, + "multipath_paths": multipath_paths, + "multipath_max_delay": multipath_max_delay, + "doppler_freq": doppler_freq, + "iq_amp_imbalance": iq_amp_imbalance, + "iq_phase_imbalance": iq_phase_imbalance, + "iq_dc_offset": iq_dc_offset, + "phase_noise": phase_noise, + "gain_fluctuation": gain_fluctuation, + "compression": compression, + } + + recording = apply_post_processing(recording, frequency_shift, channel_type, chan_params, verbose) + + for key, value in apply_user_config_metadata(metadata).items(): + recording.update_metadata(key, value) + fmt = get_output_format(output, format) + save_recording(recording, output, fmt, overwrite, verbose) + + +@generate.command() +@click.option("--symbol-rate", "-r", type=float, required=True, help="Symbol rate in Hz") +@click.option( + "--message-source", type=click.Choice(["random", "file", "string"]), default="random", help="Data source" +) +@click.option("--message-content", help="File path or string content") +@common_options +def ook( + sample_rate, + num_samples, + duration, + frequency_shift, + center_frequency, + channel_type, + noise_power, + path_gain, + output, + format, + overwrite, + metadata, + verbose, + quiet, + rician_k, + multipath_paths, + multipath_max_delay, + doppler_freq, + iq_amp_imbalance, + iq_phase_imbalance, + iq_dc_offset, + phase_noise, + gain_fluctuation, + compression, + symbol_rate, + message_source, + message_content, + **kwargs, +): + """Generate On-Off Keying (OOK) signal.""" + + sps = int(sample_rate / symbol_rate) + ns = resolve_length(sample_rate, num_samples, duration) + + echo_progress("Generating OOK...", quiet) + + # Source Block + source = load_source(message_source, message_content, None) + + # OOK Modulator + mod = OOKModulator(source, samples_per_symbol=sps) + recording = mod.record(ns) + recording._metadata["sample_rate"] = sample_rate + recording._metadata["modulation"] = "OOK" + + if center_frequency: + recording._metadata["center_frequency"] = center_frequency + + chan_params = { + "noise_power": noise_power, + "path_gain": path_gain, + "rician_k": rician_k, + "multipath_paths": multipath_paths, + "multipath_max_delay": multipath_max_delay, + "doppler_freq": doppler_freq, + "iq_amp_imbalance": iq_amp_imbalance, + "iq_phase_imbalance": iq_phase_imbalance, + "iq_dc_offset": iq_dc_offset, + "phase_noise": phase_noise, + "gain_fluctuation": gain_fluctuation, + "compression": compression, + } + + recording = apply_post_processing(recording, frequency_shift, channel_type, chan_params, verbose) + + for key, value in apply_user_config_metadata(metadata).items(): + recording.update_metadata(key, value) + fmt = get_output_format(output, format) + save_recording(recording, output, fmt, overwrite, verbose) + + +@generate.command() +@click.option("--symbol-rate", "-r", type=float, required=True, help="Symbol rate in Hz") +@click.option( + "--message-source", type=click.Choice(["random", "file", "string"]), default="random", help="Data source" +) +@click.option("--message-content", help="File path or string content") +@common_options +def oqpsk( + sample_rate, + num_samples, + duration, + frequency_shift, + center_frequency, + channel_type, + noise_power, + path_gain, + output, + format, + overwrite, + metadata, + verbose, + quiet, + rician_k, + multipath_paths, + multipath_max_delay, + doppler_freq, + iq_amp_imbalance, + iq_phase_imbalance, + iq_dc_offset, + phase_noise, + gain_fluctuation, + compression, + symbol_rate, + message_source, + message_content, + **kwargs, +): + """Generate Offset QPSK (OQPSK) signal.""" + + sps = int(sample_rate / symbol_rate) + ns = resolve_length(sample_rate, num_samples, duration) + + echo_progress("Generating OQPSK...", quiet) + + # Source Block + source = load_source(message_source, message_content, None) + + # OQPSK Modulator + mod = OQPSKModulator(source, samples_per_symbol=sps) + recording = mod.record(ns) + recording._metadata["sample_rate"] = sample_rate + recording._metadata["modulation"] = "OQPSK" + + if center_frequency: + recording._metadata["center_frequency"] = center_frequency + + chan_params = { + "noise_power": noise_power, + "path_gain": path_gain, + "rician_k": rician_k, + "multipath_paths": multipath_paths, + "multipath_max_delay": multipath_max_delay, + "doppler_freq": doppler_freq, + "iq_amp_imbalance": iq_amp_imbalance, + "iq_phase_imbalance": iq_phase_imbalance, + "iq_dc_offset": iq_dc_offset, + "phase_noise": phase_noise, + "gain_fluctuation": gain_fluctuation, + "compression": compression, + } + + recording = apply_post_processing(recording, frequency_shift, channel_type, chan_params, verbose) + + for key, value in apply_user_config_metadata(metadata).items(): + recording.update_metadata(key, value) + fmt = get_output_format(output, format) + save_recording(recording, output, fmt, overwrite, verbose) + + +@generate.command() +@click.option("--symbol-rate", "-r", type=float, required=True, help="Symbol rate in Hz") +@click.option("--bt", type=float, default=0.3, help="Bandwidth-Time product (e.g., 0.3, 0.5)") +@click.option( + "--message-source", type=click.Choice(["random", "file", "string"]), default="random", help="Data source" +) +@click.option("--message-content", help="File path or string content") +@common_options +def gmsk( + sample_rate, + num_samples, + duration, + frequency_shift, + center_frequency, + channel_type, + noise_power, + path_gain, + output, + format, + overwrite, + metadata, + verbose, + quiet, + rician_k, + multipath_paths, + multipath_max_delay, + doppler_freq, + iq_amp_imbalance, + iq_phase_imbalance, + iq_dc_offset, + phase_noise, + gain_fluctuation, + compression, + symbol_rate, + bt, + message_source, + message_content, + **kwargs, +): + """Generate GMSK modulated signal.""" + + sps = int(sample_rate / symbol_rate) + ns = resolve_length(sample_rate, num_samples, duration) + + echo_progress(f"Generating GMSK (BT={bt})...", quiet) + + # Source Block + source = load_source(message_source, message_content, None) + + # GMSK Modulator + mod = GMSKModulator(source, samples_per_symbol=sps, bt=bt) + recording = mod.record(ns) + recording._metadata["sample_rate"] = sample_rate + recording._metadata["modulation"] = "GMSK" + recording._metadata["bt_product"] = bt + + if center_frequency: + recording._metadata["center_frequency"] = center_frequency + + chan_params = { + "noise_power": noise_power, + "path_gain": path_gain, + "rician_k": rician_k, + "multipath_paths": multipath_paths, + "multipath_max_delay": multipath_max_delay, + "doppler_freq": doppler_freq, + "iq_amp_imbalance": iq_amp_imbalance, + "iq_phase_imbalance": iq_phase_imbalance, + "iq_dc_offset": iq_dc_offset, + "phase_noise": phase_noise, + "gain_fluctuation": gain_fluctuation, + "compression": compression, + } + + recording = apply_post_processing(recording, frequency_shift, channel_type, chan_params, verbose) + + for key, value in apply_user_config_metadata(metadata).items(): + recording.update_metadata(key, value) + fmt = get_output_format(output, format) + save_recording(recording, output, fmt, overwrite, verbose) + + +@generate.command() +@click.option("--symbols", "-N", type=int, help="Number of symbols") +@click.option("--order", "-M", type=int, required=True, help="PSK Order (2, 4, 8)") +@click.option("--symbol-rate", "-r", type=float, required=True, help="Symbol rate in Hz") +@click.option( + "--filter", + "filter_type", + type=click.Choice(["rrc", "rc", "gaussian", "none"]), + default="rrc", + help="Pulse shaping filter", +) +@click.option("--filter-span", type=int, default=6, help="Filter span in symbols") +@click.option("--filter-beta", type=float, default=0.35, help="Filter roll-off factor") +@click.option( + "--message-source", type=click.Choice(["random", "file", "string"]), default="random", help="Data source" +) +@click.option("--message-content", help="File path or string content") +@common_options +def psk( + sample_rate, + num_samples, + duration, + frequency_shift, + center_frequency, + channel_type, + noise_power, + path_gain, + output, + format, + overwrite, + metadata, + verbose, + quiet, + symbols, + order, + symbol_rate, + filter_type, + filter_span, + filter_beta, + message_source, + message_content, + **kwargs, +): + """Generate PSK modulated signal.""" + _run_mod_gen( + "PSK", + sample_rate, + symbols, + num_samples, + duration, + order, + symbol_rate, + filter_type, + filter_span, + filter_beta, + message_source, + message_content, + frequency_shift, + center_frequency, + channel_type, + noise_power, + path_gain, + output, + format, + overwrite, + metadata, + verbose, + quiet, + ) + + +@generate.command() +@click.option("--bandwidth", "-b", type=int, required=True, help="Bandwidth in MHz (e.g. 10, 20)") +@click.option("--mu", "-u", type=int, default=1, help="Numerology (0-3)") +@click.option("--frames", type=int, default=1, help="Number of 10ms frames") +@click.option("--ssb/--no-ssb", default=True, help="Enable SSB") +@common_options +def nr5g( + sample_rate, + frequency_shift, + center_frequency, + channel_type, + noise_power, + path_gain, + output, + format, + overwrite, + metadata, + verbose, + quiet, + bandwidth, + mu, + frames, + ssb, + **kwargs, +): + """Generate 5G NR frame.""" + + if not HAS_NR5G: + raise click.ClickException("5G NR Generator not available (missing dependencies or module)") + + echo_progress(f"Generating 5G NR ({bandwidth} MHz, mu={mu}, {frames} frames)...", quiet) + + # NR5GGenerator parameters + # It determines sample rate based on bandwidth/mu/fr? + # nr_ofdm_params(bandwidth_mhz, mu, fr) returns fs. + # We should verify if user supplied sample_rate matches or we should ignore user sample_rate? + # Or we resample? + # The generator has fixed fs for a given BW/mu config usually. + # Let's instantiate it and see its fs. + + gen = NR5GGenerator(bandwidth_mhz=bandwidth, mu=mu, frames_per_recording=frames, ssb=ssb) + + native_fs = gen.fs + if sample_rate and abs(sample_rate - native_fs) > 1.0: + echo_progress( + message=( + f"Warning: Requested sample rate {format_sample_rate(sample_rate)} " + f"differs from native NR rate {format_sample_rate(native_fs)}." + ), + quiet=quiet, + ) + echo_progress("Output will be at native rate.", quiet) + # If we really wanted to support arbitrary rate, we'd need resampling. + # For now, just warn and use native. + + recording = gen.record(batch_size=1) + + recording._metadata["signal_type"] = "nr5g" + + if center_frequency: + recording._metadata["center_frequency"] = center_frequency + + # Post processing + chan_params = {"noise_power": noise_power, "path_gain": path_gain} + recording = apply_post_processing(recording, frequency_shift, channel_type, chan_params, verbose) + + for key, value in apply_user_config_metadata(metadata).items(): + recording.update_metadata(key, value) + fmt = get_output_format(output, format) + save_recording(recording, output, fmt, overwrite, verbose)