diff --git a/rfg_adc_plotter/gui/pyqtgraph_backend.py b/rfg_adc_plotter/gui/pyqtgraph_backend.py index 0b1a8ac..f82769f 100644 --- a/rfg_adc_plotter/gui/pyqtgraph_backend.py +++ b/rfg_adc_plotter/gui/pyqtgraph_backend.py @@ -205,6 +205,7 @@ def run_pyqtgraph(args) -> None: bg_compute_cb = QtWidgets.QCheckBox("расчет фона") bg_subtract_cb = QtWidgets.QCheckBox("вычет фона") fft_bg_subtract_cb = QtWidgets.QCheckBox("FFT вычет фона") + fft_symmetric_cb = QtWidgets.QCheckBox("симм. IFFT") peak_search_cb = QtWidgets.QCheckBox("поиск пиков") calib_group = QtWidgets.QGroupBox("Калибровка") calib_group_layout = QtWidgets.QVBoxLayout(calib_group) @@ -239,6 +240,7 @@ def run_pyqtgraph(args) -> None: settings_layout.addWidget(bg_compute_cb) settings_layout.addWidget(bg_subtract_cb) settings_layout.addWidget(fft_bg_subtract_cb) + settings_layout.addWidget(fft_symmetric_cb) settings_layout.addWidget(peak_search_cb) status = pg.LabelItem(justify="left") @@ -248,6 +250,7 @@ def run_pyqtgraph(args) -> None: bg_compute_enabled = True bg_subtract_enabled = False fft_bg_subtract_enabled = False + fft_symmetric_enabled = True status_note = "" status_dirty = True fixed_ylim: Optional[Tuple[float, float]] = None @@ -441,11 +444,26 @@ def run_pyqtgraph(args) -> None: fft_bg_subtract_enabled = False runtime.mark_dirty() + def set_fft_symmetric_enabled() -> None: + nonlocal fft_symmetric_enabled + try: + fft_symmetric_enabled = bool(fft_symmetric_cb.isChecked()) + except Exception: + fft_symmetric_enabled = True + runtime.ring.set_symmetric_fft_enabled(fft_symmetric_enabled) + runtime.current_distances = runtime.ring.distance_axis + runtime.current_fft_db = None + set_status_note("IFFT: симметричный" if fft_symmetric_enabled else "IFFT: обычный") + update_physical_axes() + runtime.mark_dirty() + try: bg_compute_cb.setChecked(True) + fft_symmetric_cb.setChecked(True) except Exception: pass set_bg_compute_enabled() + set_fft_symmetric_enabled() try: calib_cb.stateChanged.connect(lambda _v: set_calib_enabled()) @@ -455,6 +473,7 @@ def run_pyqtgraph(args) -> None: bg_compute_cb.stateChanged.connect(lambda _v: set_bg_compute_enabled()) bg_subtract_cb.stateChanged.connect(lambda _v: set_bg_subtract_enabled()) fft_bg_subtract_cb.stateChanged.connect(lambda _v: set_fft_bg_subtract_enabled()) + fft_symmetric_cb.stateChanged.connect(lambda _v: set_fft_symmetric_enabled()) except Exception: pass @@ -745,7 +764,12 @@ def run_pyqtgraph(args) -> None: distance_axis = runtime.current_distances if runtime.current_distances is not None else runtime.ring.distance_axis if sweep_for_fft is not None and sweep_for_fft.size > 0 and distance_axis is not None: if runtime.current_fft_db is None or runtime.current_fft_db.size != distance_axis.size or runtime.plot_dirty: - runtime.current_fft_db = compute_fft_row(sweep_for_fft, runtime.current_freqs, distance_axis.size) + runtime.current_fft_db = compute_fft_row( + sweep_for_fft, + runtime.current_freqs, + distance_axis.size, + symmetric=fft_symmetric_enabled, + ) fft_vals = runtime.current_fft_db xs_fft = distance_axis[: fft_vals.size] if fft_bg_subtract_enabled and bg_fft_for_line is not None: diff --git a/rfg_adc_plotter/processing/fft.py b/rfg_adc_plotter/processing/fft.py index 3d35123..76884df 100644 --- a/rfg_adc_plotter/processing/fft.py +++ b/rfg_adc_plotter/processing/fft.py @@ -6,7 +6,7 @@ from typing import Optional, Tuple import numpy as np -from rfg_adc_plotter.constants import C_M_S, FFT_LEN +from rfg_adc_plotter.constants import C_M_S, FFT_LEN, SWEEP_FREQ_MAX_GHZ, SWEEP_FREQ_MIN_GHZ def prepare_fft_segment( @@ -48,6 +48,45 @@ def prepare_fft_segment( return resampled, take_fft +def build_symmetric_ifft_spectrum( + sweep: np.ndarray, + freqs: Optional[np.ndarray], + fft_len: int = FFT_LEN, +) -> Optional[np.ndarray]: + """Build a centered symmetric spectrum over [-f_max, f_max] for IFFT.""" + if fft_len <= 0: + return None + + freq_axis = np.linspace(-SWEEP_FREQ_MAX_GHZ, SWEEP_FREQ_MAX_GHZ, int(fft_len), dtype=np.float64) + neg_idx_all = np.flatnonzero(freq_axis <= (-SWEEP_FREQ_MIN_GHZ)) + pos_idx_all = np.flatnonzero(freq_axis >= SWEEP_FREQ_MIN_GHZ) + band_len = int(min(neg_idx_all.size, pos_idx_all.size)) + if band_len <= 1: + return None + + neg_idx = neg_idx_all[:band_len] + pos_idx = pos_idx_all[-band_len:] + prepared = prepare_fft_segment(sweep, freqs, fft_len=band_len) + if prepared is None: + return None + + fft_seg, take_fft = prepared + if take_fft != band_len: + fft_seg = np.asarray(fft_seg[:band_len], dtype=np.float32) + if fft_seg.size < band_len: + padded = np.zeros((band_len,), dtype=np.float32) + padded[: fft_seg.size] = fft_seg + fft_seg = padded + + window = np.hanning(band_len).astype(np.float32) + band = np.nan_to_num(fft_seg, nan=0.0).astype(np.float32, copy=False) * window + + spectrum = np.zeros((int(fft_len),), dtype=np.float32) + spectrum[pos_idx] = band + spectrum[neg_idx] = band[::-1] + return spectrum + + def fft_mag_to_db(mag: np.ndarray) -> np.ndarray: """Convert magnitude to dB with safe zero handling.""" mag_arr = np.asarray(mag, dtype=np.float32) @@ -55,15 +94,11 @@ def fft_mag_to_db(mag: np.ndarray) -> np.ndarray: return (20.0 * np.log10(safe_mag + 1e-9)).astype(np.float32, copy=False) -def compute_fft_mag_row( +def _compute_fft_mag_row_direct( sweep: np.ndarray, freqs: Optional[np.ndarray], bins: int, ) -> np.ndarray: - """Compute a linear FFT magnitude row.""" - if bins <= 0: - return np.zeros((0,), dtype=np.float32) - prepared = prepare_fft_segment(sweep, freqs, fft_len=FFT_LEN) if prepared is None: return np.full((bins,), np.nan, dtype=np.float32) @@ -79,28 +114,56 @@ def compute_fft_mag_row( return mag +def compute_fft_mag_row( + sweep: np.ndarray, + freqs: Optional[np.ndarray], + bins: int, + *, + symmetric: bool = True, +) -> np.ndarray: + """Compute a linear FFT magnitude row.""" + if bins <= 0: + return np.zeros((0,), dtype=np.float32) + + if not symmetric: + return _compute_fft_mag_row_direct(sweep, freqs, bins) + + spectrum_centered = build_symmetric_ifft_spectrum(sweep, freqs, fft_len=FFT_LEN) + if spectrum_centered is None: + return np.full((bins,), np.nan, dtype=np.float32) + + spec = np.fft.ifft(np.fft.ifftshift(spectrum_centered)) + mag = np.abs(spec).astype(np.float32) + if mag.shape[0] != bins: + mag = mag[:bins] + return mag + + def compute_fft_row( sweep: np.ndarray, freqs: Optional[np.ndarray], bins: int, + *, + symmetric: bool = True, ) -> np.ndarray: """Compute a dB FFT row.""" - return fft_mag_to_db(compute_fft_mag_row(sweep, freqs, bins)) + return fft_mag_to_db(compute_fft_mag_row(sweep, freqs, bins, symmetric=symmetric)) -def compute_distance_axis(freqs: Optional[np.ndarray], bins: int) -> np.ndarray: +def compute_distance_axis(freqs: Optional[np.ndarray], bins: int, *, symmetric: bool = True) -> np.ndarray: """Compute the one-way distance axis for IFFT output.""" if bins <= 0: return np.zeros((0,), dtype=np.float64) - if freqs is None: - return np.arange(bins, dtype=np.float64) - - freq_arr = np.asarray(freqs, dtype=np.float64) - finite = freq_arr[np.isfinite(freq_arr)] - if finite.size < 2: - return np.arange(bins, dtype=np.float64) - - df_ghz = float((finite[-1] - finite[0]) / max(1, finite.size - 1)) + if symmetric: + df_ghz = (2.0 * float(SWEEP_FREQ_MAX_GHZ)) / max(1, FFT_LEN - 1) + else: + if freqs is None: + return np.arange(bins, dtype=np.float64) + freq_arr = np.asarray(freqs, dtype=np.float64) + finite = freq_arr[np.isfinite(freq_arr)] + if finite.size < 2: + return np.arange(bins, dtype=np.float64) + df_ghz = float((finite[-1] - finite[0]) / max(1, finite.size - 1)) df_hz = abs(df_ghz) * 1e9 if not np.isfinite(df_hz) or df_hz <= 0.0: return np.arange(bins, dtype=np.float64) diff --git a/rfg_adc_plotter/state/ring_buffer.py b/rfg_adc_plotter/state/ring_buffer.py index e789595..faaa22a 100644 --- a/rfg_adc_plotter/state/ring_buffer.py +++ b/rfg_adc_plotter/state/ring_buffer.py @@ -17,6 +17,7 @@ class RingBuffer: def __init__(self, max_sweeps: int): self.max_sweeps = int(max_sweeps) self.fft_bins = FFT_LEN // 2 + 1 + self.fft_symmetric = True self.width = 0 self.head = 0 self.ring: Optional[np.ndarray] = None @@ -25,6 +26,7 @@ class RingBuffer: self.x_shared: Optional[np.ndarray] = None self.distance_axis: Optional[np.ndarray] = None self.last_fft_db: Optional[np.ndarray] = None + self.last_freqs: Optional[np.ndarray] = None self.y_min_fft: Optional[float] = None self.y_max_fft: Optional[float] = None @@ -61,6 +63,50 @@ class RingBuffer: changed = True return changed + def set_symmetric_fft_enabled(self, enabled: bool) -> bool: + """Switch FFT mode and rebuild cached FFT rows from stored sweeps.""" + enabled_bool = bool(enabled) + if enabled_bool == self.fft_symmetric: + return False + + self.fft_symmetric = enabled_bool + self.y_min_fft = None + self.y_max_fft = None + + if self.ring is None or self.ring_fft is None: + return True + + self.ring_fft.fill(np.nan) + for row_idx in range(self.ring.shape[0]): + sweep_row = self.ring[row_idx] + if not np.any(np.isfinite(sweep_row)): + continue + fft_mag = compute_fft_mag_row( + sweep_row, + self.last_freqs, + self.fft_bins, + symmetric=self.fft_symmetric, + ) + self.ring_fft[row_idx, :] = fft_mag + + if self.last_freqs is not None: + self.distance_axis = compute_distance_axis( + self.last_freqs, + self.fft_bins, + symmetric=self.fft_symmetric, + ) + + last_idx = (self.head - 1) % self.max_sweeps + if self.ring_fft.shape[0] > 0: + last_fft = self.ring_fft[last_idx] + self.last_fft_db = fft_mag_to_db(last_fft) + finite = self.ring_fft[np.isfinite(self.ring_fft)] + if finite.size > 0: + finite_db = fft_mag_to_db(finite.astype(np.float32, copy=False)) + self.y_min_fft = float(np.nanmin(finite_db)) + self.y_max_fft = float(np.nanmax(finite_db)) + return True + def push(self, sweep: np.ndarray, freqs: Optional[np.ndarray] = None) -> None: """Push a processed sweep and refresh raw/FFT buffers.""" if sweep is None or sweep.size == 0: @@ -74,8 +120,10 @@ class RingBuffer: row[:take] = np.asarray(sweep[:take], dtype=np.float32) self.ring[self.head, :] = row self.ring_time[self.head] = time.time() + if freqs is not None: + self.last_freqs = np.asarray(freqs, dtype=np.float64).copy() - fft_mag = compute_fft_mag_row(sweep, freqs, self.fft_bins) + fft_mag = compute_fft_mag_row(sweep, freqs, self.fft_bins, symmetric=self.fft_symmetric) self.ring_fft[self.head, :] = fft_mag self.last_fft_db = fft_mag_to_db(fft_mag) @@ -85,7 +133,7 @@ class RingBuffer: self.y_min_fft = fr_min if self.y_min_fft is None else min(self.y_min_fft, fr_min) self.y_max_fft = fr_max if self.y_max_fft is None else max(self.y_max_fft, fr_max) - self.distance_axis = compute_distance_axis(freqs, self.fft_bins) + self.distance_axis = compute_distance_axis(freqs, self.fft_bins, symmetric=self.fft_symmetric) self.head = (self.head + 1) % self.max_sweeps def get_display_raw(self) -> np.ndarray: diff --git a/tests/test_processing.py b/tests/test_processing.py index dab5d61..02684c9 100644 --- a/tests/test_processing.py +++ b/tests/test_processing.py @@ -5,6 +5,7 @@ import tempfile import numpy as np import unittest +from rfg_adc_plotter.constants import FFT_LEN, SWEEP_FREQ_MAX_GHZ, SWEEP_FREQ_MIN_GHZ from rfg_adc_plotter.processing.calibration import ( build_calib_envelope, calibrate_freqs, @@ -12,7 +13,12 @@ from rfg_adc_plotter.processing.calibration import ( recalculate_calibration_c, save_calib_envelope, ) -from rfg_adc_plotter.processing.fft import compute_distance_axis, compute_fft_mag_row, compute_fft_row +from rfg_adc_plotter.processing.fft import ( + build_symmetric_ifft_spectrum, + compute_distance_axis, + compute_fft_mag_row, + compute_fft_row, +) from rfg_adc_plotter.processing.normalization import ( build_calib_envelopes, normalize_by_calib, @@ -94,6 +100,23 @@ class ProcessingTests(unittest.TestCase): self.assertEqual(axis.shape, (513,)) self.assertTrue(np.all(np.diff(axis) >= 0.0)) + def test_symmetric_ifft_spectrum_has_zero_gap_and_mirrored_band(self): + sweep = np.linspace(1.0, 2.0, 128, dtype=np.float32) + freqs = np.linspace(SWEEP_FREQ_MIN_GHZ, SWEEP_FREQ_MAX_GHZ, 128, dtype=np.float64) + spectrum = build_symmetric_ifft_spectrum(sweep, freqs, fft_len=FFT_LEN) + + self.assertIsNotNone(spectrum) + freq_axis = np.linspace(-SWEEP_FREQ_MAX_GHZ, SWEEP_FREQ_MAX_GHZ, FFT_LEN, dtype=np.float64) + neg_idx_all = np.flatnonzero(freq_axis <= (-SWEEP_FREQ_MIN_GHZ)) + pos_idx_all = np.flatnonzero(freq_axis >= SWEEP_FREQ_MIN_GHZ) + band_len = int(min(neg_idx_all.size, pos_idx_all.size)) + neg_idx = neg_idx_all[:band_len] + pos_idx = pos_idx_all[-band_len:] + zero_mask = (freq_axis > (-SWEEP_FREQ_MIN_GHZ)) & (freq_axis < SWEEP_FREQ_MIN_GHZ) + + self.assertTrue(np.allclose(spectrum[zero_mask], 0.0)) + self.assertTrue(np.allclose(spectrum[neg_idx], spectrum[pos_idx][::-1])) + def test_peak_helpers_find_reference_and_peak_boxes(self): xs = np.linspace(0.0, 10.0, 200) ys = np.exp(-((xs - 5.0) ** 2) / 0.4) * 10.0 + 1.0 diff --git a/tests/test_ring_buffer.py b/tests/test_ring_buffer.py index 16c08fc..0cbeff2 100644 --- a/tests/test_ring_buffer.py +++ b/tests/test_ring_buffer.py @@ -39,6 +39,23 @@ class RingBufferTests(unittest.TestCase): self.assertIsNotNone(ring.last_fft_db) self.assertEqual(ring.last_fft_db.shape, (ring.fft_bins,)) + def test_ring_buffer_can_switch_fft_mode_and_rebuild_fft_rows(self): + ring = RingBuffer(max_sweeps=2) + sweep = np.linspace(0.0, 1.0, 64, dtype=np.float32) + freqs = np.linspace(3.3, 14.3, 64, dtype=np.float64) + ring.push(sweep, freqs) + fft_before = ring.last_fft_db.copy() + axis_before = ring.distance_axis.copy() + + changed = ring.set_symmetric_fft_enabled(False) + + self.assertTrue(changed) + self.assertFalse(ring.fft_symmetric) + self.assertEqual(ring.get_display_raw().shape[1], 2) + self.assertEqual(ring.last_fft_db.shape, fft_before.shape) + self.assertFalse(np.allclose(ring.last_fft_db, fft_before)) + self.assertFalse(np.allclose(ring.distance_axis, axis_before)) + if __name__ == "__main__": unittest.main()