diff --git a/rfg_adc_plotter/gui/pyqtgraph_backend.py b/rfg_adc_plotter/gui/pyqtgraph_backend.py index 165564d..45d03a5 100644 --- a/rfg_adc_plotter/gui/pyqtgraph_backend.py +++ b/rfg_adc_plotter/gui/pyqtgraph_backend.py @@ -65,6 +65,48 @@ def _visible_levels_pyqtgraph(data: np.ndarray, plot_item) -> Optional[Tuple[flo return (vmin, vmax) +def apply_working_range( + freqs: Optional[np.ndarray], + sweep: Optional[np.ndarray], + range_min_ghz: float, + range_max_ghz: float, +) -> Tuple[np.ndarray, np.ndarray]: + """Crop sweep data to the active working frequency range.""" + if freqs is None or sweep is None: + return ( + np.zeros((0,), dtype=np.float64), + np.zeros((0,), dtype=np.float32), + ) + + freq_arr = np.asarray(freqs, dtype=np.float64).reshape(-1) + sweep_arr = np.asarray(sweep, dtype=np.float32).reshape(-1) + width = min(freq_arr.size, sweep_arr.size) + if width <= 0: + return ( + np.zeros((0,), dtype=np.float64), + np.zeros((0,), dtype=np.float32), + ) + + freq_arr = freq_arr[:width] + sweep_arr = sweep_arr[:width] + valid = ( + np.isfinite(freq_arr) + & np.isfinite(sweep_arr) + & (freq_arr >= float(range_min_ghz)) + & (freq_arr <= float(range_max_ghz)) + ) + if not np.any(valid): + return ( + np.zeros((0,), dtype=np.float64), + np.zeros((0,), dtype=np.float32), + ) + + return ( + freq_arr[valid].astype(np.float64, copy=False), + sweep_arr[valid].astype(np.float32, copy=False), + ) + + def run_pyqtgraph(args) -> None: """Start the PyQtGraph GUI.""" peak_calibrate_mode = bool(getattr(args, "calibrate", False)) @@ -98,7 +140,11 @@ def run_pyqtgraph(args) -> None: fft_bins = FFT_LEN // 2 + 1 spec_clip = parse_spec_clip(getattr(args, "spec_clip", None)) spec_mean_sec = float(getattr(args, "spec_mean_sec", 0.0)) - runtime = RuntimeState(ring=RingBuffer(max_sweeps)) + runtime = RuntimeState( + ring=RingBuffer(max_sweeps), + range_min_ghz=float(SWEEP_FREQ_MIN_GHZ), + range_max_ghz=float(SWEEP_FREQ_MAX_GHZ), + ) pg.setConfigOptions( useOpenGL=not peak_calibrate_mode, @@ -205,6 +251,24 @@ def run_pyqtgraph(args) -> None: bg_compute_cb = QtWidgets.QCheckBox("расчет фона") bg_subtract_cb = QtWidgets.QCheckBox("вычет фона") fft_bg_subtract_cb = QtWidgets.QCheckBox("FFT вычет фона") + range_group = QtWidgets.QGroupBox("Рабочий диапазон") + range_group_layout = QtWidgets.QFormLayout(range_group) + range_group_layout.setContentsMargins(6, 6, 6, 6) + range_group_layout.setSpacing(6) + range_min_spin = QtWidgets.QDoubleSpinBox() + range_max_spin = QtWidgets.QDoubleSpinBox() + for spin in (range_min_spin, range_max_spin): + spin.setDecimals(6) + spin.setRange(0.0, 100.0) + spin.setSingleStep(0.1) + try: + spin.setSuffix(" GHz") + except Exception: + pass + range_min_spin.setValue(runtime.range_min_ghz) + range_max_spin.setValue(runtime.range_max_ghz) + range_group_layout.addRow("f min", range_min_spin) + range_group_layout.addRow("f max", range_max_spin) fft_mode_label = QtWidgets.QLabel("IFFT режим") fft_mode_combo = QtWidgets.QComboBox() fft_mode_combo.addItem("Обычный", "direct") @@ -240,6 +304,7 @@ def run_pyqtgraph(args) -> None: settings_layout.addWidget(QtWidgets.QLabel("Настройки")) except Exception: pass + settings_layout.addWidget(range_group) settings_layout.addWidget(calib_group) settings_layout.addWidget(bg_compute_cb) settings_layout.addWidget(bg_subtract_cb) @@ -258,6 +323,7 @@ def run_pyqtgraph(args) -> None: fft_mode = "symmetric" status_note = "" status_dirty = True + range_change_in_progress = False fixed_ylim: Optional[Tuple[float, float]] = None if args.ylim: try: @@ -272,14 +338,21 @@ def run_pyqtgraph(args) -> None: changed = runtime.ring.ensure_init(sweep_width) if not changed: return + f_min = float(runtime.range_min_ghz) + f_max = float(runtime.range_max_ghz) + if runtime.current_freqs is not None and runtime.current_freqs.size > 0: + finite_f = runtime.current_freqs[np.isfinite(runtime.current_freqs)] + if finite_f.size > 0: + f_min = float(np.min(finite_f)) + f_max = float(np.max(finite_f)) img.setImage(runtime.ring.get_display_raw(), autoLevels=False) - img.setRect(0, SWEEP_FREQ_MIN_GHZ, max_sweeps, SWEEP_FREQ_MAX_GHZ - SWEEP_FREQ_MIN_GHZ) + img.setRect(0, f_min, max_sweeps, max(1e-9, f_max - f_min)) p_img.setRange( xRange=(0, max_sweeps - 1), - yRange=(SWEEP_FREQ_MIN_GHZ, SWEEP_FREQ_MAX_GHZ), + yRange=(f_min, f_max), padding=0, ) - p_line.setXRange(SWEEP_FREQ_MIN_GHZ, SWEEP_FREQ_MAX_GHZ, padding=0) + p_line.setXRange(f_min, f_max, padding=0) img_fft.setImage(runtime.ring.get_display_fft_linear(), autoLevels=False) img_fft.setRect(0, 0.0, max_sweeps, 1.0) p_spec.setRange(xRange=(0, max_sweeps - 1), yRange=(0.0, 1.0), padding=0) @@ -295,6 +368,15 @@ def run_pyqtgraph(args) -> None: f_max = f_min + 1.0 img.setRect(0, f_min, max_sweeps, f_max - f_min) p_img.setRange(xRange=(0, max_sweeps - 1), yRange=(f_min, f_max), padding=0) + p_line.setXRange(f_min, f_max, padding=0) + else: + f_min = float(runtime.range_min_ghz) + f_max = float(runtime.range_max_ghz) + if f_max <= f_min: + f_max = f_min + 1.0 + img.setRect(0, f_min, max_sweeps, f_max - f_min) + p_img.setRange(xRange=(0, max_sweeps - 1), yRange=(f_min, f_max), padding=0) + p_line.setXRange(f_min, f_max, padding=0) distance_axis = runtime.ring.distance_axis if distance_axis is not None and distance_axis.size > 0: @@ -317,6 +399,13 @@ def run_pyqtgraph(args) -> None: size, dtype=np.float64, ) + if runtime.range_max_ghz > runtime.range_min_ghz: + return np.linspace( + float(runtime.range_min_ghz), + float(runtime.range_max_ghz), + size, + dtype=np.float64, + ) if runtime.ring.x_shared is not None and size <= runtime.ring.x_shared.size: return runtime.ring.x_shared[:size] return np.arange(size, dtype=np.float32) @@ -333,14 +422,69 @@ def run_pyqtgraph(args) -> None: path = "" return path or "calibration_envelope.npy" + def reset_ring_buffers() -> None: + runtime.ring.reset() + runtime.current_distances = None + runtime.current_fft_db = None + runtime.bg_spec_cache = None + runtime.current_peak_width = None + runtime.current_peak_amplitude = None + runtime.peak_candidates = [] + img.setImage(runtime.ring.get_display_raw(), autoLevels=False) + img_fft.setImage(runtime.ring.get_display_fft_linear(), autoLevels=False) + update_physical_axes() + + def refresh_current_window(push_to_ring: bool = False, *, reset_ring: bool = False) -> None: + if reset_ring: + reset_ring_buffers() + + if runtime.full_current_freqs is None or runtime.full_current_sweep_raw is None: + runtime.current_freqs = None + runtime.current_sweep_raw = None + runtime.current_sweep_norm = None + runtime.current_fft_db = None + runtime.current_distances = runtime.ring.distance_axis + return + + current_freqs, current_sweep = apply_working_range( + runtime.full_current_freqs, + runtime.full_current_sweep_raw, + runtime.range_min_ghz, + runtime.range_max_ghz, + ) + runtime.current_freqs = current_freqs + runtime.current_sweep_raw = current_sweep + + if runtime.current_sweep_raw.size == 0: + if push_to_ring: + reset_ring_buffers() + runtime.current_freqs = None + runtime.current_sweep_raw = None + runtime.current_sweep_norm = None + runtime.current_fft_db = None + runtime.current_distances = None + set_status_note("диапазон: нет точек в выбранном окне") + return + + recompute_current_processed_sweep(push_to_ring=push_to_ring) + def recompute_current_processed_sweep(push_to_ring: bool = False) -> None: - if runtime.current_sweep_raw is not None and calib_enabled and runtime.calib_envelope is not None: + if ( + runtime.current_sweep_raw is not None + and runtime.current_sweep_raw.size > 0 + and calib_enabled + and runtime.calib_envelope is not None + ): runtime.current_sweep_norm = normalize_by_envelope(runtime.current_sweep_raw, runtime.calib_envelope) else: runtime.current_sweep_norm = None runtime.current_fft_db = None - if not push_to_ring or runtime.current_sweep_raw is None: + if ( + not push_to_ring + or runtime.current_sweep_raw is None + or runtime.current_sweep_raw.size == 0 + ): return sweep_for_processing = runtime.current_sweep_norm if runtime.current_sweep_norm is not None else runtime.current_sweep_raw @@ -360,6 +504,36 @@ def run_pyqtgraph(args) -> None: recompute_current_processed_sweep(push_to_ring=False) runtime.mark_dirty() + def restore_range_controls() -> None: + nonlocal range_change_in_progress + range_change_in_progress = True + try: + range_min_spin.setValue(float(runtime.range_min_ghz)) + range_max_spin.setValue(float(runtime.range_max_ghz)) + finally: + range_change_in_progress = False + + def set_working_range() -> None: + nonlocal range_change_in_progress + if range_change_in_progress: + return + try: + new_min = float(range_min_spin.value()) + new_max = float(range_max_spin.value()) + except Exception: + restore_range_controls() + return + if (not np.isfinite(new_min)) or (not np.isfinite(new_max)) or new_min >= new_max: + set_status_note("диапазон: f_min должен быть меньше f_max") + restore_range_controls() + runtime.mark_dirty() + return + runtime.range_min_ghz = new_min + runtime.range_max_ghz = new_max + refresh_current_window(push_to_ring=True, reset_ring=True) + set_status_note(f"диапазон: {new_min:.6g}..{new_max:.6g} GHz") + runtime.mark_dirty() + def pick_calib_file() -> None: start_path = get_calib_file_path() try: @@ -472,10 +646,13 @@ def run_pyqtgraph(args) -> None: fft_mode_combo.setCurrentIndex(1) except Exception: pass + restore_range_controls() set_bg_compute_enabled() set_fft_mode() try: + range_min_spin.valueChanged.connect(lambda _v: set_working_range()) + range_max_spin.valueChanged.connect(lambda _v: set_working_range()) calib_cb.stateChanged.connect(lambda _v: set_calib_enabled()) calib_pick_btn.clicked.connect(lambda _checked=False: pick_calib_file()) calib_save_btn.clicked.connect(lambda _checked=False: save_current_calibration()) @@ -691,11 +868,11 @@ def run_pyqtgraph(args) -> None: "I": sweep, } ) - runtime.current_freqs = calibrated["F"] - runtime.current_sweep_raw = calibrated["I"] + runtime.full_current_freqs = np.asarray(calibrated["F"], dtype=np.float64) + runtime.full_current_sweep_raw = np.asarray(calibrated["I"], dtype=np.float32) runtime.current_aux_curves = aux_curves runtime.current_info = info - recompute_current_processed_sweep(push_to_ring=True) + refresh_current_window(push_to_ring=True) if drained > 0: update_physical_axes() return drained @@ -727,9 +904,11 @@ def run_pyqtgraph(args) -> None: except Exception: bg_fft_for_line = None - if redraw_needed and (runtime.current_sweep_raw is not None or runtime.calib_envelope is not None): + if redraw_needed: xs = resolve_curve_xs( - runtime.current_sweep_raw.size if runtime.current_sweep_raw is not None else runtime.calib_envelope.size + runtime.current_sweep_raw.size + if runtime.current_sweep_raw is not None + else (runtime.calib_envelope.size if runtime.calib_envelope is not None else 0) ) displayed_calib = None @@ -871,8 +1050,16 @@ def run_pyqtgraph(args) -> None: runtime.current_peak_amplitude = None else: curve_fft_ref.setVisible(False) + curve_fft.setData([], []) for box in fft_peak_boxes: box.setVisible(False) + fft_bg_line.setVisible(False) + fft_left_line.setVisible(False) + fft_right_line.setVisible(False) + spec_left_line.setVisible(False) + spec_right_line.setVisible(False) + runtime.current_peak_width = None + runtime.current_peak_amplitude = None runtime.peak_candidates = [] refresh_peak_params_label([]) runtime.plot_dirty = False diff --git a/rfg_adc_plotter/processing/fft.py b/rfg_adc_plotter/processing/fft.py index 7cd8c1b..d91e31c 100644 --- a/rfg_adc_plotter/processing/fft.py +++ b/rfg_adc_plotter/processing/fft.py @@ -9,6 +9,21 @@ import numpy as np from rfg_adc_plotter.constants import C_M_S, FFT_LEN, SWEEP_FREQ_MAX_GHZ, SWEEP_FREQ_MIN_GHZ +def _finite_freq_bounds(freqs: Optional[np.ndarray]) -> Optional[Tuple[float, float]]: + """Return finite frequency bounds for the current working segment.""" + if freqs is None: + return None + freq_arr = np.asarray(freqs, dtype=np.float64).reshape(-1) + finite = freq_arr[np.isfinite(freq_arr)] + if finite.size < 2: + return None + f_min = float(np.min(finite)) + f_max = float(np.max(finite)) + if not np.isfinite(f_min) or not np.isfinite(f_max) or f_max <= f_min: + return None + return f_min, f_max + + def prepare_fft_segment( sweep: np.ndarray, freqs: Optional[np.ndarray], @@ -57,9 +72,16 @@ def build_symmetric_ifft_spectrum( if fft_len <= 0: return None - freq_axis = np.linspace(-SWEEP_FREQ_MAX_GHZ, SWEEP_FREQ_MAX_GHZ, int(fft_len), dtype=np.float64) - neg_idx_all = np.flatnonzero(freq_axis <= (-SWEEP_FREQ_MIN_GHZ)) - pos_idx_all = np.flatnonzero(freq_axis >= SWEEP_FREQ_MIN_GHZ) + bounds = _finite_freq_bounds(freqs) + if bounds is None: + f_min = float(SWEEP_FREQ_MIN_GHZ) + f_max = float(SWEEP_FREQ_MAX_GHZ) + else: + f_min, f_max = bounds + + freq_axis = np.linspace(-f_max, f_max, int(fft_len), dtype=np.float64) + neg_idx_all = np.flatnonzero(freq_axis <= (-f_min)) + pos_idx_all = np.flatnonzero(freq_axis >= f_min) band_len = int(min(neg_idx_all.size, pos_idx_all.size)) if band_len <= 1: return None @@ -96,8 +118,15 @@ def build_positive_only_centered_ifft_spectrum( if fft_len <= 0: return None - freq_axis = np.linspace(-SWEEP_FREQ_MAX_GHZ, SWEEP_FREQ_MAX_GHZ, int(fft_len), dtype=np.float64) - pos_idx = np.flatnonzero(freq_axis >= SWEEP_FREQ_MIN_GHZ) + bounds = _finite_freq_bounds(freqs) + if bounds is None: + f_min = float(SWEEP_FREQ_MIN_GHZ) + f_max = float(SWEEP_FREQ_MAX_GHZ) + else: + f_min, f_max = bounds + + freq_axis = np.linspace(-f_max, f_max, int(fft_len), dtype=np.float64) + pos_idx = np.flatnonzero(freq_axis >= f_min) band_len = int(pos_idx.size) if band_len <= 1: return None @@ -216,7 +245,12 @@ def compute_distance_axis( return np.zeros((0,), dtype=np.float64) fft_mode = _normalize_fft_mode(mode, symmetric) if fft_mode in {"symmetric", "positive_only"}: - df_ghz = (2.0 * float(SWEEP_FREQ_MAX_GHZ)) / max(1, FFT_LEN - 1) + bounds = _finite_freq_bounds(freqs) + if bounds is None: + f_max = float(SWEEP_FREQ_MAX_GHZ) + else: + _, f_max = bounds + df_ghz = (2.0 * f_max) / max(1, FFT_LEN - 1) else: if freqs is None: return np.arange(bins, dtype=np.float64) diff --git a/rfg_adc_plotter/state/ring_buffer.py b/rfg_adc_plotter/state/ring_buffer.py index 24f1d6f..59ee18d 100644 --- a/rfg_adc_plotter/state/ring_buffer.py +++ b/rfg_adc_plotter/state/ring_buffer.py @@ -38,6 +38,20 @@ class RingBuffer: def fft_symmetric(self) -> bool: return self.fft_mode == "symmetric" + def reset(self) -> None: + """Drop all buffered sweeps and derived FFT state.""" + self.width = 0 + self.head = 0 + self.ring = None + self.ring_time = None + self.ring_fft = None + self.x_shared = None + self.distance_axis = None + self.last_fft_db = None + self.last_freqs = None + self.y_min_fft = None + self.y_max_fft = None + def ensure_init(self, sweep_width: int) -> bool: """Allocate or resize buffers. Returns True when geometry changed.""" target_width = max(int(sweep_width), int(WF_WIDTH)) diff --git a/rfg_adc_plotter/state/runtime_state.py b/rfg_adc_plotter/state/runtime_state.py index 67e37c9..787071f 100644 --- a/rfg_adc_plotter/state/runtime_state.py +++ b/rfg_adc_plotter/state/runtime_state.py @@ -14,6 +14,10 @@ from rfg_adc_plotter.types import SweepAuxCurves, SweepInfo @dataclass class RuntimeState: ring: RingBuffer + range_min_ghz: float = 0.0 + range_max_ghz: float = 0.0 + full_current_freqs: Optional[np.ndarray] = None + full_current_sweep_raw: Optional[np.ndarray] = None current_freqs: Optional[np.ndarray] = None current_distances: Optional[np.ndarray] = None current_sweep_raw: Optional[np.ndarray] = None diff --git a/tests/test_processing.py b/tests/test_processing.py index 102c989..19afa11 100644 --- a/tests/test_processing.py +++ b/tests/test_processing.py @@ -6,6 +6,7 @@ import numpy as np import unittest from rfg_adc_plotter.constants import FFT_LEN, SWEEP_FREQ_MAX_GHZ, SWEEP_FREQ_MIN_GHZ +from rfg_adc_plotter.gui.pyqtgraph_backend import apply_working_range from rfg_adc_plotter.processing.calibration import ( build_calib_envelope, calibrate_freqs, @@ -100,6 +101,24 @@ class ProcessingTests(unittest.TestCase): with self.assertRaises(ValueError): load_calib_envelope(path) + def test_apply_working_range_crops_sweep_to_selected_band(self): + freqs = np.linspace(3.3, 14.3, 12, dtype=np.float64) + sweep = np.arange(12, dtype=np.float32) + cropped_freqs, cropped_sweep = apply_working_range(freqs, sweep, 5.0, 9.0) + + self.assertGreater(cropped_freqs.size, 0) + self.assertEqual(cropped_freqs.shape, cropped_sweep.shape) + self.assertGreaterEqual(float(np.min(cropped_freqs)), 5.0) + self.assertLessEqual(float(np.max(cropped_freqs)), 9.0) + + def test_apply_working_range_returns_empty_when_no_points_match(self): + freqs = np.linspace(3.3, 14.3, 12, dtype=np.float64) + sweep = np.arange(12, dtype=np.float32) + cropped_freqs, cropped_sweep = apply_working_range(freqs, sweep, 20.0, 21.0) + + self.assertEqual(cropped_freqs.shape, (0,)) + self.assertEqual(cropped_sweep.shape, (0,)) + def test_fft_helpers_return_expected_shapes(self): sweep = np.sin(np.linspace(0.0, 4.0 * np.pi, 128)).astype(np.float32) freqs = np.linspace(3.3, 14.3, 128, dtype=np.float64) @@ -113,34 +132,44 @@ class ProcessingTests(unittest.TestCase): def test_symmetric_ifft_spectrum_has_zero_gap_and_mirrored_band(self): sweep = np.linspace(1.0, 2.0, 128, dtype=np.float32) - freqs = np.linspace(SWEEP_FREQ_MIN_GHZ, SWEEP_FREQ_MAX_GHZ, 128, dtype=np.float64) + freqs = np.linspace(4.0, 10.0, 128, dtype=np.float64) spectrum = build_symmetric_ifft_spectrum(sweep, freqs, fft_len=FFT_LEN) self.assertIsNotNone(spectrum) - freq_axis = np.linspace(-SWEEP_FREQ_MAX_GHZ, SWEEP_FREQ_MAX_GHZ, FFT_LEN, dtype=np.float64) - neg_idx_all = np.flatnonzero(freq_axis <= (-SWEEP_FREQ_MIN_GHZ)) - pos_idx_all = np.flatnonzero(freq_axis >= SWEEP_FREQ_MIN_GHZ) + freq_axis = np.linspace(-10.0, 10.0, FFT_LEN, dtype=np.float64) + neg_idx_all = np.flatnonzero(freq_axis <= (-4.0)) + pos_idx_all = np.flatnonzero(freq_axis >= 4.0) band_len = int(min(neg_idx_all.size, pos_idx_all.size)) neg_idx = neg_idx_all[:band_len] pos_idx = pos_idx_all[-band_len:] - zero_mask = (freq_axis > (-SWEEP_FREQ_MIN_GHZ)) & (freq_axis < SWEEP_FREQ_MIN_GHZ) + zero_mask = (freq_axis > (-4.0)) & (freq_axis < 4.0) self.assertTrue(np.allclose(spectrum[zero_mask], 0.0)) self.assertTrue(np.allclose(spectrum[neg_idx], spectrum[pos_idx][::-1])) def test_positive_only_centered_spectrum_keeps_zeros_until_positive_min(self): sweep = np.linspace(1.0, 2.0, 128, dtype=np.float32) - freqs = np.linspace(SWEEP_FREQ_MIN_GHZ, SWEEP_FREQ_MAX_GHZ, 128, dtype=np.float64) + freqs = np.linspace(4.0, 10.0, 128, dtype=np.float64) spectrum = build_positive_only_centered_ifft_spectrum(sweep, freqs, fft_len=FFT_LEN) self.assertIsNotNone(spectrum) - freq_axis = np.linspace(-SWEEP_FREQ_MAX_GHZ, SWEEP_FREQ_MAX_GHZ, FFT_LEN, dtype=np.float64) - zero_mask = freq_axis < SWEEP_FREQ_MIN_GHZ - pos_idx = np.flatnonzero(freq_axis >= SWEEP_FREQ_MIN_GHZ) + freq_axis = np.linspace(-10.0, 10.0, FFT_LEN, dtype=np.float64) + zero_mask = freq_axis < 4.0 + pos_idx = np.flatnonzero(freq_axis >= 4.0) self.assertTrue(np.allclose(spectrum[zero_mask], 0.0)) self.assertTrue(np.any(np.abs(spectrum[pos_idx]) > 0.0)) + def test_symmetric_distance_axis_uses_windowed_frequency_bounds(self): + freqs = np.linspace(4.0, 10.0, 128, dtype=np.float64) + axis = compute_distance_axis(freqs, 513, mode="symmetric") + df_hz = (2.0 * 10.0 / max(1, FFT_LEN - 1)) * 1e9 + expected_step = 299_792_458.0 / (2.0 * FFT_LEN * df_hz) + + self.assertEqual(axis.shape, (513,)) + self.assertTrue(np.all(np.diff(axis) >= 0.0)) + self.assertAlmostEqual(float(axis[1] - axis[0]), expected_step, places=15) + def test_peak_helpers_find_reference_and_peak_boxes(self): xs = np.linspace(0.0, 10.0, 200) ys = np.exp(-((xs - 5.0) ** 2) / 0.4) * 10.0 + 1.0 diff --git a/tests/test_ring_buffer.py b/tests/test_ring_buffer.py index 19bfa09..e1e0405 100644 --- a/tests/test_ring_buffer.py +++ b/tests/test_ring_buffer.py @@ -70,6 +70,19 @@ class RingBufferTests(unittest.TestCase): self.assertEqual(ring.last_fft_db.shape, (ring.fft_bins,)) self.assertIsNotNone(ring.distance_axis) + def test_ring_buffer_reset_clears_cached_history(self): + ring = RingBuffer(max_sweeps=2) + ring.push(np.linspace(0.0, 1.0, 64, dtype=np.float32), np.linspace(4.0, 10.0, 64)) + + ring.reset() + + self.assertIsNone(ring.ring) + self.assertIsNone(ring.ring_fft) + self.assertIsNone(ring.distance_axis) + self.assertIsNone(ring.last_fft_db) + self.assertEqual(ring.width, 0) + self.assertEqual(ring.head, 0) + if __name__ == "__main__": unittest.main()