from dataclasses import dataclass from typing import Optional import numpy as np from utils.data.recording import Recording from utils.signal import block_generator from utils.signal.basic_signal_generator import complex_sine @dataclass(frozen=True) class ModulationInfo: bps: int constellation: str class ModulationRegistry: _registry = { "qam16": ModulationInfo(bps=4, constellation="qam"), "qam32": ModulationInfo(bps=5, constellation="qam"), "qam64": ModulationInfo(bps=6, constellation="qam"), "qam256": ModulationInfo(bps=8, constellation="qam"), "qam1024": ModulationInfo(bps=10, constellation="qam"), "pam4": ModulationInfo(bps=2, constellation="pam"), "pam8": ModulationInfo(bps=3, constellation="pam"), "bpsk": ModulationInfo(bps=1, constellation="psk"), "qpsk": ModulationInfo(bps=2, constellation="psk"), "psk8": ModulationInfo(bps=3, constellation="psk"), "psk16": ModulationInfo(bps=4, constellation="psk"), "psk32": ModulationInfo(bps=5, constellation="psk"), } @classmethod def get(cls, mod_type: str) -> ModulationInfo: return cls._registry[mod_type] def periodic_random(length, divisor=4, seed=256): np.random.seed(seed) chunk = np.random.rand(int(length / divisor)) return np.tile(chunk, divisor) def create_modulated_signal(modulation: str, sps: int, beta, length: int) -> Recording: """Produces a modulated signal Recording.""" mod_info = ModulationRegistry.get(modulation) if mod_info is None: raise ValueError(f"Modulation {modulation} not in registry.") while length % sps != 0 or length % mod_info.bps != 0: length = length + 1 # needs to be multiple of bits per symbol and sps num_bits = int(length * sps) mapper = block_generator.Mapper( constellation_type=mod_info.constellation, num_bits_per_symbol=mod_info.bps, ) upsampler = block_generator.Upsampling(factor=sps) filter = block_generator.RaisedCosineFilter( span_in_symbols=10, upsampling_factor=sps, beta=beta ) bits = [(np.random.rand(num_bits) > 0.5).astype(np.float32)] long_bits = np.concatenate([bits, bits, bits], axis=1) symbols = mapper(long_bits) upsampled_symbols = upsampler([symbols]) filtered_samples = filter([upsampled_symbols]) output_recording = filtered_samples[length : length * 2] return Recording(data=output_recording) # Old create_modulated_signal code # def create_modulated_signal(modulation: str, sps: int, beta, length: int) -> Recording: # """Produces a modulated signal Recording.""" # mod_info = ModulationRegistry.get(modulation) # if mod_info is None: # raise ValueError(f"Modulation {modulation} not in registry.") # source_block = block_generator.RandomBinarySource() # mapper_block = block_generator.Mapper( # constellation_type=mod_info.constellation, # num_bits_per_symbol=mod_info.bps, # ) # upsampler_block = block_generator.Upsampling(factor=sps) # filter_block = block_generator.RaisedCosineFilter(upsampling_factor=sps, beta=beta) # mapper_block.connect_input([source_block]) # upsampler_block.connect_input([mapper_block]) # filter_block.connect_input([upsampler_block]) # dividing_factor = sps * mod_info.bps # while length % dividing_factor != 0: # length = length + 1 # double_length = length * 2 # recording = filter_block.record(num_samples=double_length) # return Recording(data=recording.data[:, :length], metadata=recording.metadata) def create_lfm_recording( sample_rate: int, width: Optional[int], chirp_period: Optional[float], chirp_type: str, length: int, ) -> Recording: """Produces a Recording of Linear Frequency Modulation (LFM) Jamming.""" lfm_jamming_source = block_generator.LFMJammingSource( sample_rate=sample_rate, bandwidth=width, chirp_period=chirp_period, chirp_type=chirp_type, ) return lfm_jamming_source.record(num_samples=length) def create_noise_recording( rms_power: float, length: int, ) -> Recording: """Generate a Recording of Additive White Gaussian Noise (AWGN).""" # 1. Create a repeating pseudo-random envelope np.random.seed(256) chunk = np.random.rand(length // 4) tiled = np.tile(chunk, 4) amplitude_envelope = np.sqrt(tiled) # 2. Generate complex Gaussian noise with unit power real = np.random.normal(0, 1, length) imag = np.random.normal(0, 1, length) complex_noise = real + 1j * imag # 3. Scale noise by desired power and envelope scaled_noise = complex_noise * amplitude_envelope * np.sqrt(rms_power) metadata = {"interference": "wb", "signal_type": "noise"} return Recording(data=scaled_noise, metadata=metadata) def create_ctnb_recording(length: int) -> Recording: ones_source = block_generator.ConstantSource() return ones_source.record(num_samples=length) def create_birdie_recording( sample_rate: int, length: int, wave_number: int ) -> Recording: recording_data = np.zeros(int(length)) for _ in range(wave_number): frequency = np.random.choice(np.arange(-sample_rate / 2, sample_rate / 2)) recording = complex_sine( sample_rate=int(sample_rate), length=int(length), frequency=int(frequency) ) recording_data = recording_data + recording.data return Recording(data=recording_data, metadata=recording.metadata) def frequency_shift( recording: Recording, freq_shift: float, sample_rate: int ) -> Recording: """Applies a frequency shift the input recording.""" source = block_generator.RecordingSource(recording=recording) frequency_shift_block = block_generator.FrequencyShift( shift_frequency=freq_shift, sampling_rate=sample_rate ) frequency_shift_block.connect_input([source]) return frequency_shift_block.record(num_samples=len(recording))