from __future__ import annotations import numpy as np import unittest from rfg_adc_plotter.processing.fft import compute_fft_mag_row from rfg_adc_plotter.state.ring_buffer import RingBuffer class RingBufferTests(unittest.TestCase): def test_ring_buffer_initializes_on_first_push(self): ring = RingBuffer(max_sweeps=4) sweep = np.linspace(-1.0, 1.0, 64, dtype=np.float32) ring.push(sweep, np.linspace(3.3, 14.3, 64)) self.assertIsNotNone(ring.ring) self.assertIsNotNone(ring.ring_fft) self.assertIsNotNone(ring.ring_time) self.assertIsNotNone(ring.distance_axis) self.assertIsNotNone(ring.get_last_fft_linear()) self.assertIsNotNone(ring.last_fft_db) self.assertEqual(ring.ring.shape[0], 4) self.assertEqual(ring.ring_fft.shape, (4, ring.fft_bins)) def test_ring_buffer_reallocates_when_sweep_width_grows(self): ring = RingBuffer(max_sweeps=3) ring.push(np.ones((32,), dtype=np.float32), np.linspace(3.3, 14.3, 32)) first_width = ring.width ring.push(np.ones((2048,), dtype=np.float32), np.linspace(3.3, 14.3, 2048)) self.assertGreater(ring.width, first_width) self.assertIsNotNone(ring.ring) self.assertEqual(ring.ring.shape, (3, ring.width)) def test_ring_buffer_tracks_latest_fft_and_display_arrays(self): ring = RingBuffer(max_sweeps=2) ring.push(np.linspace(0.0, 1.0, 64, dtype=np.float32), np.linspace(3.3, 14.3, 64)) ring.push(np.linspace(1.0, 0.0, 64, dtype=np.float32), np.linspace(3.3, 14.3, 64)) raw = ring.get_display_raw() fft = ring.get_display_fft_linear() self.assertEqual(raw.shape[1], 2) self.assertEqual(fft.shape[1], 2) self.assertIsNotNone(ring.last_fft_db) self.assertEqual(ring.last_fft_db.shape, (ring.fft_bins,)) def test_ring_buffer_can_return_decimated_display_raw(self): ring = RingBuffer(max_sweeps=3) sweep_a = np.linspace(0.0, 1.0, 4096, dtype=np.float32) sweep_b = np.linspace(1.0, 2.0, 4096, dtype=np.float32) sweep_c = np.linspace(2.0, 3.0, 4096, dtype=np.float32) freqs = np.linspace(3.3, 14.3, 4096, dtype=np.float64) ring.push(sweep_a, freqs) ring.push(sweep_b, freqs) ring.push(sweep_c, freqs) raw = ring.get_display_raw_decimated(256) self.assertEqual(raw.shape, (256, 3)) self.assertAlmostEqual(float(raw[0, -1]), float(sweep_c[0]), places=6) self.assertAlmostEqual(float(raw[-1, -1]), float(sweep_c[-1]), places=6) def test_ring_buffer_can_switch_fft_mode_and_rebuild_fft_rows(self): ring = RingBuffer(max_sweeps=2) sweep = np.linspace(0.0, 1.0, 64, dtype=np.float32) freqs = np.linspace(3.3, 14.3, 64, dtype=np.float64) ring.push(sweep, freqs) fft_before = ring.last_fft_db.copy() axis_before = ring.distance_axis.copy() changed = ring.set_symmetric_fft_enabled(False) self.assertTrue(changed) self.assertFalse(ring.fft_symmetric) self.assertEqual(ring.get_display_raw().shape[1], 2) self.assertIsNotNone(ring.get_last_fft_linear()) self.assertEqual(ring.last_fft_db.shape, fft_before.shape) self.assertFalse(np.allclose(ring.last_fft_db, fft_before)) self.assertFalse(np.allclose(ring.distance_axis, axis_before)) def test_ring_buffer_can_switch_to_positive_only_fft_mode(self): ring = RingBuffer(max_sweeps=2) sweep = np.linspace(0.0, 1.0, 64, dtype=np.float32) freqs = np.linspace(3.3, 14.3, 64, dtype=np.float64) ring.push(sweep, freqs) changed = ring.set_fft_mode("positive_only") self.assertTrue(changed) self.assertEqual(ring.fft_mode, "positive_only") self.assertIsNotNone(ring.last_fft_db) self.assertEqual(ring.last_fft_db.shape, (ring.fft_bins,)) self.assertIsNotNone(ring.distance_axis) def test_ring_buffer_rebuilds_fft_from_complex_input(self): ring = RingBuffer(max_sweeps=2) freqs = np.linspace(3.3, 14.3, 64, dtype=np.float64) complex_input = np.exp(1j * np.linspace(0.0, 2.0 * np.pi, 64)).astype(np.complex64) display_sweep = np.abs(complex_input).astype(np.float32) ring.push(display_sweep, freqs, fft_input=complex_input) ring.set_fft_mode("direct") expected = compute_fft_mag_row(complex_input, freqs, ring.fft_bins, mode="direct") self.assertTrue(np.allclose(ring.get_last_fft_linear(), expected)) self.assertFalse(np.iscomplexobj(ring.get_display_fft_linear())) self.assertTrue(np.allclose(ring.get_display_raw()[: display_sweep.size, -1], display_sweep)) def test_ring_buffer_reset_clears_cached_history(self): ring = RingBuffer(max_sweeps=2) ring.push(np.linspace(0.0, 1.0, 64, dtype=np.float32), np.linspace(4.0, 10.0, 64)) ring.reset() self.assertIsNone(ring.ring) self.assertIsNone(ring.ring_fft) self.assertIsNone(ring.distance_axis) self.assertIsNone(ring.last_fft_db) self.assertEqual(ring.width, 0) self.assertEqual(ring.head, 0) if __name__ == "__main__": unittest.main()