fft new mode
This commit is contained in:
@ -39,6 +39,87 @@ def _interp_signal(x_uniform: np.ndarray, x_known: np.ndarray, y_known: np.ndarr
|
||||
return np.interp(x_uniform, x_known, np.asarray(y_known, dtype=np.float64)).astype(np.float32)
|
||||
|
||||
|
||||
def _fit_complex_bins(values: np.ndarray, bins: int) -> np.ndarray:
|
||||
arr = np.asarray(values, dtype=np.complex64).reshape(-1)
|
||||
if bins <= 0:
|
||||
return np.zeros((0,), dtype=np.complex64)
|
||||
if arr.size == bins:
|
||||
return arr
|
||||
out = np.full((bins,), np.nan + 0j, dtype=np.complex64)
|
||||
take = min(arr.size, bins)
|
||||
out[:take] = arr[:take]
|
||||
return out
|
||||
|
||||
|
||||
def _extract_positive_exact_band(
|
||||
sweep: np.ndarray,
|
||||
freqs: Optional[np.ndarray],
|
||||
) -> Optional[Tuple[np.ndarray, np.ndarray, float, float]]:
|
||||
"""Return sorted positive band data and exact-grid parameters."""
|
||||
if freqs is None:
|
||||
return None
|
||||
|
||||
sweep_arr = _coerce_sweep_array(sweep)
|
||||
freq_arr = np.asarray(freqs, dtype=np.float64).reshape(-1)
|
||||
take = min(int(sweep_arr.size), int(freq_arr.size))
|
||||
if take <= 1:
|
||||
return None
|
||||
|
||||
sweep_seg = sweep_arr[:take]
|
||||
freq_seg = freq_arr[:take]
|
||||
valid = np.isfinite(freq_seg) & np.isfinite(sweep_seg) & (freq_seg > 0.0)
|
||||
if int(np.count_nonzero(valid)) < 2:
|
||||
return None
|
||||
|
||||
freq_band = np.asarray(freq_seg[valid], dtype=np.float64)
|
||||
sweep_band = np.asarray(sweep_seg[valid])
|
||||
order = np.argsort(freq_band, kind="mergesort")
|
||||
freq_band = freq_band[order]
|
||||
sweep_band = sweep_band[order]
|
||||
|
||||
n_band = int(freq_band.size)
|
||||
if n_band <= 1:
|
||||
return None
|
||||
|
||||
f_min = float(freq_band[0])
|
||||
f_max = float(freq_band[-1])
|
||||
if (not np.isfinite(f_min)) or (not np.isfinite(f_max)) or f_max <= f_min:
|
||||
return None
|
||||
|
||||
df_ghz = float((f_max - f_min) / max(1, n_band - 1))
|
||||
if (not np.isfinite(df_ghz)) or df_ghz <= 0.0:
|
||||
return None
|
||||
|
||||
return freq_band, sweep_band, f_max, df_ghz
|
||||
|
||||
|
||||
def _resolve_positive_only_exact_geometry(freqs: Optional[np.ndarray]) -> Optional[Tuple[int, float]]:
|
||||
"""Return (N_shift, df_hz) for the exact centered positive-only mode."""
|
||||
if freqs is None:
|
||||
return None
|
||||
|
||||
freq_arr = np.asarray(freqs, dtype=np.float64).reshape(-1)
|
||||
finite = np.asarray(freq_arr[np.isfinite(freq_arr) & (freq_arr > 0.0)], dtype=np.float64)
|
||||
if finite.size < 2:
|
||||
return None
|
||||
|
||||
finite.sort(kind="mergesort")
|
||||
f_min = float(finite[0])
|
||||
f_max = float(finite[-1])
|
||||
if (not np.isfinite(f_min)) or (not np.isfinite(f_max)) or f_max <= f_min:
|
||||
return None
|
||||
|
||||
n_band = int(finite.size)
|
||||
df_ghz = float((f_max - f_min) / max(1, n_band - 1))
|
||||
if (not np.isfinite(df_ghz)) or df_ghz <= 0.0:
|
||||
return None
|
||||
|
||||
f_shift = np.arange(-f_max, f_max + (0.5 * df_ghz), df_ghz, dtype=np.float64)
|
||||
if f_shift.size <= 1:
|
||||
return None
|
||||
return int(f_shift.size), float(df_ghz * 1e9)
|
||||
|
||||
|
||||
def prepare_fft_segment(
|
||||
sweep: np.ndarray,
|
||||
freqs: Optional[np.ndarray],
|
||||
@ -172,6 +253,29 @@ def build_positive_only_centered_ifft_spectrum(
|
||||
return spectrum
|
||||
|
||||
|
||||
def build_positive_only_exact_centered_ifft_spectrum(
|
||||
sweep: np.ndarray,
|
||||
freqs: Optional[np.ndarray],
|
||||
) -> Optional[np.ndarray]:
|
||||
"""Build centered spectrum exactly as zeros[-f_max..+f_min) + measured positive band."""
|
||||
prepared = _extract_positive_exact_band(sweep, freqs)
|
||||
if prepared is None:
|
||||
return None
|
||||
|
||||
freq_band, sweep_band, f_max, df_ghz = prepared
|
||||
f_shift = np.arange(-f_max, f_max + (0.5 * df_ghz), df_ghz, dtype=np.float64)
|
||||
if f_shift.size <= 1:
|
||||
return None
|
||||
|
||||
band_dtype = np.complex64 if np.iscomplexobj(sweep_band) else np.float32
|
||||
band = np.nan_to_num(np.asarray(sweep_band, dtype=band_dtype), nan=0.0)
|
||||
spectrum = np.zeros((int(f_shift.size),), dtype=band_dtype)
|
||||
idx = np.round((freq_band - f_shift[0]) / df_ghz).astype(np.int64)
|
||||
idx = np.clip(idx, 0, spectrum.size - 1)
|
||||
spectrum[idx] = band
|
||||
return spectrum
|
||||
|
||||
|
||||
def fft_mag_to_db(mag: np.ndarray) -> np.ndarray:
|
||||
"""Convert magnitude to dB with safe zero handling."""
|
||||
mag_arr = np.asarray(mag, dtype=np.float32)
|
||||
@ -192,10 +296,8 @@ def _compute_fft_complex_row_direct(
|
||||
fft_in = np.zeros((FFT_LEN,), dtype=np.complex64)
|
||||
window = np.hanning(take_fft).astype(np.float32)
|
||||
fft_in[:take_fft] = np.asarray(fft_seg, dtype=np.complex64) * window
|
||||
spec = np.fft.ifft(fft_in).astype(np.complex64, copy=False)
|
||||
if spec.shape[0] != bins:
|
||||
spec = spec[:bins]
|
||||
return spec
|
||||
spec = np.fft.ifft(fft_in)
|
||||
return _fit_complex_bins(spec, bins)
|
||||
|
||||
|
||||
def _normalize_fft_mode(mode: str | None, symmetric: Optional[bool]) -> str:
|
||||
@ -208,6 +310,8 @@ def _normalize_fft_mode(mode: str | None, symmetric: Optional[bool]) -> str:
|
||||
return "symmetric"
|
||||
if normalized in {"positive_only", "positive-centered", "positive_centered", "zero_left"}:
|
||||
return "positive_only"
|
||||
if normalized in {"positive_only_exact", "positive-centered-exact", "positive_centered_exact", "zero_left_exact"}:
|
||||
return "positive_only_exact"
|
||||
raise ValueError(f"Unsupported FFT mode: {mode!r}")
|
||||
|
||||
|
||||
@ -229,16 +333,15 @@ def compute_fft_complex_row(
|
||||
|
||||
if fft_mode == "positive_only":
|
||||
spectrum_centered = build_positive_only_centered_ifft_spectrum(sweep, freqs, fft_len=FFT_LEN)
|
||||
elif fft_mode == "positive_only_exact":
|
||||
spectrum_centered = build_positive_only_exact_centered_ifft_spectrum(sweep, freqs)
|
||||
else:
|
||||
spectrum_centered = build_symmetric_ifft_spectrum(sweep, freqs, fft_len=FFT_LEN)
|
||||
if spectrum_centered is None:
|
||||
return np.full((bins,), np.nan + 0j, dtype=np.complex64)
|
||||
|
||||
spec = np.fft.ifft(np.fft.ifftshift(np.asarray(spectrum_centered, dtype=np.complex64)))
|
||||
spec = np.asarray(spec, dtype=np.complex64)
|
||||
if spec.shape[0] != bins:
|
||||
spec = spec[:bins]
|
||||
return spec
|
||||
return _fit_complex_bins(spec, bins)
|
||||
|
||||
|
||||
def compute_fft_mag_row(
|
||||
@ -277,6 +380,16 @@ def compute_distance_axis(
|
||||
if bins <= 0:
|
||||
return np.zeros((0,), dtype=np.float64)
|
||||
fft_mode = _normalize_fft_mode(mode, symmetric)
|
||||
if fft_mode == "positive_only_exact":
|
||||
geometry = _resolve_positive_only_exact_geometry(freqs)
|
||||
if geometry is None:
|
||||
return np.arange(bins, dtype=np.float64)
|
||||
n_shift, df_hz = geometry
|
||||
if (not np.isfinite(df_hz)) or df_hz <= 0.0 or n_shift <= 0:
|
||||
return np.arange(bins, dtype=np.float64)
|
||||
step_m = C_M_S / (2.0 * float(n_shift) * df_hz)
|
||||
return np.arange(bins, dtype=np.float64) * step_m
|
||||
|
||||
if fft_mode in {"symmetric", "positive_only"}:
|
||||
bounds = _finite_freq_bounds(freqs)
|
||||
if bounds is None:
|
||||
|
||||
Reference in New Issue
Block a user