removed data/ from the git ignore

This commit is contained in:
liyuxiao2 2025-05-21 15:52:16 -04:00
parent b49f351a4c
commit 2b2766524c
38 changed files with 4133 additions and 2 deletions

2
.gitignore vendored
View File

@ -2,5 +2,3 @@
# Byte-compiled / optimized / DLL files
__pycache__/
data/

View File

@ -0,0 +1,57 @@
import sys, os
sys.path.insert(0, os.path.abspath("../..")) # or ".." if needed
import numpy as np
import torch
from torch.utils.data import Dataset
import h5py
from helpers.app_settings import get_app_settings
settings = get_app_settings()
dataset = settings.dataset.modulation_types
class ModulationH5Dataset(Dataset):
def __init__(self, hdf5_path, label_name, data_key="training_data", label_encoder=None, transform=None):
self.hdf5_path = hdf5_path
self.data_key = data_key
self.label_name = label_name
self.label_encoder = label_encoder
self.transform = transform
with h5py.File(hdf5_path, 'r') as f:
self.length = f[data_key].shape[0]
self.metadata = f["metadata"]["metadata"][:]
settings = get_app_settings()
dataset_cfg = settings.dataset
all_labels = dataset_cfg.modulation_types
if self.label_encoder is None:
from sklearn.preprocessing import LabelEncoder
self.label_encoder = LabelEncoder()
self.label_encoder.fit(all_labels)
# Get per-sample labels from metadata
raw_labels = [row["modulation"].decode("utf-8") for row in self.metadata]
self.encoded_labels = self.label_encoder.transform(raw_labels)
def __len__(self):
return self.length
def __getitem__(self, idx):
with h5py.File(self.hdf5_path, 'r') as f:
x = f[self.data_key][idx] # shape (1, 128) or similar
# Normalize
mean = np.mean(x, axis=-1, keepdims=True)
std = np.std(x, axis=-1, keepdims=True)
x = (x - mean) / (std + 1e-6)
x = torch.tensor(x, dtype=torch.float32)
label = torch.tensor(self.encoded_labels[idx], dtype=torch.long)
return x, label

BIN
data/dataset/train.h5 Normal file

Binary file not shown.

BIN
data/dataset/val.h5 Normal file

Binary file not shown.

58
data/models/cm_plotter.py Normal file
View File

@ -0,0 +1,58 @@
import numpy as np
from typing import Optional
from matplotlib import pyplot as plt
from sklearn.metrics import confusion_matrix
def plot_confusion_matrix(
y_true: np.array,
y_pred: np.array,
classes: list,
normalize: bool = True,
title: Optional[str] = None,
text: bool = True,
rotate_x_text: int = 90,
figsize: tuple = (16,9),
cmap: plt.cm = plt.cm.Blues,
):
"""Function to help plot confusion matrices
https://scikit-learn.org/stable/auto_examples/model_selection/plot_confusion_matrix.html
"""
if not title:
if normalize:
title = "Normalized confusion matrix"
else:
title = "Confusion matrix, without normalization"
# Compute confusion matrix
cm = confusion_matrix(y_true, y_pred)
if normalize:
cm = cm.astype("float") / cm.sum(axis=1)[:, np.newaxis]
fig, ax = plt.subplots()
im = ax.imshow(cm, interpolation="none", cmap=cmap)
ax.figure.colorbar(im, ax=ax)
ax.set(
xticks=np.arange(cm.shape[1]),
yticks=np.arange(cm.shape[0]),
xticklabels=classes,
yticklabels=classes,
title=title,
ylabel="True label",
xlabel="Predicted label",
)
ax.set_xticklabels(classes, rotation=rotate_x_text)
ax.figure.set_size_inches(figsize)
# Loop over data dimensions and create text annotations.
fmt = ".2f" if normalize else "d"
thresh = cm.max() / 2.0
for i in range(cm.shape[0]):
for j in range(cm.shape[1]):
if text:
ax.text(j, i, format(cm[i,j], fmt), ha="center", va="center", color="white" if cm[i,j] > thresh else "black")
if len(classes) == 2:
plt.axis([-0.5, 1.5, 1.5, -0.5])
fig.tight_layout()
return ax

File diff suppressed because it is too large Load Diff

206
data/models/mobilenetv3.py Normal file
View File

@ -0,0 +1,206 @@
import numpy as np
import torch
import timm
from torch import nn
sizes = [
'mobilenetv3_large_075',
'mobilenetv3_large_100',
'mobilenetv3_rw',
'mobilenetv3_small_050',
'mobilenetv3_small_075',
'mobilenetv3_small_100',
'tf_mobilenetv3_large_075',
'tf_mobilenetv3_large_100',
'tf_mobilenetv3_large_minimal_100',
'tf_mobilenetv3_small_075',
'tf_mobilenetv3_small_100',
'tf_mobilenetv3_small_minimal_100'
]
class SqueezeExcite(nn.Module):
def __init__(
self,
in_chs,
se_ratio=0.25,
reduced_base_chs=None,
act_layer=nn.SiLU,
gate_fn=torch.sigmoid,
divisor=1,
**_,
):
super(SqueezeExcite, self).__init__()
reduced_chs = reduced_base_chs
self.conv_reduce = nn.Conv1d(in_chs, reduced_chs, 1, bias=True)
self.act1 = act_layer(inplace=True)
self.conv_expand = nn.Conv1d(reduced_chs, in_chs, 1, bias=True)
self.gate_fn = gate_fn
def forward(self, x):
x_se = x.mean((2,), keepdim=True)
x_se = self.conv_reduce(x_se)
x_se = self.act1(x_se)
x_se = self.conv_expand(x_se)
return x * self.gate_fn(x_se)
class FastGlobalAvgPool1d(nn.Module):
def __init__(self, flatten=False):
super(FastGlobalAvgPool1d, self).__init__()
self.flatten = flatten
def forward(self, x):
if self.flatten:
in_size = x.size()
return x.view((in_size[0], in_size[1], -1)).mean(dim=2)
else:
return x.view(x.size(0), x.size(1), -1).mean(-1).view(x.size(0), x.size(1), 1)
class GBN(torch.nn.Module):
"""
Ghost Batch Normalization
https://arxiv.org/abs/1705.08741
"""
def __init__(self, input_dim, drop, act, virtual_batch_size=32, momentum=0.1):
super(GBN, self).__init__()
self.input_dim = input_dim
self.virtual_batch_size = virtual_batch_size
self.bn = nn.BatchNorm1d(self.input_dim, momentum=momentum)
self.drop = drop
self.act = act
def forward(self, x):
# chunks = x.chunk(int(np.ceil(x.shape[0] / self.virtual_batch_size)), 0)
# res = [self.bn(x_) for x_ in chunks]
# return self.drop(self.act(torch.cat(res, dim=0)))
# x = self.bn(x)
# x = self.act(x)
# x = self.drop(x)
# return x
return self.drop(self.act(self.bn(x)))
def replace_bn(parent):
for n, m in parent.named_children():
if type(m) is timm.layers.norm_act.BatchNormAct2d:
# if type(m) is nn.BatchNorm2d:
# print(type(m))
setattr(
parent,
n,
GBN(m.num_features, m.drop, m.act),
)
else:
replace_bn(m)
def replace_se(parent):
for n, m in parent.named_children():
if type(m) is timm.models._efficientnet_blocks.SqueezeExcite:
setattr(
parent,
n,
SqueezeExcite(
m.conv_reduce.in_channels,
reduced_base_chs=m.conv_reduce.out_channels,
),
)
else:
replace_se(m)
def replace_conv(parent, ds_rate):
for n, m in parent.named_children():
if type(m) is nn.Conv2d:
if ds_rate == 2:
setattr(
parent,
n,
nn.Conv1d(
m.in_channels,
m.out_channels,
kernel_size=m.kernel_size[0],
stride=m.stride[0],
padding=m.padding[0],
bias=m.kernel_size[0],
groups=m.groups,
),
)
else:
setattr(
parent,
n,
nn.Conv1d(
m.in_channels,
m.out_channels,
kernel_size=m.kernel_size[0] if m.kernel_size[0] == 1 else 5,
stride=m.stride[0] if m.stride[0] == 1 else ds_rate,
padding=m.padding[0] if m.padding[0] == 0 else 2,
bias=m.kernel_size[0],
groups=m.groups,
),
)
else:
replace_conv(m, ds_rate)
def create_mobilenetv3(network, ds_rate=2, in_chans=2):
replace_se(network)
replace_bn(network)
replace_conv(network, ds_rate)
network.global_pool = FastGlobalAvgPool1d()
network.conv_stem = nn.Conv1d(
in_channels=in_chans,
out_channels=network.conv_stem.out_channels,
kernel_size=network.conv_stem.kernel_size,
stride=network.conv_stem.stride,
padding=network.conv_stem.padding,
bias=network.conv_stem.kernel_size,
groups=network.conv_stem.groups,
)
return network
def mobilenetv3(
model_size = 'mobilenetv3_small_050',
num_classes: int = 10,
drop_rate: float = 0,
drop_path_rate: float = 0,
in_chans=2,
):
mdl = create_mobilenetv3(
timm.create_model(
model_size,
num_classes=num_classes,
in_chans=in_chans,
drop_path_rate=drop_path_rate,
drop_rate=drop_rate,
exportable=True,
),
in_chans=in_chans,
)
return mdl
import torch.nn as nn
class Simple1DCNN(nn.Module):
def __init__(self, in_chans=2, num_classes=4):
super().__init__()
self.net = nn.Sequential(
nn.Conv1d(in_chans, 32, kernel_size=3, padding=1),
nn.ReLU(),
nn.MaxPool1d(2),
nn.Conv1d(32, 64, kernel_size=3, padding=1),
nn.ReLU(),
nn.AdaptiveAvgPool1d(1),
nn.Flatten(),
nn.Linear(64, num_classes)
)
def forward(self, x):
return self.net(x) # x shape: [B, 2, 128]
def simple_cnn(in_chans=2, num_classes=4):
return Simple1DCNN(in_chans, num_classes)

BIN
data/recordings/rec_0Hz_2025-05-15_09-45-10_0264b4a.npy (Stored with Git LFS) Normal file

Binary file not shown.

BIN
data/recordings/rec_0Hz_2025-05-15_09-45-10_0b3b80f.npy (Stored with Git LFS) Normal file

Binary file not shown.

BIN
data/recordings/rec_0Hz_2025-05-15_09-45-10_1effc4c.npy (Stored with Git LFS) Normal file

Binary file not shown.

BIN
data/recordings/rec_0Hz_2025-05-15_09-45-10_37a73db.npy (Stored with Git LFS) Normal file

Binary file not shown.

BIN
data/recordings/rec_0Hz_2025-05-15_09-45-10_3d557a9.npy (Stored with Git LFS) Normal file

Binary file not shown.

BIN
data/recordings/rec_0Hz_2025-05-15_09-45-10_442fcb9.npy (Stored with Git LFS) Normal file

Binary file not shown.

BIN
data/recordings/rec_0Hz_2025-05-15_09-45-10_491c457.npy (Stored with Git LFS) Normal file

Binary file not shown.

BIN
data/recordings/rec_0Hz_2025-05-15_09-45-10_4fff84f.npy (Stored with Git LFS) Normal file

Binary file not shown.

BIN
data/recordings/rec_0Hz_2025-05-15_09-45-10_6676600.npy (Stored with Git LFS) Normal file

Binary file not shown.

BIN
data/recordings/rec_0Hz_2025-05-15_09-45-10_6d35ff9.npy (Stored with Git LFS) Normal file

Binary file not shown.

BIN
data/recordings/rec_0Hz_2025-05-15_09-45-10_6d85f3e.npy (Stored with Git LFS) Normal file

Binary file not shown.

BIN
data/recordings/rec_0Hz_2025-05-15_09-45-10_85a8c83.npy (Stored with Git LFS) Normal file

Binary file not shown.

BIN
data/recordings/rec_0Hz_2025-05-15_09-45-10_940988e.npy (Stored with Git LFS) Normal file

Binary file not shown.

BIN
data/recordings/rec_0Hz_2025-05-15_09-45-10_9f88dc2.npy (Stored with Git LFS) Normal file

Binary file not shown.

BIN
data/recordings/rec_0Hz_2025-05-15_09-45-10_a4a6ba6.npy (Stored with Git LFS) Normal file

Binary file not shown.

BIN
data/recordings/rec_0Hz_2025-05-15_09-45-10_a60964b.npy (Stored with Git LFS) Normal file

Binary file not shown.

BIN
data/recordings/rec_0Hz_2025-05-15_09-45-10_ad350fe.npy (Stored with Git LFS) Normal file

Binary file not shown.

BIN
data/recordings/rec_0Hz_2025-05-15_09-45-10_ae5224a.npy (Stored with Git LFS) Normal file

Binary file not shown.

BIN
data/recordings/rec_0Hz_2025-05-15_09-45-10_b68f080.npy (Stored with Git LFS) Normal file

Binary file not shown.

BIN
data/recordings/rec_0Hz_2025-05-15_09-45-10_c00477b.npy (Stored with Git LFS) Normal file

Binary file not shown.

BIN
data/recordings/rec_0Hz_2025-05-15_09-45-10_cca57ca.npy (Stored with Git LFS) Normal file

Binary file not shown.

BIN
data/recordings/rec_0Hz_2025-05-15_09-45-10_db8a5b4.npy (Stored with Git LFS) Normal file

Binary file not shown.

BIN
data/recordings/rec_0Hz_2025-05-15_09-45-10_dd021f7.npy (Stored with Git LFS) Normal file

Binary file not shown.

BIN
data/recordings/rec_0Hz_2025-05-15_09-45-10_e0cc41d.npy (Stored with Git LFS) Normal file

Binary file not shown.

BIN
data/recordings/rec_0Hz_2025-05-15_09-45-10_e61d9bf.npy (Stored with Git LFS) Normal file

Binary file not shown.

BIN
data/recordings/rec_0Hz_2025-05-15_09-45-10_f024082.npy (Stored with Git LFS) Normal file

Binary file not shown.

BIN
data/recordings/rec_0Hz_2025-05-15_09-45-10_f2013fa.npy (Stored with Git LFS) Normal file

Binary file not shown.

BIN
data/recordings/rec_0Hz_2025-05-15_09-45-10_f2ae593.npy (Stored with Git LFS) Normal file

Binary file not shown.

69
data/scripts/data_gen.py Normal file
View File

@ -0,0 +1,69 @@
from utils.data import Recording
import numpy as np
from utils.signal import block_generator
mods = {
"bpsk": {"num_bits_per_symbol": 1, "constellation_type": "psk"},
"qpsk": {"num_bits_per_symbol": 2, "constellation_type": "psk"},
"qam16": {"num_bits_per_symbol": 4, "constellation_type": "qam"},
"qam64": {"num_bits_per_symbol": 6, "constellation_type": "qam"},
}
def generate_modulated_signals():
for modulation in ["bpsk", "qpsk", "qam16", "qam64"]:
for snr in np.arange(-6, 13, 3):
recording_length = 1024
beta = 0.3 # the rolloff factor, can be changed to add variety
sps = 4 # samples per symbol, or the relative bandwidth of the digital signal. Can also be changed.
# blocks don't directly take the string 'qpsk' so we use the dict 'mods' to get parameters
constellation_type = mods[modulation]["constellation_type"]
num_bits_per_symbol = mods[modulation]["num_bits_per_symbol"]
# construct the digital modulation blocks with these parameters
# we have bit source -> mapper -> upsampling -> pulse shaping
bit_source = block_generator.RandomBinarySource()
mapper = block_generator.Mapper(
constellation_type=constellation_type,
num_bits_per_symbol=num_bits_per_symbol,
)
upsampler = block_generator.Upsampling(factor=sps)
pulse_shaping_filter = block_generator.RaisedCosineFilter(
upsampling_factor=sps, beta=beta
)
pulse_shaping_filter.connect_input([upsampler])
upsampler.connect_input([mapper])
mapper.connect_input([bit_source])
modulation_recording = pulse_shaping_filter.record(
num_samples=recording_length
)
# add noise by calculating the power of the modulation recording and generating AWGN from the snr parameter
signal_power = np.mean(np.abs(modulation_recording.data[0] ** 2))
awgn_source = block_generator.AWGNSource(
variance=(signal_power / 2) * (10 ** (((-1 * snr) / 20)))
)
noise = awgn_source.record(num_samples=recording_length)
samples_with_noise = modulation_recording.data + noise.data
output_recording = Recording(data=samples_with_noise)
# add metadata for ML later
output_recording.add_to_metadata(key="modulation", value=modulation)
output_recording.add_to_metadata(key="snr", value=int(snr))
output_recording.add_to_metadata(key="beta", value=beta)
output_recording.add_to_metadata(key="sps", value=sps)
# view if you want
# output_recording.view()
# save to file
output_recording.to_npy() # optionally add path and filename parameters
if __name__ == "__main__":
generate_modulated_signals()

View File

@ -0,0 +1,152 @@
import os, h5py, numpy as np
from utils.io import from_npy
from split_dataset import split, split_recording
from helpers.app_settings import get_app_settings
meta_dtype = np.dtype(
[
("rec_id", "S256"),
("snippet_idx", np.int32),
("modulation", "S32"),
("snr", np.int32),
("beta", np.float32),
("sps", np.int32),
]
)
info_dtype = np.dtype(
[
("num_records", np.int32),
("dataset_name", "S64"), # up to 64byte UTF-8 strings
("creator", "S64"),
]
)
def write_hdf5_file(records, output_path, dataset_name="data"):
"""
Writes a list of records to an HDF5 file.
Parameters:
records (list): List of records to be written to the file
output_path (str): Path to the output HDF5 file
dataset_name (str): Name of the dataset in the HDF5 file (default: "data")
Returns:
str: Path to the created HDF5 file
"""
meta_arr = np.empty(len(records), dtype=meta_dtype)
for i, (_, md) in enumerate(records):
meta_arr[i] = (
md["rec_id"].encode("utf-8"),
md["snippet_idx"],
md["modulation"].encode("utf-8"),
int(md["snr"]),
float(md["beta"]),
int(md["sps"]),
)
first_rec, _ = records[0] # records[0] is a tuple of (data, md)
sample = first_rec
shape, dtype = sample.shape, sample.dtype
with h5py.File(output_path, "w") as hf:
dset = hf.create_dataset(
dataset_name, shape=(len(records),) + shape, dtype=dtype, compression="gzip"
)
for idx, (snip, md) in enumerate(records):
dset[idx, ...] = snip
mg = hf.create_group("metadata")
mg.create_dataset("metadata", data=meta_arr, compression="gzip")
print(dset.shape, f"snippets created in {dataset_name}")
info_arr = np.array(
[
(
len(records),
dataset_name.encode("utf-8"),
b"generate_dataset.py", # already bytes
)
],
dtype=info_dtype,
)
mg.create_dataset("dataset_info", data=info_arr)
return output_path
def complex_to_channel(data):
"""
Convert complex-valued IQ data of shape (1, N) to 2-channel real array of shape (2, N).
"""
assert np.iscomplexobj(data) #check if the data is in the form a+bi
real = np.real(data[0]) # (N,)
imag = np.imag(data[0]) # (N,)
stacked = np.stack([real, imag], axis=0) # shape (2, N)
return stacked.astype(np.float32)
def generate_datasets(cfg):
"""
Generates a dataset from a folder of .npy files and saves it to an HDF5 file
Parameters:
path_to_recordings (str): Path to the folder containing .npy files
output_path (str): Path to the output HDF5 file
dataset_name (str): Name of the dataset in the HDF5 file (default: "data")
Returns:
dset (h5py.Dataset): The created dataset object
"""
parent = os.path.dirname(cfg.output_dir)
if not parent:
os.makedirs(cfg.output_dir, exist_ok=True)
# we assume the recordings are in .npy format
files = os.listdir(cfg.input_dir)
if not files:
raise ValueError("No files found in the specified directory.")
records = []
for fname in files:
rec = from_npy(os.path.join(cfg.input_dir, fname))
data = rec.data #here data is a numpy array with the shape (1, N)
data = complex_to_channel(data) # convert to 2-channel real array
md = rec.metadata # pull metadata from the recording
md.setdefault("recid", len(records))
records.append((data, md))
# split each recording into <num_slices> snippets each
records = split_recording(records, cfg.num_slices)
train_records, val_records = split(records, cfg.train_split, cfg.seed)
train_path = os.path.join(cfg.output_dir, "train.h5")
val_path = os.path.join(cfg.output_dir, "val.h5")
write_hdf5_file(train_records, train_path, "training_data")
write_hdf5_file(val_records, val_path, "validation_data")
return train_path, val_path
def main():
settings = get_app_settings()
dataset_cfg = settings.dataset
train_path, val_path = generate_datasets(dataset_cfg)
print(f"✅ Train: {train_path}\n✅ Val: {val_path}")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,79 @@
import random
from collections import defaultdict
def split(dataset, train_frac=0.8, seed=42, label_key = "modulation"):
"""
Splits a dataset into smaller datasets based on the specified lengths.
Parameters:
dataset (list): The dataset to be split.
lengths (list): A list of lengths for each split.
Returns:
list: A list of split datasets.
"""
rec_buckets = defaultdict(list)
for data, md in dataset:
rec_buckets[md["recid"]].append((data, md))
rec_labels = {} #store labels for each recording
for rec_id, group in rec_buckets.items():
label = group[0][1][label_key]
if isinstance(label, bytes): #if the label is a byte string
label = label.decode("utf-8")
rec_labels[rec_id] = label
label_rec_ids = defaultdict(list) #group rec_ids by label
for rec_id, label in rec_labels.items():
label_rec_ids[label].append(rec_id)
random.seed(seed)
train_recs, val_recs = set(), set()
for label, rec_ids in label_rec_ids.items():
random.shuffle(rec_ids)
split_idx = int(len(rec_ids) * train_frac)
train_recs.update(rec_ids[:split_idx]) #pulls train_frac or rec_ids per label, guarantees all modulations are represented
val_recs.update(rec_ids[split_idx:])
# add the assigned recordings to the train and val datasets
train_dataset, val_dataset = [], []
for rec_id, group in rec_buckets.items():
if rec_id in train_recs:
train_dataset.extend(group)
elif rec_id in val_recs:
val_dataset.extend(group)
return train_dataset, val_dataset
def split_recording(recording_list, num_snippets):
"""
Splits a list of recordings into smaller chunks.
Parameters:
recording_list (list): List of recordings to be split
Returns: yeah yeah
list: List of split recordings
"""
snippet_list = []
for data, md in recording_list:
C, N = data.shape
L = N // num_snippets
for i in range(num_snippets):
start = i * L
end = (i + 1) * L
snippet = data[:, start:end]
# copy the metadata, adding a snippet index
snippet_md = md.copy()
snippet_md["snippet_idx"] = i
snippet_list.append((snippet, snippet_md))
return snippet_list