modrec-workflow/scripts/dataset_building/data_gen.py
2025-06-16 13:43:59 -04:00

82 lines
3.6 KiB
Python

from utils.data import Recording
import numpy as np
from utils.signal import block_generator
import argparse
import os
from helpers.app_settings import get_app_settings
mods = {
"bpsk": {"num_bits_per_symbol": 1, "constellation_type": "psk"},
"qpsk": {"num_bits_per_symbol": 2, "constellation_type": "psk"},
"qam16": {"num_bits_per_symbol": 4, "constellation_type": "qam"},
"qam64": {"num_bits_per_symbol": 6, "constellation_type": "qam"},
}
def generate_modulated_signals(output_dir):
settings = get_app_settings().dataset
for modulation in settings.modulation_types:
for snr in np.arange(settings.snr_start, settings.snr_end, settings.snr_step):
for i in range(3):
recording_length = settings.recording_length
beta = settings.beta # the rolloff factor, can be changed to add variety
sps = settings.sps # samples per symbol, or the relative bandwidth of the digital signal. Can also be changed.
# blocks don't directly take the string 'qpsk' so we use the dict 'mods' to get parameters
constellation_type = mods[modulation]["constellation_type"]
num_bits_per_symbol = mods[modulation]["num_bits_per_symbol"]
# construct the digital modulation blocks with these parameters
# we have bit source -> mapper -> upsampling -> pulse shaping
bit_source = block_generator.RandomBinarySource()
mapper = block_generator.Mapper(
constellation_type=constellation_type,
num_bits_per_symbol=num_bits_per_symbol,
)
upsampler = block_generator.Upsampling(factor=sps)
pulse_shaping_filter = block_generator.RaisedCosineFilter(
upsampling_factor=sps, beta=beta
)
pulse_shaping_filter.connect_input([upsampler])
upsampler.connect_input([mapper])
mapper.connect_input([bit_source])
modulation_recording = pulse_shaping_filter.record(
num_samples=recording_length
)
# add noise by calculating the power of the modulation recording and generating AWGN from the snr parameter
signal_power = np.mean(np.abs(modulation_recording.data[0] ** 2))
awgn_source = block_generator.AWGNSource(
variance=(signal_power / 2) * (10 ** (((-1 * snr) / 20)))
)
noise = awgn_source.record(num_samples=recording_length)
samples_with_noise = modulation_recording.data + noise.data
output_recording = Recording(data=samples_with_noise)
# add metadata for ML later
output_recording.add_to_metadata(key="modulation", value=modulation)
output_recording.add_to_metadata(key="snr", value=int(snr))
output_recording.add_to_metadata(key="beta", value=beta)
output_recording.add_to_metadata(key="sps", value=sps)
# view if you want
# output_recording.view()
# save to file
output_recording.to_npy(path = output_dir) # optionally add path and filename parameters
if __name__ == "__main__":
p = argparse.ArgumentParser(description="Generate modulated signal .npy files")
p.add_argument(
"--output-dir",
default=".",
help="Folder where .npy files will be saved"
)
args = p.parse_args()
generate_modulated_signals(args.output_dir)
print("generated data to " + args.output_dir)