import gc import os import textwrap from typing import Optional import matplotlib.pyplot as plt import numpy as np from matplotlib import gridspec from matplotlib.patches import Patch from PIL import Image from scipy.fft import fft, fftshift from scipy.signal import spectrogram from scipy.signal.windows import hann from ria_toolkit_oss.datatypes.recording import Recording from ria_toolkit_oss.view.tools import ( COLORS, decimate, extract_metadata_fields, set_path, ) def get_fft_size(plot_length): if plot_length < 2000: return int(64) elif plot_length < 10000: return int(256) elif plot_length < 1000000: return int(1024) else: return int(2048) def set_spines(ax, spines): if not spines: ax.spines["top"].set_visible(False) ax.spines["right"].set_visible(False) ax.spines["bottom"].set_visible(False) ax.spines["left"].set_visible(False) def view_annotations( recording: Recording, channel: Optional[int] = 0, output_path: Optional[str] = "images/annotations.png", title: Optional[str] = "Annotated Spectrogram", dpi: Optional[int] = 300, title_fontsize: Optional[int] = 15, dark: Optional[bool] = True, ) -> None: # 1. Setup Plotting Environment plt.close("all") if dark: plt.style.use("dark_background") else: plt.style.use("default") fig, ax = plt.subplots(figsize=(12, 8)) complex_signal = recording.data[channel] sample_rate, center_frequency, _ = extract_metadata_fields(recording.metadata) annotations = recording.annotations # 2. Setup Color Mapping palette = ["#2196F3", "#9C27B0", "#64B5F6", "#7B1FA2", "#5C6BC0", "#CE93D8", "#1565C0", "#7C4DFF"] unique_labels = sorted(list(set(ann.label for ann in annotations if ann.label))) label_to_color = {label: palette[i % len(palette)] for i, label in enumerate(unique_labels)} # 3. Generate Spectrogram Pxx, freqs, times, im = ax.specgram( complex_signal, NFFT=256, Fs=sample_rate, Fc=center_frequency, noverlap=128, cmap="twilight" ) # 4. Draw Annotations (highest threshold % first so lower % renders on top) def _threshold_sort_key(ann): try: return int(ann.label.rstrip("%")) except (ValueError, AttributeError): return 0 for annotation in sorted(annotations, key=_threshold_sort_key, reverse=True): t_start = annotation.sample_start / sample_rate t_width = annotation.sample_count / sample_rate f_start = annotation.freq_lower_edge f_height = annotation.freq_upper_edge - annotation.freq_lower_edge ann_color = label_to_color.get(annotation.label, "gray") rect = plt.Rectangle( (t_start, f_start), t_width, f_height, linewidth=1.5, edgecolor=ann_color, facecolor="none", alpha=0.8 ) ax.add_patch(rect) if unique_labels: legend_elements = [ Patch(facecolor=label_to_color[label], alpha=0.3, edgecolor=label_to_color[label], label=label) for label in unique_labels ] ax.legend(handles=legend_elements, loc="upper right", framealpha=0.2) ax.set_title(title, fontsize=title_fontsize, pad=20) ax.set_xlabel("Time (s)", fontsize=12) ax.set_ylabel("Frequency (MHz)", fontsize=12) ax.grid(alpha=0.1) output_path, _ = set_path(output_path=output_path) plt.savefig(output_path, dpi=dpi, bbox_inches="tight") plt.close(fig) print(f"Professional annotation plot saved to {output_path}") def view_channels( recording: Recording, output_path: Optional[str] = "images/signal.png", title: Optional[str] = "Multichannel Signal Plot", ) -> None: """Create a PNG of the recording samples, spectrogram, and constellation plot. Plot is automatically saved to file at output_path. :param recording: The recording object to plot :type recording: Recording :param output_path: The path to save the image. Defaults to "images/signal.png". :type output_path: str, optional :param title: The plot title. Defaults to "Multichannel Signal Plot". :type title: str, optional :return: None **Examples:** .. todo:: Usage examples coming soon. """ num_channels = recording.data.shape[0] fig, axes = plt.subplots(nrows=num_channels, ncols=2) fig.subplots_adjust(wspace=0.5, hspace=0.5) plt.style.use("dark_background") fig.suptitle(title, fontsize=16) axes[0, 0].set_title("IQ Signal", color=COLORS["light"]) axes[0, 1].set_title("Spectrogram", color=COLORS["light"]) linewidth = 0.5 tick_fontsize = 4 center_frequency = recording.metadata.get("center_frequency", 0) sample_rate = recording.metadata.get("sample_rate", 1) sample_indexes = np.arange(0, len(recording.data[0]), 1) t = sample_indexes / sample_rate for i in range(num_channels): axes[i, 0].plot(t, np.real(recording.data[i]), linewidth=linewidth) axes[i, 0].plot(t, np.imag(recording.data[i]), linewidth=linewidth) axes[i, 1].specgram(recording.data[i], Fs=sample_rate, Fc=center_frequency) axes[i, 0].tick_params(labelsize=tick_fontsize, colors=COLORS["light"]) axes[i, 1].tick_params(labelsize=tick_fontsize, colors=COLORS["light"]) axes[i, 0].set_ylabel("Amplitude", fontsize=6, color=COLORS["light"]) axes[i, 1].set_ylabel("Freq (Hz)", fontsize=6, color=COLORS["light"]) if i != num_channels - 1: axes[i, 0].set_xticks([]) axes[i, 1].set_xticks([]) else: axes[i, 0].set_xlabel("Time (s)", fontsize=6, color=COLORS["light"]) axes[i, 1].set_xlabel("Time (s)", fontsize=6, color=COLORS["light"]) output_path, _ = set_path(output_path=output_path) plt.savefig(output_path, dpi=1000) print(f"Saved signal plot to {output_path}") def view_sig( recording: Recording, output_path: Optional[str] = "images/signal.png", title: Optional[str] = "Signal Plot", dpi: Optional[int] = 250, plot_length: Optional[int] = None, plot_spectrogram: Optional[bool] = True, iq: Optional[bool] = True, frequency: Optional[bool] = True, constellation: Optional[bool] = True, metadata: Optional[bool] = True, logo: Optional[bool] = True, dark: Optional[bool] = True, spines: Optional[bool] = False, title_fontsize: Optional[int] = 35, subtitle_fontsize: Optional[int] = 15, ) -> None: """ Create a plot of various signal visualizations as a png or svg image. :param recording: The recording object to plot. :type recording: Recording :param output_path: The output image path. Defaults to "images/signal.png" :type output_path: str, optional :param title: The display title. Defaults to "Signal Plot" :type title: str, optional :param dpi: The dots per inch resolution. Defaults to 250 :type dpi: int, optional :param plot_length: The number of samples to plot, default is the whole recording. Defaults to None :type plot_length: int, optional :param plot_spectrogram: Display the spectrogram. Defaults to True :type plot_spectrogram: bool, optional :param iq: Display the iq sample plot. Defaults to True :type iq: bool, optional :param frequency: Display the fft of the recording. Defaults to True :type frequency: bool, optional :param constellation: Display the constellation plot. Defaults to True :type constellation: bool, optional :param metadata: Display the metadata text. Defaults to True :type metadata: bool, optional :param logo: Display the Qoherent logo. Defaults to True :type logo: bool, optional :param dark: Use dark mode. Defaults to True :type dark: bool, optional :param spines: Display spines (bounding lines) around plots. Defaults to False :type spines: bool, optional :param title_fontsize: The font size of the main title text. Defaults to 40 :type title_fontsize: int, optional :param subtitle_fontsize: The fontsize of the subplot titles. Defaults to 20 :type subtitle_fontsize: int, optional **Examples:** .. todo:: Usage examples coming soon. """ complex_signal = recording.data[0] sample_rate, center_frequency, _ = extract_metadata_fields(recording.metadata) subplot_height = 2 * (plot_spectrogram + iq + frequency) + 3 * (constellation or metadata or logo) subplot_width = max((constellation + metadata or 1), logo * 3) if dark: plt.style.use("dark_background") logo_path = os.path.dirname(__file__) + "/graphics/Qoherent-logo-white-transparent.png" else: plt.style.use("default") logo_path = os.path.dirname(__file__) + "/graphics/Qoherent-logo-black-transparent.png" if plot_length is None: plot_length = len(recording.data[0]) # Plot preparation fig = plt.figure(figsize=(16, 14)) fig.suptitle(title, fontsize=title_fontsize) gs = gridspec.GridSpec(subplot_height, subplot_width) plot_y_indx = 0 plot_x_indx = 0 if plot_spectrogram: spec_ax = plt.subplot(gs[plot_y_indx : plot_y_indx + 2, :]) plot_y_indx = plot_y_indx + 2 fft_size = get_fft_size(plot_length=plot_length) _, t_spec, Sxx = spectrogram( complex_signal[:plot_length], fs=sample_rate, nperseg=fft_size, noverlap=fft_size // 8, mode="magnitude", return_onesided=False, ) # shift frequencies so zero is centered f_bins = np.fft.fftfreq(fft_size, d=1.0 / sample_rate) f_bins = np.fft.fftshift(f_bins) f_bins = f_bins + center_frequency Sxx = np.fft.fftshift(Sxx, axes=0) spec_ax.imshow( 10 * np.log10(Sxx + 1e-12), aspect="auto", origin="lower", extent=[t_spec[0], t_spec[-1], f_bins[0], f_bins[-1]], cmap="twilight", ) set_spines(spec_ax, spines) spec_ax.set_title("Spectrogram", loc="center", fontsize=subtitle_fontsize) if iq: iq_ax = plt.subplot(gs[plot_y_indx : plot_y_indx + 2, :]) plot_y_indx = plot_y_indx + 2 plot_iq = decimate(complex_signal[:plot_length]) t = np.arange(len(plot_iq)) / sample_rate * (len(complex_signal[:plot_length]) / len(plot_iq)) iq_ax.plot(t, plot_iq.real, color=COLORS["purple"], linewidth=0.6, alpha=0.8, label="I") iq_ax.plot(t, plot_iq.imag, color=COLORS["magenta"], linewidth=0.6, alpha=0.8, label="Q") iq_ax.grid(False) iq_ax.set_ylabel("Amplitude") iq_ax.set_xlim([min(t), max(t)]) iq_ax.set_xlabel("Time (s)") iq_ax.set_title("IQ Sample Plot", fontsize=subtitle_fontsize) set_spines(iq_ax, spines) if frequency: freq_ax = plt.subplot(gs[plot_y_indx : plot_y_indx + 2, :]) plot_y_indx = plot_y_indx + 2 # Apply window to reduce spectral leakage window = hann(len(complex_signal[:plot_length])) spectrum = np.abs(fftshift(fft(complex_signal[:plot_length] * window))) # Convert to dB spectrum_db = 20 * np.log10(spectrum + 1e-12) # 20*log for magnitude freqs = np.linspace(-sample_rate / 2, sample_rate / 2, len(complex_signal[:plot_length])) + center_frequency freq_ax.plot(freqs, spectrum_db, color=COLORS["accent"], linewidth=0.8) freq_ax.set_ylabel("Magnitude (dB)") freq_ax.set_title("Frequency Spectrum (Windowed FFT)", fontsize=subtitle_fontsize) set_spines(freq_ax, spines) if constellation: const_ax = plt.subplot(gs[plot_y_indx:, plot_x_indx]) plot_x_indx = plot_x_indx + 1 plot_const = decimate(complex_signal[:plot_length], 50_000) const_ax.scatter(plot_const.real, plot_const.imag, c=COLORS["purple"], s=1, linewidths=0.1) dimension = max(abs(complex_signal)) * 1.1 const_ax.set_xlim([-1 * dimension, dimension]) const_ax.set_ylim([-1 * dimension, dimension]) const_ax.set_xlabel("In-phase (I)") const_ax.set_ylabel("Quadrature (Q)") const_ax.set_title("Constellation", fontsize=subtitle_fontsize) const_ax.set_aspect("equal") if not spines: const_ax.spines["top"].set_visible(False) const_ax.spines["right"].set_visible(False) const_ax.spines["bottom"].set_visible(False) const_ax.spines["left"].set_visible(False) # metadata text box if metadata: meta_ax = plt.subplot(gs[plot_y_indx:, plot_x_indx]) plot_x_indx = plot_x_indx + 1 metadata_text = "\n".join( [ f"{key}: {textwrap.shorten(str(value), width=80, placeholder='...')}" for key, value in recording.metadata.items() ] ) meta_ax.text( 0.05, 0.95, metadata_text, fontsize=10, va="top", ha="left", bbox=dict(facecolor="none", alpha=0.5, edgecolor="none"), ) meta_ax.set_title("Metadata", fontsize=subtitle_fontsize) # Remove the tick labels meta_ax.xaxis.set_ticklabels([]) # Remove x-axis tick labels meta_ax.yaxis.set_ticklabels([]) # Remove y-axis tick labels meta_ax.set_xticks([]) meta_ax.set_yticks([]) set_spines(meta_ax, spines) if logo and os.path.isfile(logo_path): # logo_ax = plt.subplot(gs[plot_y_indx:, 2]) logo_pos = [0.75, 0.05, 0.2, 0.08] logo_ax = fig.add_axes(logo_pos, anchor="SE", zorder=10) plot_x_indx = plot_x_indx + 1 logo_ax.axis("off") try: image = Image.open(logo_path) # Open the PNG image using PIL logo_ax.imshow(image) except FileNotFoundError: print(f"Warning, {logo_path} not found.") fig.subplots_adjust( left=0.1, # Left margin right=0.9, # Right margin top=0.9, # Top margin bottom=0.1, # Bottom margin wspace=0.4, # Horizontal space between subplots hspace=2.5, # Vertical space between subplots ) output_path, _ = set_path(output_path=output_path) plt.savefig(output_path, dpi=dpi) print(f"Saved signal plot to {output_path}") # Garbage collection and clean up to prevent memory overloading plt.close("all") gc.collect()