512 lines
17 KiB
Python
512 lines
17 KiB
Python
"""FFT helpers for line and waterfall views."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from typing import Optional, Tuple
|
|
|
|
import numpy as np
|
|
|
|
from rfg_adc_plotter.constants import C_M_S, FFT_LEN, SWEEP_FREQ_MAX_GHZ, SWEEP_FREQ_MIN_GHZ
|
|
|
|
|
|
def _finite_freq_bounds(freqs: Optional[np.ndarray]) -> Optional[Tuple[float, float]]:
|
|
"""Return finite frequency bounds for the current working segment."""
|
|
if freqs is None:
|
|
return None
|
|
freq_arr = np.asarray(freqs, dtype=np.float64).reshape(-1)
|
|
finite = freq_arr[np.isfinite(freq_arr)]
|
|
if finite.size < 2:
|
|
return None
|
|
f_min = float(np.min(finite))
|
|
f_max = float(np.max(finite))
|
|
if not np.isfinite(f_min) or not np.isfinite(f_max) or f_max <= f_min:
|
|
return None
|
|
return f_min, f_max
|
|
|
|
|
|
def _coerce_sweep_array(sweep: np.ndarray) -> np.ndarray:
|
|
values = np.asarray(sweep).reshape(-1)
|
|
if np.iscomplexobj(values):
|
|
return np.asarray(values, dtype=np.complex64)
|
|
return np.asarray(values, dtype=np.float32)
|
|
|
|
|
|
def _interp_signal(x_uniform: np.ndarray, x_known: np.ndarray, y_known: np.ndarray) -> np.ndarray:
|
|
if np.iscomplexobj(y_known):
|
|
real = np.interp(x_uniform, x_known, np.asarray(y_known.real, dtype=np.float64))
|
|
imag = np.interp(x_uniform, x_known, np.asarray(y_known.imag, dtype=np.float64))
|
|
return (real + (1j * imag)).astype(np.complex64)
|
|
return np.interp(x_uniform, x_known, np.asarray(y_known, dtype=np.float64)).astype(np.float32)
|
|
|
|
|
|
def _fit_complex_bins(values: np.ndarray, bins: int) -> np.ndarray:
|
|
arr = np.asarray(values, dtype=np.complex64).reshape(-1)
|
|
if bins <= 0:
|
|
return np.zeros((0,), dtype=np.complex64)
|
|
if arr.size == bins:
|
|
return arr
|
|
out = np.full((bins,), np.nan + 0j, dtype=np.complex64)
|
|
take = min(arr.size, bins)
|
|
out[:take] = arr[:take]
|
|
return out
|
|
|
|
|
|
def _extract_positive_exact_band(
|
|
sweep: np.ndarray,
|
|
freqs: Optional[np.ndarray],
|
|
) -> Optional[Tuple[np.ndarray, np.ndarray, float, float]]:
|
|
"""Return sorted positive band data and exact-grid parameters."""
|
|
if freqs is None:
|
|
return None
|
|
|
|
sweep_arr = _coerce_sweep_array(sweep)
|
|
freq_arr = np.asarray(freqs, dtype=np.float64).reshape(-1)
|
|
take = min(int(sweep_arr.size), int(freq_arr.size))
|
|
if take <= 1:
|
|
return None
|
|
|
|
sweep_seg = sweep_arr[:take]
|
|
freq_seg = freq_arr[:take]
|
|
valid = np.isfinite(freq_seg) & np.isfinite(sweep_seg) & (freq_seg > 0.0)
|
|
if int(np.count_nonzero(valid)) < 2:
|
|
return None
|
|
|
|
freq_band = np.asarray(freq_seg[valid], dtype=np.float64)
|
|
sweep_band = np.asarray(sweep_seg[valid])
|
|
order = np.argsort(freq_band, kind="mergesort")
|
|
freq_band = freq_band[order]
|
|
sweep_band = sweep_band[order]
|
|
|
|
n_band = int(freq_band.size)
|
|
if n_band <= 1:
|
|
return None
|
|
|
|
f_min = float(freq_band[0])
|
|
f_max = float(freq_band[-1])
|
|
if (not np.isfinite(f_min)) or (not np.isfinite(f_max)) or f_max <= f_min:
|
|
return None
|
|
|
|
df_ghz = float((f_max - f_min) / max(1, n_band - 1))
|
|
if (not np.isfinite(df_ghz)) or df_ghz <= 0.0:
|
|
return None
|
|
|
|
return freq_band, sweep_band, f_max, df_ghz
|
|
|
|
|
|
def _positive_exact_shift_size(f_max: float, df_ghz: float) -> int:
|
|
if (not np.isfinite(f_max)) or (not np.isfinite(df_ghz)) or f_max <= 0.0 or df_ghz <= 0.0:
|
|
return 0
|
|
return int(np.arange(-f_max, f_max + (0.5 * df_ghz), df_ghz, dtype=np.float64).size)
|
|
|
|
|
|
def _resolve_positive_exact_band_size(
|
|
f_min: float,
|
|
f_max: float,
|
|
n_band: int,
|
|
max_shift_len: Optional[int],
|
|
) -> int:
|
|
if n_band <= 2:
|
|
return max(2, int(n_band))
|
|
if max_shift_len is None:
|
|
return int(n_band)
|
|
|
|
limit = int(max_shift_len)
|
|
if limit <= 1:
|
|
return max(2, int(n_band))
|
|
|
|
span = float(f_max - f_min)
|
|
if (not np.isfinite(span)) or span <= 0.0:
|
|
return int(n_band)
|
|
|
|
df_current = float(span / max(1, int(n_band) - 1))
|
|
if _positive_exact_shift_size(f_max, df_current) <= limit:
|
|
return int(n_band)
|
|
|
|
denom = max(2.0 * f_max, 1e-12)
|
|
approx = int(np.floor(1.0 + ((float(limit - 1) * span) / denom)))
|
|
target = min(int(n_band), max(2, approx))
|
|
while target > 2:
|
|
df_try = float(span / max(1, target - 1))
|
|
if _positive_exact_shift_size(f_max, df_try) <= limit:
|
|
break
|
|
target -= 1
|
|
return max(2, target)
|
|
|
|
|
|
def _normalize_positive_exact_band(
|
|
freq_band: np.ndarray,
|
|
sweep_band: np.ndarray,
|
|
*,
|
|
max_shift_len: Optional[int] = None,
|
|
) -> Optional[Tuple[np.ndarray, np.ndarray, float, float]]:
|
|
freq_arr = np.asarray(freq_band, dtype=np.float64).reshape(-1)
|
|
sweep_arr = np.asarray(sweep_band).reshape(-1)
|
|
width = min(int(freq_arr.size), int(sweep_arr.size))
|
|
if width <= 1:
|
|
return None
|
|
|
|
freq_arr = freq_arr[:width]
|
|
sweep_arr = sweep_arr[:width]
|
|
f_min = float(freq_arr[0])
|
|
f_max = float(freq_arr[-1])
|
|
if (not np.isfinite(f_min)) or (not np.isfinite(f_max)) or f_max <= f_min:
|
|
return None
|
|
|
|
target_band = _resolve_positive_exact_band_size(f_min, f_max, int(freq_arr.size), max_shift_len)
|
|
if target_band < int(freq_arr.size):
|
|
target_freqs = np.linspace(f_min, f_max, target_band, dtype=np.float64)
|
|
target_sweep = _interp_signal(target_freqs, freq_arr, sweep_arr)
|
|
freq_arr = target_freqs
|
|
sweep_arr = np.asarray(target_sweep).reshape(-1)
|
|
|
|
n_band = int(freq_arr.size)
|
|
if n_band <= 1:
|
|
return None
|
|
|
|
df_ghz = float((f_max - f_min) / max(1, n_band - 1))
|
|
if (not np.isfinite(df_ghz)) or df_ghz <= 0.0:
|
|
return None
|
|
|
|
return freq_arr, sweep_arr, f_max, df_ghz
|
|
|
|
|
|
def _resolve_positive_only_exact_geometry(
|
|
freqs: Optional[np.ndarray],
|
|
*,
|
|
max_shift_len: Optional[int] = None,
|
|
) -> Optional[Tuple[int, float]]:
|
|
"""Return (N_shift, df_hz) for the exact centered positive-only mode."""
|
|
if freqs is None:
|
|
return None
|
|
|
|
freq_arr = np.asarray(freqs, dtype=np.float64).reshape(-1)
|
|
finite = np.asarray(freq_arr[np.isfinite(freq_arr) & (freq_arr > 0.0)], dtype=np.float64)
|
|
if finite.size < 2:
|
|
return None
|
|
|
|
finite.sort(kind="mergesort")
|
|
f_min = float(finite[0])
|
|
f_max = float(finite[-1])
|
|
if (not np.isfinite(f_min)) or (not np.isfinite(f_max)) or f_max <= f_min:
|
|
return None
|
|
|
|
n_band = int(finite.size)
|
|
target_band = _resolve_positive_exact_band_size(f_min, f_max, n_band, max_shift_len)
|
|
n_band = max(2, min(n_band, target_band))
|
|
df_ghz = float((f_max - f_min) / max(1, n_band - 1))
|
|
if (not np.isfinite(df_ghz)) or df_ghz <= 0.0:
|
|
return None
|
|
|
|
n_shift = _positive_exact_shift_size(f_max, df_ghz)
|
|
if n_shift <= 1:
|
|
return None
|
|
return int(n_shift), float(df_ghz * 1e9)
|
|
|
|
|
|
def prepare_fft_segment(
|
|
sweep: np.ndarray,
|
|
freqs: Optional[np.ndarray],
|
|
fft_len: int = FFT_LEN,
|
|
) -> Optional[Tuple[np.ndarray, int]]:
|
|
"""Prepare a sweep segment for FFT on a uniform frequency grid."""
|
|
take_fft = min(int(sweep.size), int(fft_len))
|
|
if take_fft <= 0:
|
|
return None
|
|
|
|
sweep_arr = _coerce_sweep_array(sweep)
|
|
sweep_seg = sweep_arr[:take_fft]
|
|
fallback_dtype = np.complex64 if np.iscomplexobj(sweep_seg) else np.float32
|
|
fallback = np.nan_to_num(sweep_seg, nan=0.0).astype(fallback_dtype, copy=False)
|
|
if freqs is None:
|
|
return fallback, take_fft
|
|
|
|
freq_arr = np.asarray(freqs)
|
|
if freq_arr.size < take_fft:
|
|
return fallback, take_fft
|
|
|
|
freq_seg = np.asarray(freq_arr[:take_fft], dtype=np.float64)
|
|
valid = np.isfinite(sweep_seg) & np.isfinite(freq_seg)
|
|
if int(np.count_nonzero(valid)) < 2:
|
|
return fallback, take_fft
|
|
|
|
x_valid = freq_seg[valid]
|
|
y_valid = sweep_seg[valid]
|
|
order = np.argsort(x_valid, kind="mergesort")
|
|
x_valid = x_valid[order]
|
|
y_valid = y_valid[order]
|
|
x_unique, unique_idx = np.unique(x_valid, return_index=True)
|
|
y_unique = y_valid[unique_idx]
|
|
if x_unique.size < 2 or x_unique[-1] <= x_unique[0]:
|
|
return fallback, take_fft
|
|
|
|
x_uniform = np.linspace(float(x_unique[0]), float(x_unique[-1]), take_fft, dtype=np.float64)
|
|
resampled = _interp_signal(x_uniform, x_unique, y_unique)
|
|
return resampled, take_fft
|
|
|
|
|
|
def build_symmetric_ifft_spectrum(
|
|
sweep: np.ndarray,
|
|
freqs: Optional[np.ndarray],
|
|
fft_len: int = FFT_LEN,
|
|
) -> Optional[np.ndarray]:
|
|
"""Build a centered symmetric spectrum over [-f_max, f_max] for IFFT."""
|
|
if fft_len <= 0:
|
|
return None
|
|
|
|
bounds = _finite_freq_bounds(freqs)
|
|
if bounds is None:
|
|
f_min = float(SWEEP_FREQ_MIN_GHZ)
|
|
f_max = float(SWEEP_FREQ_MAX_GHZ)
|
|
else:
|
|
f_min, f_max = bounds
|
|
|
|
freq_axis = np.linspace(-f_max, f_max, int(fft_len), dtype=np.float64)
|
|
neg_idx_all = np.flatnonzero(freq_axis <= (-f_min))
|
|
pos_idx_all = np.flatnonzero(freq_axis >= f_min)
|
|
band_len = int(min(neg_idx_all.size, pos_idx_all.size))
|
|
if band_len <= 1:
|
|
return None
|
|
|
|
neg_idx = neg_idx_all[:band_len]
|
|
pos_idx = pos_idx_all[-band_len:]
|
|
prepared = prepare_fft_segment(sweep, freqs, fft_len=band_len)
|
|
if prepared is None:
|
|
return None
|
|
|
|
fft_seg, take_fft = prepared
|
|
if take_fft != band_len:
|
|
fft_dtype = np.complex64 if np.iscomplexobj(fft_seg) else np.float32
|
|
fft_seg = np.asarray(fft_seg[:band_len], dtype=fft_dtype)
|
|
if fft_seg.size < band_len:
|
|
padded = np.zeros((band_len,), dtype=fft_dtype)
|
|
padded[: fft_seg.size] = fft_seg
|
|
fft_seg = padded
|
|
|
|
window = np.hanning(band_len).astype(np.float32)
|
|
band_dtype = np.complex64 if np.iscomplexobj(fft_seg) else np.float32
|
|
band = np.nan_to_num(fft_seg, nan=0.0).astype(band_dtype, copy=False) * window
|
|
|
|
spectrum = np.zeros((int(fft_len),), dtype=band_dtype)
|
|
spectrum[pos_idx] = band
|
|
spectrum[neg_idx] = np.conj(band[::-1]) if np.iscomplexobj(band) else band[::-1]
|
|
return spectrum
|
|
|
|
|
|
def build_positive_only_centered_ifft_spectrum(
|
|
sweep: np.ndarray,
|
|
freqs: Optional[np.ndarray],
|
|
fft_len: int = FFT_LEN,
|
|
) -> Optional[np.ndarray]:
|
|
"""Build a centered spectrum with zeros from -f_max to +f_min."""
|
|
if fft_len <= 0:
|
|
return None
|
|
|
|
bounds = _finite_freq_bounds(freqs)
|
|
if bounds is None:
|
|
f_min = float(SWEEP_FREQ_MIN_GHZ)
|
|
f_max = float(SWEEP_FREQ_MAX_GHZ)
|
|
else:
|
|
f_min, f_max = bounds
|
|
|
|
freq_axis = np.linspace(-f_max, f_max, int(fft_len), dtype=np.float64)
|
|
pos_idx = np.flatnonzero(freq_axis >= f_min)
|
|
band_len = int(pos_idx.size)
|
|
if band_len <= 1:
|
|
return None
|
|
|
|
prepared = prepare_fft_segment(sweep, freqs, fft_len=band_len)
|
|
if prepared is None:
|
|
return None
|
|
|
|
fft_seg, take_fft = prepared
|
|
if take_fft != band_len:
|
|
fft_dtype = np.complex64 if np.iscomplexobj(fft_seg) else np.float32
|
|
fft_seg = np.asarray(fft_seg[:band_len], dtype=fft_dtype)
|
|
if fft_seg.size < band_len:
|
|
padded = np.zeros((band_len,), dtype=fft_dtype)
|
|
padded[: fft_seg.size] = fft_seg
|
|
fft_seg = padded
|
|
|
|
window = np.hanning(band_len).astype(np.float32)
|
|
band_dtype = np.complex64 if np.iscomplexobj(fft_seg) else np.float32
|
|
band = np.nan_to_num(fft_seg, nan=0.0).astype(band_dtype, copy=False) * window
|
|
|
|
spectrum = np.zeros((int(fft_len),), dtype=band_dtype)
|
|
spectrum[pos_idx] = band
|
|
return spectrum
|
|
|
|
|
|
def build_positive_only_exact_centered_ifft_spectrum(
|
|
sweep: np.ndarray,
|
|
freqs: Optional[np.ndarray],
|
|
*,
|
|
max_shift_len: Optional[int] = None,
|
|
) -> Optional[np.ndarray]:
|
|
"""Build centered spectrum exactly as zeros[-f_max..+f_min) + measured positive band."""
|
|
prepared = _extract_positive_exact_band(sweep, freqs)
|
|
if prepared is None:
|
|
return None
|
|
|
|
freq_band, sweep_band, _f_max, _df_ghz = prepared
|
|
normalized = _normalize_positive_exact_band(
|
|
freq_band,
|
|
sweep_band,
|
|
max_shift_len=max_shift_len,
|
|
)
|
|
if normalized is None:
|
|
return None
|
|
|
|
freq_band, sweep_band, f_max, df_ghz = normalized
|
|
f_shift = np.arange(-f_max, f_max + (0.5 * df_ghz), df_ghz, dtype=np.float64)
|
|
if f_shift.size <= 1:
|
|
return None
|
|
|
|
band_dtype = np.complex64 if np.iscomplexobj(sweep_band) else np.float32
|
|
band = np.nan_to_num(np.asarray(sweep_band, dtype=band_dtype), nan=0.0)
|
|
spectrum = np.zeros((int(f_shift.size),), dtype=band_dtype)
|
|
idx = np.round((freq_band - f_shift[0]) / df_ghz).astype(np.int64)
|
|
idx = np.clip(idx, 0, spectrum.size - 1)
|
|
spectrum[idx] = band
|
|
return spectrum
|
|
|
|
|
|
def fft_mag_to_db(mag: np.ndarray) -> np.ndarray:
|
|
"""Convert magnitude to dB with safe zero handling."""
|
|
mag_arr = np.asarray(mag, dtype=np.float32)
|
|
safe_mag = np.maximum(mag_arr, 0.0)
|
|
return (20.0 * np.log10(safe_mag + 1e-9)).astype(np.float32, copy=False)
|
|
|
|
|
|
def _compute_fft_complex_row_direct(
|
|
sweep: np.ndarray,
|
|
freqs: Optional[np.ndarray],
|
|
bins: int,
|
|
) -> np.ndarray:
|
|
prepared = prepare_fft_segment(sweep, freqs, fft_len=FFT_LEN)
|
|
if prepared is None:
|
|
return np.full((bins,), np.nan + 0j, dtype=np.complex64)
|
|
|
|
fft_seg, take_fft = prepared
|
|
fft_in = np.zeros((FFT_LEN,), dtype=np.complex64)
|
|
window = np.hanning(take_fft).astype(np.float32)
|
|
fft_in[:take_fft] = np.asarray(fft_seg, dtype=np.complex64) * window
|
|
spec = np.fft.ifft(fft_in)
|
|
return _fit_complex_bins(spec, bins)
|
|
|
|
|
|
def _normalize_fft_mode(mode: str | None, symmetric: Optional[bool]) -> str:
|
|
if symmetric is not None:
|
|
return "symmetric" if symmetric else "direct"
|
|
normalized = str(mode or "symmetric").strip().lower()
|
|
if normalized in {"direct", "ordinary", "normal"}:
|
|
return "direct"
|
|
if normalized in {"symmetric", "sym", "mirror"}:
|
|
return "symmetric"
|
|
if normalized in {"positive_only", "positive-centered", "positive_centered", "zero_left"}:
|
|
return "positive_only"
|
|
if normalized in {"positive_only_exact", "positive-centered-exact", "positive_centered_exact", "zero_left_exact"}:
|
|
return "positive_only_exact"
|
|
raise ValueError(f"Unsupported FFT mode: {mode!r}")
|
|
|
|
|
|
def compute_fft_complex_row(
|
|
sweep: np.ndarray,
|
|
freqs: Optional[np.ndarray],
|
|
bins: int,
|
|
*,
|
|
mode: str = "symmetric",
|
|
symmetric: Optional[bool] = None,
|
|
) -> np.ndarray:
|
|
"""Compute a complex FFT/IFFT row on the distance axis."""
|
|
if bins <= 0:
|
|
return np.zeros((0,), dtype=np.complex64)
|
|
|
|
fft_mode = _normalize_fft_mode(mode, symmetric)
|
|
if fft_mode == "direct":
|
|
return _compute_fft_complex_row_direct(sweep, freqs, bins)
|
|
|
|
if fft_mode == "positive_only":
|
|
spectrum_centered = build_positive_only_centered_ifft_spectrum(sweep, freqs, fft_len=FFT_LEN)
|
|
elif fft_mode == "positive_only_exact":
|
|
spectrum_centered = build_positive_only_exact_centered_ifft_spectrum(
|
|
sweep,
|
|
freqs,
|
|
max_shift_len=bins,
|
|
)
|
|
else:
|
|
spectrum_centered = build_symmetric_ifft_spectrum(sweep, freqs, fft_len=FFT_LEN)
|
|
if spectrum_centered is None:
|
|
return np.full((bins,), np.nan + 0j, dtype=np.complex64)
|
|
|
|
spec = np.fft.ifft(np.fft.ifftshift(np.asarray(spectrum_centered, dtype=np.complex64)))
|
|
return _fit_complex_bins(spec, bins)
|
|
|
|
|
|
def compute_fft_mag_row(
|
|
sweep: np.ndarray,
|
|
freqs: Optional[np.ndarray],
|
|
bins: int,
|
|
*,
|
|
mode: str = "symmetric",
|
|
symmetric: Optional[bool] = None,
|
|
) -> np.ndarray:
|
|
"""Compute a linear FFT magnitude row."""
|
|
complex_row = compute_fft_complex_row(sweep, freqs, bins, mode=mode, symmetric=symmetric)
|
|
return np.abs(complex_row).astype(np.float32, copy=False)
|
|
|
|
|
|
def compute_fft_row(
|
|
sweep: np.ndarray,
|
|
freqs: Optional[np.ndarray],
|
|
bins: int,
|
|
*,
|
|
mode: str = "symmetric",
|
|
symmetric: Optional[bool] = None,
|
|
) -> np.ndarray:
|
|
"""Compute a dB FFT row."""
|
|
return fft_mag_to_db(compute_fft_mag_row(sweep, freqs, bins, mode=mode, symmetric=symmetric))
|
|
|
|
|
|
def compute_distance_axis(
|
|
freqs: Optional[np.ndarray],
|
|
bins: int,
|
|
*,
|
|
mode: str = "symmetric",
|
|
symmetric: Optional[bool] = None,
|
|
) -> np.ndarray:
|
|
"""Compute the one-way distance axis for IFFT output."""
|
|
if bins <= 0:
|
|
return np.zeros((0,), dtype=np.float64)
|
|
fft_mode = _normalize_fft_mode(mode, symmetric)
|
|
if fft_mode == "positive_only_exact":
|
|
geometry = _resolve_positive_only_exact_geometry(freqs, max_shift_len=bins)
|
|
if geometry is None:
|
|
return np.arange(bins, dtype=np.float64)
|
|
n_shift, df_hz = geometry
|
|
if (not np.isfinite(df_hz)) or df_hz <= 0.0 or n_shift <= 0:
|
|
return np.arange(bins, dtype=np.float64)
|
|
step_m = C_M_S / (2.0 * float(n_shift) * df_hz)
|
|
return np.arange(bins, dtype=np.float64) * step_m
|
|
|
|
if fft_mode in {"symmetric", "positive_only"}:
|
|
bounds = _finite_freq_bounds(freqs)
|
|
if bounds is None:
|
|
f_max = float(SWEEP_FREQ_MAX_GHZ)
|
|
else:
|
|
_, f_max = bounds
|
|
df_ghz = (2.0 * f_max) / max(1, FFT_LEN - 1)
|
|
else:
|
|
if freqs is None:
|
|
return np.arange(bins, dtype=np.float64)
|
|
freq_arr = np.asarray(freqs, dtype=np.float64)
|
|
finite = freq_arr[np.isfinite(freq_arr)]
|
|
if finite.size < 2:
|
|
return np.arange(bins, dtype=np.float64)
|
|
df_ghz = float((finite[-1] - finite[0]) / max(1, finite.size - 1))
|
|
df_hz = abs(df_ghz) * 1e9
|
|
if not np.isfinite(df_hz) or df_hz <= 0.0:
|
|
return np.arange(bins, dtype=np.float64)
|
|
|
|
step_m = C_M_S / (2.0 * FFT_LEN * df_hz)
|
|
return np.arange(bins, dtype=np.float64) * step_m
|