2025-09-02 11:35:41 -04:00
|
|
|
import numpy as np
|
|
|
|
|
import plotly.graph_objects as go
|
|
|
|
|
import scipy.signal as signal
|
|
|
|
|
from plotly.graph_objs import Figure
|
|
|
|
|
from scipy.fft import fft, fftshift
|
|
|
|
|
|
2025-09-04 12:29:54 -04:00
|
|
|
from ria_toolkit_oss.datatypes import Recording
|
2025-09-02 11:35:41 -04:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def spectrogram(rec: Recording, thumbnail: bool = False) -> Figure:
|
|
|
|
|
"""Create a spectrogram for the recording.
|
|
|
|
|
|
|
|
|
|
:param rec: Signal to plot.
|
2025-09-04 12:29:54 -04:00
|
|
|
:type rec: ria_toolkit_oss.datatypes.Recording
|
2025-09-02 11:35:41 -04:00
|
|
|
:param thumbnail: Whether to return a small thumbnail version or full plot.
|
|
|
|
|
:type thumbnail: bool
|
|
|
|
|
|
2025-09-04 12:29:54 -04:00
|
|
|
:return: Spectrogram, as a Plotly Figure.
|
2025-09-02 11:35:41 -04:00
|
|
|
"""
|
|
|
|
|
complex_signal = rec.data[0]
|
|
|
|
|
sample_rate = int(rec.metadata.get("sample_rate", 1))
|
|
|
|
|
plot_length = len(complex_signal)
|
|
|
|
|
|
|
|
|
|
# Determine FFT size
|
|
|
|
|
if plot_length < 2000:
|
|
|
|
|
fft_size = 64
|
|
|
|
|
elif plot_length < 10000:
|
|
|
|
|
fft_size = 256
|
|
|
|
|
elif plot_length < 1000000:
|
|
|
|
|
fft_size = 1024
|
|
|
|
|
else:
|
|
|
|
|
fft_size = 2048
|
|
|
|
|
|
|
|
|
|
frequencies, times, Sxx = signal.spectrogram(
|
|
|
|
|
complex_signal,
|
|
|
|
|
fs=sample_rate,
|
|
|
|
|
nfft=fft_size,
|
|
|
|
|
nperseg=fft_size,
|
|
|
|
|
noverlap=fft_size // 8,
|
|
|
|
|
scaling="density",
|
|
|
|
|
mode="complex",
|
|
|
|
|
return_onesided=False,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Convert complex values to amplitude and then to log scale for visualization
|
|
|
|
|
Sxx_magnitude = np.abs(Sxx)
|
|
|
|
|
Sxx_log = np.log10(Sxx_magnitude + 1e-6)
|
|
|
|
|
|
|
|
|
|
# Normalize spectrogram values between 0 and 1 for plotting
|
|
|
|
|
Sxx_log_shifted = Sxx_log - np.min(Sxx_log)
|
|
|
|
|
Sxx_log_norm = Sxx_log_shifted / np.max(Sxx_log_shifted)
|
|
|
|
|
|
|
|
|
|
# Shift frequency bins and spectrogram rows so frequencies run from negative to positive
|
|
|
|
|
frequencies_shifted = np.fft.fftshift(frequencies)
|
|
|
|
|
Sxx_shifted = np.fft.fftshift(Sxx_log_norm, axes=0)
|
|
|
|
|
|
2026-01-27 12:44:27 -05:00
|
|
|
# Downsample heatmap for performance (max 500x500 = 250,000 points)
|
|
|
|
|
max_freq_bins = 500
|
|
|
|
|
max_time_bins = 500
|
|
|
|
|
|
|
|
|
|
freq_step = max(1, len(frequencies_shifted) // max_freq_bins)
|
|
|
|
|
time_step = max(1, len(times) // max_time_bins)
|
|
|
|
|
|
|
|
|
|
if freq_step > 1 or time_step > 1:
|
|
|
|
|
Sxx_shifted = Sxx_shifted[::freq_step, ::time_step]
|
|
|
|
|
frequencies_shifted = frequencies_shifted[::freq_step]
|
|
|
|
|
times = times[::time_step]
|
|
|
|
|
|
2025-09-02 11:35:41 -04:00
|
|
|
fig = go.Figure(
|
|
|
|
|
data=go.Heatmap(
|
|
|
|
|
z=Sxx_shifted,
|
|
|
|
|
x=times / 1e6,
|
|
|
|
|
y=frequencies_shifted,
|
|
|
|
|
colorscale="Viridis",
|
|
|
|
|
zmin=0,
|
|
|
|
|
zmax=1,
|
|
|
|
|
reversescale=False,
|
|
|
|
|
showscale=False,
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if thumbnail:
|
|
|
|
|
fig.update_xaxes(showticklabels=False)
|
|
|
|
|
fig.update_yaxes(showticklabels=False)
|
|
|
|
|
fig.update_layout(
|
|
|
|
|
template="plotly_dark",
|
|
|
|
|
width=200,
|
|
|
|
|
height=100,
|
|
|
|
|
margin=dict(l=5, r=5, t=5, b=5),
|
|
|
|
|
xaxis=dict(scaleanchor=None),
|
|
|
|
|
yaxis=dict(scaleanchor=None),
|
|
|
|
|
)
|
|
|
|
|
else:
|
|
|
|
|
fig.update_layout(
|
|
|
|
|
title="Spectrogram",
|
|
|
|
|
xaxis_title="Time [s]",
|
|
|
|
|
yaxis_title="Frequency [Hz]",
|
|
|
|
|
template="plotly_dark",
|
|
|
|
|
height=300,
|
|
|
|
|
width=800,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
return fig
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def iq_time_series(rec: Recording) -> Figure:
|
|
|
|
|
"""Create a time series plot of the real and imaginary parts of signal.
|
|
|
|
|
|
|
|
|
|
:param rec: Signal to plot.
|
2025-09-04 12:29:54 -04:00
|
|
|
:type rec: ria_toolkit_oss.datatypes.Recording
|
2025-09-02 11:35:41 -04:00
|
|
|
|
2025-09-04 12:29:54 -04:00
|
|
|
:return: Time series plot, as a Plotly Figure.
|
2025-09-02 11:35:41 -04:00
|
|
|
"""
|
|
|
|
|
complex_signal = rec.data[0]
|
|
|
|
|
sample_rate = int(rec.metadata.get("sample_rate", 1))
|
|
|
|
|
plot_length = len(complex_signal)
|
2026-01-27 12:44:27 -05:00
|
|
|
# Downsample for performance (max 10,000 points)
|
|
|
|
|
max_points = 10000
|
|
|
|
|
step = max(1, plot_length // max_points)
|
|
|
|
|
if step > 1:
|
|
|
|
|
complex_signal = complex_signal[::step]
|
|
|
|
|
plot_length = len(complex_signal)
|
|
|
|
|
|
|
|
|
|
t = np.arange(0, plot_length, 1) * step / sample_rate
|
2025-09-02 11:35:41 -04:00
|
|
|
|
|
|
|
|
fig = go.Figure()
|
2026-01-27 12:44:27 -05:00
|
|
|
# Use Scattergl for WebGL-accelerated rendering
|
|
|
|
|
fig.add_trace(go.Scattergl(x=t, y=complex_signal.real, mode="lines", name="I (In-phase)", line=dict(width=0.6)))
|
|
|
|
|
fig.add_trace(go.Scattergl(x=t, y=complex_signal.imag, mode="lines", name="Q (Quadrature)", line=dict(width=0.6)))
|
2025-09-02 11:35:41 -04:00
|
|
|
|
|
|
|
|
fig.update_layout(
|
|
|
|
|
title="IQ Time Series",
|
|
|
|
|
xaxis_title="Time [s]",
|
|
|
|
|
yaxis_title="Amplitude",
|
|
|
|
|
template="plotly_dark",
|
|
|
|
|
height=300,
|
|
|
|
|
width=800,
|
|
|
|
|
showlegend=True,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
return fig
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def frequency_spectrum(rec: Recording) -> Figure:
|
|
|
|
|
"""Create a frequency spectrum plot from the recording.
|
|
|
|
|
|
|
|
|
|
:param rec: Input signal to plot.
|
2025-09-04 12:29:54 -04:00
|
|
|
:type rec: ria_toolkit_oss.datatypes.Recording
|
2025-09-02 11:35:41 -04:00
|
|
|
|
2025-09-04 12:29:54 -04:00
|
|
|
:return: Frequency spectrum, as a Plotly figure.
|
2025-09-02 11:35:41 -04:00
|
|
|
"""
|
|
|
|
|
complex_signal = rec.data[0]
|
|
|
|
|
center_frequency = int(rec.metadata.get("center_frequency", 0))
|
|
|
|
|
sample_rate = int(rec.metadata.get("sample_rate", 1))
|
|
|
|
|
|
|
|
|
|
epsilon = 1e-10
|
|
|
|
|
spectrum = np.abs(fftshift(fft(complex_signal)))
|
|
|
|
|
freqs = np.linspace(-sample_rate / 2, sample_rate / 2, len(complex_signal)) + center_frequency
|
|
|
|
|
log_spectrum = np.log10(spectrum + epsilon)
|
|
|
|
|
scaled_log_spectrum = (log_spectrum - log_spectrum.min()) / (log_spectrum.max() - log_spectrum.min())
|
|
|
|
|
|
2026-01-27 12:44:27 -05:00
|
|
|
# Downsample for performance (max 10,000 points)
|
|
|
|
|
max_points = 10000
|
|
|
|
|
if len(freqs) > max_points:
|
|
|
|
|
step = len(freqs) // max_points
|
|
|
|
|
freqs = freqs[::step]
|
|
|
|
|
scaled_log_spectrum = scaled_log_spectrum[::step]
|
|
|
|
|
|
2025-09-02 11:35:41 -04:00
|
|
|
fig = go.Figure()
|
2026-01-27 12:44:27 -05:00
|
|
|
fig.add_trace(go.Scattergl(x=freqs, y=scaled_log_spectrum, mode="lines", name="Spectrum", line=dict(width=0.4)))
|
2025-09-02 11:35:41 -04:00
|
|
|
|
|
|
|
|
fig.update_layout(
|
|
|
|
|
title="Frequency Spectrum",
|
|
|
|
|
xaxis_title="Frequency [Hz]",
|
|
|
|
|
yaxis_title="Magnitude",
|
|
|
|
|
yaxis_type="log",
|
|
|
|
|
template="plotly_dark",
|
|
|
|
|
height=300,
|
|
|
|
|
width=800,
|
|
|
|
|
showlegend=False,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
return fig
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def constellation(rec: Recording) -> Figure:
|
|
|
|
|
"""Create a constellation plot from the recording.
|
|
|
|
|
|
|
|
|
|
:param rec: Input signal to plot.
|
2025-09-04 12:29:54 -04:00
|
|
|
:type rec: ria_toolkit_oss.datatypes.Recording
|
2025-09-02 11:35:41 -04:00
|
|
|
|
2025-09-04 12:29:54 -04:00
|
|
|
:return: Constellation, as a Plotly Figure.
|
2025-09-02 11:35:41 -04:00
|
|
|
"""
|
|
|
|
|
complex_signal = rec.data[0]
|
|
|
|
|
|
2025-09-04 12:29:54 -04:00
|
|
|
# Downsample the IQ samples to a target number of points. This reduces the amount of data plotted,
|
|
|
|
|
# improving performance and interactivity without losing significant detail in the constellation visualization.
|
2025-09-02 11:35:41 -04:00
|
|
|
target_number_of_points = 5000
|
|
|
|
|
step = max(1, len(complex_signal) // target_number_of_points)
|
|
|
|
|
i_ds = complex_signal.real[::step]
|
|
|
|
|
q_ds = complex_signal.imag[::step]
|
|
|
|
|
|
|
|
|
|
fig = go.Figure()
|
2026-01-27 12:44:27 -05:00
|
|
|
fig.add_trace(go.Scattergl(x=i_ds, y=q_ds, mode="lines", name="Constellation", line=dict(width=0.2)))
|
2025-09-02 11:35:41 -04:00
|
|
|
|
|
|
|
|
fig.update_layout(
|
|
|
|
|
title="Constellation",
|
|
|
|
|
xaxis_title="In-phase (I)",
|
|
|
|
|
yaxis_title="Quadrature (Q)",
|
|
|
|
|
template="plotly_dark",
|
|
|
|
|
height=400,
|
|
|
|
|
width=400,
|
|
|
|
|
showlegend=False,
|
|
|
|
|
xaxis=dict(range=[-1.1, 1.1]),
|
|
|
|
|
yaxis=dict(range=[-1.1, 1.1]),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
return fig
|
2025-12-05 11:19:06 -05:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def power_spectral_density(rec: Recording) -> Figure:
|
|
|
|
|
"""Create a Power Spectral Density (PSD) plot from the recording.
|
|
|
|
|
|
|
|
|
|
:param rec: Input signal to plot.
|
|
|
|
|
:type rec: ria_toolkit_oss.datatypes.Recording
|
|
|
|
|
|
|
|
|
|
:return: PSD plot, as a Plotly Figure.
|
|
|
|
|
"""
|
|
|
|
|
complex_signal = rec.data[0]
|
|
|
|
|
center_frequency = int(rec.metadata.get("center_frequency", 0))
|
|
|
|
|
sample_rate = int(rec.metadata.get("sample_rate", 1))
|
|
|
|
|
|
|
|
|
|
# Calculate PSD using Welch's method
|
|
|
|
|
frequencies, psd = signal.welch(
|
|
|
|
|
complex_signal,
|
|
|
|
|
fs=sample_rate,
|
|
|
|
|
nperseg=min(1024, len(complex_signal)),
|
|
|
|
|
return_onesided=False,
|
|
|
|
|
scaling="density",
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Shift frequencies and PSD for proper visualization
|
|
|
|
|
frequencies_shifted = fftshift(frequencies) + center_frequency
|
|
|
|
|
psd_shifted = fftshift(psd)
|
|
|
|
|
|
|
|
|
|
# Convert to dB scale
|
|
|
|
|
psd_db = 10 * np.log10(psd_shifted + 1e-10)
|
|
|
|
|
|
|
|
|
|
fig = go.Figure()
|
|
|
|
|
fig.add_trace(
|
2026-01-27 12:44:27 -05:00
|
|
|
go.Scattergl(x=frequencies_shifted, y=psd_db, mode="lines", name="PSD", line=dict(width=0.8, color="#00D9FF"))
|
2025-12-05 11:19:06 -05:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
fig.update_layout(
|
|
|
|
|
title="Power Spectral Density",
|
|
|
|
|
xaxis_title="Frequency [Hz]",
|
|
|
|
|
yaxis_title="Power/Frequency [dB/Hz]",
|
|
|
|
|
template="plotly_dark",
|
|
|
|
|
height=300,
|
|
|
|
|
width=800,
|
|
|
|
|
showlegend=False,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
return fig
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def fft_plot(rec: Recording) -> Figure:
|
|
|
|
|
"""Create an FFT magnitude plot from the recording.
|
|
|
|
|
|
|
|
|
|
:param rec: Input signal to plot.
|
|
|
|
|
:type rec: ria_toolkit_oss.datatypes.Recording
|
|
|
|
|
|
|
|
|
|
:return: FFT plot, as a Plotly Figure.
|
|
|
|
|
"""
|
|
|
|
|
complex_signal = rec.data[0]
|
|
|
|
|
center_frequency = int(rec.metadata.get("center_frequency", 0))
|
|
|
|
|
sample_rate = int(rec.metadata.get("sample_rate", 1))
|
|
|
|
|
|
|
|
|
|
# Compute FFT
|
|
|
|
|
fft_result = fftshift(fft(complex_signal))
|
|
|
|
|
freqs = fftshift(np.fft.fftfreq(len(complex_signal), 1 / sample_rate)) + center_frequency
|
|
|
|
|
|
|
|
|
|
# Convert to magnitude in dB
|
|
|
|
|
magnitude = np.abs(fft_result)
|
|
|
|
|
magnitude_db = 20 * np.log10(magnitude + 1e-10)
|
|
|
|
|
|
2026-01-27 12:44:27 -05:00
|
|
|
max_points = 10000
|
|
|
|
|
if len(freqs) > max_points:
|
|
|
|
|
step = len(freqs) // max_points
|
|
|
|
|
freqs = freqs[::step]
|
|
|
|
|
magnitude_db = magnitude_db[::step]
|
|
|
|
|
|
2025-12-05 11:19:06 -05:00
|
|
|
fig = go.Figure()
|
2026-01-27 12:49:09 -05:00
|
|
|
fig.add_trace(
|
|
|
|
|
go.Scattergl(x=freqs, y=magnitude_db, mode="lines", name="FFT", line=dict(width=0.6, color="#FF6B9D"))
|
|
|
|
|
)
|
2025-12-05 11:19:06 -05:00
|
|
|
|
|
|
|
|
fig.update_layout(
|
|
|
|
|
title="FFT Magnitude",
|
|
|
|
|
xaxis_title="Frequency [Hz]",
|
|
|
|
|
yaxis_title="Magnitude [dB]",
|
|
|
|
|
template="plotly_dark",
|
|
|
|
|
height=300,
|
|
|
|
|
width=800,
|
|
|
|
|
showlegend=False,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
return fig
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def spectrogram_3d(rec: Recording) -> Figure:
|
|
|
|
|
"""Create a 3D spectrogram plot from the recording.
|
|
|
|
|
|
|
|
|
|
:param rec: Input signal to plot.
|
|
|
|
|
:type rec: ria_toolkit_oss.datatypes.Recording
|
|
|
|
|
|
|
|
|
|
:return: 3D Spectrogram, as a Plotly Figure.
|
|
|
|
|
"""
|
|
|
|
|
complex_signal = rec.data[0]
|
|
|
|
|
sample_rate = int(rec.metadata.get("sample_rate", 1))
|
|
|
|
|
plot_length = len(complex_signal)
|
|
|
|
|
|
|
|
|
|
# Determine FFT size
|
|
|
|
|
if plot_length < 2000:
|
|
|
|
|
fft_size = 64
|
|
|
|
|
elif plot_length < 10000:
|
|
|
|
|
fft_size = 256
|
|
|
|
|
elif plot_length < 1000000:
|
|
|
|
|
fft_size = 1024
|
|
|
|
|
else:
|
|
|
|
|
fft_size = 2048
|
|
|
|
|
|
|
|
|
|
frequencies, times, Sxx = signal.spectrogram(
|
|
|
|
|
complex_signal,
|
|
|
|
|
fs=sample_rate,
|
|
|
|
|
nfft=fft_size,
|
|
|
|
|
nperseg=fft_size,
|
|
|
|
|
noverlap=fft_size // 8,
|
|
|
|
|
scaling="density",
|
|
|
|
|
mode="complex",
|
|
|
|
|
return_onesided=False,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Convert complex values to amplitude and then to log scale
|
|
|
|
|
Sxx_magnitude = np.abs(Sxx)
|
|
|
|
|
Sxx_log = 10 * np.log10(Sxx_magnitude + 1e-10)
|
|
|
|
|
|
|
|
|
|
# Shift frequency bins for proper visualization
|
|
|
|
|
frequencies_shifted = np.fft.fftshift(frequencies)
|
|
|
|
|
Sxx_shifted = np.fft.fftshift(Sxx_log, axes=0)
|
|
|
|
|
|
2026-01-27 12:44:27 -05:00
|
|
|
# Downsample to prevent browser memory issues (max ~40,000 total points)
|
|
|
|
|
max_freq_bins = 200
|
|
|
|
|
max_time_bins = 200
|
|
|
|
|
|
|
|
|
|
freq_step = max(1, len(frequencies_shifted) // max_freq_bins)
|
|
|
|
|
time_step = max(1, len(times) // max_time_bins)
|
|
|
|
|
|
|
|
|
|
if freq_step > 1 or time_step > 1:
|
|
|
|
|
Sxx_shifted = Sxx_shifted[::freq_step, ::time_step]
|
|
|
|
|
frequencies_shifted = frequencies_shifted[::freq_step]
|
|
|
|
|
times = times[::time_step]
|
|
|
|
|
|
2025-12-05 11:19:06 -05:00
|
|
|
fig = go.Figure(
|
|
|
|
|
data=[
|
|
|
|
|
go.Surface(
|
|
|
|
|
z=Sxx_shifted,
|
|
|
|
|
x=times,
|
|
|
|
|
y=frequencies_shifted,
|
|
|
|
|
colorscale="Viridis",
|
|
|
|
|
showscale=True,
|
|
|
|
|
colorbar=dict(title="Power [dB]"),
|
|
|
|
|
)
|
|
|
|
|
]
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
fig.update_layout(
|
|
|
|
|
title="3D Spectrogram",
|
|
|
|
|
scene=dict(
|
|
|
|
|
xaxis_title="Time [s]",
|
|
|
|
|
yaxis_title="Frequency [Hz]",
|
|
|
|
|
zaxis_title="Power [dB]",
|
|
|
|
|
camera=dict(eye=dict(x=1.5, y=1.5, z=1.3)),
|
|
|
|
|
),
|
|
|
|
|
template="plotly_dark",
|
|
|
|
|
height=600,
|
|
|
|
|
width=900,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
return fig
|