From 9af035e079f1388c809de9b33fd892002ad9b3e2 Mon Sep 17 00:00:00 2001 From: madrigal Date: Thu, 10 Jul 2025 15:11:40 -0400 Subject: [PATCH] Added signal and recording generation files, and gitignore --- .gitignore | 29 +++++++ recording_generation.py | 112 ++++++++++++++++++++++++++ signal_generation.py | 170 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 311 insertions(+) create mode 100644 .gitignore create mode 100644 recording_generation.py create mode 100644 signal_generation.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..adb409d --- /dev/null +++ b/.gitignore @@ -0,0 +1,29 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# PyCharm +.idea/ + +# Visual Studio Code +.vscode/ + +# Generated files +*.dot +*.hdf5 +*.npy +*.png +*.sigmf-data +*.sigmf-meta +images/ +recordings/ diff --git a/recording_generation.py b/recording_generation.py new file mode 100644 index 0000000..a01f559 --- /dev/null +++ b/recording_generation.py @@ -0,0 +1,112 @@ +import os +import random + +import numpy as np +from utils.io.recording import from_npy + +from signal_generation import (create_birdie_recording, create_ctnb_recording, + create_lfm_recording, create_modulated_signal, + create_noise_recording) + + +class RecordingGenerator: + def __init__(self, sample_rate): + self.sample_rate = int(sample_rate) + + def generate_collision( + self, + mod_choices: list = ["qam16", "qam64", "qam256"], + roll_offs: list = [0.15, 0.35], + sps_choices: list = [4, 5, 6], + length: int = 8192, + ): + for modulation in mod_choices: + for roll_off in roll_offs: + roll_off_str = str(roll_off)[2:] + sps = random.choice(sps_choices) + length = 8192 + recording = create_modulated_signal( + modulation=modulation, sps=sps, beta=roll_off, length=length + ) + recording.to_npy(filename=f"{modulation}_{roll_off_str}") + print(f"{modulation}_{roll_off_str} file saved.") + + def generate_lfm( + self, period_choices: list = [0.25, 0.3, 0.35], width_choices: list = [10] + ): + for chirp_type in ["up", "down", "up_down"]: + chirp_period = random.choice(period_choices) + width_factor = random.choice(width_choices) + width = self.sample_rate // width_factor + + recording = create_lfm_recording( + sample_rate=self.sample_rate, + width=width, + chirp_type=chirp_type, + chirp_period=chirp_period, + length=int(self.sample_rate * chirp_period), + ) + + print(f"LFM chirp length: {int(self.sample_rate * chirp_period)}") + recording.to_npy(filename=f"{chirp_type}_chirp") + print(f"{chirp_type}_chirp file saved.") + + def generate_wb(self, num: int = 2, length: int = 8192): + for i in range(num): + recording = create_noise_recording( + length=length, + rms_power=0.2, + ) + recording.to_npy(filename=f"wb{i + 1}") + print(f"wb{i + 1} file saved.") + + def generate_ctnb(self, num: int = 2, length: int = 8192): + for i in range(num): + recording = create_ctnb_recording(length=length) + recording.to_npy(filename=f"ctnb{i + 1}") + print(f"ctnb{i + 1} file saved.") + + def generate_birdie(self, num: int = 2, length: int = 8192, wave_num: int = 5): + for i in range(num): + recording = create_birdie_recording( + sample_rate=int(self.sample_rate), + length=length, + wave_number=int(wave_num + i), + ) + recording.to_npy(filename=f"birdie{i + 1}") + print(f"birdie{i + 1} file saved.") + + def convert_to_dat( + self, + source_directory: str = "/recordings", + save_directory: str = "/dat_recordings", + ): + for root, _, files in os.walk(source_directory): + for name in files: + filename = os.path.join(root, name) + savename = save_directory + name[:-4] + ".dat" + + recording = from_npy(file=filename) + data = recording.data[0] + + # Convert complex128 -> float64 -> int16 after scaling + real = np.real(data) + imag = np.imag(data) + + # Scale down the float values to fit in int16 range [-32768, 32767] + # Adjust the scaling factor depending on your signal's dynamic range + scale_factor = 32767 / np.max(np.abs(np.concatenate((real, imag)))) + real_scaled = (real * scale_factor).astype(np.int16) + imag_scaled = (imag * scale_factor).astype(np.int16) + + # Interleave real and imag + interleaved = np.empty((real_scaled.size * 2,), dtype=np.int16) + interleaved[0::2] = real_scaled + interleaved[1::2] = imag_scaled + + interleaved.tofile(savename) + print(f"Saved {savename}") + + +if __name__ == "__main__": + generator = RecordingGenerator() diff --git a/signal_generation.py b/signal_generation.py new file mode 100644 index 0000000..f4c22dd --- /dev/null +++ b/signal_generation.py @@ -0,0 +1,170 @@ +from dataclasses import dataclass +from typing import Optional + +import numpy as np +from utils.data.recording import Recording +from utils.signal import block_generator +from utils.signal.basic_signal_generator import complex_sine + + +@dataclass(frozen=True) +class ModulationInfo: + bps: int + constellation: str + + +class ModulationRegistry: + _registry = { + "qam16": ModulationInfo(bps=4, constellation="qam"), + "qam32": ModulationInfo(bps=5, constellation="qam"), + "qam64": ModulationInfo(bps=6, constellation="qam"), + "qam256": ModulationInfo(bps=8, constellation="qam"), + "qam1024": ModulationInfo(bps=10, constellation="qam"), + "pam4": ModulationInfo(bps=2, constellation="pam"), + "pam8": ModulationInfo(bps=3, constellation="pam"), + "bpsk": ModulationInfo(bps=1, constellation="psk"), + "qpsk": ModulationInfo(bps=2, constellation="psk"), + "psk8": ModulationInfo(bps=3, constellation="psk"), + "psk16": ModulationInfo(bps=4, constellation="psk"), + "psk32": ModulationInfo(bps=5, constellation="psk"), + } + + @classmethod + def get(cls, mod_type: str) -> ModulationInfo: + return cls._registry[mod_type] + + +def periodic_random(length, divisor=4, seed=256): + np.random.seed(seed) + chunk = np.random.rand(int(length / divisor)) + return np.tile(chunk, divisor) + + +def create_modulated_signal(modulation: str, sps: int, beta, length: int) -> Recording: + """Produces a modulated signal Recording.""" + mod_info = ModulationRegistry.get(modulation) + if mod_info is None: + raise ValueError(f"Modulation {modulation} not in registry.") + + while length % sps != 0 or length % mod_info.bps != 0: + length = length + 1 # needs to be multiple of bits per symbol and sps + + num_bits = int(length * sps) + + mapper = block_generator.Mapper( + constellation_type=mod_info.constellation, + num_bits_per_symbol=mod_info.bps, + ) + upsampler = block_generator.Upsampling(factor=sps) + filter = block_generator.RaisedCosineFilter( + span_in_symbols=10, upsampling_factor=sps, beta=beta + ) + + bits = [(np.random.rand(num_bits) > 0.5).astype(np.float32)] + long_bits = np.concatenate([bits, bits, bits], axis=1) + symbols = mapper(long_bits) + + upsampled_symbols = upsampler([symbols]) + filtered_samples = filter([upsampled_symbols]) + output_recording = filtered_samples[length : length * 2] + + return Recording(data=output_recording) + + +# Old create_modulated_signal code +# def create_modulated_signal(modulation: str, sps: int, beta, length: int) -> Recording: +# """Produces a modulated signal Recording.""" +# mod_info = ModulationRegistry.get(modulation) +# if mod_info is None: +# raise ValueError(f"Modulation {modulation} not in registry.") + +# source_block = block_generator.RandomBinarySource() +# mapper_block = block_generator.Mapper( +# constellation_type=mod_info.constellation, +# num_bits_per_symbol=mod_info.bps, +# ) +# upsampler_block = block_generator.Upsampling(factor=sps) +# filter_block = block_generator.RaisedCosineFilter(upsampling_factor=sps, beta=beta) + +# mapper_block.connect_input([source_block]) +# upsampler_block.connect_input([mapper_block]) +# filter_block.connect_input([upsampler_block]) + +# dividing_factor = sps * mod_info.bps +# while length % dividing_factor != 0: +# length = length + 1 +# double_length = length * 2 + +# recording = filter_block.record(num_samples=double_length) +# return Recording(data=recording.data[:, :length], metadata=recording.metadata) + + +def create_lfm_recording( + sample_rate: int, + width: Optional[int], + chirp_period: Optional[float], + chirp_type: str, + length: int, +) -> Recording: + """Produces a Recording of Linear Frequency Modulation (LFM) Jamming.""" + lfm_jamming_source = block_generator.LFMJammingSource( + sample_rate=sample_rate, + bandwidth=width, + chirp_period=chirp_period, + chirp_type=chirp_type, + ) + + return lfm_jamming_source.record(num_samples=length) + + +def create_noise_recording( + rms_power: float, + length: int, +) -> Recording: + """Generate a Recording of Additive White Gaussian Noise (AWGN).""" + # 1. Create a repeating pseudo-random envelope + np.random.seed(256) + chunk = np.random.rand(length // 4) + tiled = np.tile(chunk, 4) + amplitude_envelope = np.sqrt(tiled) + + # 2. Generate complex Gaussian noise with unit power + real = np.random.normal(0, 1, length) + imag = np.random.normal(0, 1, length) + complex_noise = real + 1j * imag + + # 3. Scale noise by desired power and envelope + scaled_noise = complex_noise * amplitude_envelope * np.sqrt(rms_power) + metadata = {"interference": "wb", "signal_type": "noise"} + return Recording(data=scaled_noise, metadata=metadata) + + +def create_ctnb_recording(length: int) -> Recording: + ones_source = block_generator.ConstantSource() + return ones_source.record(num_samples=length) + + +def create_birdie_recording( + sample_rate: int, length: int, wave_number: int +) -> Recording: + recording_data = np.zeros(int(length)) + for _ in range(wave_number): + frequency = np.random.choice(np.arange(-sample_rate / 2, sample_rate / 2)) + recording = complex_sine( + sample_rate=int(sample_rate), length=int(length), frequency=int(frequency) + ) + recording_data = recording_data + recording.data + return Recording(data=recording_data, metadata=recording.metadata) + + +def frequency_shift( + recording: Recording, freq_shift: float, sample_rate: int +) -> Recording: + """Applies a frequency shift the input recording.""" + source = block_generator.RecordingSource(recording=recording) + frequency_shift_block = block_generator.FrequencyShift( + shift_frequency=freq_shift, sampling_rate=sample_rate + ) + frequency_shift_block.connect_input([source]) + + return frequency_shift_block.record(num_samples=len(recording))