173 lines
5.6 KiB
Python
173 lines
5.6 KiB
Python
"""FFT helpers for line and waterfall views."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from typing import Optional, Tuple
|
|
|
|
import numpy as np
|
|
|
|
from rfg_adc_plotter.constants import C_M_S, FFT_LEN, SWEEP_FREQ_MAX_GHZ, SWEEP_FREQ_MIN_GHZ
|
|
|
|
|
|
def prepare_fft_segment(
|
|
sweep: np.ndarray,
|
|
freqs: Optional[np.ndarray],
|
|
fft_len: int = FFT_LEN,
|
|
) -> Optional[Tuple[np.ndarray, int]]:
|
|
"""Prepare a sweep segment for FFT on a uniform frequency grid."""
|
|
take_fft = min(int(sweep.size), int(fft_len))
|
|
if take_fft <= 0:
|
|
return None
|
|
|
|
sweep_seg = np.asarray(sweep[:take_fft], dtype=np.float32)
|
|
fallback = np.nan_to_num(sweep_seg, nan=0.0).astype(np.float32, copy=False)
|
|
if freqs is None:
|
|
return fallback, take_fft
|
|
|
|
freq_arr = np.asarray(freqs)
|
|
if freq_arr.size < take_fft:
|
|
return fallback, take_fft
|
|
|
|
freq_seg = np.asarray(freq_arr[:take_fft], dtype=np.float64)
|
|
valid = np.isfinite(sweep_seg) & np.isfinite(freq_seg)
|
|
if int(np.count_nonzero(valid)) < 2:
|
|
return fallback, take_fft
|
|
|
|
x_valid = freq_seg[valid]
|
|
y_valid = sweep_seg[valid]
|
|
order = np.argsort(x_valid, kind="mergesort")
|
|
x_valid = x_valid[order]
|
|
y_valid = y_valid[order]
|
|
x_unique, unique_idx = np.unique(x_valid, return_index=True)
|
|
y_unique = y_valid[unique_idx]
|
|
if x_unique.size < 2 or x_unique[-1] <= x_unique[0]:
|
|
return fallback, take_fft
|
|
|
|
x_uniform = np.linspace(float(x_unique[0]), float(x_unique[-1]), take_fft, dtype=np.float64)
|
|
resampled = np.interp(x_uniform, x_unique, y_unique).astype(np.float32)
|
|
return resampled, take_fft
|
|
|
|
|
|
def build_symmetric_ifft_spectrum(
|
|
sweep: np.ndarray,
|
|
freqs: Optional[np.ndarray],
|
|
fft_len: int = FFT_LEN,
|
|
) -> Optional[np.ndarray]:
|
|
"""Build a centered symmetric spectrum over [-f_max, f_max] for IFFT."""
|
|
if fft_len <= 0:
|
|
return None
|
|
|
|
freq_axis = np.linspace(-SWEEP_FREQ_MAX_GHZ, SWEEP_FREQ_MAX_GHZ, int(fft_len), dtype=np.float64)
|
|
neg_idx_all = np.flatnonzero(freq_axis <= (-SWEEP_FREQ_MIN_GHZ))
|
|
pos_idx_all = np.flatnonzero(freq_axis >= SWEEP_FREQ_MIN_GHZ)
|
|
band_len = int(min(neg_idx_all.size, pos_idx_all.size))
|
|
if band_len <= 1:
|
|
return None
|
|
|
|
neg_idx = neg_idx_all[:band_len]
|
|
pos_idx = pos_idx_all[-band_len:]
|
|
prepared = prepare_fft_segment(sweep, freqs, fft_len=band_len)
|
|
if prepared is None:
|
|
return None
|
|
|
|
fft_seg, take_fft = prepared
|
|
if take_fft != band_len:
|
|
fft_seg = np.asarray(fft_seg[:band_len], dtype=np.float32)
|
|
if fft_seg.size < band_len:
|
|
padded = np.zeros((band_len,), dtype=np.float32)
|
|
padded[: fft_seg.size] = fft_seg
|
|
fft_seg = padded
|
|
|
|
window = np.hanning(band_len).astype(np.float32)
|
|
band = np.nan_to_num(fft_seg, nan=0.0).astype(np.float32, copy=False) * window
|
|
|
|
spectrum = np.zeros((int(fft_len),), dtype=np.float32)
|
|
spectrum[pos_idx] = band
|
|
spectrum[neg_idx] = band[::-1]
|
|
return spectrum
|
|
|
|
|
|
def fft_mag_to_db(mag: np.ndarray) -> np.ndarray:
|
|
"""Convert magnitude to dB with safe zero handling."""
|
|
mag_arr = np.asarray(mag, dtype=np.float32)
|
|
safe_mag = np.maximum(mag_arr, 0.0)
|
|
return (20.0 * np.log10(safe_mag + 1e-9)).astype(np.float32, copy=False)
|
|
|
|
|
|
def _compute_fft_mag_row_direct(
|
|
sweep: np.ndarray,
|
|
freqs: Optional[np.ndarray],
|
|
bins: int,
|
|
) -> np.ndarray:
|
|
prepared = prepare_fft_segment(sweep, freqs, fft_len=FFT_LEN)
|
|
if prepared is None:
|
|
return np.full((bins,), np.nan, dtype=np.float32)
|
|
|
|
fft_seg, take_fft = prepared
|
|
fft_in = np.zeros((FFT_LEN,), dtype=np.float32)
|
|
window = np.hanning(take_fft).astype(np.float32)
|
|
fft_in[:take_fft] = fft_seg * window
|
|
spec = np.fft.ifft(fft_in)
|
|
mag = np.abs(spec).astype(np.float32)
|
|
if mag.shape[0] != bins:
|
|
mag = mag[:bins]
|
|
return mag
|
|
|
|
|
|
def compute_fft_mag_row(
|
|
sweep: np.ndarray,
|
|
freqs: Optional[np.ndarray],
|
|
bins: int,
|
|
*,
|
|
symmetric: bool = True,
|
|
) -> np.ndarray:
|
|
"""Compute a linear FFT magnitude row."""
|
|
if bins <= 0:
|
|
return np.zeros((0,), dtype=np.float32)
|
|
|
|
if not symmetric:
|
|
return _compute_fft_mag_row_direct(sweep, freqs, bins)
|
|
|
|
spectrum_centered = build_symmetric_ifft_spectrum(sweep, freqs, fft_len=FFT_LEN)
|
|
if spectrum_centered is None:
|
|
return np.full((bins,), np.nan, dtype=np.float32)
|
|
|
|
spec = np.fft.ifft(np.fft.ifftshift(spectrum_centered))
|
|
mag = np.abs(spec).astype(np.float32)
|
|
if mag.shape[0] != bins:
|
|
mag = mag[:bins]
|
|
return mag
|
|
|
|
|
|
def compute_fft_row(
|
|
sweep: np.ndarray,
|
|
freqs: Optional[np.ndarray],
|
|
bins: int,
|
|
*,
|
|
symmetric: bool = True,
|
|
) -> np.ndarray:
|
|
"""Compute a dB FFT row."""
|
|
return fft_mag_to_db(compute_fft_mag_row(sweep, freqs, bins, symmetric=symmetric))
|
|
|
|
|
|
def compute_distance_axis(freqs: Optional[np.ndarray], bins: int, *, symmetric: bool = True) -> np.ndarray:
|
|
"""Compute the one-way distance axis for IFFT output."""
|
|
if bins <= 0:
|
|
return np.zeros((0,), dtype=np.float64)
|
|
if symmetric:
|
|
df_ghz = (2.0 * float(SWEEP_FREQ_MAX_GHZ)) / max(1, FFT_LEN - 1)
|
|
else:
|
|
if freqs is None:
|
|
return np.arange(bins, dtype=np.float64)
|
|
freq_arr = np.asarray(freqs, dtype=np.float64)
|
|
finite = freq_arr[np.isfinite(freq_arr)]
|
|
if finite.size < 2:
|
|
return np.arange(bins, dtype=np.float64)
|
|
df_ghz = float((finite[-1] - finite[0]) / max(1, finite.size - 1))
|
|
df_hz = abs(df_ghz) * 1e9
|
|
if not np.isfinite(df_hz) or df_hz <= 0.0:
|
|
return np.arange(bins, dtype=np.float64)
|
|
|
|
step_m = C_M_S / (2.0 * FFT_LEN * df_hz)
|
|
return np.arange(bins, dtype=np.float64) * step_m
|