diff --git a/recording_generation.py b/recording_generation.py index f6fdf82..ed686d3 100644 --- a/recording_generation.py +++ b/recording_generation.py @@ -2,11 +2,16 @@ import os import random import numpy as np +from utils.data.recording import Recording from utils.io.recording import from_npy -from signal_generation import (create_birdie_recording, create_ctnb_recording, - create_lfm_recording, create_modulated_signal, - create_noise_recording) +from signal_generation import ( + create_birdie_recording, + create_ctnb_recording, + create_lfm_recording, + create_modulated_signal, + create_noise_recording, +) class RecordingGenerator: @@ -28,7 +33,9 @@ class RecordingGenerator: recording = create_modulated_signal( modulation=modulation, sps=sps, beta=roll_off, length=length ) - recording.to_npy(filename=f"{modulation}_{roll_off_str}") + recording.to_npy( + filename=f"{modulation}_{roll_off_str}", overwrite=True + ) print(f"{modulation}_{roll_off_str} file saved.") def generate_lfm( @@ -48,22 +55,21 @@ class RecordingGenerator: ) print(f"LFM chirp length: {int(self.sample_rate * chirp_period)}") - recording.to_npy(filename=f"{chirp_type}_chirp") + recording.to_npy(filename=f"{chirp_type}_chirp", overwrite=True) print(f"{chirp_type}_chirp file saved.") def generate_wb(self, num: int = 2, length: int = 8192): for i in range(num): recording = create_noise_recording( - length=length, - rms_power=0.2, + length=length, rms_power=0.2, counter=random.choice([2, 4, 8, 16, 32]) ) - recording.to_npy(filename=f"wb{i + 1}") + recording.to_npy(filename=f"wb{i + 1}", overwrite=True) print(f"wb{i + 1} file saved.") def generate_ctnb(self, num: int = 2, length: int = 8192): for i in range(num): recording = create_ctnb_recording(length=length) - recording.to_npy(filename=f"ctnb{i + 1}") + recording.to_npy(filename=f"ctnb{i + 1}", overwrite=True) print(f"ctnb{i + 1} file saved.") def generate_birdie(self, num: int = 2, length: int = 8192, wave_num: int = 5): @@ -73,9 +79,14 @@ class RecordingGenerator: length=length, wave_number=int(wave_num + i), ) - recording.to_npy(filename=f"birdie{i + 1}") + recording.to_npy(filename=f"birdie{i + 1}", overwrite=True) print(f"birdie{i + 1} file saved.") + def generate_zeros(self, length: int = 8192): + data = np.zeros(shape=length, dtype=np.complex64) + recording = Recording(data=data) + recording.to_npy(filename="zero", overwrite=True) + def convert_to_dat( self, source_directory: str = "recordings", diff --git a/signal_generation.py b/signal_generation.py index f4c22dd..deada3c 100644 --- a/signal_generation.py +++ b/signal_generation.py @@ -34,13 +34,9 @@ class ModulationRegistry: return cls._registry[mod_type] -def periodic_random(length, divisor=4, seed=256): - np.random.seed(seed) - chunk = np.random.rand(int(length / divisor)) - return np.tile(chunk, divisor) - - -def create_modulated_signal(modulation: str, sps: int, beta, length: int) -> Recording: +def create_modulated_signal( + modulation: str, sps: int, beta: float, length: int +) -> Recording: """Produces a modulated signal Recording.""" mod_info = ModulationRegistry.get(modulation) if mod_info is None: @@ -56,7 +52,7 @@ def create_modulated_signal(modulation: str, sps: int, beta, length: int) -> Rec num_bits_per_symbol=mod_info.bps, ) upsampler = block_generator.Upsampling(factor=sps) - filter = block_generator.RaisedCosineFilter( + filter = block_generator.RootRaisedCosineFilter( span_in_symbols=10, upsampling_factor=sps, beta=beta ) @@ -66,37 +62,20 @@ def create_modulated_signal(modulation: str, sps: int, beta, length: int) -> Rec upsampled_symbols = upsampler([symbols]) filtered_samples = filter([upsampled_symbols]) - output_recording = filtered_samples[length : length * 2] + start = (len(filtered_samples) - length) // 2 + end = start + length + output_recording = filtered_samples[start:end] - return Recording(data=output_recording) + metadata = { + "modulation": modulation, + "bits_per_symbol": mod_info.bps, + "constellation": mod_info.constellation, + "sps": sps, + "beta": beta, + "source": "signal.block_generator", + } - -# Old create_modulated_signal code -# def create_modulated_signal(modulation: str, sps: int, beta, length: int) -> Recording: -# """Produces a modulated signal Recording.""" -# mod_info = ModulationRegistry.get(modulation) -# if mod_info is None: -# raise ValueError(f"Modulation {modulation} not in registry.") - -# source_block = block_generator.RandomBinarySource() -# mapper_block = block_generator.Mapper( -# constellation_type=mod_info.constellation, -# num_bits_per_symbol=mod_info.bps, -# ) -# upsampler_block = block_generator.Upsampling(factor=sps) -# filter_block = block_generator.RaisedCosineFilter(upsampling_factor=sps, beta=beta) - -# mapper_block.connect_input([source_block]) -# upsampler_block.connect_input([mapper_block]) -# filter_block.connect_input([upsampler_block]) - -# dividing_factor = sps * mod_info.bps -# while length % dividing_factor != 0: -# length = length + 1 -# double_length = length * 2 - -# recording = filter_block.record(num_samples=double_length) -# return Recording(data=recording.data[:, :length], metadata=recording.metadata) + return Recording(data=output_recording, metadata=metadata) def create_lfm_recording( @@ -120,10 +99,11 @@ def create_lfm_recording( def create_noise_recording( rms_power: float, length: int, + counter: int, ) -> Recording: """Generate a Recording of Additive White Gaussian Noise (AWGN).""" # 1. Create a repeating pseudo-random envelope - np.random.seed(256) + np.random.seed(256 + counter) chunk = np.random.rand(length // 4) tiled = np.tile(chunk, 4) amplitude_envelope = np.sqrt(tiled) @@ -145,11 +125,13 @@ def create_ctnb_recording(length: int) -> Recording: def create_birdie_recording( - sample_rate: int, length: int, wave_number: int + sample_rate: int, length: int, wave_number: int, sps: int = 1 ) -> Recording: recording_data = np.zeros(int(length)) for _ in range(wave_number): - frequency = np.random.choice(np.arange(-sample_rate / 2, sample_rate / 2)) + frequency = np.random.choice( + np.arange(-sample_rate / (2 * sps), sample_rate / (2 * sps)) + ) recording = complex_sine( sample_rate=int(sample_rate), length=int(length), frequency=int(frequency) )