From cbd76cfd54373002e1f5a93b77a7141194aaba16 Mon Sep 17 00:00:00 2001 From: awe Date: Mon, 13 Apr 2026 14:15:56 +0300 Subject: [PATCH] thinking fft --- rfg_adc_plotter/processing/fft.py | 112 ++++++++++++++++++++++++++++-- tests/test_processing.py | 35 +++++++++- 2 files changed, 138 insertions(+), 9 deletions(-) diff --git a/rfg_adc_plotter/processing/fft.py b/rfg_adc_plotter/processing/fft.py index 45f1570..cc2ce11 100644 --- a/rfg_adc_plotter/processing/fft.py +++ b/rfg_adc_plotter/processing/fft.py @@ -93,7 +93,88 @@ def _extract_positive_exact_band( return freq_band, sweep_band, f_max, df_ghz -def _resolve_positive_only_exact_geometry(freqs: Optional[np.ndarray]) -> Optional[Tuple[int, float]]: +def _positive_exact_shift_size(f_max: float, df_ghz: float) -> int: + if (not np.isfinite(f_max)) or (not np.isfinite(df_ghz)) or f_max <= 0.0 or df_ghz <= 0.0: + return 0 + return int(np.arange(-f_max, f_max + (0.5 * df_ghz), df_ghz, dtype=np.float64).size) + + +def _resolve_positive_exact_band_size( + f_min: float, + f_max: float, + n_band: int, + max_shift_len: Optional[int], +) -> int: + if n_band <= 2: + return max(2, int(n_band)) + if max_shift_len is None: + return int(n_band) + + limit = int(max_shift_len) + if limit <= 1: + return max(2, int(n_band)) + + span = float(f_max - f_min) + if (not np.isfinite(span)) or span <= 0.0: + return int(n_band) + + df_current = float(span / max(1, int(n_band) - 1)) + if _positive_exact_shift_size(f_max, df_current) <= limit: + return int(n_band) + + denom = max(2.0 * f_max, 1e-12) + approx = int(np.floor(1.0 + ((float(limit - 1) * span) / denom))) + target = min(int(n_band), max(2, approx)) + while target > 2: + df_try = float(span / max(1, target - 1)) + if _positive_exact_shift_size(f_max, df_try) <= limit: + break + target -= 1 + return max(2, target) + + +def _normalize_positive_exact_band( + freq_band: np.ndarray, + sweep_band: np.ndarray, + *, + max_shift_len: Optional[int] = None, +) -> Optional[Tuple[np.ndarray, np.ndarray, float, float]]: + freq_arr = np.asarray(freq_band, dtype=np.float64).reshape(-1) + sweep_arr = np.asarray(sweep_band).reshape(-1) + width = min(int(freq_arr.size), int(sweep_arr.size)) + if width <= 1: + return None + + freq_arr = freq_arr[:width] + sweep_arr = sweep_arr[:width] + f_min = float(freq_arr[0]) + f_max = float(freq_arr[-1]) + if (not np.isfinite(f_min)) or (not np.isfinite(f_max)) or f_max <= f_min: + return None + + target_band = _resolve_positive_exact_band_size(f_min, f_max, int(freq_arr.size), max_shift_len) + if target_band < int(freq_arr.size): + target_freqs = np.linspace(f_min, f_max, target_band, dtype=np.float64) + target_sweep = _interp_signal(target_freqs, freq_arr, sweep_arr) + freq_arr = target_freqs + sweep_arr = np.asarray(target_sweep).reshape(-1) + + n_band = int(freq_arr.size) + if n_band <= 1: + return None + + df_ghz = float((f_max - f_min) / max(1, n_band - 1)) + if (not np.isfinite(df_ghz)) or df_ghz <= 0.0: + return None + + return freq_arr, sweep_arr, f_max, df_ghz + + +def _resolve_positive_only_exact_geometry( + freqs: Optional[np.ndarray], + *, + max_shift_len: Optional[int] = None, +) -> Optional[Tuple[int, float]]: """Return (N_shift, df_hz) for the exact centered positive-only mode.""" if freqs is None: return None @@ -110,14 +191,16 @@ def _resolve_positive_only_exact_geometry(freqs: Optional[np.ndarray]) -> Option return None n_band = int(finite.size) + target_band = _resolve_positive_exact_band_size(f_min, f_max, n_band, max_shift_len) + n_band = max(2, min(n_band, target_band)) df_ghz = float((f_max - f_min) / max(1, n_band - 1)) if (not np.isfinite(df_ghz)) or df_ghz <= 0.0: return None - f_shift = np.arange(-f_max, f_max + (0.5 * df_ghz), df_ghz, dtype=np.float64) - if f_shift.size <= 1: + n_shift = _positive_exact_shift_size(f_max, df_ghz) + if n_shift <= 1: return None - return int(f_shift.size), float(df_ghz * 1e9) + return int(n_shift), float(df_ghz * 1e9) def prepare_fft_segment( @@ -256,13 +339,24 @@ def build_positive_only_centered_ifft_spectrum( def build_positive_only_exact_centered_ifft_spectrum( sweep: np.ndarray, freqs: Optional[np.ndarray], + *, + max_shift_len: Optional[int] = None, ) -> Optional[np.ndarray]: """Build centered spectrum exactly as zeros[-f_max..+f_min) + measured positive band.""" prepared = _extract_positive_exact_band(sweep, freqs) if prepared is None: return None - freq_band, sweep_band, f_max, df_ghz = prepared + freq_band, sweep_band, _f_max, _df_ghz = prepared + normalized = _normalize_positive_exact_band( + freq_band, + sweep_band, + max_shift_len=max_shift_len, + ) + if normalized is None: + return None + + freq_band, sweep_band, f_max, df_ghz = normalized f_shift = np.arange(-f_max, f_max + (0.5 * df_ghz), df_ghz, dtype=np.float64) if f_shift.size <= 1: return None @@ -334,7 +428,11 @@ def compute_fft_complex_row( if fft_mode == "positive_only": spectrum_centered = build_positive_only_centered_ifft_spectrum(sweep, freqs, fft_len=FFT_LEN) elif fft_mode == "positive_only_exact": - spectrum_centered = build_positive_only_exact_centered_ifft_spectrum(sweep, freqs) + spectrum_centered = build_positive_only_exact_centered_ifft_spectrum( + sweep, + freqs, + max_shift_len=bins, + ) else: spectrum_centered = build_symmetric_ifft_spectrum(sweep, freqs, fft_len=FFT_LEN) if spectrum_centered is None: @@ -381,7 +479,7 @@ def compute_distance_axis( return np.zeros((0,), dtype=np.float64) fft_mode = _normalize_fft_mode(mode, symmetric) if fft_mode == "positive_only_exact": - geometry = _resolve_positive_only_exact_geometry(freqs) + geometry = _resolve_positive_only_exact_geometry(freqs, max_shift_len=bins) if geometry is None: return np.arange(bins, dtype=np.float64) n_shift, df_hz = geometry diff --git a/tests/test_processing.py b/tests/test_processing.py index 9a87aae..4d04216 100644 --- a/tests/test_processing.py +++ b/tests/test_processing.py @@ -513,14 +513,45 @@ class ProcessingTests(unittest.TestCase): bins = 8 axis = compute_distance_axis(freqs, bins, mode="positive_only_exact") - df_hz = 1e9 - n_shift = int(np.arange(-6.0, 6.0 + 0.5, 1.0, dtype=np.float64).size) + # With a small bins budget the exact-mode grid is downsampled so + # internal IFFT length does not exceed visible bins. + df_hz = 2e9 + n_shift = int(np.arange(-6.0, 6.0 + 1.0, 2.0, dtype=np.float64).size) expected_step = C_M_S / (2.0 * n_shift * df_hz) expected = np.arange(bins, dtype=np.float64) * expected_step self.assertEqual(axis.shape, (bins,)) self.assertTrue(np.allclose(axis, expected)) + def test_positive_only_exact_mode_remains_stable_when_input_points_double(self): + bins = FFT_LEN // 2 + 1 + tau_s = 45e-9 + + freqs_400 = np.linspace(3.3, 14.3, 400, dtype=np.float64) + freqs_800 = np.linspace(3.3, 14.3, 800, dtype=np.float64) + sweep_400 = np.exp(-1j * 2.0 * np.pi * freqs_400 * 1e9 * tau_s).astype(np.complex64) + sweep_800 = np.exp(-1j * 2.0 * np.pi * freqs_800 * 1e9 * tau_s).astype(np.complex64) + + mag_400 = compute_fft_mag_row(sweep_400, freqs_400, bins, mode="positive_only_exact") + mag_800 = compute_fft_mag_row(sweep_800, freqs_800, bins, mode="positive_only_exact") + + self.assertEqual(mag_400.shape, mag_800.shape) + finite = np.isfinite(mag_400) & np.isfinite(mag_800) + self.assertGreater(int(np.count_nonzero(finite)), int(0.95 * bins)) + + idx_400 = int(np.nanargmax(mag_400)) + idx_800 = int(np.nanargmax(mag_800)) + peak_400 = float(np.nanmax(mag_400)) + peak_800 = float(np.nanmax(mag_800)) + + self.assertLess(abs(idx_400 - idx_800), 64) + self.assertGreater(idx_400, 8) + self.assertGreater(idx_800, 8) + self.assertLess(idx_400, bins - 8) + self.assertLess(idx_800, bins - 8) + self.assertGreater(peak_400, 0.05) + self.assertGreater(peak_800, 0.05) + def test_resolve_visible_fft_curves_handles_complex_mode(self): complex_row = np.asarray([1.0 + 2.0j, -3.0 + 4.0j], dtype=np.complex64) mag = np.abs(complex_row).astype(np.float32)