177 lines
7.5 KiB
Python
177 lines
7.5 KiB
Python
from __future__ import annotations
|
|
|
|
import numpy as np
|
|
import unittest
|
|
import warnings
|
|
from unittest.mock import patch
|
|
|
|
from rfg_adc_plotter.processing.fft import compute_fft_mag_row
|
|
from rfg_adc_plotter.state.ring_buffer import RingBuffer
|
|
|
|
|
|
class RingBufferTests(unittest.TestCase):
|
|
def test_ring_buffer_initializes_on_first_push(self):
|
|
ring = RingBuffer(max_sweeps=4)
|
|
sweep = np.linspace(-1.0, 1.0, 64, dtype=np.float32)
|
|
ring.push(sweep, np.linspace(3.3, 14.3, 64))
|
|
self.assertIsNotNone(ring.ring)
|
|
self.assertIsNotNone(ring.ring_fft)
|
|
self.assertIsNotNone(ring.ring_time)
|
|
self.assertIsNotNone(ring.distance_axis)
|
|
self.assertIsNotNone(ring.get_last_fft_linear())
|
|
self.assertIsNotNone(ring.last_fft_db)
|
|
self.assertEqual(ring.ring.shape[0], 4)
|
|
self.assertEqual(ring.ring_fft.shape, (4, ring.fft_bins))
|
|
|
|
def test_ring_buffer_reallocates_when_sweep_width_grows(self):
|
|
ring = RingBuffer(max_sweeps=3)
|
|
ring.push(np.ones((32,), dtype=np.float32), np.linspace(3.3, 14.3, 32))
|
|
first_width = ring.width
|
|
ring.push(np.ones((2048,), dtype=np.float32), np.linspace(3.3, 14.3, 2048))
|
|
self.assertGreater(ring.width, first_width)
|
|
self.assertIsNotNone(ring.ring)
|
|
self.assertEqual(ring.ring.shape, (3, ring.width))
|
|
|
|
def test_ring_buffer_tracks_latest_fft_and_display_arrays(self):
|
|
ring = RingBuffer(max_sweeps=2)
|
|
ring.push(np.linspace(0.0, 1.0, 64, dtype=np.float32), np.linspace(3.3, 14.3, 64))
|
|
ring.push(np.linspace(1.0, 0.0, 64, dtype=np.float32), np.linspace(3.3, 14.3, 64))
|
|
raw = ring.get_display_raw()
|
|
fft = ring.get_display_fft_linear()
|
|
self.assertEqual(raw.shape[1], 2)
|
|
self.assertEqual(fft.shape[1], 2)
|
|
self.assertIsNotNone(ring.last_fft_db)
|
|
self.assertEqual(ring.last_fft_db.shape, (ring.fft_bins,))
|
|
|
|
def test_ring_buffer_can_return_decimated_display_raw(self):
|
|
ring = RingBuffer(max_sweeps=3)
|
|
sweep_a = np.linspace(0.0, 1.0, 4096, dtype=np.float32)
|
|
sweep_b = np.linspace(1.0, 2.0, 4096, dtype=np.float32)
|
|
sweep_c = np.linspace(2.0, 3.0, 4096, dtype=np.float32)
|
|
freqs = np.linspace(3.3, 14.3, 4096, dtype=np.float64)
|
|
ring.push(sweep_a, freqs)
|
|
ring.push(sweep_b, freqs)
|
|
ring.push(sweep_c, freqs)
|
|
|
|
raw = ring.get_display_raw_decimated(256)
|
|
|
|
self.assertEqual(raw.shape, (256, 3))
|
|
self.assertAlmostEqual(float(raw[0, -1]), float(sweep_c[0]), places=6)
|
|
self.assertAlmostEqual(float(raw[-1, -1]), float(sweep_c[-1]), places=6)
|
|
|
|
def test_ring_buffer_can_switch_fft_mode_and_rebuild_fft_rows(self):
|
|
ring = RingBuffer(max_sweeps=2)
|
|
sweep = np.linspace(0.0, 1.0, 64, dtype=np.float32)
|
|
freqs = np.linspace(3.3, 14.3, 64, dtype=np.float64)
|
|
ring.push(sweep, freqs)
|
|
fft_before = ring.last_fft_db.copy()
|
|
axis_before = ring.distance_axis.copy()
|
|
|
|
changed = ring.set_symmetric_fft_enabled(False)
|
|
|
|
self.assertTrue(changed)
|
|
self.assertFalse(ring.fft_symmetric)
|
|
self.assertEqual(ring.get_display_raw().shape[1], 2)
|
|
self.assertIsNotNone(ring.get_last_fft_linear())
|
|
self.assertEqual(ring.last_fft_db.shape, fft_before.shape)
|
|
self.assertFalse(np.allclose(ring.last_fft_db, fft_before))
|
|
self.assertFalse(np.allclose(ring.distance_axis, axis_before))
|
|
|
|
def test_ring_buffer_can_switch_to_positive_only_fft_mode(self):
|
|
ring = RingBuffer(max_sweeps=2)
|
|
sweep = np.linspace(0.0, 1.0, 64, dtype=np.float32)
|
|
freqs = np.linspace(3.3, 14.3, 64, dtype=np.float64)
|
|
ring.push(sweep, freqs)
|
|
|
|
changed = ring.set_fft_mode("positive_only")
|
|
|
|
self.assertTrue(changed)
|
|
self.assertEqual(ring.fft_mode, "positive_only")
|
|
self.assertIsNotNone(ring.last_fft_db)
|
|
self.assertEqual(ring.last_fft_db.shape, (ring.fft_bins,))
|
|
self.assertIsNotNone(ring.distance_axis)
|
|
|
|
def test_ring_buffer_can_switch_to_positive_only_exact_fft_mode(self):
|
|
ring = RingBuffer(max_sweeps=2)
|
|
sweep = np.linspace(0.0, 1.0, 64, dtype=np.float32)
|
|
freqs = np.linspace(3.3, 14.3, 64, dtype=np.float64)
|
|
ring.push(sweep, freqs)
|
|
|
|
changed = ring.set_fft_mode("positive_only_exact")
|
|
|
|
self.assertTrue(changed)
|
|
self.assertEqual(ring.fft_mode, "positive_only_exact")
|
|
self.assertIsNotNone(ring.last_fft_db)
|
|
self.assertEqual(ring.last_fft_db.shape, (ring.fft_bins,))
|
|
self.assertIsNotNone(ring.distance_axis)
|
|
|
|
def test_ring_buffer_rebuilds_fft_from_complex_input(self):
|
|
ring = RingBuffer(max_sweeps=2)
|
|
freqs = np.linspace(3.3, 14.3, 64, dtype=np.float64)
|
|
complex_input = np.exp(1j * np.linspace(0.0, 2.0 * np.pi, 64)).astype(np.complex64)
|
|
display_sweep = np.abs(complex_input).astype(np.float32)
|
|
ring.push(display_sweep, freqs, fft_input=complex_input)
|
|
|
|
ring.set_fft_mode("direct")
|
|
|
|
expected = compute_fft_mag_row(complex_input, freqs, ring.fft_bins, mode="direct")
|
|
self.assertTrue(np.allclose(ring.get_last_fft_linear(), expected))
|
|
self.assertFalse(np.iscomplexobj(ring.get_display_fft_linear()))
|
|
self.assertTrue(np.allclose(ring.get_display_raw()[: display_sweep.size, -1], display_sweep))
|
|
|
|
def test_ring_buffer_reset_clears_cached_history(self):
|
|
ring = RingBuffer(max_sweeps=2)
|
|
ring.push(np.linspace(0.0, 1.0, 64, dtype=np.float32), np.linspace(4.0, 10.0, 64))
|
|
|
|
ring.reset()
|
|
|
|
self.assertIsNone(ring.ring)
|
|
self.assertIsNone(ring.ring_fft)
|
|
self.assertIsNone(ring.distance_axis)
|
|
self.assertIsNone(ring.last_fft_db)
|
|
self.assertEqual(ring.width, 0)
|
|
self.assertEqual(ring.head, 0)
|
|
|
|
def test_ring_buffer_push_ignores_all_nan_fft_without_runtime_warning(self):
|
|
ring = RingBuffer(max_sweeps=2)
|
|
freqs = np.linspace(3.3, 14.3, 64, dtype=np.float64)
|
|
ring.push(np.linspace(0.0, 1.0, 64, dtype=np.float32), freqs)
|
|
fft_before = ring.last_fft_db.copy()
|
|
y_min_before = ring.y_min_fft
|
|
y_max_before = ring.y_max_fft
|
|
|
|
with warnings.catch_warnings():
|
|
warnings.simplefilter("error", RuntimeWarning)
|
|
with patch(
|
|
"rfg_adc_plotter.state.ring_buffer.compute_fft_mag_row",
|
|
return_value=np.full((ring.fft_bins,), np.nan, dtype=np.float32),
|
|
):
|
|
ring.push(np.linspace(1.0, 2.0, 64, dtype=np.float32), freqs)
|
|
|
|
self.assertFalse(ring.last_push_fft_valid)
|
|
self.assertTrue(np.allclose(ring.last_fft_db, fft_before))
|
|
self.assertEqual(ring.y_min_fft, y_min_before)
|
|
self.assertEqual(ring.y_max_fft, y_max_before)
|
|
|
|
def test_ring_buffer_set_fft_mode_ignores_all_nan_rebuild_without_runtime_warning(self):
|
|
ring = RingBuffer(max_sweeps=2)
|
|
freqs = np.linspace(3.3, 14.3, 64, dtype=np.float64)
|
|
ring.push(np.linspace(0.0, 1.0, 64, dtype=np.float32), freqs)
|
|
fft_before = ring.last_fft_db.copy()
|
|
|
|
with warnings.catch_warnings():
|
|
warnings.simplefilter("error", RuntimeWarning)
|
|
with patch(
|
|
"rfg_adc_plotter.state.ring_buffer.compute_fft_mag_row",
|
|
return_value=np.full((ring.fft_bins,), np.nan, dtype=np.float32),
|
|
):
|
|
ring.set_fft_mode("direct")
|
|
|
|
self.assertFalse(ring.last_push_fft_valid)
|
|
self.assertTrue(np.allclose(ring.last_fft_db, fft_before))
|
|
self.assertEqual(ring.fft_mode, "direct")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|