diff --git a/tests/transforms/test_iq_augmentations.py b/tests/transforms/test_iq_augmentations.py new file mode 100644 index 0000000..e2fb024 --- /dev/null +++ b/tests/transforms/test_iq_augmentations.py @@ -0,0 +1,226 @@ +import numpy as np + +from ria_toolkit_oss.datatypes import Recording +from ria_toolkit_oss.transforms import iq_augmentations + +TEST_DATA1 = [[1 + 1j, 2 + 2j, 3 + 3j, 4 + 4j]] +TEST_DATA2 = [[1 + 42j, 54 - 34j, -234 - 7j]] +TEST_DATA3 = [[1 + 1j, 10 + 3j, -1 + 4j, 7 + 3j]] +TEST_DATA4 = [[1 + 2j, 3 + 4j, 5 + 5j, 4 + 4j, 2 - 6j]] +TEST_METADATA = {"apple": 1, "watermelon": 2, "mango": 3} + + +def test_rms_power_1(): + # Testing rms power calculation + signal = [1, 1, 1, 1, 1, 1] + signal_rms_power = np.sqrt(np.mean(np.abs(signal) ** 2)) + assert np.allclose(signal_rms_power, [1]) + + +def test_rms_power_2(): + # Testing rms power calculation + signal = [1, -1, 1, -1, 1, -1] + signal_rms_power = np.sqrt(np.mean(np.abs(signal) ** 2)) + assert np.allclose(signal_rms_power, [1]) + + +def test_awgn(): + # Testing awgn calculations + rms_power = 0.5 + channels, length = 10, 10000 + variance = rms_power**2 + magnitude = np.random.normal(loc=0, scale=np.sqrt(variance), size=(channels, length)) + phase = np.random.uniform(low=0, high=2 * np.pi, size=(channels, length)) + noise = magnitude * np.exp(1j * phase) + noise_power = np.sqrt(np.mean(np.abs(noise) ** 2)) + assert (rms_power - noise_power) < 0.01 + assert noise.shape == (channels, length) + + +def test_generate_awgn(): + # Testing generate_awgn() with array_like input + length = 1000 + sample_rate = 1000000 + frequency = 1000 + amplitude = 1 + baseband_phase = 0 + rf_phase = 0 + dc_offset = 0 + snr = 0 + + total_time = length / sample_rate + t = np.linspace(0, total_time, length, endpoint=False) + sine_wave = amplitude * np.sin(2 * np.pi * frequency * t + baseband_phase) + dc_offset + complex_sine_wave = sine_wave * np.exp(1j * rf_phase) + signal = complex_sine_wave.reshape(1, complex_sine_wave.shape[0]) + noisy_sine_wave = iq_augmentations.generate_awgn(signal, snr) + assert np.sqrt(np.mean(np.abs(noisy_sine_wave) ** 2)) / np.sqrt(np.mean(np.abs(signal) ** 2)) - 2 < 0.01 + + +def test_time_reversal(): + # Testing time_reversal() with array_like input + transformed_data = iq_augmentations.time_reversal(TEST_DATA1) + assert np.array_equal(transformed_data, np.asarray([[4 + 4j, 3 + 3j, 2 + 2j, 1 + 1j]])) + + +def test_time_reversal_rec(): + # Testing time_reversal() with Recording input + rec = Recording(data=TEST_DATA1, metadata=TEST_METADATA) + transformed_rec = iq_augmentations.time_reversal(rec) + assert np.array_equal(transformed_rec.data, np.asarray([[4 + 4j, 3 + 3j, 2 + 2j, 1 + 1j]])) + assert rec.metadata == transformed_rec.metadata + + +def test_spectral_inversion(): + # Testing spectral_inversion() with array_like input + transformed_data = iq_augmentations.spectral_inversion(TEST_DATA1) + assert np.array_equal(transformed_data, np.asarray([[1 - 1j, 2 - 2j, 3 - 3j, 4 - 4j]])) + + +def test_spectral_inversion_rec(): + # Testing spectral_inversion() with Recording input + rec = Recording(data=TEST_DATA1, metadata=TEST_METADATA) + transformed_rec = iq_augmentations.spectral_inversion(rec) + assert np.array_equal(transformed_rec.data, np.asarray([[1 - 1j, 2 - 2j, 3 - 3j, 4 - 4j]])) + assert rec.metadata == transformed_rec.metadata + + +def test_channel_swap(): + # Testing channel_swap() with array_like input + transformed_data = iq_augmentations.channel_swap(TEST_DATA2) + assert np.array_equal(transformed_data, np.asarray([[42 + 1j, -34 + 54j, -7 - 234j]])) + + +def test_channel_swap_rec(): + # Testing channel_swap() with Recording input + rec = Recording(data=TEST_DATA2, metadata=TEST_METADATA) + transformed_rec = iq_augmentations.channel_swap(rec) + assert np.array_equal(transformed_rec.data, np.asarray([[42 + 1j, -34 + 54j, -7 - 234j]])) + assert rec.metadata == transformed_rec.metadata + + +def test_amplitude_reversal(): + # Testing amplitude_reversal() with array_like input + transformed_data = iq_augmentations.amplitude_reversal(TEST_DATA2) + assert np.array_equal(transformed_data, np.asarray([[-1 - 42j, -54 + 34j, 234 + 7j]])) + + +def test_amplitude_reversal_rec(): + # Testing amplitude_reversal() with array_like input + rec = Recording(data=TEST_DATA2, metadata=TEST_METADATA) + transformed_rec = iq_augmentations.amplitude_reversal(rec) + assert np.array_equal(transformed_rec.data, np.asarray([[-1 - 42j, -54 + 34j, 234 + 7j]])) + assert rec.metadata == transformed_rec.metadata + + +def test_drop_samples_back_fill(): + # Testing drop_samples() when fill_type='back-fill' + np.random.seed(0) + transformed_data = iq_augmentations.drop_samples(TEST_DATA1, max_section_size=2, fill_type="back-fill") + assert np.array_equal(transformed_data, np.asarray([[1 + 1j, 1 + 1j, 3 + 3j, 3 + 3j]])) + + +def test_drop_samples_front_fill(): + # Testing drop_samples() when fill_type='front-fill' + np.random.seed(0) + transformed_data = iq_augmentations.drop_samples(TEST_DATA1, max_section_size=2, fill_type="front-fill") + assert np.array_equal(transformed_data, np.asarray([[3 + 3j, 3 + 3j, 3 + 3j, 4 + 4j]])) + + +def test_drop_samples_mean(): + # Testing drop_samples() when fill_type='mean' + np.random.seed(0) + transformed_data = iq_augmentations.drop_samples(TEST_DATA1, max_section_size=2, fill_type="mean") + assert np.array_equal(transformed_data, np.asarray([[2.5 + 2.5j, 2.5 + 2.5j, 3 + 3j, 2.5 + 2.5j]])) + + +def test_drop_samples_zeros(): + # Testing drop_samples() when fill_type='zeros' + np.random.seed(0) + transformed_data = iq_augmentations.drop_samples(TEST_DATA1, max_section_size=2, fill_type="zeros") + assert np.array_equal(transformed_data, np.asarray([[0, 0, 3 + 3j, 0]])) + + +def test_quantize_tape_floor(): + # Testing quantize_tape() with array_like input when rounding = 'floor' + transformed_data = iq_augmentations.quantize_tape(TEST_DATA3, bin_number=2, rounding_type="floor") + assert np.array_equal(transformed_data, np.asarray([[-1 - 1j, 4.5 - 1j, -1 - 1j, 4.5 - 1j]])) + + +def test_quantize_tape_rec_ceiling(): + # Testing quantize_tape() with Recording input when rounding = 'ceiling' + rec = Recording(data=TEST_DATA3, metadata=TEST_METADATA) + transformed_rec = iq_augmentations.quantize_tape(rec, bin_number=2, rounding_type="ceiling") + assert np.array_equal(transformed_rec.data, np.asarray([[4.5 + 4.5j, 10 + 4.5j, 4.5 + 4.5j, 10 + 4.5j]])) + assert rec.metadata == transformed_rec.metadata + + +def test_quantize_parts_ceiling(): + # Testing quantize_parts() with array_like input when rounding = 'ceiling' + np.random.seed(0) + transformed_data = iq_augmentations.quantize_parts( + TEST_DATA3, max_section_size=2, bin_number=2, rounding_type="ceiling" + ) + assert np.array_equal(transformed_data, np.asarray([[4.5 + 4.5j, 10 + 4.5j, -1 + 4j, 10 + 4.5j]])) + + +def test_quantize_parts_rec_floor(): + # Testing quantize_parts() with Recording input when rounding = 'floor' + np.random.seed(0) + rec = Recording(data=TEST_DATA3, metadata=TEST_METADATA) + transformed_rec = iq_augmentations.quantize_parts(rec, max_section_size=2, bin_number=2, rounding_type="floor") + assert np.array_equal(transformed_rec.data, np.asarray([[-1 - 1j, 4.5 - 1j, -1 + 4j, 4.5 - 1j]])) + assert rec.metadata == transformed_rec.metadata + + +def test_magnitude_rescale(): + # Testing magnitude_rescale with array_like input + np.random.seed(0) + transformed_data = iq_augmentations.magnitude_rescale(TEST_DATA1, starting_bounds=(2, 3), max_magnitude=2) + assert np.allclose( + transformed_data, np.asarray([[1 + 1j, 2 + 2j, 3.55706771 + 3.55706771j, 4.74275695 + 4.74275695j]]) + ) + + +def test_magnitude_rescale_rec(): + # Testing magnitude_rescale with Recording input + np.random.seed(0) + rec = Recording(data=TEST_DATA1, metadata=TEST_METADATA) + transformed_rec = iq_augmentations.magnitude_rescale(rec, starting_bounds=(2, 3), max_magnitude=2) + assert np.allclose( + transformed_rec.data, np.asarray([[1 + 1j, 2 + 2j, 3.55706771 + 3.55706771j, 4.74275695 + 4.74275695j]]) + ) + assert rec.metadata == transformed_rec.metadata + + +def test_cut_out_ones(): + # Testing cut_out() with array_like input when fill_type = 'ones' + np.random.seed(0) + transformed_data = iq_augmentations.cut_out(TEST_DATA1, max_section_size=2, fill_type="ones") + assert np.array_equal(transformed_data, np.asarray([[1 + 1j, 1 + 1j, 3 + 3j, 1 + 1j]])) + + +def test_cut_out_avg_snr_1(): + # Testing cut_out() with array_like input when fill_type = 'avg-snr' + np.random.seed(0) + transformed_data = iq_augmentations.cut_out(TEST_DATA1, max_section_size=2, fill_type="avg-snr") + assert np.allclose( + transformed_data, + np.asarray([[-1.26516288 - 0.36655702j, -2.44693984 + 1.27294267j, 3 + 3j, 4.1583403 - 0.96625365j]]), + ) + + +def test_patch_shuffle(): + # Testing patch_shuffle() with array_like input + np.random.seed(0) + transformed_data = iq_augmentations.patch_shuffle(TEST_DATA4, max_patch_size=3) + assert np.array_equal(transformed_data, np.asarray([[3 + 2j, 1 + 4j, 5 + 5j, 2 - 6j, 4 + 4j]])) + + +def test_patch_shuffle_rec(): + # Testing patch_shuffle() with Recording input + np.random.seed(0) + rec = Recording(data=TEST_DATA4, metadata=TEST_METADATA) + transformed_rec = iq_augmentations.patch_shuffle(rec, max_patch_size=3) + assert np.array_equal(transformed_rec.data, np.asarray([[3 + 2j, 1 + 4j, 5 + 5j, 2 - 6j, 4 + 4j]])) + assert rec.metadata == transformed_rec.metadata