ria-toolkit-oss/src/ria_toolkit_oss_cli/ria_toolkit_oss/generate.py

1344 lines
40 KiB
Python
Raw Normal View History

"""Generate command - Generate synthetic signals."""
from pathlib import Path
from typing import Optional
import click
import numpy as np
M
2025-12-11 11:13:27 -05:00
import yaml
M
2025-12-11 16:53:26 -05:00
import ria_toolkit_oss.signal.basic_signal_generator as basic_gen
from ria_toolkit_oss.data import Recording
G
2025-12-11 15:12:01 -05:00
from ria_toolkit_oss.signal.block_generator.basic import FrequencyShift
M
2025-12-11 16:53:26 -05:00
from ria_toolkit_oss.signal.block_generator.continuous_modulation.fsk_modulator import (
FSKModulator,
)
G
2025-12-11 15:12:01 -05:00
from ria_toolkit_oss.signal.block_generator.data_types import DataType
from ria_toolkit_oss.signal.block_generator.mapping.apsk_mapper import _APSKMapper
M
2025-12-11 16:53:26 -05:00
from ria_toolkit_oss.signal.block_generator.mapping.cross_qam_mapper import (
_CrossQAMMapper,
)
M
2025-12-11 16:53:26 -05:00
from ria_toolkit_oss.signal.block_generator.mapping.mapper import Mapper
G
2025-12-11 15:12:01 -05:00
from ria_toolkit_oss.signal.block_generator.pulse_shaping import (
RaisedCosineFilter,
RootRaisedCosineFilter,
Upsampling,
)
G
2025-12-11 15:12:01 -05:00
from ria_toolkit_oss.signal.block_generator.source import (
BinarySource,
M
2025-12-11 16:53:26 -05:00
LFMChirpSource,
RecordingSource,
SawtoothSource,
SquareSource,
)
# Block Generator Imports
G
2025-12-11 15:12:01 -05:00
from ria_toolkit_oss.signal.block_generator.source_block import SourceBlock
M
2025-12-11 16:53:26 -05:00
from ria_toolkit_oss.signal.block_generator.symbol_modulation import (
GMSKModulator,
OOKModulator,
OQPSKModulator,
)
M
2026-01-30 17:43:10 -05:00
from ria_toolkit_oss.transforms.iq_impairments import iq_imbalance
M
2025-12-19 11:25:06 -05:00
from ria_toolkit_oss_cli.ria_toolkit_oss.common import (
echo_progress,
echo_verbose,
format_frequency,
format_sample_rate,
parse_metadata_args,
save_recording,
)
from ria_toolkit_oss_cli.ria_toolkit_oss.config import load_user_config
# Extend Mapper to support new types
def _create_extended_mapper(self):
if self.constellation_type.upper() == "APSK":
return _APSKMapper(self.num_bits_per_symbol, self.normalize, self.use_gray_code)
elif self.constellation_type.upper() == "CROSS_QAM":
return _CrossQAMMapper(self.num_bits_per_symbol, self.normalize, self.use_gray_code)
else:
# Original factory
return self._original_create_constellation_mapper()
# Monkey patch Mapper to support new types without modifying original file
Mapper._original_create_constellation_mapper = Mapper._create_constellation_mapper
Mapper._create_constellation_mapper = _create_extended_mapper
def load_config_options(ctx, param, value):
"""Callback to load options from YAML config file."""
if not value:
return None
try:
with open(value, "r") as f:
config = yaml.safe_load(f)
# Store config in context for other commands to access
ctx.default_map = config
return value
except Exception as e:
raise click.BadParameter(f"Error loading config file: {e}")
def apply_user_config_metadata(metadata_tuple):
"""Apply user config metadata and merge with CLI metadata.
Args:
metadata_tuple: Tuple of metadata KEY=VALUE strings from CLI
Returns:
dict: Merged metadata dictionary
"""
# Load user config
user_config = load_user_config()
metadata_dict = {}
# Apply user config metadata (if user config exists)
if user_config:
# Add standard metadata fields from config
for key in ["author", "organization", "project", "location", "testbed"]:
if key in user_config:
metadata_dict[key] = user_config[key]
# Add SigMF fields from config
if "sigmf" in user_config:
sigmf = user_config["sigmf"]
for key in ["license", "hw", "dataset"]:
if key in sigmf:
metadata_dict[key] = sigmf[key]
# CLI metadata overrides everything
if metadata_tuple:
metadata_dict.update(parse_metadata_args(metadata_tuple))
return metadata_dict
def get_output_format(output: Optional[str], format_opt: Optional[str]) -> str:
"""Determine output format from filename or option."""
if format_opt:
return format_opt
if not output:
return "sigmf" # Default to sigmf for better metadata support
ext = Path(output).suffix.lower()
if ext in [".sigmf", ".sigmf-data", ".sigmf-meta"]:
return "sigmf"
elif ext == ".npy":
return "npy"
elif ext == ".wav":
return "wav"
elif ext == ".blue":
return "blue"
else:
return "sigmf"
class FileSourceBlock(SourceBlock):
"""Generates bits from a file or bytes."""
def __init__(self, data: bytes, repeat: bool = True):
self.data = data
self.repeat = repeat
# Convert to bits
bits = np.unpackbits(np.frombuffer(data, dtype=np.uint8))
self.bits = bits.astype(np.float32) # SourceBlock expects float32 bits (0.0, 1.0)
self.idx = 0
M
2025-12-11 16:53:26 -05:00
@property
def input_type(self) -> DataType:
return [DataType.NONE]
@property
def output_type(self) -> DataType:
return DataType.BITS
def __call__(self, num_samples: int) -> np.ndarray:
out = np.zeros(num_samples, dtype=np.float32)
filled = 0
while filled < num_samples:
remaining = num_samples - filled
available = len(self.bits) - self.idx
take = min(remaining, available)
out[filled : filled + take] = self.bits[self.idx : self.idx + take]
self.idx += take
filled += take
if self.idx >= len(self.bits):
if self.repeat:
self.idx = 0
else:
# Pad with zeros if not repeating
break
return out
def apply_post_processing(
recording: Recording, frequency_shift: float, add_noise: str, channel_params: dict, verbose: bool
) -> Recording:
"""Apply frequency shift and channel models to a recording."""
# 1. Frequency Shift (Pre-channel)
if frequency_shift != 0:
echo_verbose(f"Applying frequency shift: {format_frequency(frequency_shift)}", verbose)
# Use simple phase shift if only 1 block? No, basic gen FrequencyShift
# We can use RecordingSource + FrequencyShift + record()
source = RecordingSource(recording)
fs_block = FrequencyShift(shift_frequency=frequency_shift, sampling_rate=recording.sample_rate)
fs_block.input = [source]
num = len(recording.data[0]) if recording.n_chan > 0 else len(recording.data)
# get_samples
processed = fs_block.get_samples(num)
recording = Recording(data=processed, metadata=recording.metadata)
# 2. IQ Imbalance
amp = channel_params.get("iq_amp_imbalance")
phase = channel_params.get("iq_phase_imbalance")
dc = channel_params.get("iq_dc_offset")
if amp or phase or dc:
echo_verbose(f"Applying IQ Imbalance (Amp={amp}dB, Phase={phase}rad, DC={dc})", verbose)
recording = iq_imbalance(
recording,
amplitude_imbalance=(
amp if amp is not None else 0
), # iq_imbalance defaults to 1.5? We want 0 if not set but one of others is set.
phase_imbalance=phase if phase is not None else 0,
dc_offset=dc if dc is not None else 0,
)
# 3. AWGN (Final stage usually)
2026-04-01 11:57:59 -04:00
if add_noise:
npow = channel_params.get("noise_power", 0.1)
echo_verbose(f"Applying AWGN (Power={npow})", verbose)
# Use AWGNChannel block logic directly
noise_std = np.sqrt(npow / 2)
noise = noise_std * (np.random.randn(*recording.data.shape) + 1j * np.random.randn(*recording.data.shape))
recording = Recording(data=recording.data + noise, metadata=recording.metadata)
return recording
@click.group()
def generate():
"""Generate synthetic signals.
M
2025-12-11 11:13:27 -05:00
\b
Examples:
F
Port annotation system from utils and fix ria package imports Annotations package (new): - Add threshold_qualifier with 3-pass hysteresis detector (Pass 1: strong bursts, Pass 2: weak residual bursts, Pass 3: macro-window faint burst detection), auto window_size scaled to 1ms, channel selection, and stable noise_floor baseline throughout - Add energy_detector, cusum_annotator, parallel_signal_separator, qualify_slice, signal_isolation, annotation_transforms - Add __init__.py exporting the four functions used by the CLI - Fix all imports from utils.data → ria_toolkit_oss.datatypes CLI annotate command (new): - Port full annotate CLI from utils including list, add, remove, clear, energy, cusum, threshold, and separate subcommands - Fix imports from utils.* → ria_toolkit_oss.* and utils_cli.* → ria_toolkit_oss_cli.* - Safe overwrite logic: _annotated files always writable, originals protected; --overwrite writes in-place only on _annotated inputs CLI view command: - Add 'annotations' as a valid --type, wiring view_annotations from view_signal view_signal.py: - Add view_annotations function with blue/purple alternating palette and threshold %-sorted drawing order (lower % renders on top) recording.py (datatypes): - Fix lazy imports in to_wav() and to_blue() from utils.io → ria_toolkit_oss.io io/recording.py: - Add compatibility shim in from_npy to remap utils.data.annotation.Annotation to ria_toolkit_oss.datatypes.annotation.Annotation when loading .npy files pickled by the utils package
2026-03-31 13:34:00 -04:00
ria synth chirp -b 1e6 -p 0.01 -s 10e6 -o chirp_basic.sigmf
ria synth fsk -M 2 -r 100e3 -s 2e6 -o fsk2_basic.sigmf
M
2025-12-11 11:13:27 -05:00
"""
pass
def common_options(f):
"""Decorator for common options."""
f = click.option("--sample-rate", "-s", type=float, required=True, help="Sample rate in Hz")(f)
f = click.option("--num-samples", "-n", type=int, help="Number of samples")(f)
f = click.option("--duration", "-t", type=float, help="Duration in seconds (alternative to --num-samples)")(f)
f = click.option("--frequency-shift", type=float, default=0.0, help="Digital frequency shift from baseband (Hz)")(
f
)
f = click.option("--center-frequency", "-fc", type=float, help="Metadata center frequency (Hz)")(f)
f = click.option("--add-noise", is_flag=True, help="Add noise to signal")(f)
f = click.option("--noise-power", type=float, default=0.1, help="Noise power (variance) for AWGN")(f)
f = click.option("--path-gain", type=float, default=0.0, help="Path gain (dB) for Rayleigh")(f)
f = click.option("--output", "-o", required=True, help="Output filename")(f)
f = click.option("--format", "-F", type=click.Choice(["npy", "sigmf", "wav", "blue"]), help="Output format")(f)
# Impairment options
f = click.option("--multipath-paths", type=int, help="Multipath: Number of paths")(f)
f = click.option("--multipath-max-delay", type=float, help="Multipath: Max delay (s)")(f)
f = click.option("--iq-amp-imbalance", type=float, help="IQ Imbalance: Amplitude (dB)")(f)
f = click.option("--iq-phase-imbalance", type=float, help="IQ Imbalance: Phase (rad)")(f)
f = click.option("--iq-dc-offset", type=float, help="IQ Imbalance: DC Offset")(f)
f = click.option(
"--config",
"-c",
callback=load_config_options,
is_eager=True,
expose_value=False,
type=click.Path(exists=True),
help="Load parameters from YAML",
)(f)
f = click.option("--overwrite", "-w", is_flag=True, help="Overwrite existing file")(f)
f = click.option("--metadata", "-m", multiple=True, help="Add metadata KEY=VALUE")(f)
f = click.option("--verbose", "-v", is_flag=True, help="Verbose output")(f)
f = click.option("--quiet", "-q", is_flag=True, help="Suppress output")(f)
return f
def resolve_length(sample_rate, num_samples, duration, symbols=None, sps=None):
"""Resolve generation length."""
if symbols is not None and sps is not None:
# Modulation specific
if num_samples:
# If both provided, check consistency or prefer num_samples?
# We'll treat symbols as the driver if provided.
pass
return int(symbols * sps)
if num_samples:
return int(num_samples)
if duration:
return int(duration * sample_rate)
# Default
return 10000
@generate.command()
@click.option("--frequency", "-f", type=float, default=1000.0, help="Tone frequency relative to carrier (Hz)")
@click.option("--amplitude", "-a", type=float, default=1.0, help="Amplitude (0.0-1.0)")
@click.option("--phase", "-p", type=float, default=0.0, help="Initial phase in radians")
@common_options
def tone(
sample_rate,
num_samples,
duration,
frequency_shift,
center_frequency,
add_noise,
noise_power,
path_gain,
output,
format,
overwrite,
metadata,
verbose,
quiet,
frequency,
amplitude,
phase,
**kwargs,
):
"""Generate a complex tone."""
ns = resolve_length(sample_rate, num_samples, duration)
echo_progress(f"Generating tone: {format_frequency(frequency)} at {format_sample_rate(sample_rate)}", quiet)
# Use basic_gen for core tone
recording = basic_gen.sine(
sample_rate=int(sample_rate), length=ns, frequency=frequency, amplitude=amplitude, baseband_phase=phase
)
if center_frequency:
recording._metadata["center_frequency"] = center_frequency
echo_verbose(f"Center Frequency: {format_frequency(center_frequency)}", verbose)
# Post processing
chan_params = {"noise_power": noise_power, "path_gain": path_gain}
recording = apply_post_processing(recording, frequency_shift, add_noise, chan_params, verbose)
# User metadata
metadata = apply_user_config_metadata(metadata)
metadata["signal_type"] = "tone"
for key, value in metadata.items():
recording.update_metadata(key, value)
fmt = get_output_format(output, format)
save_recording(recording, output, fmt, overwrite, verbose)
@generate.command()
@click.option("--noise-type", "-T", type=click.Choice(["gaussian", "uniform"]), default="gaussian", help="Noise type")
@click.option("--power", "-p", type=float, default=1.0, help="Signal power/variance")
@common_options
def noise(
sample_rate,
num_samples,
duration,
frequency_shift,
center_frequency,
add_noise,
noise_power,
path_gain,
output,
format,
overwrite,
metadata,
verbose,
quiet,
noise_type,
power,
**kwargs,
):
"""Generate random noise."""
ns = resolve_length(sample_rate, num_samples, duration)
echo_progress(f"Generating {noise_type} noise...", quiet)
if noise_type == "gaussian":
# AWGN
rms = np.sqrt(power)
recording = basic_gen.noise(sample_rate=int(sample_rate), length=ns, rms_power=rms)
else:
# Uniform
real = np.random.uniform(-1, 1, ns)
imag = np.random.uniform(-1, 1, ns)
a = np.sqrt(3 * power / 2)
data = a * (real + 1j * imag)
recording = Recording(data=data, metadata={"sample_rate": sample_rate})
recording._metadata["signal_type"] = "noise"
recording._metadata["noise_type"] = noise_type
if center_frequency:
recording._metadata["center_frequency"] = center_frequency
# Post processing
chan_params = {"noise_power": noise_power, "path_gain": path_gain}
recording = apply_post_processing(recording, frequency_shift, add_noise, chan_params, verbose)
for key, value in apply_user_config_metadata(metadata).items():
recording.update_metadata(key, value)
fmt = get_output_format(output, format)
save_recording(recording, output, fmt, overwrite, verbose)
@generate.command()
@click.option("--bandwidth", "-b", type=float, required=True, help="Chirp bandwidth (Hz)")
@click.option("--period", "-p", type=float, required=True, help="Chirp period (seconds)")
@click.option("--type", "chirp_type", type=click.Choice(["up", "down", "up_down"]), default="up", help="Chirp type")
@common_options
def chirp(
sample_rate,
num_samples,
duration,
frequency_shift,
center_frequency,
add_noise,
noise_power,
path_gain,
output,
format,
overwrite,
metadata,
verbose,
quiet,
bandwidth,
period,
chirp_type,
**kwargs,
):
"""Generate LFM Chirp signal."""
ns = resolve_length(sample_rate, num_samples, duration)
echo_progress(f"Generating {chirp_type} chirp ({format_frequency(bandwidth)}, {period}s)...", quiet)
M
2025-12-11 11:13:27 -05:00
source = LFMChirpSource(sample_rate=sample_rate, bandwidth=bandwidth, chirp_period=period, chirp_type=chirp_type)
recording = source.record(ns)
recording._metadata["signal_type"] = "chirp"
recording._metadata["chirp_type"] = chirp_type
recording._metadata["bandwidth"] = bandwidth
recording._metadata["period"] = period
if center_frequency:
recording._metadata["center_frequency"] = center_frequency
# Post processing
chan_params = {"noise_power": noise_power, "path_gain": path_gain}
recording = apply_post_processing(recording, frequency_shift, add_noise, chan_params, verbose)
for key, value in apply_user_config_metadata(metadata).items():
recording.update_metadata(key, value)
fmt = get_output_format(output, format)
save_recording(recording, output, fmt, overwrite, verbose)
@generate.command()
@click.option("--frequency", "-f", type=float, default=1000.0, help="Frequency (Hz)")
@click.option("--amplitude", "-a", type=float, default=1.0, help="Amplitude")
@click.option("--duty-cycle", "-d", type=float, default=0.5, help="Duty cycle (0.0-1.0)")
@click.option("--phase", "-p", type=float, default=0.0, help="Phase shift (radians)")
@common_options
def square(
sample_rate,
num_samples,
duration,
frequency_shift,
center_frequency,
add_noise,
noise_power,
path_gain,
output,
format,
overwrite,
metadata,
verbose,
quiet,
frequency,
amplitude,
duty_cycle,
phase,
**kwargs,
):
"""Generate Square wave."""
ns = resolve_length(sample_rate, num_samples, duration)
echo_progress(f"Generating square wave: {format_frequency(frequency)}...", quiet)
source = SquareSource(
frequency=frequency, sample_rate=sample_rate, amplitude=amplitude, duty_cycle=duty_cycle, phase_shift=phase
)
recording = source.record(ns)
recording._metadata["signal_type"] = "square"
if center_frequency:
recording._metadata["center_frequency"] = center_frequency
chan_params = {"noise_power": noise_power, "path_gain": path_gain}
recording = apply_post_processing(recording, frequency_shift, add_noise, chan_params, verbose)
for key, value in apply_user_config_metadata(metadata).items():
recording.update_metadata(key, value)
fmt = get_output_format(output, format)
save_recording(recording, output, fmt, overwrite, verbose)
@generate.command()
@click.option("--frequency", "-f", type=float, default=1000.0, help="Frequency (Hz)")
@click.option("--amplitude", "-a", type=float, default=1.0, help="Amplitude")
@click.option("--phase", "-p", type=float, default=0.0, help="Phase shift (radians)")
@common_options
def sawtooth(
sample_rate,
num_samples,
duration,
frequency_shift,
center_frequency,
add_noise,
noise_power,
path_gain,
output,
format,
overwrite,
metadata,
verbose,
quiet,
frequency,
amplitude,
phase,
**kwargs,
):
"""Generate Sawtooth wave."""
ns = resolve_length(sample_rate, num_samples, duration)
echo_progress(f"Generating sawtooth wave: {format_frequency(frequency)}...", quiet)
source = SawtoothSource(frequency=frequency, sample_rate=sample_rate, amplitude=amplitude, phase_shift=phase)
recording = source.record(ns)
recording._metadata["signal_type"] = "sawtooth"
if center_frequency:
recording._metadata["center_frequency"] = center_frequency
chan_params = {"noise_power": noise_power, "path_gain": path_gain}
recording = apply_post_processing(recording, frequency_shift, add_noise, chan_params, verbose)
for key, value in apply_user_config_metadata(metadata).items():
recording.update_metadata(key, value)
fmt = get_output_format(output, format)
save_recording(recording, output, fmt, overwrite, verbose)
def load_source(message_source, message_content, num_bits=None):
if num_bits is not None:
if message_source == "random":
M
2025-12-11 16:53:26 -05:00
return BinarySource()((1, num_bits))
elif message_source == "string":
if not message_content:
raise click.BadParameter("Message content required for string source")
return FileSourceBlock(message_content.encode("utf-8"), repeat=True)(num_bits).reshape(1, -1)
elif message_source == "file":
if not message_content:
raise click.BadParameter("File path required for file source")
p = Path(message_content)
if not p.exists():
raise click.BadParameter(f"File not found: {p}")
return FileSourceBlock(p.read_bytes(), repeat=True)(num_bits).reshape(1, -1)
else:
if message_source == "random":
M
2025-12-11 16:53:26 -05:00
return BinarySource() # Infinite source
elif message_source == "string":
if not message_content:
raise click.BadParameter("Message content required for string source")
return FileSourceBlock(message_content.encode("utf-8"), repeat=True)
elif message_source == "file":
if not message_content:
raise click.BadParameter("File path required for file source")
p = Path(message_content)
if not p.exists():
raise click.BadParameter(f"File not found: {p}")
return FileSourceBlock(p.read_bytes(), repeat=True)
def _run_mod_gen(
mod_type,
sample_rate,
symbols,
num_samples,
duration,
order,
symbol_rate,
filter_type,
filter_span,
filter_beta,
message_source,
message_content,
frequency_shift,
center_frequency,
add_noise,
noise_power,
path_gain,
output,
format,
overwrite,
metadata,
verbose,
quiet,
):
# Resolve length
# If symbols provided, it drives.
# If not, use num_samples/duration to calculate symbols
if symbol_rate is None:
# Try to infer? No, required.
raise click.BadParameter("Symbol rate required")
sps = sample_rate / symbol_rate
if not sps.is_integer():
sps_int = int(round(sps))
if sps_int < 1:
sps_int = 1
actual_sr = sps_int * symbol_rate
echo_progress(f"Warning: Non-integer samples per symbol ({sps:.4f}). Rounding to {sps_int}.", quiet)
echo_progress(f"Actual sample rate will be {format_sample_rate(actual_sr)}", quiet)
sps = int(sps_int)
sample_rate = actual_sr
else:
sps = int(sps)
if symbols is None:
# Calc from duration/samples
ns = resolve_length(sample_rate, num_samples, duration)
symbols = int(np.ceil(ns / sps))
echo_progress(f"Generating {mod_type}-{order} ({symbols} symbols)...", quiet)
echo_verbose(f" Sample Rate: {format_sample_rate(sample_rate)} (SPS={sps})", verbose)
bps = int(np.log2(order))
total_samples = symbols * sps
# Source
source = load_source(message_source, message_content, None)
# Mapper and Pulse Shaping
mapper = Mapper(constellation_type=mod_type, num_bits_per_symbol=bps)
upsampler = Upsampling(factor=sps)
# Filter
if filter_type == "rrc":
filter_block = RootRaisedCosineFilter(span_in_symbols=filter_span, upsampling_factor=sps, beta=filter_beta)
elif filter_type == "rc":
filter_block = RaisedCosineFilter(span_in_symbols=filter_span, upsampling_factor=sps, beta=filter_beta)
elif filter_type == "gaussian":
raise click.ClickException("Gaussian filter not supported yet")
else:
filter_block = None
# Generate base signal
mapper.connect_input([source])
upsampler.connect_input([mapper])
if filter_block:
filter_block.connect_input([upsampler])
base_recording = filter_block.record(total_samples)
else:
base_recording = upsampler.record(total_samples)
# Update metadata
for key, value in {
"modulation": mod_type,
"order": order,
"symbol_rate": symbol_rate,
"symbols": symbols,
"filter": filter_type,
}.items():
base_recording.update_metadata(key, value)
if center_frequency:
base_recording.update_metadata("center_frequency", center_frequency)
# Post Processing
chan_params = {"noise_power": noise_power, "path_gain": path_gain}
final_recording = apply_post_processing(base_recording, frequency_shift, add_noise, chan_params, verbose)
# Trim if explicit num_samples was requested and we generated more (due to symbol alignment)
target_ns = resolve_length(sample_rate, num_samples, duration)
if target_ns and len(final_recording.data[0]) > target_ns:
# Only trim if difference is significant?
# User usually wants exact length if specified.
if num_samples or duration: # If explicitly asked for length
final_recording = final_recording.trim(target_ns)
for key, value in apply_user_config_metadata(metadata).items():
final_recording.update_metadata(key, value)
fmt = get_output_format(output, format)
save_recording(final_recording, output, fmt, overwrite, verbose)
@generate.command()
@click.option("--symbols", "-N", type=int, help="Number of symbols")
@click.option("--order", "-M", type=int, required=True, help="QAM Order (4, 16, 32, 64, 128, 256, 1024)")
@click.option("--symbol-rate", "-r", type=float, required=True, help="Symbol rate in Hz")
@click.option(
"--filter",
"filter_type",
type=click.Choice(["rrc", "rc", "gaussian", "none"]),
default="rrc",
help="Pulse shaping filter",
)
@click.option("--filter-span", type=int, default=6, help="Filter span in symbols")
@click.option("--filter-beta", type=float, default=0.35, help="Filter roll-off factor")
@click.option(
"--message-source", type=click.Choice(["random", "file", "string"]), default="random", help="Data source"
)
@click.option("--message-content", help="File path or string content")
@common_options
def qam(
sample_rate,
num_samples,
duration,
frequency_shift,
center_frequency,
add_noise,
noise_power,
path_gain,
output,
format,
overwrite,
metadata,
verbose,
quiet,
multipath_paths,
multipath_max_delay,
iq_amp_imbalance,
iq_phase_imbalance,
iq_dc_offset,
symbols,
order,
symbol_rate,
filter_type,
filter_span,
filter_beta,
message_source,
message_content,
**kwargs,
):
"""Generate QAM modulated signal."""
# Determine modulation type (Normal QAM vs Cross QAM)
if order in [32, 128]:
mod_type = "CROSS_QAM"
else:
mod_type = "QAM"
_run_mod_gen(
mod_type,
sample_rate,
symbols,
num_samples,
duration,
order,
symbol_rate,
filter_type,
filter_span,
filter_beta,
message_source,
message_content,
frequency_shift,
center_frequency,
add_noise,
noise_power,
path_gain,
output,
format,
overwrite,
metadata,
verbose,
quiet,
)
@generate.command()
@click.option("--symbols", "-N", type=int, help="Number of symbols")
@click.option("--order", "-M", type=int, required=True, help="APSK Order (16, 32, 64, 128, 256)")
@click.option("--symbol-rate", "-r", type=float, required=True, help="Symbol rate in Hz")
@click.option(
"--filter",
"filter_type",
type=click.Choice(["rrc", "rc", "gaussian", "none"]),
default="rrc",
help="Pulse shaping filter",
)
@click.option("--filter-span", type=int, default=6, help="Filter span in symbols")
@click.option("--filter-beta", type=float, default=0.35, help="Filter roll-off factor")
@click.option(
"--message-source", type=click.Choice(["random", "file", "string"]), default="random", help="Data source"
)
@click.option("--message-content", help="File path or string content")
@common_options
def apsk(
sample_rate,
num_samples,
duration,
frequency_shift,
center_frequency,
add_noise,
noise_power,
path_gain,
output,
format,
overwrite,
metadata,
verbose,
quiet,
multipath_paths,
multipath_max_delay,
iq_amp_imbalance,
iq_phase_imbalance,
iq_dc_offset,
symbols,
order,
symbol_rate,
filter_type,
filter_span,
filter_beta,
message_source,
message_content,
**kwargs,
):
"""Generate APSK modulated signal."""
_run_mod_gen(
"APSK",
sample_rate,
symbols,
num_samples,
duration,
order,
symbol_rate,
filter_type,
filter_span,
filter_beta,
message_source,
message_content,
frequency_shift,
center_frequency,
add_noise,
noise_power,
path_gain,
output,
format,
overwrite,
metadata,
verbose,
quiet,
)
@generate.command()
@click.option("--symbols", "-N", type=int, help="Number of symbols")
@click.option("--order", "-M", type=int, required=True, help="PAM Order (4, 8, 16)")
@click.option("--symbol-rate", "-r", type=float, required=True, help="Symbol rate in Hz")
@click.option(
"--filter",
"filter_type",
type=click.Choice(["rrc", "rc", "gaussian", "none"]),
default="rrc",
help="Pulse shaping filter",
)
@click.option("--filter-span", type=int, default=6, help="Filter span in symbols")
@click.option("--filter-beta", type=float, default=0.35, help="Filter roll-off factor")
@click.option(
"--message-source", type=click.Choice(["random", "file", "string"]), default="random", help="Data source"
)
@click.option("--message-content", help="File path or string content")
@common_options
def pam(
sample_rate,
num_samples,
duration,
frequency_shift,
center_frequency,
add_noise,
noise_power,
path_gain,
output,
format,
overwrite,
metadata,
verbose,
quiet,
multipath_paths,
multipath_max_delay,
iq_amp_imbalance,
iq_phase_imbalance,
iq_dc_offset,
symbols,
order,
symbol_rate,
filter_type,
filter_span,
filter_beta,
message_source,
message_content,
**kwargs,
):
"""Generate PAM modulated signal."""
_run_mod_gen(
"PAM",
sample_rate,
symbols,
num_samples,
duration,
order,
symbol_rate,
filter_type,
filter_span,
filter_beta,
message_source,
message_content,
frequency_shift,
center_frequency,
add_noise,
noise_power,
path_gain,
output,
format,
overwrite,
metadata,
verbose,
quiet,
)
@generate.command()
@click.option("--symbols", "-N", type=int, help="Number of symbols")
@click.option("--order", "-M", type=int, default=2, help="FSK Order (2, 4, 8)")
@click.option("--symbol-rate", "-r", type=float, required=True, help="Symbol rate in Hz")
@click.option("--freq-spacing", type=float, help="Frequency spacing (Hz)")
@click.option("--modulation-index", "-h", type=float, help="Modulation Index (alternative to spacing)")
@click.option(
"--message-source", type=click.Choice(["random", "file", "string"]), default="random", help="Data source"
)
@click.option("--message-content", help="File path or string content")
@common_options
def fsk(
sample_rate,
num_samples,
duration,
frequency_shift,
center_frequency,
add_noise,
noise_power,
path_gain,
output,
format,
overwrite,
metadata,
verbose,
quiet,
multipath_paths,
multipath_max_delay,
iq_amp_imbalance,
iq_phase_imbalance,
iq_dc_offset,
symbols,
order,
symbol_rate,
freq_spacing,
modulation_index,
message_source,
message_content,
**kwargs,
):
"""Generate FSK modulated signal."""
# FSK uses FSKModulator which is a standalone Source/Modulator block? No, it's a Modulator.
# Takes bits input.
# Determine spacing
if freq_spacing is None:
if modulation_index is None:
modulation_index = 1.0 # Default
freq_spacing = modulation_index * symbol_rate
# Samples per symbol
sps = sample_rate / symbol_rate # FSKModulator takes sampling_freq and symbol_duration (1/rate)
symbol_duration = 1.0 / symbol_rate
# Resolve length
ns = resolve_length(sample_rate, num_samples, duration, symbols, sps)
if symbols is None:
symbols = int(np.ceil(ns / sps))
echo_progress(f"Generating {order}-FSK (Spacing={format_frequency(freq_spacing)})...", quiet)
# Bits
bps = int(np.log2(order))
num_bits = symbols * bps
# Source
source_bits = load_source(message_source, message_content, num_bits)
# Modulator
mod = FSKModulator(
num_bits_per_symbol=bps,
frequency_spacing=freq_spacing,
symbol_duration=symbol_duration,
sampling_frequency=sample_rate,
)
# Generate
samples = mod(source_bits)
# Flatten
samples = samples.flatten()[:ns]
recording = Recording(data=samples, metadata={"sample_rate": sample_rate})
recording._metadata.update(
{
"modulation": "FSK",
"order": order,
"symbol_rate": symbol_rate,
"freq_spacing": freq_spacing,
"mod_index": modulation_index if modulation_index else freq_spacing / symbol_rate,
}
)
if center_frequency:
recording._metadata["center_frequency"] = center_frequency
chan_params = {
"noise_power": noise_power,
"path_gain": path_gain,
"multipath_paths": multipath_paths,
"multipath_max_delay": multipath_max_delay,
"iq_amp_imbalance": iq_amp_imbalance,
"iq_phase_imbalance": iq_phase_imbalance,
"iq_dc_offset": iq_dc_offset,
}
recording = apply_post_processing(recording, frequency_shift, add_noise, chan_params, verbose)
for key, value in apply_user_config_metadata(metadata).items():
recording.update_metadata(key, value)
fmt = get_output_format(output, format)
save_recording(recording, output, fmt, overwrite, verbose)
@generate.command()
@click.option("--symbol-rate", "-r", type=float, required=True, help="Symbol rate in Hz")
@click.option(
"--message-source", type=click.Choice(["random", "file", "string"]), default="random", help="Data source"
)
@click.option("--message-content", help="File path or string content")
@common_options
def ook(
sample_rate,
num_samples,
duration,
frequency_shift,
center_frequency,
add_noise,
noise_power,
path_gain,
output,
format,
overwrite,
metadata,
verbose,
quiet,
multipath_paths,
multipath_max_delay,
iq_amp_imbalance,
iq_phase_imbalance,
iq_dc_offset,
symbol_rate,
message_source,
message_content,
**kwargs,
):
"""Generate On-Off Keying (OOK) signal."""
sps = int(sample_rate / symbol_rate)
ns = resolve_length(sample_rate, num_samples, duration)
echo_progress("Generating OOK...", quiet)
# Source Block
source = load_source(message_source, message_content, None)
# OOK Modulator
mod = OOKModulator(source, samples_per_symbol=sps)
recording = mod.record(ns)
recording._metadata["sample_rate"] = sample_rate
recording._metadata["modulation"] = "OOK"
if center_frequency:
recording._metadata["center_frequency"] = center_frequency
chan_params = {
"noise_power": noise_power,
"path_gain": path_gain,
"multipath_paths": multipath_paths,
"multipath_max_delay": multipath_max_delay,
"iq_amp_imbalance": iq_amp_imbalance,
"iq_phase_imbalance": iq_phase_imbalance,
"iq_dc_offset": iq_dc_offset,
}
recording = apply_post_processing(recording, frequency_shift, add_noise, chan_params, verbose)
for key, value in apply_user_config_metadata(metadata).items():
recording.update_metadata(key, value)
fmt = get_output_format(output, format)
save_recording(recording, output, fmt, overwrite, verbose)
@generate.command()
@click.option("--symbol-rate", "-r", type=float, required=True, help="Symbol rate in Hz")
@click.option(
"--message-source", type=click.Choice(["random", "file", "string"]), default="random", help="Data source"
)
@click.option("--message-content", help="File path or string content")
@common_options
def oqpsk(
sample_rate,
num_samples,
duration,
frequency_shift,
center_frequency,
add_noise,
noise_power,
path_gain,
output,
format,
overwrite,
metadata,
verbose,
quiet,
multipath_paths,
multipath_max_delay,
iq_amp_imbalance,
iq_phase_imbalance,
iq_dc_offset,
symbol_rate,
message_source,
message_content,
**kwargs,
):
"""Generate Offset QPSK (OQPSK) signal."""
sps = int(sample_rate / symbol_rate)
ns = resolve_length(sample_rate, num_samples, duration)
echo_progress("Generating OQPSK...", quiet)
# Source Block
source = load_source(message_source, message_content, None)
# OQPSK Modulator
mod = OQPSKModulator(source, samples_per_symbol=sps)
recording = mod.record(ns)
recording._metadata["sample_rate"] = sample_rate
recording._metadata["modulation"] = "OQPSK"
if center_frequency:
recording._metadata["center_frequency"] = center_frequency
chan_params = {
"noise_power": noise_power,
"path_gain": path_gain,
"multipath_paths": multipath_paths,
"multipath_max_delay": multipath_max_delay,
"iq_amp_imbalance": iq_amp_imbalance,
"iq_phase_imbalance": iq_phase_imbalance,
"iq_dc_offset": iq_dc_offset,
}
recording = apply_post_processing(recording, frequency_shift, add_noise, chan_params, verbose)
for key, value in apply_user_config_metadata(metadata).items():
recording.update_metadata(key, value)
fmt = get_output_format(output, format)
save_recording(recording, output, fmt, overwrite, verbose)
@generate.command()
@click.option("--symbol-rate", "-r", type=float, required=True, help="Symbol rate in Hz")
@click.option("--bt", type=float, default=0.3, help="Bandwidth-Time product (e.g., 0.3, 0.5)")
@click.option(
"--message-source", type=click.Choice(["random", "file", "string"]), default="random", help="Data source"
)
@click.option("--message-content", help="File path or string content")
@common_options
def gmsk(
sample_rate,
num_samples,
duration,
frequency_shift,
center_frequency,
add_noise,
noise_power,
path_gain,
output,
format,
overwrite,
metadata,
verbose,
quiet,
multipath_paths,
multipath_max_delay,
iq_amp_imbalance,
iq_phase_imbalance,
iq_dc_offset,
symbol_rate,
bt,
message_source,
message_content,
**kwargs,
):
"""Generate GMSK modulated signal."""
sps = int(sample_rate / symbol_rate)
ns = resolve_length(sample_rate, num_samples, duration)
echo_progress(f"Generating GMSK (BT={bt})...", quiet)
# Source Block
source = load_source(message_source, message_content, None)
# GMSK Modulator
mod = GMSKModulator(source, samples_per_symbol=sps, bt=bt)
recording = mod.record(ns)
recording._metadata["sample_rate"] = sample_rate
recording._metadata["modulation"] = "GMSK"
recording._metadata["bt_product"] = bt
if center_frequency:
recording._metadata["center_frequency"] = center_frequency
chan_params = {
"noise_power": noise_power,
"path_gain": path_gain,
"multipath_paths": multipath_paths,
"multipath_max_delay": multipath_max_delay,
"iq_amp_imbalance": iq_amp_imbalance,
"iq_phase_imbalance": iq_phase_imbalance,
"iq_dc_offset": iq_dc_offset,
}
recording = apply_post_processing(recording, frequency_shift, add_noise, chan_params, verbose)
for key, value in apply_user_config_metadata(metadata).items():
recording.update_metadata(key, value)
fmt = get_output_format(output, format)
save_recording(recording, output, fmt, overwrite, verbose)
@generate.command()
@click.option("--symbols", "-N", type=int, help="Number of symbols")
@click.option("--order", "-M", type=int, required=True, help="PSK Order (2, 4, 8)")
@click.option("--symbol-rate", "-r", type=float, required=True, help="Symbol rate in Hz")
@click.option(
"--filter",
"filter_type",
type=click.Choice(["rrc", "rc", "gaussian", "none"]),
default="rrc",
help="Pulse shaping filter",
)
@click.option("--filter-span", type=int, default=6, help="Filter span in symbols")
@click.option("--filter-beta", type=float, default=0.35, help="Filter roll-off factor")
@click.option(
"--message-source", type=click.Choice(["random", "file", "string"]), default="random", help="Data source"
)
@click.option("--message-content", help="File path or string content")
@common_options
def psk(
sample_rate,
num_samples,
duration,
frequency_shift,
center_frequency,
add_noise,
noise_power,
path_gain,
output,
format,
overwrite,
metadata,
verbose,
quiet,
symbols,
order,
symbol_rate,
filter_type,
filter_span,
filter_beta,
message_source,
message_content,
**kwargs,
):
"""Generate PSK modulated signal."""
_run_mod_gen(
"PSK",
sample_rate,
symbols,
num_samples,
duration,
order,
symbol_rate,
filter_type,
filter_span,
filter_beta,
message_source,
message_content,
frequency_shift,
center_frequency,
add_noise,
noise_power,
path_gain,
output,
format,
overwrite,
metadata,
verbose,
quiet,
)