"""FFT helpers for line and waterfall views.""" from __future__ import annotations from typing import Optional, Tuple import numpy as np from rfg_adc_plotter.constants import C_M_S, FFT_LEN, SWEEP_FREQ_MAX_GHZ, SWEEP_FREQ_MIN_GHZ def prepare_fft_segment( sweep: np.ndarray, freqs: Optional[np.ndarray], fft_len: int = FFT_LEN, ) -> Optional[Tuple[np.ndarray, int]]: """Prepare a sweep segment for FFT on a uniform frequency grid.""" take_fft = min(int(sweep.size), int(fft_len)) if take_fft <= 0: return None sweep_seg = np.asarray(sweep[:take_fft], dtype=np.float32) fallback = np.nan_to_num(sweep_seg, nan=0.0).astype(np.float32, copy=False) if freqs is None: return fallback, take_fft freq_arr = np.asarray(freqs) if freq_arr.size < take_fft: return fallback, take_fft freq_seg = np.asarray(freq_arr[:take_fft], dtype=np.float64) valid = np.isfinite(sweep_seg) & np.isfinite(freq_seg) if int(np.count_nonzero(valid)) < 2: return fallback, take_fft x_valid = freq_seg[valid] y_valid = sweep_seg[valid] order = np.argsort(x_valid, kind="mergesort") x_valid = x_valid[order] y_valid = y_valid[order] x_unique, unique_idx = np.unique(x_valid, return_index=True) y_unique = y_valid[unique_idx] if x_unique.size < 2 or x_unique[-1] <= x_unique[0]: return fallback, take_fft x_uniform = np.linspace(float(x_unique[0]), float(x_unique[-1]), take_fft, dtype=np.float64) resampled = np.interp(x_uniform, x_unique, y_unique).astype(np.float32) return resampled, take_fft def build_symmetric_ifft_spectrum( sweep: np.ndarray, freqs: Optional[np.ndarray], fft_len: int = FFT_LEN, ) -> Optional[np.ndarray]: """Build a centered symmetric spectrum over [-f_max, f_max] for IFFT.""" if fft_len <= 0: return None freq_axis = np.linspace(-SWEEP_FREQ_MAX_GHZ, SWEEP_FREQ_MAX_GHZ, int(fft_len), dtype=np.float64) neg_idx_all = np.flatnonzero(freq_axis <= (-SWEEP_FREQ_MIN_GHZ)) pos_idx_all = np.flatnonzero(freq_axis >= SWEEP_FREQ_MIN_GHZ) band_len = int(min(neg_idx_all.size, pos_idx_all.size)) if band_len <= 1: return None neg_idx = neg_idx_all[:band_len] pos_idx = pos_idx_all[-band_len:] prepared = prepare_fft_segment(sweep, freqs, fft_len=band_len) if prepared is None: return None fft_seg, take_fft = prepared if take_fft != band_len: fft_seg = np.asarray(fft_seg[:band_len], dtype=np.float32) if fft_seg.size < band_len: padded = np.zeros((band_len,), dtype=np.float32) padded[: fft_seg.size] = fft_seg fft_seg = padded window = np.hanning(band_len).astype(np.float32) band = np.nan_to_num(fft_seg, nan=0.0).astype(np.float32, copy=False) * window spectrum = np.zeros((int(fft_len),), dtype=np.float32) spectrum[pos_idx] = band spectrum[neg_idx] = band[::-1] return spectrum def fft_mag_to_db(mag: np.ndarray) -> np.ndarray: """Convert magnitude to dB with safe zero handling.""" mag_arr = np.asarray(mag, dtype=np.float32) safe_mag = np.maximum(mag_arr, 0.0) return (20.0 * np.log10(safe_mag + 1e-9)).astype(np.float32, copy=False) def _compute_fft_mag_row_direct( sweep: np.ndarray, freqs: Optional[np.ndarray], bins: int, ) -> np.ndarray: prepared = prepare_fft_segment(sweep, freqs, fft_len=FFT_LEN) if prepared is None: return np.full((bins,), np.nan, dtype=np.float32) fft_seg, take_fft = prepared fft_in = np.zeros((FFT_LEN,), dtype=np.float32) window = np.hanning(take_fft).astype(np.float32) fft_in[:take_fft] = fft_seg * window spec = np.fft.ifft(fft_in) mag = np.abs(spec).astype(np.float32) if mag.shape[0] != bins: mag = mag[:bins] return mag def compute_fft_mag_row( sweep: np.ndarray, freqs: Optional[np.ndarray], bins: int, *, symmetric: bool = True, ) -> np.ndarray: """Compute a linear FFT magnitude row.""" if bins <= 0: return np.zeros((0,), dtype=np.float32) if not symmetric: return _compute_fft_mag_row_direct(sweep, freqs, bins) spectrum_centered = build_symmetric_ifft_spectrum(sweep, freqs, fft_len=FFT_LEN) if spectrum_centered is None: return np.full((bins,), np.nan, dtype=np.float32) spec = np.fft.ifft(np.fft.ifftshift(spectrum_centered)) mag = np.abs(spec).astype(np.float32) if mag.shape[0] != bins: mag = mag[:bins] return mag def compute_fft_row( sweep: np.ndarray, freqs: Optional[np.ndarray], bins: int, *, symmetric: bool = True, ) -> np.ndarray: """Compute a dB FFT row.""" return fft_mag_to_db(compute_fft_mag_row(sweep, freqs, bins, symmetric=symmetric)) def compute_distance_axis(freqs: Optional[np.ndarray], bins: int, *, symmetric: bool = True) -> np.ndarray: """Compute the one-way distance axis for IFFT output.""" if bins <= 0: return np.zeros((0,), dtype=np.float64) if symmetric: df_ghz = (2.0 * float(SWEEP_FREQ_MAX_GHZ)) / max(1, FFT_LEN - 1) else: if freqs is None: return np.arange(bins, dtype=np.float64) freq_arr = np.asarray(freqs, dtype=np.float64) finite = freq_arr[np.isfinite(freq_arr)] if finite.size < 2: return np.arange(bins, dtype=np.float64) df_ghz = float((finite[-1] - finite[0]) / max(1, finite.size - 1)) df_hz = abs(df_ghz) * 1e9 if not np.isfinite(df_hz) or df_hz <= 0.0: return np.arange(bins, dtype=np.float64) step_m = C_M_S / (2.0 * FFT_LEN * df_hz) return np.arange(bins, dtype=np.float64) * step_m