M
2025-07-10 15:11:40 -04:00
|
|
|
from dataclasses import dataclass
|
|
|
|
|
from typing import Optional
|
|
|
|
|
|
|
|
|
|
import numpy as np
|
|
|
|
|
from utils.data.recording import Recording
|
|
|
|
|
from utils.signal import block_generator
|
|
|
|
|
from utils.signal.basic_signal_generator import complex_sine
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
|
|
|
class ModulationInfo:
|
|
|
|
|
bps: int
|
|
|
|
|
constellation: str
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ModulationRegistry:
|
|
|
|
|
_registry = {
|
|
|
|
|
"qam16": ModulationInfo(bps=4, constellation="qam"),
|
|
|
|
|
"qam32": ModulationInfo(bps=5, constellation="qam"),
|
|
|
|
|
"qam64": ModulationInfo(bps=6, constellation="qam"),
|
|
|
|
|
"qam256": ModulationInfo(bps=8, constellation="qam"),
|
|
|
|
|
"qam1024": ModulationInfo(bps=10, constellation="qam"),
|
|
|
|
|
"pam4": ModulationInfo(bps=2, constellation="pam"),
|
|
|
|
|
"pam8": ModulationInfo(bps=3, constellation="pam"),
|
|
|
|
|
"bpsk": ModulationInfo(bps=1, constellation="psk"),
|
|
|
|
|
"qpsk": ModulationInfo(bps=2, constellation="psk"),
|
|
|
|
|
"psk8": ModulationInfo(bps=3, constellation="psk"),
|
|
|
|
|
"psk16": ModulationInfo(bps=4, constellation="psk"),
|
|
|
|
|
"psk32": ModulationInfo(bps=5, constellation="psk"),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
def get(cls, mod_type: str) -> ModulationInfo:
|
|
|
|
|
return cls._registry[mod_type]
|
|
|
|
|
|
|
|
|
|
|
M
2025-11-25 13:40:42 -05:00
|
|
|
def create_modulated_signal(
|
|
|
|
|
modulation: str, sps: int, beta: float, length: int
|
|
|
|
|
) -> Recording:
|
M
2025-07-10 15:11:40 -04:00
|
|
|
"""Produces a modulated signal Recording."""
|
|
|
|
|
mod_info = ModulationRegistry.get(modulation)
|
|
|
|
|
if mod_info is None:
|
|
|
|
|
raise ValueError(f"Modulation {modulation} not in registry.")
|
|
|
|
|
|
|
|
|
|
while length % sps != 0 or length % mod_info.bps != 0:
|
|
|
|
|
length = length + 1 # needs to be multiple of bits per symbol and sps
|
|
|
|
|
|
|
|
|
|
num_bits = int(length * sps)
|
|
|
|
|
|
|
|
|
|
mapper = block_generator.Mapper(
|
|
|
|
|
constellation_type=mod_info.constellation,
|
|
|
|
|
num_bits_per_symbol=mod_info.bps,
|
|
|
|
|
)
|
|
|
|
|
upsampler = block_generator.Upsampling(factor=sps)
|
M
2025-11-25 13:40:42 -05:00
|
|
|
filter = block_generator.RootRaisedCosineFilter(
|
M
2025-07-10 15:11:40 -04:00
|
|
|
span_in_symbols=10, upsampling_factor=sps, beta=beta
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
bits = [(np.random.rand(num_bits) > 0.5).astype(np.float32)]
|
|
|
|
|
long_bits = np.concatenate([bits, bits, bits], axis=1)
|
|
|
|
|
symbols = mapper(long_bits)
|
|
|
|
|
|
|
|
|
|
upsampled_symbols = upsampler([symbols])
|
|
|
|
|
filtered_samples = filter([upsampled_symbols])
|
M
2025-11-25 13:40:42 -05:00
|
|
|
start = (len(filtered_samples) - length) // 2
|
|
|
|
|
end = start + length
|
|
|
|
|
output_recording = filtered_samples[start:end]
|
|
|
|
|
|
|
|
|
|
metadata = {
|
|
|
|
|
"modulation": modulation,
|
|
|
|
|
"bits_per_symbol": mod_info.bps,
|
|
|
|
|
"constellation": mod_info.constellation,
|
|
|
|
|
"sps": sps,
|
|
|
|
|
"beta": beta,
|
|
|
|
|
"source": "signal.block_generator",
|
|
|
|
|
}
|
M
2025-07-10 15:11:40 -04:00
|
|
|
|
M
2025-11-25 13:40:42 -05:00
|
|
|
return Recording(data=output_recording, metadata=metadata)
|
M
2025-07-10 15:11:40 -04:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_lfm_recording(
|
|
|
|
|
sample_rate: int,
|
|
|
|
|
width: Optional[int],
|
|
|
|
|
chirp_period: Optional[float],
|
|
|
|
|
chirp_type: str,
|
|
|
|
|
length: int,
|
|
|
|
|
) -> Recording:
|
|
|
|
|
"""Produces a Recording of Linear Frequency Modulation (LFM) Jamming."""
|
|
|
|
|
lfm_jamming_source = block_generator.LFMJammingSource(
|
|
|
|
|
sample_rate=sample_rate,
|
|
|
|
|
bandwidth=width,
|
|
|
|
|
chirp_period=chirp_period,
|
|
|
|
|
chirp_type=chirp_type,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
return lfm_jamming_source.record(num_samples=length)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_noise_recording(
|
|
|
|
|
rms_power: float,
|
|
|
|
|
length: int,
|
M
2025-11-25 14:26:31 -05:00
|
|
|
seed: int | None = None
|
M
2025-07-10 15:11:40 -04:00
|
|
|
) -> Recording:
|
|
|
|
|
"""Generate a Recording of Additive White Gaussian Noise (AWGN)."""
|
|
|
|
|
# 1. Create a repeating pseudo-random envelope
|
M
2025-11-25 14:26:31 -05:00
|
|
|
if seed is not None:
|
|
|
|
|
np.random.seed(seed)
|
|
|
|
|
|
|
|
|
|
# 2. Sigma for complex AWGN:
|
|
|
|
|
sigma = np.sqrt(rms_power / 2)
|
|
|
|
|
|
|
|
|
|
# 3. Generate complex Gaussian noise with correct power
|
|
|
|
|
real = np.random.normal(0, sigma, length)
|
|
|
|
|
imag = np.random.normal(0, sigma, length)
|
M
2025-07-10 15:11:40 -04:00
|
|
|
complex_noise = real + 1j * imag
|
|
|
|
|
|
M
2025-11-25 14:26:31 -05:00
|
|
|
metadata = {"interference": "wb", "signal_type": "noise", "rms_power": rms_power}
|
|
|
|
|
return Recording(data=complex_noise, metadata=metadata)
|
M
2025-07-10 15:11:40 -04:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_ctnb_recording(length: int) -> Recording:
|
|
|
|
|
ones_source = block_generator.ConstantSource()
|
|
|
|
|
return ones_source.record(num_samples=length)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_birdie_recording(
|
M
2025-11-25 13:40:42 -05:00
|
|
|
sample_rate: int, length: int, wave_number: int, sps: int = 1
|
M
2025-07-10 15:11:40 -04:00
|
|
|
) -> Recording:
|
|
|
|
|
recording_data = np.zeros(int(length))
|
|
|
|
|
for _ in range(wave_number):
|
M
2025-11-25 13:40:42 -05:00
|
|
|
frequency = np.random.choice(
|
|
|
|
|
np.arange(-sample_rate / (2 * sps), sample_rate / (2 * sps))
|
|
|
|
|
)
|
M
2025-07-10 15:11:40 -04:00
|
|
|
recording = complex_sine(
|
|
|
|
|
sample_rate=int(sample_rate), length=int(length), frequency=int(frequency)
|
|
|
|
|
)
|
|
|
|
|
recording_data = recording_data + recording.data
|
|
|
|
|
return Recording(data=recording_data, metadata=recording.metadata)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def frequency_shift(
|
|
|
|
|
recording: Recording, freq_shift: float, sample_rate: int
|
|
|
|
|
) -> Recording:
|
|
|
|
|
"""Applies a frequency shift the input recording."""
|
|
|
|
|
source = block_generator.RecordingSource(recording=recording)
|
|
|
|
|
frequency_shift_block = block_generator.FrequencyShift(
|
|
|
|
|
shift_frequency=freq_shift, sampling_rate=sample_rate
|
|
|
|
|
)
|
|
|
|
|
frequency_shift_block.connect_input([source])
|
|
|
|
|
|
|
|
|
|
return frequency_shift_block.record(num_samples=len(recording))
|