recording-generation/signal_generation.py

171 lines
5.9 KiB
Python
Raw Normal View History

from dataclasses import dataclass
from typing import Optional
import numpy as np
from utils.data.recording import Recording
from utils.signal import block_generator
from utils.signal.basic_signal_generator import complex_sine
@dataclass(frozen=True)
class ModulationInfo:
bps: int
constellation: str
class ModulationRegistry:
_registry = {
"qam16": ModulationInfo(bps=4, constellation="qam"),
"qam32": ModulationInfo(bps=5, constellation="qam"),
"qam64": ModulationInfo(bps=6, constellation="qam"),
"qam256": ModulationInfo(bps=8, constellation="qam"),
"qam1024": ModulationInfo(bps=10, constellation="qam"),
"pam4": ModulationInfo(bps=2, constellation="pam"),
"pam8": ModulationInfo(bps=3, constellation="pam"),
"bpsk": ModulationInfo(bps=1, constellation="psk"),
"qpsk": ModulationInfo(bps=2, constellation="psk"),
"psk8": ModulationInfo(bps=3, constellation="psk"),
"psk16": ModulationInfo(bps=4, constellation="psk"),
"psk32": ModulationInfo(bps=5, constellation="psk"),
}
@classmethod
def get(cls, mod_type: str) -> ModulationInfo:
return cls._registry[mod_type]
def periodic_random(length, divisor=4, seed=256):
np.random.seed(seed)
chunk = np.random.rand(int(length / divisor))
return np.tile(chunk, divisor)
def create_modulated_signal(modulation: str, sps: int, beta, length: int) -> Recording:
"""Produces a modulated signal Recording."""
mod_info = ModulationRegistry.get(modulation)
if mod_info is None:
raise ValueError(f"Modulation {modulation} not in registry.")
while length % sps != 0 or length % mod_info.bps != 0:
length = length + 1 # needs to be multiple of bits per symbol and sps
num_bits = int(length * sps)
mapper = block_generator.Mapper(
constellation_type=mod_info.constellation,
num_bits_per_symbol=mod_info.bps,
)
upsampler = block_generator.Upsampling(factor=sps)
filter = block_generator.RaisedCosineFilter(
span_in_symbols=10, upsampling_factor=sps, beta=beta
)
bits = [(np.random.rand(num_bits) > 0.5).astype(np.float32)]
long_bits = np.concatenate([bits, bits, bits], axis=1)
symbols = mapper(long_bits)
upsampled_symbols = upsampler([symbols])
filtered_samples = filter([upsampled_symbols])
output_recording = filtered_samples[length : length * 2]
return Recording(data=output_recording)
# Old create_modulated_signal code
# def create_modulated_signal(modulation: str, sps: int, beta, length: int) -> Recording:
# """Produces a modulated signal Recording."""
# mod_info = ModulationRegistry.get(modulation)
# if mod_info is None:
# raise ValueError(f"Modulation {modulation} not in registry.")
# source_block = block_generator.RandomBinarySource()
# mapper_block = block_generator.Mapper(
# constellation_type=mod_info.constellation,
# num_bits_per_symbol=mod_info.bps,
# )
# upsampler_block = block_generator.Upsampling(factor=sps)
# filter_block = block_generator.RaisedCosineFilter(upsampling_factor=sps, beta=beta)
# mapper_block.connect_input([source_block])
# upsampler_block.connect_input([mapper_block])
# filter_block.connect_input([upsampler_block])
# dividing_factor = sps * mod_info.bps
# while length % dividing_factor != 0:
# length = length + 1
# double_length = length * 2
# recording = filter_block.record(num_samples=double_length)
# return Recording(data=recording.data[:, :length], metadata=recording.metadata)
def create_lfm_recording(
sample_rate: int,
width: Optional[int],
chirp_period: Optional[float],
chirp_type: str,
length: int,
) -> Recording:
"""Produces a Recording of Linear Frequency Modulation (LFM) Jamming."""
lfm_jamming_source = block_generator.LFMJammingSource(
sample_rate=sample_rate,
bandwidth=width,
chirp_period=chirp_period,
chirp_type=chirp_type,
)
return lfm_jamming_source.record(num_samples=length)
def create_noise_recording(
rms_power: float,
length: int,
) -> Recording:
"""Generate a Recording of Additive White Gaussian Noise (AWGN)."""
# 1. Create a repeating pseudo-random envelope
np.random.seed(256)
chunk = np.random.rand(length // 4)
tiled = np.tile(chunk, 4)
amplitude_envelope = np.sqrt(tiled)
# 2. Generate complex Gaussian noise with unit power
real = np.random.normal(0, 1, length)
imag = np.random.normal(0, 1, length)
complex_noise = real + 1j * imag
# 3. Scale noise by desired power and envelope
scaled_noise = complex_noise * amplitude_envelope * np.sqrt(rms_power)
metadata = {"interference": "wb", "signal_type": "noise"}
return Recording(data=scaled_noise, metadata=metadata)
def create_ctnb_recording(length: int) -> Recording:
ones_source = block_generator.ConstantSource()
return ones_source.record(num_samples=length)
def create_birdie_recording(
sample_rate: int, length: int, wave_number: int
) -> Recording:
recording_data = np.zeros(int(length))
for _ in range(wave_number):
frequency = np.random.choice(np.arange(-sample_rate / 2, sample_rate / 2))
recording = complex_sine(
sample_rate=int(sample_rate), length=int(length), frequency=int(frequency)
)
recording_data = recording_data + recording.data
return Recording(data=recording_data, metadata=recording.metadata)
def frequency_shift(
recording: Recording, freq_shift: float, sample_rate: int
) -> Recording:
"""Applies a frequency shift the input recording."""
source = block_generator.RecordingSource(recording=recording)
frequency_shift_block = block_generator.FrequencyShift(
shift_frequency=freq_shift, sampling_rate=sample_rate
)
frequency_shift_block.connect_input([source])
return frequency_shift_block.record(num_samples=len(recording))