Added signal and recording generation files, and gitignore
This commit is contained in:
commit
9af035e079
29
.gitignore
vendored
Normal file
29
.gitignore
vendored
Normal file
|
@ -0,0 +1,29 @@
|
|||
# Byte-compiled / optimized / DLL files
|
||||
__pycache__/
|
||||
*.py[cod]
|
||||
*$py.class
|
||||
|
||||
# Environments
|
||||
.env
|
||||
.venv
|
||||
env/
|
||||
venv/
|
||||
ENV/
|
||||
env.bak/
|
||||
venv.bak/
|
||||
|
||||
# PyCharm
|
||||
.idea/
|
||||
|
||||
# Visual Studio Code
|
||||
.vscode/
|
||||
|
||||
# Generated files
|
||||
*.dot
|
||||
*.hdf5
|
||||
*.npy
|
||||
*.png
|
||||
*.sigmf-data
|
||||
*.sigmf-meta
|
||||
images/
|
||||
recordings/
|
112
recording_generation.py
Normal file
112
recording_generation.py
Normal file
|
@ -0,0 +1,112 @@
|
|||
import os
|
||||
import random
|
||||
|
||||
import numpy as np
|
||||
from utils.io.recording import from_npy
|
||||
|
||||
from signal_generation import (create_birdie_recording, create_ctnb_recording,
|
||||
create_lfm_recording, create_modulated_signal,
|
||||
create_noise_recording)
|
||||
|
||||
|
||||
class RecordingGenerator:
|
||||
def __init__(self, sample_rate):
|
||||
self.sample_rate = int(sample_rate)
|
||||
|
||||
def generate_collision(
|
||||
self,
|
||||
mod_choices: list = ["qam16", "qam64", "qam256"],
|
||||
roll_offs: list = [0.15, 0.35],
|
||||
sps_choices: list = [4, 5, 6],
|
||||
length: int = 8192,
|
||||
):
|
||||
for modulation in mod_choices:
|
||||
for roll_off in roll_offs:
|
||||
roll_off_str = str(roll_off)[2:]
|
||||
sps = random.choice(sps_choices)
|
||||
length = 8192
|
||||
recording = create_modulated_signal(
|
||||
modulation=modulation, sps=sps, beta=roll_off, length=length
|
||||
)
|
||||
recording.to_npy(filename=f"{modulation}_{roll_off_str}")
|
||||
print(f"{modulation}_{roll_off_str} file saved.")
|
||||
|
||||
def generate_lfm(
|
||||
self, period_choices: list = [0.25, 0.3, 0.35], width_choices: list = [10]
|
||||
):
|
||||
for chirp_type in ["up", "down", "up_down"]:
|
||||
chirp_period = random.choice(period_choices)
|
||||
width_factor = random.choice(width_choices)
|
||||
width = self.sample_rate // width_factor
|
||||
|
||||
recording = create_lfm_recording(
|
||||
sample_rate=self.sample_rate,
|
||||
width=width,
|
||||
chirp_type=chirp_type,
|
||||
chirp_period=chirp_period,
|
||||
length=int(self.sample_rate * chirp_period),
|
||||
)
|
||||
|
||||
print(f"LFM chirp length: {int(self.sample_rate * chirp_period)}")
|
||||
recording.to_npy(filename=f"{chirp_type}_chirp")
|
||||
print(f"{chirp_type}_chirp file saved.")
|
||||
|
||||
def generate_wb(self, num: int = 2, length: int = 8192):
|
||||
for i in range(num):
|
||||
recording = create_noise_recording(
|
||||
length=length,
|
||||
rms_power=0.2,
|
||||
)
|
||||
recording.to_npy(filename=f"wb{i + 1}")
|
||||
print(f"wb{i + 1} file saved.")
|
||||
|
||||
def generate_ctnb(self, num: int = 2, length: int = 8192):
|
||||
for i in range(num):
|
||||
recording = create_ctnb_recording(length=length)
|
||||
recording.to_npy(filename=f"ctnb{i + 1}")
|
||||
print(f"ctnb{i + 1} file saved.")
|
||||
|
||||
def generate_birdie(self, num: int = 2, length: int = 8192, wave_num: int = 5):
|
||||
for i in range(num):
|
||||
recording = create_birdie_recording(
|
||||
sample_rate=int(self.sample_rate),
|
||||
length=length,
|
||||
wave_number=int(wave_num + i),
|
||||
)
|
||||
recording.to_npy(filename=f"birdie{i + 1}")
|
||||
print(f"birdie{i + 1} file saved.")
|
||||
|
||||
def convert_to_dat(
|
||||
self,
|
||||
source_directory: str = "/recordings",
|
||||
save_directory: str = "/dat_recordings",
|
||||
):
|
||||
for root, _, files in os.walk(source_directory):
|
||||
for name in files:
|
||||
filename = os.path.join(root, name)
|
||||
savename = save_directory + name[:-4] + ".dat"
|
||||
|
||||
recording = from_npy(file=filename)
|
||||
data = recording.data[0]
|
||||
|
||||
# Convert complex128 -> float64 -> int16 after scaling
|
||||
real = np.real(data)
|
||||
imag = np.imag(data)
|
||||
|
||||
# Scale down the float values to fit in int16 range [-32768, 32767]
|
||||
# Adjust the scaling factor depending on your signal's dynamic range
|
||||
scale_factor = 32767 / np.max(np.abs(np.concatenate((real, imag))))
|
||||
real_scaled = (real * scale_factor).astype(np.int16)
|
||||
imag_scaled = (imag * scale_factor).astype(np.int16)
|
||||
|
||||
# Interleave real and imag
|
||||
interleaved = np.empty((real_scaled.size * 2,), dtype=np.int16)
|
||||
interleaved[0::2] = real_scaled
|
||||
interleaved[1::2] = imag_scaled
|
||||
|
||||
interleaved.tofile(savename)
|
||||
print(f"Saved {savename}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
generator = RecordingGenerator()
|
170
signal_generation.py
Normal file
170
signal_generation.py
Normal file
|
@ -0,0 +1,170 @@
|
|||
from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
|
||||
import numpy as np
|
||||
from utils.data.recording import Recording
|
||||
from utils.signal import block_generator
|
||||
from utils.signal.basic_signal_generator import complex_sine
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ModulationInfo:
|
||||
bps: int
|
||||
constellation: str
|
||||
|
||||
|
||||
class ModulationRegistry:
|
||||
_registry = {
|
||||
"qam16": ModulationInfo(bps=4, constellation="qam"),
|
||||
"qam32": ModulationInfo(bps=5, constellation="qam"),
|
||||
"qam64": ModulationInfo(bps=6, constellation="qam"),
|
||||
"qam256": ModulationInfo(bps=8, constellation="qam"),
|
||||
"qam1024": ModulationInfo(bps=10, constellation="qam"),
|
||||
"pam4": ModulationInfo(bps=2, constellation="pam"),
|
||||
"pam8": ModulationInfo(bps=3, constellation="pam"),
|
||||
"bpsk": ModulationInfo(bps=1, constellation="psk"),
|
||||
"qpsk": ModulationInfo(bps=2, constellation="psk"),
|
||||
"psk8": ModulationInfo(bps=3, constellation="psk"),
|
||||
"psk16": ModulationInfo(bps=4, constellation="psk"),
|
||||
"psk32": ModulationInfo(bps=5, constellation="psk"),
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def get(cls, mod_type: str) -> ModulationInfo:
|
||||
return cls._registry[mod_type]
|
||||
|
||||
|
||||
def periodic_random(length, divisor=4, seed=256):
|
||||
np.random.seed(seed)
|
||||
chunk = np.random.rand(int(length / divisor))
|
||||
return np.tile(chunk, divisor)
|
||||
|
||||
|
||||
def create_modulated_signal(modulation: str, sps: int, beta, length: int) -> Recording:
|
||||
"""Produces a modulated signal Recording."""
|
||||
mod_info = ModulationRegistry.get(modulation)
|
||||
if mod_info is None:
|
||||
raise ValueError(f"Modulation {modulation} not in registry.")
|
||||
|
||||
while length % sps != 0 or length % mod_info.bps != 0:
|
||||
length = length + 1 # needs to be multiple of bits per symbol and sps
|
||||
|
||||
num_bits = int(length * sps)
|
||||
|
||||
mapper = block_generator.Mapper(
|
||||
constellation_type=mod_info.constellation,
|
||||
num_bits_per_symbol=mod_info.bps,
|
||||
)
|
||||
upsampler = block_generator.Upsampling(factor=sps)
|
||||
filter = block_generator.RaisedCosineFilter(
|
||||
span_in_symbols=10, upsampling_factor=sps, beta=beta
|
||||
)
|
||||
|
||||
bits = [(np.random.rand(num_bits) > 0.5).astype(np.float32)]
|
||||
long_bits = np.concatenate([bits, bits, bits], axis=1)
|
||||
symbols = mapper(long_bits)
|
||||
|
||||
upsampled_symbols = upsampler([symbols])
|
||||
filtered_samples = filter([upsampled_symbols])
|
||||
output_recording = filtered_samples[length : length * 2]
|
||||
|
||||
return Recording(data=output_recording)
|
||||
|
||||
|
||||
# Old create_modulated_signal code
|
||||
# def create_modulated_signal(modulation: str, sps: int, beta, length: int) -> Recording:
|
||||
# """Produces a modulated signal Recording."""
|
||||
# mod_info = ModulationRegistry.get(modulation)
|
||||
# if mod_info is None:
|
||||
# raise ValueError(f"Modulation {modulation} not in registry.")
|
||||
|
||||
# source_block = block_generator.RandomBinarySource()
|
||||
# mapper_block = block_generator.Mapper(
|
||||
# constellation_type=mod_info.constellation,
|
||||
# num_bits_per_symbol=mod_info.bps,
|
||||
# )
|
||||
# upsampler_block = block_generator.Upsampling(factor=sps)
|
||||
# filter_block = block_generator.RaisedCosineFilter(upsampling_factor=sps, beta=beta)
|
||||
|
||||
# mapper_block.connect_input([source_block])
|
||||
# upsampler_block.connect_input([mapper_block])
|
||||
# filter_block.connect_input([upsampler_block])
|
||||
|
||||
# dividing_factor = sps * mod_info.bps
|
||||
# while length % dividing_factor != 0:
|
||||
# length = length + 1
|
||||
# double_length = length * 2
|
||||
|
||||
# recording = filter_block.record(num_samples=double_length)
|
||||
# return Recording(data=recording.data[:, :length], metadata=recording.metadata)
|
||||
|
||||
|
||||
def create_lfm_recording(
|
||||
sample_rate: int,
|
||||
width: Optional[int],
|
||||
chirp_period: Optional[float],
|
||||
chirp_type: str,
|
||||
length: int,
|
||||
) -> Recording:
|
||||
"""Produces a Recording of Linear Frequency Modulation (LFM) Jamming."""
|
||||
lfm_jamming_source = block_generator.LFMJammingSource(
|
||||
sample_rate=sample_rate,
|
||||
bandwidth=width,
|
||||
chirp_period=chirp_period,
|
||||
chirp_type=chirp_type,
|
||||
)
|
||||
|
||||
return lfm_jamming_source.record(num_samples=length)
|
||||
|
||||
|
||||
def create_noise_recording(
|
||||
rms_power: float,
|
||||
length: int,
|
||||
) -> Recording:
|
||||
"""Generate a Recording of Additive White Gaussian Noise (AWGN)."""
|
||||
# 1. Create a repeating pseudo-random envelope
|
||||
np.random.seed(256)
|
||||
chunk = np.random.rand(length // 4)
|
||||
tiled = np.tile(chunk, 4)
|
||||
amplitude_envelope = np.sqrt(tiled)
|
||||
|
||||
# 2. Generate complex Gaussian noise with unit power
|
||||
real = np.random.normal(0, 1, length)
|
||||
imag = np.random.normal(0, 1, length)
|
||||
complex_noise = real + 1j * imag
|
||||
|
||||
# 3. Scale noise by desired power and envelope
|
||||
scaled_noise = complex_noise * amplitude_envelope * np.sqrt(rms_power)
|
||||
metadata = {"interference": "wb", "signal_type": "noise"}
|
||||
return Recording(data=scaled_noise, metadata=metadata)
|
||||
|
||||
|
||||
def create_ctnb_recording(length: int) -> Recording:
|
||||
ones_source = block_generator.ConstantSource()
|
||||
return ones_source.record(num_samples=length)
|
||||
|
||||
|
||||
def create_birdie_recording(
|
||||
sample_rate: int, length: int, wave_number: int
|
||||
) -> Recording:
|
||||
recording_data = np.zeros(int(length))
|
||||
for _ in range(wave_number):
|
||||
frequency = np.random.choice(np.arange(-sample_rate / 2, sample_rate / 2))
|
||||
recording = complex_sine(
|
||||
sample_rate=int(sample_rate), length=int(length), frequency=int(frequency)
|
||||
)
|
||||
recording_data = recording_data + recording.data
|
||||
return Recording(data=recording_data, metadata=recording.metadata)
|
||||
|
||||
|
||||
def frequency_shift(
|
||||
recording: Recording, freq_shift: float, sample_rate: int
|
||||
) -> Recording:
|
||||
"""Applies a frequency shift the input recording."""
|
||||
source = block_generator.RecordingSource(recording=recording)
|
||||
frequency_shift_block = block_generator.FrequencyShift(
|
||||
shift_frequency=freq_shift, sampling_rate=sample_rate
|
||||
)
|
||||
frequency_shift_block.connect_input([source])
|
||||
|
||||
return frequency_shift_block.record(num_samples=len(recording))
|
Loading…
Reference in New Issue
Block a user