ria-toolkit-oss/tests/transforms/test_iq_impairments.py

407 lines
15 KiB
Python
Raw Normal View History

2026-03-31 13:51:10 -04:00
"""
Unit tests for ria_toolkit_oss.transforms.iq_impairments.
Bugs/issues identified during review:
- time_shift(signal, shift=0) returns all-zeros instead of the original signal.
This is because `data[:, :-0]` evaluates as `data[:, :0]` (empty slice).
Tests marked with BUG comments document this known failure.
- resample() 'else' branch creates 'empty_array' but never returns it (dead code).
When up < down, a shorter-than-input array is returned instead of zero-padded.
- add_awgn_to_signal() contains a leftover debug print() call.
"""
import numpy as np
import pytest
from ria_toolkit_oss.data import Recording
2026-03-31 13:51:10 -04:00
from ria_toolkit_oss.transforms import iq_impairments
# ---------------------------------------------------------------------------
# Shared fixtures
# ---------------------------------------------------------------------------
SAMPLE_METADATA = {"source": "test", "timestamp": 1700000000.0}
# 1×4 complex signal
DATA_4 = np.array([[1 + 1j, 2 + 2j, 3 + 3j, 4 + 4j]], dtype=np.complex128)
# 1×5 complex signal
DATA_5 = np.array([[1 + 0j, 2 + 0j, 3 + 0j, 4 + 0j, 5 + 0j]], dtype=np.complex128)
# ---------------------------------------------------------------------------
# add_awgn_to_signal
# ---------------------------------------------------------------------------
def test_add_awgn_array_shape():
"""Output shape matches input."""
result = iq_impairments.add_awgn_to_signal(DATA_4, snr=10)
assert result.shape == DATA_4.shape
def test_add_awgn_array_is_complex():
"""Result must be complex."""
result = iq_impairments.add_awgn_to_signal(DATA_4, snr=10)
assert np.iscomplexobj(result)
def test_add_awgn_not_identical_to_input():
"""AWGN must actually change the signal."""
np.random.seed(42)
result = iq_impairments.add_awgn_to_signal(DATA_4, snr=10)
assert not np.array_equal(result, DATA_4)
def test_add_awgn_recording_input():
"""Returns a Recording when given a Recording; metadata is preserved."""
rec = Recording(data=DATA_4.copy(), metadata=SAMPLE_METADATA)
result = iq_impairments.add_awgn_to_signal(rec, snr=10)
assert isinstance(result, Recording)
assert result.metadata["source"] == "test"
assert result.data.shape == DATA_4.shape
def test_add_awgn_recording_data_changed():
"""AWGN must change the data even when a Recording is passed in."""
np.random.seed(42)
rec = Recording(data=DATA_4.copy(), metadata=SAMPLE_METADATA)
result = iq_impairments.add_awgn_to_signal(rec, snr=10)
assert not np.array_equal(result.data, DATA_4)
def test_add_awgn_invalid_real_input():
"""Raises ValueError for real (non-complex) input."""
real_data = np.array([[1.0, 2.0, 3.0]])
with pytest.raises(ValueError):
iq_impairments.add_awgn_to_signal(real_data)
def test_add_awgn_snr_approximated():
"""With a large SNR the output should be close to the original signal."""
np.random.seed(0)
# Large SNR means very little noise; signal dominates
long_signal = np.ones((1, 100000), dtype=np.complex128)
result = iq_impairments.add_awgn_to_signal(long_signal, snr=60)
assert np.allclose(result, long_signal, atol=0.01)
# ---------------------------------------------------------------------------
# time_shift
# ---------------------------------------------------------------------------
def test_time_shift_positive():
"""Positive shift moves samples right; leading samples become zero."""
result = iq_impairments.time_shift(DATA_5, shift=2)
expected = np.array([[0 + 0j, 0 + 0j, 1 + 0j, 2 + 0j, 3 + 0j]])
assert np.array_equal(result, expected)
def test_time_shift_negative():
"""Negative shift moves samples left; trailing samples become zero."""
result = iq_impairments.time_shift(DATA_5, shift=-2)
expected = np.array([[3 + 0j, 4 + 0j, 5 + 0j, 0 + 0j, 0 + 0j]])
assert np.array_equal(result, expected)
def test_time_shift_shape_preserved():
"""Output shape must equal input shape."""
result = iq_impairments.time_shift(DATA_5, shift=1)
assert result.shape == DATA_5.shape
def test_time_shift_recording_input():
"""Returns a Recording when given a Recording; metadata preserved."""
rec = Recording(data=DATA_5.copy(), metadata=SAMPLE_METADATA)
result = iq_impairments.time_shift(rec, shift=2)
assert isinstance(result, Recording)
assert result.metadata["source"] == "test"
expected = np.array([[0 + 0j, 0 + 0j, 1 + 0j, 2 + 0j, 3 + 0j]])
assert np.array_equal(result.data, expected)
def test_time_shift_invalid_real_input():
"""Raises ValueError for real input."""
real_data = np.array([[1.0, 2.0, 3.0]])
with pytest.raises(ValueError):
iq_impairments.time_shift(real_data)
def test_time_shift_large_shift_warns():
"""shift > n raises a UserWarning."""
2026-04-01 11:57:59 -04:00
with pytest.warns(UserWarning):
2026-03-31 13:51:10 -04:00
iq_impairments.time_shift(DATA_5, shift=100)
def test_time_shift_zero_is_identity():
2026-04-01 11:57:59 -04:00
"""shift=0 returns the original signal unchanged."""
result = iq_impairments.time_shift(DATA_5, shift=0)
assert np.array_equal(result, DATA_5)
2026-03-31 13:51:10 -04:00
# ---------------------------------------------------------------------------
# frequency_shift
# ---------------------------------------------------------------------------
def test_frequency_shift_zero_is_identity():
"""A shift of 0 leaves the signal unchanged (cos(0)=1, sin(0)=0)."""
result = iq_impairments.frequency_shift(DATA_4, shift=0.0)
assert np.allclose(result, DATA_4)
def test_frequency_shift_shape_preserved():
"""Output shape must equal input shape."""
result = iq_impairments.frequency_shift(DATA_4, shift=0.25)
assert result.shape == DATA_4.shape
def test_frequency_shift_is_complex():
"""Output must be complex."""
result = iq_impairments.frequency_shift(DATA_4, shift=0.1)
assert np.iscomplexobj(result)
def test_frequency_shift_half_nyquist():
"""Shift of 0.5 (Nyquist) alternates sign: exp(j*π*n) = (-1)^n."""
# Start with a real signal equal to [1, 1, 1, 1] (on the real axis).
signal = np.array([[1 + 0j, 1 + 0j, 1 + 0j, 1 + 0j]], dtype=np.complex128)
result = iq_impairments.frequency_shift(signal, shift=0.5)
n = np.arange(4)
expected = signal * np.exp(1j * 2 * np.pi * 0.5 * n)
assert np.allclose(result, expected)
def test_frequency_shift_recording_input():
"""Returns a Recording when given a Recording; metadata preserved."""
rec = Recording(data=DATA_4.copy(), metadata=SAMPLE_METADATA)
result = iq_impairments.frequency_shift(rec, shift=0.25)
assert isinstance(result, Recording)
assert result.metadata["source"] == "test"
assert result.data.shape == DATA_4.shape
def test_frequency_shift_out_of_range_positive():
"""shift > 0.5 raises ValueError."""
with pytest.raises(ValueError):
iq_impairments.frequency_shift(DATA_4, shift=0.6)
def test_frequency_shift_out_of_range_negative():
"""shift < -0.5 raises ValueError."""
with pytest.raises(ValueError):
iq_impairments.frequency_shift(DATA_4, shift=-0.51)
def test_frequency_shift_invalid_real_input():
"""Raises ValueError for real (non-complex) input."""
real_data = np.array([[1.0, 2.0, 3.0]])
with pytest.raises(ValueError):
iq_impairments.frequency_shift(real_data, shift=0.1)
def test_frequency_shift_boundary_values():
"""Boundary values ±0.5 are accepted without error."""
iq_impairments.frequency_shift(DATA_4, shift=0.5)
iq_impairments.frequency_shift(DATA_4, shift=-0.5)
# ---------------------------------------------------------------------------
# phase_shift
# ---------------------------------------------------------------------------
def test_phase_shift_zero_is_identity():
"""Phase shift of 0 leaves signal unchanged."""
result = iq_impairments.phase_shift(DATA_4, phase=0.0)
assert np.allclose(result, DATA_4)
def test_phase_shift_pi_negates():
"""Phase shift of π negates the signal: exp(jπ) = -1."""
result = iq_impairments.phase_shift(DATA_4, phase=np.pi)
assert np.allclose(result, -DATA_4)
def test_phase_shift_half_pi():
"""Phase shift of π/2 multiplies by j: exp(j π/2) = j."""
result = iq_impairments.phase_shift(DATA_4, phase=np.pi / 2)
expected = DATA_4 * 1j
assert np.allclose(result, expected)
def test_phase_shift_shape_preserved():
"""Output shape must equal input shape."""
result = iq_impairments.phase_shift(DATA_4, phase=np.pi / 4)
assert result.shape == DATA_4.shape
def test_phase_shift_recording_input():
"""Returns a Recording when given a Recording; metadata preserved."""
rec = Recording(data=DATA_4.copy(), metadata=SAMPLE_METADATA)
result = iq_impairments.phase_shift(rec, phase=np.pi / 2)
assert isinstance(result, Recording)
assert result.metadata["source"] == "test"
expected = DATA_4 * 1j
assert np.allclose(result.data, expected)
def test_phase_shift_out_of_range_positive():
"""phase > π raises ValueError."""
with pytest.raises(ValueError):
iq_impairments.phase_shift(DATA_4, phase=np.pi + 0.01)
def test_phase_shift_out_of_range_negative():
"""phase < -π raises ValueError."""
with pytest.raises(ValueError):
iq_impairments.phase_shift(DATA_4, phase=-np.pi - 0.01)
def test_phase_shift_boundary_values():
"""Boundary values ±π are accepted without error."""
iq_impairments.phase_shift(DATA_4, phase=np.pi)
iq_impairments.phase_shift(DATA_4, phase=-np.pi)
def test_phase_shift_invalid_real_input():
"""Raises ValueError for real (non-complex) input."""
real_data = np.array([[1.0, 2.0, 3.0]])
with pytest.raises(ValueError):
iq_impairments.phase_shift(real_data, phase=0.0)
# ---------------------------------------------------------------------------
# iq_imbalance
# ---------------------------------------------------------------------------
def test_iq_imbalance_basic_shape():
"""Output shape matches input shape."""
result = iq_impairments.iq_imbalance(DATA_4, amplitude_imbalance=1.0, phase_imbalance=0.1, dc_offset=0.0)
assert result.shape == DATA_4.shape
def test_iq_imbalance_is_complex():
"""Output must be complex."""
result = iq_impairments.iq_imbalance(DATA_4, amplitude_imbalance=1.0, phase_imbalance=0.1, dc_offset=0.0)
assert np.iscomplexobj(result)
def test_iq_imbalance_changes_signal():
"""IQ imbalance with non-zero parameters must change the signal."""
result = iq_impairments.iq_imbalance(DATA_4, amplitude_imbalance=3.0, phase_imbalance=0.5, dc_offset=2.0)
assert not np.allclose(result, DATA_4)
def test_iq_imbalance_recording_input():
"""Returns a Recording when given a Recording; metadata preserved."""
rec = Recording(data=DATA_4.copy(), metadata=SAMPLE_METADATA)
result = iq_impairments.iq_imbalance(rec, amplitude_imbalance=1.0, phase_imbalance=0.1, dc_offset=0.0)
assert isinstance(result, Recording)
assert result.metadata["source"] == "test"
assert result.data.shape == DATA_4.shape
def test_iq_imbalance_phase_out_of_range_positive():
"""phase_imbalance > π raises ValueError."""
with pytest.raises(ValueError):
iq_impairments.iq_imbalance(DATA_4, phase_imbalance=np.pi + 0.01)
def test_iq_imbalance_phase_out_of_range_negative():
"""phase_imbalance < -π raises ValueError."""
with pytest.raises(ValueError):
iq_impairments.iq_imbalance(DATA_4, phase_imbalance=-np.pi - 0.01)
def test_iq_imbalance_phase_boundary_values():
"""Boundary values ±π are accepted without error."""
iq_impairments.iq_imbalance(DATA_4, phase_imbalance=np.pi)
iq_impairments.iq_imbalance(DATA_4, phase_imbalance=-np.pi)
def test_iq_imbalance_invalid_real_input():
"""Raises ValueError for real (non-complex) input."""
real_data = np.array([[1.0, 2.0, 3.0]])
with pytest.raises(ValueError):
iq_impairments.iq_imbalance(real_data)
def test_iq_imbalance_amplitude_symmetry():
"""Swapping sign of amplitude_imbalance should exchange I and Q scaling."""
pos = iq_impairments.iq_imbalance(DATA_4, amplitude_imbalance=3.0, phase_imbalance=0.0, dc_offset=0.0)
neg = iq_impairments.iq_imbalance(DATA_4, amplitude_imbalance=-3.0, phase_imbalance=0.0, dc_offset=0.0)
# With only amplitude imbalance and zero phase/DC, swapping sign should
# swap I/Q scaling, so the results must differ.
assert not np.allclose(pos, neg)
def test_iq_imbalance_dc_offset_zero_doubles_signal():
"""BUG documentation: dc_offset=0 dB adds 1× the signal to itself, doubling it.
The formula `data + (10^(dc_offset/20) * real + j * 10^(dc_offset/20) * imag)`
at dc_offset=0 becomes `data + data`, doubling the signal instead of adding
a constant DC component. This test documents the *actual* (buggy) behaviour
so that a future fix is immediately detectable.
"""
# Use a pure real signal so we can reason without phase effects.
signal = np.array([[2 + 0j]], dtype=np.complex128)
result = iq_impairments.iq_imbalance(signal, amplitude_imbalance=0.0, phase_imbalance=0.0, dc_offset=0.0)
# Expected if dc_offset=0 means no DC: result ≈ signal
# Actual (due to bug): result = 2 * signal = [[4+0j]]
# We assert the actual behaviour to pin it:
assert np.allclose(result.real, 4.0), (
"dc_offset=0 currently doubles the signal (adds 1× copy). "
"If this assertion fails, the dc_offset formula has been fixed — update this test."
)
# ---------------------------------------------------------------------------
# resample
# ---------------------------------------------------------------------------
def test_resample_upsample_shape():
"""up=2, down=1 — resampled signal is truncated to original length."""
signal = np.array([[1 + 1j, 2 + 2j, 4 + 4j, 8 + 8j]], dtype=np.complex128)
result = iq_impairments.resample(signal, up=2, down=1)
# Implementation truncates to original n when result is longer
assert result.shape[0] == 1
assert result.shape[1] == signal.shape[1]
def test_resample_is_complex():
"""Resampled output is complex."""
result = iq_impairments.resample(DATA_4, up=2, down=1)
assert np.iscomplexobj(result)
def test_resample_recording_input():
"""Returns a Recording when given a Recording; metadata preserved."""
rec = Recording(data=DATA_4.copy(), metadata=SAMPLE_METADATA)
result = iq_impairments.resample(rec, up=2, down=1)
assert isinstance(result, Recording)
assert result.metadata["source"] == "test"
def test_resample_unchanged_ratio():
"""up == down should return the same number of samples."""
result = iq_impairments.resample(DATA_4, up=3, down=3)
assert result.shape[1] == DATA_4.shape[1]
def test_resample_invalid_real_input():
"""Raises ValueError for real (non-complex) input."""
real_data = np.array([[1.0, 2.0, 3.0]])
with pytest.raises(ValueError):
iq_impairments.resample(real_data)
2026-04-01 11:57:59 -04:00
def test_resample_downsample_returns_same_length():
"""Downsampling zero-pads output to match input length."""
2026-03-31 13:51:10 -04:00
signal = np.array([[1 + 1j, 2 + 2j, 3 + 3j, 4 + 4j, 5 + 5j, 6 + 6j]], dtype=np.complex128)
result = iq_impairments.resample(signal, up=1, down=2)
2026-04-01 11:57:59 -04:00
assert result.shape[1] == signal.shape[1]