added in confusion matrix for the output
All checks were successful
Modulation Recognition Demo / ria-demo (push) Successful in 2m46s

This commit is contained in:
Liyu Xiao 2025-07-09 15:39:20 -04:00
parent 9979d84e29
commit 6d531ae5f3

View File

@ -13,132 +13,162 @@ from helpers.app_settings import get_app_settings
def load_validation_data(): def load_validation_data():
val_dataset = ModulationH5Dataset( val_dataset = ModulationH5Dataset(
"data/dataset/val.h5", label_name="modulation", data_key="validation_data" "data/dataset/val.h5", label_name="modulation", data_key="validation_data"
) )
X = np.stack([x.numpy() for x, _ in val_dataset]) # shape: (N, C, L)
y = np.array([y.item() for _, y in val_dataset]) # shape: (N,)
class_names = list(val_dataset.label_encoder.classes_)
return X, y, class_names x = np.stack([x.numpy() for x, _ in val_dataset]) # shape: (N, C, L)
y = np.array([y.item() for _, y in val_dataset]) # shape: (N,)
class_names = list(val_dataset.label_encoder.classes_)
return x, y, class_names
def build_model_from_ckpt( def build_model_from_ckpt(
ckpt_path: str, in_channels: int, num_classes: int ckpt_path: str, in_channels: int, num_classes: int
) -> torch.nn.Module: ) -> torch.nn.Module:
""" """
Build and return a PyTorch model loaded from a checkpoint. Build and return a PyTorch model loaded from a checkpoint.
""" """
model = RFClassifier( model = RFClassifier(
model=mobilenetv3( model=mobilenetv3(
model_size="mobilenetv3_small_050", model_size="mobilenetv3_small_050",
num_classes=num_classes, num_classes=num_classes,
in_chans=in_channels, in_chans=in_channels,
) )
) )
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
checkpoint = torch.load(ckpt_path, weights_only=True, map_location=device) checkpoint = torch.load(ckpt_path, weights_only=True, map_location=device)
model.load_state_dict(checkpoint["state_dict"]) model.load_state_dict(checkpoint["state_dict"])
model.eval() model.eval()
return model return model
def evaluate_checkpoint(ckpt_path: str): def evaluate_checkpoint(ckpt_path: str):
""" """
Loads the model from checkpoint and evaluates it on a validation set. Loads the model from checkpoint and evaluates it on a validation set.
Prints classification metrics and plots a confusion matrix. Prints classification metrics and plots a confusion matrix.
""" """
# Load validation data
X_val, y_true, class_names = load_validation_data()
num_classes = len(class_names)
in_channels = X_val.shape[1]
# Load model
model = build_model_from_ckpt(
ckpt_path, in_channels=in_channels, num_classes=num_classes
)
# Inference
y_pred = []
with torch.no_grad():
for x in X_val:
x_tensor = torch.tensor(x[np.newaxis, ...], dtype=torch.float32)
logits = model(x_tensor)
pred = torch.argmax(logits, dim=1).item()
y_pred.append(pred)
# Print classification report
print("\nClassification Report:")
print(
classification_report(y_true, y_pred, target_names=class_names, zero_division=0)
)
plot_confusion_matrix_with_counts(
y_true=np.array(y_true),
y_pred=np.array(y_pred),
classes=class_names,
normalize=True,
title="Normalized Confusion Matrix",
)
def plot_confusion_matrix_with_counts( # Load validation data
y_true: np.ndarray, X_val, y_true, class_names = load_validation_data()
y_pred: np.ndarray, num_classes = len(class_names)
classes: list[str], in_channels = X_val.shape[1]
normalize: bool = True,
title: str = "Confusion Matrix (counts and normalized)",
# Load model
model = build_model_from_ckpt(
ckpt_path, in_channels=in_channels, num_classes=num_classes
)
# Inference
y_pred = []
with torch.no_grad():
for x in X_val:
x_tensor = torch.tensor(x[np.newaxis, ...], dtype=torch.float32)
logits = model(x_tensor)
pred = torch.argmax(logits, dim=1).item()
y_pred.append(pred)
# Print classification report
print("\nClassification Report:")
print(
classification_report(y_true, y_pred, target_names=class_names, zero_division=0)
)
print_confusion_matrix(
y_true=np.array(y_true),
y_pred=np.array(y_pred),
classes=class_names,
normalize=True,
title="Normalized Confusion Matrix",
)
def print_confusion_matrix(
y_true: np.ndarray,
y_pred: np.ndarray,
classes: list[str],
normalize: bool = True,
title: str = "Confusion Matrix (counts and normalized)",
) -> None: ) -> None:
"""
Plot a confusion matrix showing both raw counts and (optionally) normalized values.
Args:
y_true: true labels (integers 0..C-1)
y_pred: predicted labels (same shape as y_true)
classes: list of classname strings in index order
normalize: if True, each row is normalized to sum=1
title: title for the plot
"""
# 1) build raw CM
c = len(classes)
cm = np.zeros((c, c), dtype=int)
for t, p in zip(y_true, y_pred):
cm[t, p] += 1
# 2) normalize if requested
if normalize:
with np.errstate(divide="ignore", invalid="ignore"):
cm_norm = cm.astype(float) / cm.sum(axis=1)[:, None]
cm_norm = np.nan_to_num(cm_norm)
print_confusion_matrix_helper(cm_norm, classes)
else:
print_confusion_matrix_helper(cm, classes)
import numpy as np
def print_confusion_matrix_helper(matrix, classes=None, normalize=False, digits=2):
""" """
Plot a confusion matrix showing both raw counts and (optionally) normalized values. Pretty prints a confusion matrix with x/y labels.
Args: Parameters:
y_true: true labels (integers 0..C-1) - matrix: square 2D numpy array
y_pred: predicted labels (same shape as y_true) - labels: list of class labels (default: range(num_classes))
classes: list of classname strings in index order - normalize: whether to normalize rows to sum to 1
normalize: if True, each row is normalized to sum=1 - digits: number of decimal places to show for normalized values
title: title for the plot
""" """
# 1) build raw CM matrix = np.array(matrix)
C = len(classes) num_classes = matrix.shape[0]
cm = np.zeros((C, C), dtype=int) labels = classes or list(range(num_classes))
for t, p in zip(y_true, y_pred):
cm[t, p] += 1
# 2) normalize if requested # Header
if normalize: print(" " * 9 + "Ground Truth →")
with np.errstate(divide="ignore", invalid="ignore"): header = "Pred ↓ | " + " ".join([f"{str(label):>6}" for label in labels])
cm_norm = cm.astype(float) / cm.sum(axis=1)[:, None] print(header)
cm_norm = np.nan_to_num(cm_norm) print("-" * len(header))
else:
cm_norm = cm
# 3) plot # Rows
fig, ax = plt.subplots(figsize=(8, 8)) for i in range(num_classes):
im = ax.imshow(cm_norm, interpolation="nearest") row_vals = matrix[i]
ax.set_title(title) if normalize:
ax.set_xlabel("Predicted label") row_sum = row_vals.sum()
ax.set_ylabel("True label") row_vals = row_vals / row_sum if row_sum != 0 else row_vals
ax.set_xticks(np.arange(C)) row_str = " ".join([f"{val:>6.{digits}f}" for val in row_vals])
ax.set_yticks(np.arange(C)) else:
ax.set_xticklabels(classes, rotation=45, ha="right") row_str = " ".join([f"{int(val):>6}" for val in row_vals])
ax.set_yticklabels(classes) print(f"{str(labels[i]):>7} | {row_str}")
# 4) annotate
for i in range(C):
for j in range(C):
count = cm[i, j]
val = cm_norm[i, j]
txt = f"{count}\n{val:.2f}"
ax.text(j, i, txt, ha="center", va="center")
fig.colorbar(im, ax=ax, label="Normalized value" if normalize else "Count")
plt.tight_layout()
plt.show()
if __name__ == "__main__": if __name__ == "__main__":
settings = get_app_settings() settings = get_app_settings()
evaluate_checkpoint(os.path.join("checkpoint_files", "inference_recognition_model.ckpt")) evaluate_checkpoint(os.path.join("checkpoint_files", "inference_recognition_model.ckpt"))