ria-toolkit-oss/src/ria_toolkit_oss/view/view_signal.py

326 lines
12 KiB
Python
Raw Normal View History

import gc
import os
import textwrap
from typing import Optional
import matplotlib.pyplot as plt
import numpy as np
from matplotlib import gridspec
from PIL import Image
from scipy.fft import fft, fftshift
from scipy.signal import spectrogram
from scipy.signal.windows import hann
from ria_toolkit_oss.datatypes.recording import Recording
from ria_toolkit_oss.view.tools import (
COLORS,
decimate,
extract_metadata_fields,
set_path,
)
def get_fft_size(plot_length):
if plot_length < 2000:
return int(64)
elif plot_length < 10000:
return int(256)
elif plot_length < 1000000:
return int(1024)
else:
return int(2048)
def set_spines(ax, spines):
if not spines:
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)
ax.spines["bottom"].set_visible(False)
ax.spines["left"].set_visible(False)
G
2025-12-09 14:12:11 -05:00
def view_channels(
recording: Recording,
output_path: Optional[str] = "images/signal.png",
title: Optional[str] = "Multichannel Signal Plot",
) -> None:
"""Create a PNG of the recording samples, spectrogram, and constellation plot.
Plot is automatically saved to file at output_path.
:param recording: The recording object to plot
:type recording: Recording
:param output_path: The path to save the image. Defaults to "images/signal.png".
:type output_path: str, optional
:param title: The plot title. Defaults to "Multichannel Signal Plot".
:type title: str, optional
:return: None
**Examples:**
.. todo:: Usage examples coming soon.
"""
num_channels = recording.data.shape[0]
fig, axes = plt.subplots(nrows=num_channels, ncols=2)
fig.subplots_adjust(wspace=0.5, hspace=0.5)
plt.style.use("dark_background")
fig.suptitle(title, fontsize=16)
axes[0, 0].set_title("IQ Signal", color=COLORS["light"])
axes[0, 1].set_title("Spectrogram", color=COLORS["light"])
linewidth = 0.5
tick_fontsize = 4
center_frequency = recording.metadata.get("center_frequency", 0)
sample_rate = recording.metadata.get("sample_rate", 1)
sample_indexes = np.arange(0, len(recording.data[0]), 1)
t = sample_indexes / sample_rate
for i in range(num_channels):
axes[i, 0].plot(t, np.real(recording.data[i]), linewidth=linewidth)
axes[i, 0].plot(t, np.imag(recording.data[i]), linewidth=linewidth)
axes[i, 1].specgram(recording.data[i], Fs=sample_rate, Fc=center_frequency)
axes[i, 0].tick_params(labelsize=tick_fontsize, colors=COLORS["light"])
axes[i, 1].tick_params(labelsize=tick_fontsize, colors=COLORS["light"])
axes[i, 0].set_ylabel("Amplitude", fontsize=6, color=COLORS["light"])
axes[i, 1].set_ylabel("Freq (Hz)", fontsize=6, color=COLORS["light"])
if i != num_channels - 1:
axes[i, 0].set_xticks([])
axes[i, 1].set_xticks([])
else:
axes[i, 0].set_xlabel("Time (s)", fontsize=6, color=COLORS["light"])
axes[i, 1].set_xlabel("Time (s)", fontsize=6, color=COLORS["light"])
output_path, _ = set_path(output_path=output_path)
plt.savefig(output_path, dpi=1000)
print(f"Saved signal plot to {output_path}")
def view_sig(
recording: Recording,
output_path: Optional[str] = "images/signal.png",
title: Optional[str] = "Signal Plot",
dpi: Optional[int] = 250,
plot_length: Optional[int] = None,
plot_spectrogram: Optional[bool] = True,
iq: Optional[bool] = True,
frequency: Optional[bool] = True,
constellation: Optional[bool] = True,
metadata: Optional[bool] = True,
logo: Optional[bool] = True,
dark: Optional[bool] = True,
spines: Optional[bool] = False,
title_fontsize: Optional[int] = 40,
subtitle_fontsize: Optional[int] = 20,
) -> None:
"""
M
2025-10-24 18:17:17 -04:00
Create a plot of various signal visualizations as a png or svg image.
:param recording: The recording object to plot.
:type recording: Recording
:param output_path: The output image path. Defaults to "images/signal.png"
:type output_path: str, optional
:param title: The display title. Defaults to "Signal Plot"
:type title: str, optional
:param dpi: The dots per inch resolution. Defaults to 250
:type dpi: int, optional
:param plot_length: The number of samples to plot, default is the whole recording. Defaults to None
:type plot_length: int, optional
:param plot_spectrogram: Display the spectrogram. Defaults to True
:type plot_spectrogram: bool, optional
:param iq: Display the iq sample plot. Defaults to True
:type iq: bool, optional
:param frequency: Display the fft of the recording. Defaults to True
:type frequency: bool, optional
:param constellation: Display the constellation plot. Defaults to True
:type constellation: bool, optional
:param metadata: Display the metadata text. Defaults to True
:type metadata: bool, optional
:param logo: Display the Qoherent logo. Defaults to True
:type logo: bool, optional
:param dark: Use dark mode. Defaults to True
:type dark: bool, optional
:param spines: Display spines (bounding lines) around plots. Defaults to False
:type spines: bool, optional
:param title_fontsize: The font size of the main title text. Defaults to 40
:type title_fontsize: int, optional
:param subtitle_fontsize: The fontsize of the subplot titles. Defaults to 20
:type subtitle_fontsize: int, optional
**Examples:**
.. todo:: Usage examples coming soon.
"""
complex_signal = recording.data[0]
sample_rate, center_frequency, _ = extract_metadata_fields(recording.metadata)
subplot_height = 2 * (plot_spectrogram + iq + frequency) + 3 * (constellation or metadata or logo)
subplot_width = max((constellation + metadata or 1), logo * 3)
if dark:
plt.style.use("dark_background")
logo_path = os.path.dirname(__file__) + "/graphics/Qoherent-logo-white-transparent.png"
else:
plt.style.use("default")
logo_path = os.path.dirname(__file__) + "/graphics/Qoherent-logo-black-transparent.png"
if plot_length is None:
plot_length = len(recording.data[0])
# Plot preparation
fig = plt.figure(figsize=(14, 12))
fig.suptitle(title, fontsize=title_fontsize)
gs = gridspec.GridSpec(subplot_height, subplot_width)
plot_y_indx = 0
plot_x_indx = 0
if plot_spectrogram:
spec_ax = plt.subplot(gs[plot_y_indx : plot_y_indx + 2, :])
plot_y_indx = plot_y_indx + 2
fft_size = get_fft_size(plot_length=plot_length)
_, t_spec, Sxx = spectrogram(
complex_signal[:plot_length],
fs=sample_rate,
nperseg=fft_size,
noverlap=fft_size // 8,
mode="magnitude",
return_onesided=False,
)
# shift frequencies so zero is centered
f_bins = np.fft.fftfreq(fft_size, d=1.0 / sample_rate)
f_bins = np.fft.fftshift(f_bins)
f_bins = f_bins + center_frequency
Sxx = np.fft.fftshift(Sxx, axes=0)
spec_ax.imshow(
10 * np.log10(Sxx + 1e-12),
aspect="auto",
origin="lower",
extent=[t_spec[0], t_spec[-1], f_bins[0], f_bins[-1]],
cmap="twilight",
)
set_spines(spec_ax, spines)
spec_ax.set_title("Spectrogram", loc="center", fontsize=subtitle_fontsize)
spec_ax.set_ylabel("Frequency (Hz)")
spec_ax.set_xlabel("Time (s)")
if iq:
iq_ax = plt.subplot(gs[plot_y_indx : plot_y_indx + 2, :])
plot_y_indx = plot_y_indx + 2
plot_iq = decimate(complex_signal[:plot_length])
t = np.arange(len(plot_iq)) / sample_rate * (len(complex_signal[:plot_length]) / len(plot_iq))
iq_ax.plot(t, plot_iq.real, color=COLORS["purple"], linewidth=0.6, alpha=0.8, label="I")
iq_ax.plot(t, plot_iq.imag, color=COLORS["magenta"], linewidth=0.6, alpha=0.8, label="Q")
iq_ax.grid(False)
iq_ax.set_ylabel("Amplitude")
iq_ax.set_xlim([min(t), max(t)])
iq_ax.set_xlabel("Time (s)")
iq_ax.set_title("IQ Sample Plot", fontsize=subtitle_fontsize)
set_spines(iq_ax, spines)
if frequency:
freq_ax = plt.subplot(gs[plot_y_indx : plot_y_indx + 2, :])
plot_y_indx = plot_y_indx + 2
# Apply window to reduce spectral leakage
window = hann(len(complex_signal[:plot_length]))
spectrum = np.abs(fftshift(fft(complex_signal[:plot_length] * window)))
# Convert to dB
spectrum_db = 20 * np.log10(spectrum + 1e-12) # 20*log for magnitude
freqs = np.linspace(-sample_rate / 2, sample_rate / 2, len(complex_signal[:plot_length])) + center_frequency
freq_ax.plot(freqs, spectrum_db, color=COLORS["accent"], linewidth=0.8)
freq_ax.set_ylabel("Magnitude (dB)")
freq_ax.set_title("Frequency Spectrum (Windowed FFT)", fontsize=subtitle_fontsize)
set_spines(freq_ax, spines)
if constellation:
const_ax = plt.subplot(gs[plot_y_indx:, plot_x_indx])
plot_x_indx = plot_x_indx + 1
plot_const = decimate(complex_signal[:plot_length], 50_000)
const_ax.scatter(plot_const.real, plot_const.imag, c=COLORS["purple"], s=1, linewidths=0.1)
dimension = max(abs(complex_signal)) * 1.1
const_ax.set_xlim([-1 * dimension, dimension])
const_ax.set_ylim([-1 * dimension, dimension])
const_ax.set_xlabel("In-phase (I)")
const_ax.set_ylabel("Quadrature (Q)")
const_ax.set_title("Constellation", fontsize=subtitle_fontsize)
const_ax.set_aspect("equal")
if not spines:
const_ax.spines["top"].set_visible(False)
const_ax.spines["right"].set_visible(False)
const_ax.spines["bottom"].set_visible(False)
const_ax.spines["left"].set_visible(False)
# metadata text box
if metadata:
meta_ax = plt.subplot(gs[plot_y_indx:, plot_x_indx])
plot_x_indx = plot_x_indx + 1
metadata_text = "\n".join(
[
f"{key}: {textwrap.shorten(str(value), width=80, placeholder='...')}"
for key, value in recording.metadata.items()
]
)
meta_ax.text(
0.05,
0.95,
metadata_text,
fontsize=10,
va="top",
ha="left",
bbox=dict(facecolor="none", alpha=0.5, edgecolor="none"),
)
meta_ax.set_title("Metadata", fontsize=subtitle_fontsize)
# Remove the tick labels
meta_ax.xaxis.set_ticklabels([]) # Remove x-axis tick labels
meta_ax.yaxis.set_ticklabels([]) # Remove y-axis tick labels
meta_ax.set_xticks([])
meta_ax.set_yticks([])
set_spines(meta_ax, spines)
if logo and os.path.isfile(logo_path):
logo_ax = plt.subplot(gs[plot_y_indx:, 2])
plot_x_indx = plot_x_indx + 1
logo_ax.axis("off")
try:
image = Image.open(logo_path) # Open the PNG image using PIL
logo_ax.imshow(image)
except FileNotFoundError:
print(f"Warning, {logo_path} not found.")
fig.subplots_adjust(
left=0.1, # Left margin
right=0.9, # Right margin
top=0.9, # Top margin
bottom=0.1, # Bottom margin
wspace=0.4, # Horizontal space between subplots
hspace=2.5, # Vertical space between subplots
)
# save path handling
output_path, _ = set_path(output_path=output_path)
plt.savefig(output_path, dpi=dpi)
print(f"Saved signal plot to {output_path}")
# Garbage collection and clean up to prevent memory overloading
plt.close("all")
gc.collect()