From 5054f8d3d732679288772addcd47ae81cf76f5ad Mon Sep 17 00:00:00 2001 From: awe Date: Thu, 12 Mar 2026 18:09:44 +0300 Subject: [PATCH] new fft --- rfg_adc_plotter/gui/pyqtgraph_backend.py | 36 ++++++++---- rfg_adc_plotter/processing/fft.py | 75 +++++++++++++++++++++--- rfg_adc_plotter/state/ring_buffer.py | 34 ++++++++--- tests/test_processing.py | 14 +++++ tests/test_ring_buffer.py | 14 +++++ 5 files changed, 144 insertions(+), 29 deletions(-) diff --git a/rfg_adc_plotter/gui/pyqtgraph_backend.py b/rfg_adc_plotter/gui/pyqtgraph_backend.py index f82769f..165564d 100644 --- a/rfg_adc_plotter/gui/pyqtgraph_backend.py +++ b/rfg_adc_plotter/gui/pyqtgraph_backend.py @@ -205,7 +205,11 @@ def run_pyqtgraph(args) -> None: bg_compute_cb = QtWidgets.QCheckBox("расчет фона") bg_subtract_cb = QtWidgets.QCheckBox("вычет фона") fft_bg_subtract_cb = QtWidgets.QCheckBox("FFT вычет фона") - fft_symmetric_cb = QtWidgets.QCheckBox("симм. IFFT") + fft_mode_label = QtWidgets.QLabel("IFFT режим") + fft_mode_combo = QtWidgets.QComboBox() + fft_mode_combo.addItem("Обычный", "direct") + fft_mode_combo.addItem("Симметричный", "symmetric") + fft_mode_combo.addItem("Нули [-max,+min]", "positive_only") peak_search_cb = QtWidgets.QCheckBox("поиск пиков") calib_group = QtWidgets.QGroupBox("Калибровка") calib_group_layout = QtWidgets.QVBoxLayout(calib_group) @@ -240,7 +244,8 @@ def run_pyqtgraph(args) -> None: settings_layout.addWidget(bg_compute_cb) settings_layout.addWidget(bg_subtract_cb) settings_layout.addWidget(fft_bg_subtract_cb) - settings_layout.addWidget(fft_symmetric_cb) + settings_layout.addWidget(fft_mode_label) + settings_layout.addWidget(fft_mode_combo) settings_layout.addWidget(peak_search_cb) status = pg.LabelItem(justify="left") @@ -250,7 +255,7 @@ def run_pyqtgraph(args) -> None: bg_compute_enabled = True bg_subtract_enabled = False fft_bg_subtract_enabled = False - fft_symmetric_enabled = True + fft_mode = "symmetric" status_note = "" status_dirty = True fixed_ylim: Optional[Tuple[float, float]] = None @@ -444,26 +449,31 @@ def run_pyqtgraph(args) -> None: fft_bg_subtract_enabled = False runtime.mark_dirty() - def set_fft_symmetric_enabled() -> None: - nonlocal fft_symmetric_enabled + def set_fft_mode() -> None: + nonlocal fft_mode try: - fft_symmetric_enabled = bool(fft_symmetric_cb.isChecked()) + fft_mode = str(fft_mode_combo.currentData() or "symmetric") except Exception: - fft_symmetric_enabled = True - runtime.ring.set_symmetric_fft_enabled(fft_symmetric_enabled) + fft_mode = "symmetric" + runtime.ring.set_fft_mode(fft_mode) runtime.current_distances = runtime.ring.distance_axis runtime.current_fft_db = None - set_status_note("IFFT: симметричный" if fft_symmetric_enabled else "IFFT: обычный") + mode_label = { + "direct": "IFFT: обычный", + "symmetric": "IFFT: симметричный", + "positive_only": "IFFT: нули [-max,+min]", + }.get(fft_mode, f"IFFT: {fft_mode}") + set_status_note(mode_label) update_physical_axes() runtime.mark_dirty() try: bg_compute_cb.setChecked(True) - fft_symmetric_cb.setChecked(True) + fft_mode_combo.setCurrentIndex(1) except Exception: pass set_bg_compute_enabled() - set_fft_symmetric_enabled() + set_fft_mode() try: calib_cb.stateChanged.connect(lambda _v: set_calib_enabled()) @@ -473,7 +483,7 @@ def run_pyqtgraph(args) -> None: bg_compute_cb.stateChanged.connect(lambda _v: set_bg_compute_enabled()) bg_subtract_cb.stateChanged.connect(lambda _v: set_bg_subtract_enabled()) fft_bg_subtract_cb.stateChanged.connect(lambda _v: set_fft_bg_subtract_enabled()) - fft_symmetric_cb.stateChanged.connect(lambda _v: set_fft_symmetric_enabled()) + fft_mode_combo.currentIndexChanged.connect(lambda _v: set_fft_mode()) except Exception: pass @@ -768,7 +778,7 @@ def run_pyqtgraph(args) -> None: sweep_for_fft, runtime.current_freqs, distance_axis.size, - symmetric=fft_symmetric_enabled, + mode=fft_mode, ) fft_vals = runtime.current_fft_db xs_fft = distance_axis[: fft_vals.size] diff --git a/rfg_adc_plotter/processing/fft.py b/rfg_adc_plotter/processing/fft.py index 76884df..7cd8c1b 100644 --- a/rfg_adc_plotter/processing/fft.py +++ b/rfg_adc_plotter/processing/fft.py @@ -87,6 +87,41 @@ def build_symmetric_ifft_spectrum( return spectrum +def build_positive_only_centered_ifft_spectrum( + sweep: np.ndarray, + freqs: Optional[np.ndarray], + fft_len: int = FFT_LEN, +) -> Optional[np.ndarray]: + """Build a centered spectrum with zeros from -f_max to +f_min.""" + if fft_len <= 0: + return None + + freq_axis = np.linspace(-SWEEP_FREQ_MAX_GHZ, SWEEP_FREQ_MAX_GHZ, int(fft_len), dtype=np.float64) + pos_idx = np.flatnonzero(freq_axis >= SWEEP_FREQ_MIN_GHZ) + band_len = int(pos_idx.size) + if band_len <= 1: + return None + + prepared = prepare_fft_segment(sweep, freqs, fft_len=band_len) + if prepared is None: + return None + + fft_seg, take_fft = prepared + if take_fft != band_len: + fft_seg = np.asarray(fft_seg[:band_len], dtype=np.float32) + if fft_seg.size < band_len: + padded = np.zeros((band_len,), dtype=np.float32) + padded[: fft_seg.size] = fft_seg + fft_seg = padded + + window = np.hanning(band_len).astype(np.float32) + band = np.nan_to_num(fft_seg, nan=0.0).astype(np.float32, copy=False) * window + + spectrum = np.zeros((int(fft_len),), dtype=np.float32) + spectrum[pos_idx] = band + return spectrum + + def fft_mag_to_db(mag: np.ndarray) -> np.ndarray: """Convert magnitude to dB with safe zero handling.""" mag_arr = np.asarray(mag, dtype=np.float32) @@ -114,21 +149,39 @@ def _compute_fft_mag_row_direct( return mag +def _normalize_fft_mode(mode: str | None, symmetric: Optional[bool]) -> str: + if symmetric is not None: + return "symmetric" if symmetric else "direct" + normalized = str(mode or "symmetric").strip().lower() + if normalized in {"direct", "ordinary", "normal"}: + return "direct" + if normalized in {"symmetric", "sym", "mirror"}: + return "symmetric" + if normalized in {"positive_only", "positive-centered", "positive_centered", "zero_left"}: + return "positive_only" + raise ValueError(f"Unsupported FFT mode: {mode!r}") + + def compute_fft_mag_row( sweep: np.ndarray, freqs: Optional[np.ndarray], bins: int, *, - symmetric: bool = True, + mode: str = "symmetric", + symmetric: Optional[bool] = None, ) -> np.ndarray: """Compute a linear FFT magnitude row.""" if bins <= 0: return np.zeros((0,), dtype=np.float32) - if not symmetric: + fft_mode = _normalize_fft_mode(mode, symmetric) + if fft_mode == "direct": return _compute_fft_mag_row_direct(sweep, freqs, bins) - spectrum_centered = build_symmetric_ifft_spectrum(sweep, freqs, fft_len=FFT_LEN) + if fft_mode == "positive_only": + spectrum_centered = build_positive_only_centered_ifft_spectrum(sweep, freqs, fft_len=FFT_LEN) + else: + spectrum_centered = build_symmetric_ifft_spectrum(sweep, freqs, fft_len=FFT_LEN) if spectrum_centered is None: return np.full((bins,), np.nan, dtype=np.float32) @@ -144,17 +197,25 @@ def compute_fft_row( freqs: Optional[np.ndarray], bins: int, *, - symmetric: bool = True, + mode: str = "symmetric", + symmetric: Optional[bool] = None, ) -> np.ndarray: """Compute a dB FFT row.""" - return fft_mag_to_db(compute_fft_mag_row(sweep, freqs, bins, symmetric=symmetric)) + return fft_mag_to_db(compute_fft_mag_row(sweep, freqs, bins, mode=mode, symmetric=symmetric)) -def compute_distance_axis(freqs: Optional[np.ndarray], bins: int, *, symmetric: bool = True) -> np.ndarray: +def compute_distance_axis( + freqs: Optional[np.ndarray], + bins: int, + *, + mode: str = "symmetric", + symmetric: Optional[bool] = None, +) -> np.ndarray: """Compute the one-way distance axis for IFFT output.""" if bins <= 0: return np.zeros((0,), dtype=np.float64) - if symmetric: + fft_mode = _normalize_fft_mode(mode, symmetric) + if fft_mode in {"symmetric", "positive_only"}: df_ghz = (2.0 * float(SWEEP_FREQ_MAX_GHZ)) / max(1, FFT_LEN - 1) else: if freqs is None: diff --git a/rfg_adc_plotter/state/ring_buffer.py b/rfg_adc_plotter/state/ring_buffer.py index faaa22a..24f1d6f 100644 --- a/rfg_adc_plotter/state/ring_buffer.py +++ b/rfg_adc_plotter/state/ring_buffer.py @@ -17,7 +17,7 @@ class RingBuffer: def __init__(self, max_sweeps: int): self.max_sweeps = int(max_sweeps) self.fft_bins = FFT_LEN // 2 + 1 - self.fft_symmetric = True + self.fft_mode = "symmetric" self.width = 0 self.head = 0 self.ring: Optional[np.ndarray] = None @@ -34,6 +34,10 @@ class RingBuffer: def is_ready(self) -> bool: return self.ring is not None and self.ring_fft is not None + @property + def fft_symmetric(self) -> bool: + return self.fft_mode == "symmetric" + def ensure_init(self, sweep_width: int) -> bool: """Allocate or resize buffers. Returns True when geometry changed.""" target_width = max(int(sweep_width), int(WF_WIDTH)) @@ -63,13 +67,21 @@ class RingBuffer: changed = True return changed - def set_symmetric_fft_enabled(self, enabled: bool) -> bool: + def set_fft_mode(self, mode: str) -> bool: """Switch FFT mode and rebuild cached FFT rows from stored sweeps.""" - enabled_bool = bool(enabled) - if enabled_bool == self.fft_symmetric: + normalized_mode = str(mode).strip().lower() + if normalized_mode in {"ordinary", "normal"}: + normalized_mode = "direct" + if normalized_mode in {"sym", "mirror"}: + normalized_mode = "symmetric" + if normalized_mode in {"positive-centered", "positive_centered", "zero_left"}: + normalized_mode = "positive_only" + if normalized_mode not in {"direct", "symmetric", "positive_only"}: + raise ValueError(f"Unsupported FFT mode: {mode!r}") + if normalized_mode == self.fft_mode: return False - self.fft_symmetric = enabled_bool + self.fft_mode = normalized_mode self.y_min_fft = None self.y_max_fft = None @@ -85,7 +97,7 @@ class RingBuffer: sweep_row, self.last_freqs, self.fft_bins, - symmetric=self.fft_symmetric, + mode=self.fft_mode, ) self.ring_fft[row_idx, :] = fft_mag @@ -93,7 +105,7 @@ class RingBuffer: self.distance_axis = compute_distance_axis( self.last_freqs, self.fft_bins, - symmetric=self.fft_symmetric, + mode=self.fft_mode, ) last_idx = (self.head - 1) % self.max_sweeps @@ -107,6 +119,10 @@ class RingBuffer: self.y_max_fft = float(np.nanmax(finite_db)) return True + def set_symmetric_fft_enabled(self, enabled: bool) -> bool: + """Backward-compatible wrapper for the old two-state FFT switch.""" + return self.set_fft_mode("symmetric" if enabled else "direct") + def push(self, sweep: np.ndarray, freqs: Optional[np.ndarray] = None) -> None: """Push a processed sweep and refresh raw/FFT buffers.""" if sweep is None or sweep.size == 0: @@ -123,7 +139,7 @@ class RingBuffer: if freqs is not None: self.last_freqs = np.asarray(freqs, dtype=np.float64).copy() - fft_mag = compute_fft_mag_row(sweep, freqs, self.fft_bins, symmetric=self.fft_symmetric) + fft_mag = compute_fft_mag_row(sweep, freqs, self.fft_bins, mode=self.fft_mode) self.ring_fft[self.head, :] = fft_mag self.last_fft_db = fft_mag_to_db(fft_mag) @@ -133,7 +149,7 @@ class RingBuffer: self.y_min_fft = fr_min if self.y_min_fft is None else min(self.y_min_fft, fr_min) self.y_max_fft = fr_max if self.y_max_fft is None else max(self.y_max_fft, fr_max) - self.distance_axis = compute_distance_axis(freqs, self.fft_bins, symmetric=self.fft_symmetric) + self.distance_axis = compute_distance_axis(freqs, self.fft_bins, mode=self.fft_mode) self.head = (self.head + 1) % self.max_sweeps def get_display_raw(self) -> np.ndarray: diff --git a/tests/test_processing.py b/tests/test_processing.py index 52391dd..102c989 100644 --- a/tests/test_processing.py +++ b/tests/test_processing.py @@ -14,6 +14,7 @@ from rfg_adc_plotter.processing.calibration import ( save_calib_envelope, ) from rfg_adc_plotter.processing.fft import ( + build_positive_only_centered_ifft_spectrum, build_symmetric_ifft_spectrum, compute_distance_axis, compute_fft_mag_row, @@ -127,6 +128,19 @@ class ProcessingTests(unittest.TestCase): self.assertTrue(np.allclose(spectrum[zero_mask], 0.0)) self.assertTrue(np.allclose(spectrum[neg_idx], spectrum[pos_idx][::-1])) + def test_positive_only_centered_spectrum_keeps_zeros_until_positive_min(self): + sweep = np.linspace(1.0, 2.0, 128, dtype=np.float32) + freqs = np.linspace(SWEEP_FREQ_MIN_GHZ, SWEEP_FREQ_MAX_GHZ, 128, dtype=np.float64) + spectrum = build_positive_only_centered_ifft_spectrum(sweep, freqs, fft_len=FFT_LEN) + + self.assertIsNotNone(spectrum) + freq_axis = np.linspace(-SWEEP_FREQ_MAX_GHZ, SWEEP_FREQ_MAX_GHZ, FFT_LEN, dtype=np.float64) + zero_mask = freq_axis < SWEEP_FREQ_MIN_GHZ + pos_idx = np.flatnonzero(freq_axis >= SWEEP_FREQ_MIN_GHZ) + + self.assertTrue(np.allclose(spectrum[zero_mask], 0.0)) + self.assertTrue(np.any(np.abs(spectrum[pos_idx]) > 0.0)) + def test_peak_helpers_find_reference_and_peak_boxes(self): xs = np.linspace(0.0, 10.0, 200) ys = np.exp(-((xs - 5.0) ** 2) / 0.4) * 10.0 + 1.0 diff --git a/tests/test_ring_buffer.py b/tests/test_ring_buffer.py index 0cbeff2..19bfa09 100644 --- a/tests/test_ring_buffer.py +++ b/tests/test_ring_buffer.py @@ -56,6 +56,20 @@ class RingBufferTests(unittest.TestCase): self.assertFalse(np.allclose(ring.last_fft_db, fft_before)) self.assertFalse(np.allclose(ring.distance_axis, axis_before)) + def test_ring_buffer_can_switch_to_positive_only_fft_mode(self): + ring = RingBuffer(max_sweeps=2) + sweep = np.linspace(0.0, 1.0, 64, dtype=np.float32) + freqs = np.linspace(3.3, 14.3, 64, dtype=np.float64) + ring.push(sweep, freqs) + + changed = ring.set_fft_mode("positive_only") + + self.assertTrue(changed) + self.assertEqual(ring.fft_mode, "positive_only") + self.assertIsNotNone(ring.last_fft_db) + self.assertEqual(ring.last_fft_db.shape, (ring.fft_bins,)) + self.assertIsNotNone(ring.distance_axis) + if __name__ == "__main__": unittest.main()