"""FFT helpers for line and waterfall views.""" from __future__ import annotations from typing import Optional, Tuple import numpy as np from rfg_adc_plotter.constants import C_M_S, FFT_LEN, SWEEP_FREQ_MAX_GHZ, SWEEP_FREQ_MIN_GHZ def _finite_freq_bounds(freqs: Optional[np.ndarray]) -> Optional[Tuple[float, float]]: """Return finite frequency bounds for the current working segment.""" if freqs is None: return None freq_arr = np.asarray(freqs, dtype=np.float64).reshape(-1) finite = freq_arr[np.isfinite(freq_arr)] if finite.size < 2: return None f_min = float(np.min(finite)) f_max = float(np.max(finite)) if not np.isfinite(f_min) or not np.isfinite(f_max) or f_max <= f_min: return None return f_min, f_max def _coerce_sweep_array(sweep: np.ndarray) -> np.ndarray: values = np.asarray(sweep).reshape(-1) if np.iscomplexobj(values): return np.asarray(values, dtype=np.complex64) return np.asarray(values, dtype=np.float32) def _interp_signal(x_uniform: np.ndarray, x_known: np.ndarray, y_known: np.ndarray) -> np.ndarray: if np.iscomplexobj(y_known): real = np.interp(x_uniform, x_known, np.asarray(y_known.real, dtype=np.float64)) imag = np.interp(x_uniform, x_known, np.asarray(y_known.imag, dtype=np.float64)) return (real + (1j * imag)).astype(np.complex64) return np.interp(x_uniform, x_known, np.asarray(y_known, dtype=np.float64)).astype(np.float32) def _fit_complex_bins(values: np.ndarray, bins: int) -> np.ndarray: arr = np.asarray(values, dtype=np.complex64).reshape(-1) if bins <= 0: return np.zeros((0,), dtype=np.complex64) if arr.size == bins: return arr out = np.full((bins,), np.nan + 0j, dtype=np.complex64) take = min(arr.size, bins) out[:take] = arr[:take] return out def _extract_positive_exact_band( sweep: np.ndarray, freqs: Optional[np.ndarray], ) -> Optional[Tuple[np.ndarray, np.ndarray, float, float]]: """Return sorted positive band data and exact-grid parameters.""" if freqs is None: return None sweep_arr = _coerce_sweep_array(sweep) freq_arr = np.asarray(freqs, dtype=np.float64).reshape(-1) take = min(int(sweep_arr.size), int(freq_arr.size)) if take <= 1: return None sweep_seg = sweep_arr[:take] freq_seg = freq_arr[:take] valid = np.isfinite(freq_seg) & np.isfinite(sweep_seg) & (freq_seg > 0.0) if int(np.count_nonzero(valid)) < 2: return None freq_band = np.asarray(freq_seg[valid], dtype=np.float64) sweep_band = np.asarray(sweep_seg[valid]) order = np.argsort(freq_band, kind="mergesort") freq_band = freq_band[order] sweep_band = sweep_band[order] n_band = int(freq_band.size) if n_band <= 1: return None f_min = float(freq_band[0]) f_max = float(freq_band[-1]) if (not np.isfinite(f_min)) or (not np.isfinite(f_max)) or f_max <= f_min: return None df_ghz = float((f_max - f_min) / max(1, n_band - 1)) if (not np.isfinite(df_ghz)) or df_ghz <= 0.0: return None return freq_band, sweep_band, f_max, df_ghz def _positive_exact_shift_size(f_max: float, df_ghz: float) -> int: if (not np.isfinite(f_max)) or (not np.isfinite(df_ghz)) or f_max <= 0.0 or df_ghz <= 0.0: return 0 return int(np.arange(-f_max, f_max + (0.5 * df_ghz), df_ghz, dtype=np.float64).size) def _resolve_positive_exact_band_size( f_min: float, f_max: float, n_band: int, max_shift_len: Optional[int], ) -> int: if n_band <= 2: return max(2, int(n_band)) if max_shift_len is None: return int(n_band) limit = int(max_shift_len) if limit <= 1: return max(2, int(n_band)) span = float(f_max - f_min) if (not np.isfinite(span)) or span <= 0.0: return int(n_band) df_current = float(span / max(1, int(n_band) - 1)) if _positive_exact_shift_size(f_max, df_current) <= limit: return int(n_band) denom = max(2.0 * f_max, 1e-12) approx = int(np.floor(1.0 + ((float(limit - 1) * span) / denom))) target = min(int(n_band), max(2, approx)) while target > 2: df_try = float(span / max(1, target - 1)) if _positive_exact_shift_size(f_max, df_try) <= limit: break target -= 1 return max(2, target) def _normalize_positive_exact_band( freq_band: np.ndarray, sweep_band: np.ndarray, *, max_shift_len: Optional[int] = None, ) -> Optional[Tuple[np.ndarray, np.ndarray, float, float]]: freq_arr = np.asarray(freq_band, dtype=np.float64).reshape(-1) sweep_arr = np.asarray(sweep_band).reshape(-1) width = min(int(freq_arr.size), int(sweep_arr.size)) if width <= 1: return None freq_arr = freq_arr[:width] sweep_arr = sweep_arr[:width] f_min = float(freq_arr[0]) f_max = float(freq_arr[-1]) if (not np.isfinite(f_min)) or (not np.isfinite(f_max)) or f_max <= f_min: return None target_band = _resolve_positive_exact_band_size(f_min, f_max, int(freq_arr.size), max_shift_len) if target_band < int(freq_arr.size): target_freqs = np.linspace(f_min, f_max, target_band, dtype=np.float64) target_sweep = _interp_signal(target_freqs, freq_arr, sweep_arr) freq_arr = target_freqs sweep_arr = np.asarray(target_sweep).reshape(-1) n_band = int(freq_arr.size) if n_band <= 1: return None df_ghz = float((f_max - f_min) / max(1, n_band - 1)) if (not np.isfinite(df_ghz)) or df_ghz <= 0.0: return None return freq_arr, sweep_arr, f_max, df_ghz def _resolve_positive_only_exact_geometry( freqs: Optional[np.ndarray], *, max_shift_len: Optional[int] = None, ) -> Optional[Tuple[int, float]]: """Return (N_shift, df_hz) for the exact centered positive-only mode.""" if freqs is None: return None freq_arr = np.asarray(freqs, dtype=np.float64).reshape(-1) finite = np.asarray(freq_arr[np.isfinite(freq_arr) & (freq_arr > 0.0)], dtype=np.float64) if finite.size < 2: return None finite.sort(kind="mergesort") f_min = float(finite[0]) f_max = float(finite[-1]) if (not np.isfinite(f_min)) or (not np.isfinite(f_max)) or f_max <= f_min: return None n_band = int(finite.size) target_band = _resolve_positive_exact_band_size(f_min, f_max, n_band, max_shift_len) n_band = max(2, min(n_band, target_band)) df_ghz = float((f_max - f_min) / max(1, n_band - 1)) if (not np.isfinite(df_ghz)) or df_ghz <= 0.0: return None n_shift = _positive_exact_shift_size(f_max, df_ghz) if n_shift <= 1: return None return int(n_shift), float(df_ghz * 1e9) def prepare_fft_segment( sweep: np.ndarray, freqs: Optional[np.ndarray], fft_len: int = FFT_LEN, ) -> Optional[Tuple[np.ndarray, int]]: """Prepare a sweep segment for FFT on a uniform frequency grid.""" take_fft = min(int(sweep.size), int(fft_len)) if take_fft <= 0: return None sweep_arr = _coerce_sweep_array(sweep) sweep_seg = sweep_arr[:take_fft] fallback_dtype = np.complex64 if np.iscomplexobj(sweep_seg) else np.float32 fallback = np.nan_to_num(sweep_seg, nan=0.0).astype(fallback_dtype, copy=False) if freqs is None: return fallback, take_fft freq_arr = np.asarray(freqs) if freq_arr.size < take_fft: return fallback, take_fft freq_seg = np.asarray(freq_arr[:take_fft], dtype=np.float64) valid = np.isfinite(sweep_seg) & np.isfinite(freq_seg) if int(np.count_nonzero(valid)) < 2: return fallback, take_fft x_valid = freq_seg[valid] y_valid = sweep_seg[valid] order = np.argsort(x_valid, kind="mergesort") x_valid = x_valid[order] y_valid = y_valid[order] x_unique, unique_idx = np.unique(x_valid, return_index=True) y_unique = y_valid[unique_idx] if x_unique.size < 2 or x_unique[-1] <= x_unique[0]: return fallback, take_fft x_uniform = np.linspace(float(x_unique[0]), float(x_unique[-1]), take_fft, dtype=np.float64) resampled = _interp_signal(x_uniform, x_unique, y_unique) return resampled, take_fft def build_symmetric_ifft_spectrum( sweep: np.ndarray, freqs: Optional[np.ndarray], fft_len: int = FFT_LEN, ) -> Optional[np.ndarray]: """Build a centered symmetric spectrum over [-f_max, f_max] for IFFT.""" if fft_len <= 0: return None bounds = _finite_freq_bounds(freqs) if bounds is None: f_min = float(SWEEP_FREQ_MIN_GHZ) f_max = float(SWEEP_FREQ_MAX_GHZ) else: f_min, f_max = bounds freq_axis = np.linspace(-f_max, f_max, int(fft_len), dtype=np.float64) neg_idx_all = np.flatnonzero(freq_axis <= (-f_min)) pos_idx_all = np.flatnonzero(freq_axis >= f_min) band_len = int(min(neg_idx_all.size, pos_idx_all.size)) if band_len <= 1: return None neg_idx = neg_idx_all[:band_len] pos_idx = pos_idx_all[-band_len:] prepared = prepare_fft_segment(sweep, freqs, fft_len=band_len) if prepared is None: return None fft_seg, take_fft = prepared if take_fft != band_len: fft_dtype = np.complex64 if np.iscomplexobj(fft_seg) else np.float32 fft_seg = np.asarray(fft_seg[:band_len], dtype=fft_dtype) if fft_seg.size < band_len: padded = np.zeros((band_len,), dtype=fft_dtype) padded[: fft_seg.size] = fft_seg fft_seg = padded window = np.hanning(band_len).astype(np.float32) band_dtype = np.complex64 if np.iscomplexobj(fft_seg) else np.float32 band = np.nan_to_num(fft_seg, nan=0.0).astype(band_dtype, copy=False) * window spectrum = np.zeros((int(fft_len),), dtype=band_dtype) spectrum[pos_idx] = band spectrum[neg_idx] = np.conj(band[::-1]) if np.iscomplexobj(band) else band[::-1] return spectrum def build_positive_only_centered_ifft_spectrum( sweep: np.ndarray, freqs: Optional[np.ndarray], fft_len: int = FFT_LEN, ) -> Optional[np.ndarray]: """Build a centered spectrum with zeros from -f_max to +f_min.""" if fft_len <= 0: return None bounds = _finite_freq_bounds(freqs) if bounds is None: f_min = float(SWEEP_FREQ_MIN_GHZ) f_max = float(SWEEP_FREQ_MAX_GHZ) else: f_min, f_max = bounds freq_axis = np.linspace(-f_max, f_max, int(fft_len), dtype=np.float64) pos_idx = np.flatnonzero(freq_axis >= f_min) band_len = int(pos_idx.size) if band_len <= 1: return None prepared = prepare_fft_segment(sweep, freqs, fft_len=band_len) if prepared is None: return None fft_seg, take_fft = prepared if take_fft != band_len: fft_dtype = np.complex64 if np.iscomplexobj(fft_seg) else np.float32 fft_seg = np.asarray(fft_seg[:band_len], dtype=fft_dtype) if fft_seg.size < band_len: padded = np.zeros((band_len,), dtype=fft_dtype) padded[: fft_seg.size] = fft_seg fft_seg = padded window = np.hanning(band_len).astype(np.float32) band_dtype = np.complex64 if np.iscomplexobj(fft_seg) else np.float32 band = np.nan_to_num(fft_seg, nan=0.0).astype(band_dtype, copy=False) * window spectrum = np.zeros((int(fft_len),), dtype=band_dtype) spectrum[pos_idx] = band return spectrum def build_positive_only_exact_centered_ifft_spectrum( sweep: np.ndarray, freqs: Optional[np.ndarray], *, max_shift_len: Optional[int] = None, ) -> Optional[np.ndarray]: """Build centered spectrum exactly as zeros[-f_max..+f_min) + measured positive band.""" prepared = _extract_positive_exact_band(sweep, freqs) if prepared is None: return None freq_band, sweep_band, _f_max, _df_ghz = prepared normalized = _normalize_positive_exact_band( freq_band, sweep_band, max_shift_len=max_shift_len, ) if normalized is None: return None freq_band, sweep_band, f_max, df_ghz = normalized f_shift = np.arange(-f_max, f_max + (0.5 * df_ghz), df_ghz, dtype=np.float64) if f_shift.size <= 1: return None band_dtype = np.complex64 if np.iscomplexobj(sweep_band) else np.float32 band = np.nan_to_num(np.asarray(sweep_band, dtype=band_dtype), nan=0.0) spectrum = np.zeros((int(f_shift.size),), dtype=band_dtype) idx = np.round((freq_band - f_shift[0]) / df_ghz).astype(np.int64) idx = np.clip(idx, 0, spectrum.size - 1) spectrum[idx] = band return spectrum def fft_mag_to_db(mag: np.ndarray) -> np.ndarray: """Convert magnitude to dB with safe zero handling.""" mag_arr = np.asarray(mag, dtype=np.float32) safe_mag = np.maximum(mag_arr, 0.0) return (20.0 * np.log10(safe_mag + 1e-9)).astype(np.float32, copy=False) def _compute_fft_complex_row_direct( sweep: np.ndarray, freqs: Optional[np.ndarray], bins: int, ) -> np.ndarray: prepared = prepare_fft_segment(sweep, freqs, fft_len=FFT_LEN) if prepared is None: return np.full((bins,), np.nan + 0j, dtype=np.complex64) fft_seg, take_fft = prepared fft_in = np.zeros((FFT_LEN,), dtype=np.complex64) window = np.hanning(take_fft).astype(np.float32) fft_in[:take_fft] = np.asarray(fft_seg, dtype=np.complex64) * window spec = np.fft.ifft(fft_in) return _fit_complex_bins(spec, bins) def _normalize_fft_mode(mode: str | None, symmetric: Optional[bool]) -> str: if symmetric is not None: return "symmetric" if symmetric else "direct" normalized = str(mode or "symmetric").strip().lower() if normalized in {"direct", "ordinary", "normal"}: return "direct" if normalized in {"symmetric", "sym", "mirror"}: return "symmetric" if normalized in {"positive_only", "positive-centered", "positive_centered", "zero_left"}: return "positive_only" if normalized in {"positive_only_exact", "positive-centered-exact", "positive_centered_exact", "zero_left_exact"}: return "positive_only_exact" raise ValueError(f"Unsupported FFT mode: {mode!r}") def compute_fft_complex_row( sweep: np.ndarray, freqs: Optional[np.ndarray], bins: int, *, mode: str = "symmetric", symmetric: Optional[bool] = None, ) -> np.ndarray: """Compute a complex FFT/IFFT row on the distance axis.""" if bins <= 0: return np.zeros((0,), dtype=np.complex64) fft_mode = _normalize_fft_mode(mode, symmetric) if fft_mode == "direct": return _compute_fft_complex_row_direct(sweep, freqs, bins) if fft_mode == "positive_only": spectrum_centered = build_positive_only_centered_ifft_spectrum(sweep, freqs, fft_len=FFT_LEN) elif fft_mode == "positive_only_exact": spectrum_centered = build_positive_only_exact_centered_ifft_spectrum( sweep, freqs, max_shift_len=bins, ) else: spectrum_centered = build_symmetric_ifft_spectrum(sweep, freqs, fft_len=FFT_LEN) if spectrum_centered is None: return np.full((bins,), np.nan + 0j, dtype=np.complex64) spec = np.fft.ifft(np.fft.ifftshift(np.asarray(spectrum_centered, dtype=np.complex64))) return _fit_complex_bins(spec, bins) def compute_fft_mag_row( sweep: np.ndarray, freqs: Optional[np.ndarray], bins: int, *, mode: str = "symmetric", symmetric: Optional[bool] = None, ) -> np.ndarray: """Compute a linear FFT magnitude row.""" complex_row = compute_fft_complex_row(sweep, freqs, bins, mode=mode, symmetric=symmetric) return np.abs(complex_row).astype(np.float32, copy=False) def compute_fft_row( sweep: np.ndarray, freqs: Optional[np.ndarray], bins: int, *, mode: str = "symmetric", symmetric: Optional[bool] = None, ) -> np.ndarray: """Compute a dB FFT row.""" return fft_mag_to_db(compute_fft_mag_row(sweep, freqs, bins, mode=mode, symmetric=symmetric)) def compute_distance_axis( freqs: Optional[np.ndarray], bins: int, *, mode: str = "symmetric", symmetric: Optional[bool] = None, ) -> np.ndarray: """Compute the one-way distance axis for IFFT output.""" if bins <= 0: return np.zeros((0,), dtype=np.float64) fft_mode = _normalize_fft_mode(mode, symmetric) if fft_mode == "positive_only_exact": geometry = _resolve_positive_only_exact_geometry(freqs, max_shift_len=bins) if geometry is None: return np.arange(bins, dtype=np.float64) n_shift, df_hz = geometry if (not np.isfinite(df_hz)) or df_hz <= 0.0 or n_shift <= 0: return np.arange(bins, dtype=np.float64) step_m = C_M_S / (2.0 * float(n_shift) * df_hz) return np.arange(bins, dtype=np.float64) * step_m if fft_mode in {"symmetric", "positive_only"}: bounds = _finite_freq_bounds(freqs) if bounds is None: f_max = float(SWEEP_FREQ_MAX_GHZ) else: _, f_max = bounds df_ghz = (2.0 * f_max) / max(1, FFT_LEN - 1) else: if freqs is None: return np.arange(bins, dtype=np.float64) freq_arr = np.asarray(freqs, dtype=np.float64) finite = freq_arr[np.isfinite(freq_arr)] if finite.size < 2: return np.arange(bins, dtype=np.float64) df_ghz = float((finite[-1] - finite[0]) / max(1, finite.size - 1)) df_hz = abs(df_ghz) * 1e9 if not np.isfinite(df_hz) or df_hz <= 0.0: return np.arange(bins, dtype=np.float64) step_m = C_M_S / (2.0 * FFT_LEN * df_hz) return np.arange(bins, dtype=np.float64) * step_m