From 17540c3b11313e29640e4b3a576fb39029349ab6 Mon Sep 17 00:00:00 2001 From: awe Date: Fri, 10 Apr 2026 22:08:43 +0300 Subject: [PATCH] fft new mode --- rfg_adc_plotter/gui/pyqtgraph_backend.py | 2 + rfg_adc_plotter/processing/fft.py | 129 +++++++++++++++++++++-- rfg_adc_plotter/state/ring_buffer.py | 4 +- tests/test_processing.py | 49 ++++++++- tests/test_ring_buffer.py | 14 +++ 5 files changed, 188 insertions(+), 10 deletions(-) diff --git a/rfg_adc_plotter/gui/pyqtgraph_backend.py b/rfg_adc_plotter/gui/pyqtgraph_backend.py index 1ba6042..64b07a1 100644 --- a/rfg_adc_plotter/gui/pyqtgraph_backend.py +++ b/rfg_adc_plotter/gui/pyqtgraph_backend.py @@ -752,6 +752,7 @@ def run_pyqtgraph(args) -> None: fft_mode_combo.addItem("Обычный", "direct") fft_mode_combo.addItem("Симметричный", "symmetric") fft_mode_combo.addItem("Нули [-max,+min]", "positive_only") + fft_mode_combo.addItem("Нули [-max,+min] точный", "positive_only_exact") peak_search_cb = QtWidgets.QCheckBox("поиск пиков") calib_group = QtWidgets.QGroupBox("Калибровка") calib_group_layout = QtWidgets.QVBoxLayout(calib_group) @@ -1355,6 +1356,7 @@ def run_pyqtgraph(args) -> None: "direct": "IFFT: обычный", "symmetric": "IFFT: симметричный", "positive_only": "IFFT: нули [-max,+min]", + "positive_only_exact": "IFFT: нули [-max,+min] точный", }.get(fft_mode, f"IFFT: {fft_mode}") set_status_note(mode_label) update_physical_axes() diff --git a/rfg_adc_plotter/processing/fft.py b/rfg_adc_plotter/processing/fft.py index 86cb06e..45f1570 100644 --- a/rfg_adc_plotter/processing/fft.py +++ b/rfg_adc_plotter/processing/fft.py @@ -39,6 +39,87 @@ def _interp_signal(x_uniform: np.ndarray, x_known: np.ndarray, y_known: np.ndarr return np.interp(x_uniform, x_known, np.asarray(y_known, dtype=np.float64)).astype(np.float32) +def _fit_complex_bins(values: np.ndarray, bins: int) -> np.ndarray: + arr = np.asarray(values, dtype=np.complex64).reshape(-1) + if bins <= 0: + return np.zeros((0,), dtype=np.complex64) + if arr.size == bins: + return arr + out = np.full((bins,), np.nan + 0j, dtype=np.complex64) + take = min(arr.size, bins) + out[:take] = arr[:take] + return out + + +def _extract_positive_exact_band( + sweep: np.ndarray, + freqs: Optional[np.ndarray], +) -> Optional[Tuple[np.ndarray, np.ndarray, float, float]]: + """Return sorted positive band data and exact-grid parameters.""" + if freqs is None: + return None + + sweep_arr = _coerce_sweep_array(sweep) + freq_arr = np.asarray(freqs, dtype=np.float64).reshape(-1) + take = min(int(sweep_arr.size), int(freq_arr.size)) + if take <= 1: + return None + + sweep_seg = sweep_arr[:take] + freq_seg = freq_arr[:take] + valid = np.isfinite(freq_seg) & np.isfinite(sweep_seg) & (freq_seg > 0.0) + if int(np.count_nonzero(valid)) < 2: + return None + + freq_band = np.asarray(freq_seg[valid], dtype=np.float64) + sweep_band = np.asarray(sweep_seg[valid]) + order = np.argsort(freq_band, kind="mergesort") + freq_band = freq_band[order] + sweep_band = sweep_band[order] + + n_band = int(freq_band.size) + if n_band <= 1: + return None + + f_min = float(freq_band[0]) + f_max = float(freq_band[-1]) + if (not np.isfinite(f_min)) or (not np.isfinite(f_max)) or f_max <= f_min: + return None + + df_ghz = float((f_max - f_min) / max(1, n_band - 1)) + if (not np.isfinite(df_ghz)) or df_ghz <= 0.0: + return None + + return freq_band, sweep_band, f_max, df_ghz + + +def _resolve_positive_only_exact_geometry(freqs: Optional[np.ndarray]) -> Optional[Tuple[int, float]]: + """Return (N_shift, df_hz) for the exact centered positive-only mode.""" + if freqs is None: + return None + + freq_arr = np.asarray(freqs, dtype=np.float64).reshape(-1) + finite = np.asarray(freq_arr[np.isfinite(freq_arr) & (freq_arr > 0.0)], dtype=np.float64) + if finite.size < 2: + return None + + finite.sort(kind="mergesort") + f_min = float(finite[0]) + f_max = float(finite[-1]) + if (not np.isfinite(f_min)) or (not np.isfinite(f_max)) or f_max <= f_min: + return None + + n_band = int(finite.size) + df_ghz = float((f_max - f_min) / max(1, n_band - 1)) + if (not np.isfinite(df_ghz)) or df_ghz <= 0.0: + return None + + f_shift = np.arange(-f_max, f_max + (0.5 * df_ghz), df_ghz, dtype=np.float64) + if f_shift.size <= 1: + return None + return int(f_shift.size), float(df_ghz * 1e9) + + def prepare_fft_segment( sweep: np.ndarray, freqs: Optional[np.ndarray], @@ -172,6 +253,29 @@ def build_positive_only_centered_ifft_spectrum( return spectrum +def build_positive_only_exact_centered_ifft_spectrum( + sweep: np.ndarray, + freqs: Optional[np.ndarray], +) -> Optional[np.ndarray]: + """Build centered spectrum exactly as zeros[-f_max..+f_min) + measured positive band.""" + prepared = _extract_positive_exact_band(sweep, freqs) + if prepared is None: + return None + + freq_band, sweep_band, f_max, df_ghz = prepared + f_shift = np.arange(-f_max, f_max + (0.5 * df_ghz), df_ghz, dtype=np.float64) + if f_shift.size <= 1: + return None + + band_dtype = np.complex64 if np.iscomplexobj(sweep_band) else np.float32 + band = np.nan_to_num(np.asarray(sweep_band, dtype=band_dtype), nan=0.0) + spectrum = np.zeros((int(f_shift.size),), dtype=band_dtype) + idx = np.round((freq_band - f_shift[0]) / df_ghz).astype(np.int64) + idx = np.clip(idx, 0, spectrum.size - 1) + spectrum[idx] = band + return spectrum + + def fft_mag_to_db(mag: np.ndarray) -> np.ndarray: """Convert magnitude to dB with safe zero handling.""" mag_arr = np.asarray(mag, dtype=np.float32) @@ -192,10 +296,8 @@ def _compute_fft_complex_row_direct( fft_in = np.zeros((FFT_LEN,), dtype=np.complex64) window = np.hanning(take_fft).astype(np.float32) fft_in[:take_fft] = np.asarray(fft_seg, dtype=np.complex64) * window - spec = np.fft.ifft(fft_in).astype(np.complex64, copy=False) - if spec.shape[0] != bins: - spec = spec[:bins] - return spec + spec = np.fft.ifft(fft_in) + return _fit_complex_bins(spec, bins) def _normalize_fft_mode(mode: str | None, symmetric: Optional[bool]) -> str: @@ -208,6 +310,8 @@ def _normalize_fft_mode(mode: str | None, symmetric: Optional[bool]) -> str: return "symmetric" if normalized in {"positive_only", "positive-centered", "positive_centered", "zero_left"}: return "positive_only" + if normalized in {"positive_only_exact", "positive-centered-exact", "positive_centered_exact", "zero_left_exact"}: + return "positive_only_exact" raise ValueError(f"Unsupported FFT mode: {mode!r}") @@ -229,16 +333,15 @@ def compute_fft_complex_row( if fft_mode == "positive_only": spectrum_centered = build_positive_only_centered_ifft_spectrum(sweep, freqs, fft_len=FFT_LEN) + elif fft_mode == "positive_only_exact": + spectrum_centered = build_positive_only_exact_centered_ifft_spectrum(sweep, freqs) else: spectrum_centered = build_symmetric_ifft_spectrum(sweep, freqs, fft_len=FFT_LEN) if spectrum_centered is None: return np.full((bins,), np.nan + 0j, dtype=np.complex64) spec = np.fft.ifft(np.fft.ifftshift(np.asarray(spectrum_centered, dtype=np.complex64))) - spec = np.asarray(spec, dtype=np.complex64) - if spec.shape[0] != bins: - spec = spec[:bins] - return spec + return _fit_complex_bins(spec, bins) def compute_fft_mag_row( @@ -277,6 +380,16 @@ def compute_distance_axis( if bins <= 0: return np.zeros((0,), dtype=np.float64) fft_mode = _normalize_fft_mode(mode, symmetric) + if fft_mode == "positive_only_exact": + geometry = _resolve_positive_only_exact_geometry(freqs) + if geometry is None: + return np.arange(bins, dtype=np.float64) + n_shift, df_hz = geometry + if (not np.isfinite(df_hz)) or df_hz <= 0.0 or n_shift <= 0: + return np.arange(bins, dtype=np.float64) + step_m = C_M_S / (2.0 * float(n_shift) * df_hz) + return np.arange(bins, dtype=np.float64) * step_m + if fft_mode in {"symmetric", "positive_only"}: bounds = _finite_freq_bounds(freqs) if bounds is None: diff --git a/rfg_adc_plotter/state/ring_buffer.py b/rfg_adc_plotter/state/ring_buffer.py index 426cc36..cc327e2 100644 --- a/rfg_adc_plotter/state/ring_buffer.py +++ b/rfg_adc_plotter/state/ring_buffer.py @@ -134,7 +134,9 @@ class RingBuffer: normalized_mode = "symmetric" if normalized_mode in {"positive-centered", "positive_centered", "zero_left"}: normalized_mode = "positive_only" - if normalized_mode not in {"direct", "symmetric", "positive_only"}: + if normalized_mode in {"positive-centered-exact", "positive_centered_exact", "zero_left_exact"}: + normalized_mode = "positive_only_exact" + if normalized_mode not in {"direct", "symmetric", "positive_only", "positive_only_exact"}: raise ValueError(f"Unsupported FFT mode: {mode!r}") if normalized_mode == self.fft_mode: return False diff --git a/tests/test_processing.py b/tests/test_processing.py index bad67b1..cd667bf 100644 --- a/tests/test_processing.py +++ b/tests/test_processing.py @@ -5,7 +5,7 @@ import tempfile import numpy as np import unittest -from rfg_adc_plotter.constants import FFT_LEN, SWEEP_FREQ_MAX_GHZ, SWEEP_FREQ_MIN_GHZ +from rfg_adc_plotter.constants import C_M_S, FFT_LEN, SWEEP_FREQ_MAX_GHZ, SWEEP_FREQ_MIN_GHZ from rfg_adc_plotter.gui.pyqtgraph_backend import ( apply_working_range, apply_working_range_to_aux_curves, @@ -35,6 +35,7 @@ from rfg_adc_plotter.processing.background import ( subtract_fft_background, ) from rfg_adc_plotter.processing.fft import ( + build_positive_only_exact_centered_ifft_spectrum, build_positive_only_centered_ifft_spectrum, build_symmetric_ifft_spectrum, compute_distance_axis, @@ -411,6 +412,21 @@ class ProcessingTests(unittest.TestCase): self.assertTrue(np.allclose(spectrum[zero_mask], 0.0)) self.assertTrue(np.any(np.abs(spectrum[pos_idx]) > 0.0)) + def test_positive_only_exact_spectrum_uses_direct_index_insertion_without_window(self): + sweep = np.asarray([1.0, 2.0, 3.0], dtype=np.float32) + freqs = np.asarray([4.0, 5.0, 6.0], dtype=np.float64) + spectrum = build_positive_only_exact_centered_ifft_spectrum(sweep, freqs) + + self.assertIsNotNone(spectrum) + df = (6.0 - 4.0) / 2.0 + f_shift = np.arange(-6.0, 6.0 + (0.5 * df), df, dtype=np.float64) + idx = np.round((freqs - f_shift[0]) / df).astype(np.int64) + zero_mask = (f_shift > -6.0) & (f_shift < 4.0) + + self.assertEqual(int(spectrum.size), int(f_shift.size)) + self.assertTrue(np.allclose(spectrum[zero_mask], 0.0)) + self.assertTrue(np.allclose(spectrum[idx], sweep)) + def test_complex_symmetric_ifft_spectrum_uses_conjugate_mirror(self): sweep = np.exp(1j * np.linspace(0.0, np.pi, 128)).astype(np.complex64) freqs = np.linspace(4.0, 10.0, 128, dtype=np.float64) @@ -442,6 +458,37 @@ class ProcessingTests(unittest.TestCase): self.assertTrue(np.any(np.isfinite(mag))) self.assertTrue(np.any(np.isfinite(row))) + def test_compute_fft_complex_row_positive_only_exact_matches_manual_ifftshift_ifft(self): + sweep = np.asarray([1.0 + 1.0j, 2.0 + 0.0j, 3.0 - 1.0j], dtype=np.complex64) + freqs = np.asarray([4.0, 5.0, 6.0], dtype=np.float64) + bins = 16 + row = compute_fft_complex_row(sweep, freqs, bins, mode="positive_only_exact") + + df = (6.0 - 4.0) / 2.0 + f_shift = np.arange(-6.0, 6.0 + (0.5 * df), df, dtype=np.float64) + manual_shift = np.zeros((f_shift.size,), dtype=np.complex64) + idx = np.round((freqs - f_shift[0]) / df).astype(np.int64) + manual_shift[idx] = sweep + manual_ifft = np.fft.ifft(np.fft.ifftshift(manual_shift)) + expected = np.full((bins,), np.nan + 0j, dtype=np.complex64) + expected[: manual_ifft.size] = np.asarray(manual_ifft, dtype=np.complex64) + + self.assertEqual(row.shape, (bins,)) + self.assertTrue(np.allclose(row, expected, equal_nan=True)) + + def test_positive_only_exact_distance_axis_uses_exact_grid_geometry(self): + freqs = np.asarray([4.0, 5.0, 6.0], dtype=np.float64) + bins = 8 + axis = compute_distance_axis(freqs, bins, mode="positive_only_exact") + + df_hz = 1e9 + n_shift = int(np.arange(-6.0, 6.0 + 0.5, 1.0, dtype=np.float64).size) + expected_step = C_M_S / (2.0 * n_shift * df_hz) + expected = np.arange(bins, dtype=np.float64) * expected_step + + self.assertEqual(axis.shape, (bins,)) + self.assertTrue(np.allclose(axis, expected)) + def test_resolve_visible_fft_curves_handles_complex_mode(self): complex_row = np.asarray([1.0 + 2.0j, -3.0 + 4.0j], dtype=np.complex64) mag = np.abs(complex_row).astype(np.float32) diff --git a/tests/test_ring_buffer.py b/tests/test_ring_buffer.py index b30a7ca..71cebfd 100644 --- a/tests/test_ring_buffer.py +++ b/tests/test_ring_buffer.py @@ -91,6 +91,20 @@ class RingBufferTests(unittest.TestCase): self.assertEqual(ring.last_fft_db.shape, (ring.fft_bins,)) self.assertIsNotNone(ring.distance_axis) + def test_ring_buffer_can_switch_to_positive_only_exact_fft_mode(self): + ring = RingBuffer(max_sweeps=2) + sweep = np.linspace(0.0, 1.0, 64, dtype=np.float32) + freqs = np.linspace(3.3, 14.3, 64, dtype=np.float64) + ring.push(sweep, freqs) + + changed = ring.set_fft_mode("positive_only_exact") + + self.assertTrue(changed) + self.assertEqual(ring.fft_mode, "positive_only_exact") + self.assertIsNotNone(ring.last_fft_db) + self.assertEqual(ring.last_fft_db.shape, (ring.fft_bins,)) + self.assertIsNotNone(ring.distance_axis) + def test_ring_buffer_rebuilds_fft_from_complex_input(self): ring = RingBuffer(max_sweeps=2) freqs = np.linspace(3.3, 14.3, 64, dtype=np.float64)