This commit is contained in:
awe
2026-03-12 18:09:44 +03:00
parent f02de1c3d0
commit 5054f8d3d7
5 changed files with 144 additions and 29 deletions

View File

@ -87,6 +87,41 @@ def build_symmetric_ifft_spectrum(
return spectrum
def build_positive_only_centered_ifft_spectrum(
sweep: np.ndarray,
freqs: Optional[np.ndarray],
fft_len: int = FFT_LEN,
) -> Optional[np.ndarray]:
"""Build a centered spectrum with zeros from -f_max to +f_min."""
if fft_len <= 0:
return None
freq_axis = np.linspace(-SWEEP_FREQ_MAX_GHZ, SWEEP_FREQ_MAX_GHZ, int(fft_len), dtype=np.float64)
pos_idx = np.flatnonzero(freq_axis >= SWEEP_FREQ_MIN_GHZ)
band_len = int(pos_idx.size)
if band_len <= 1:
return None
prepared = prepare_fft_segment(sweep, freqs, fft_len=band_len)
if prepared is None:
return None
fft_seg, take_fft = prepared
if take_fft != band_len:
fft_seg = np.asarray(fft_seg[:band_len], dtype=np.float32)
if fft_seg.size < band_len:
padded = np.zeros((band_len,), dtype=np.float32)
padded[: fft_seg.size] = fft_seg
fft_seg = padded
window = np.hanning(band_len).astype(np.float32)
band = np.nan_to_num(fft_seg, nan=0.0).astype(np.float32, copy=False) * window
spectrum = np.zeros((int(fft_len),), dtype=np.float32)
spectrum[pos_idx] = band
return spectrum
def fft_mag_to_db(mag: np.ndarray) -> np.ndarray:
"""Convert magnitude to dB with safe zero handling."""
mag_arr = np.asarray(mag, dtype=np.float32)
@ -114,21 +149,39 @@ def _compute_fft_mag_row_direct(
return mag
def _normalize_fft_mode(mode: str | None, symmetric: Optional[bool]) -> str:
if symmetric is not None:
return "symmetric" if symmetric else "direct"
normalized = str(mode or "symmetric").strip().lower()
if normalized in {"direct", "ordinary", "normal"}:
return "direct"
if normalized in {"symmetric", "sym", "mirror"}:
return "symmetric"
if normalized in {"positive_only", "positive-centered", "positive_centered", "zero_left"}:
return "positive_only"
raise ValueError(f"Unsupported FFT mode: {mode!r}")
def compute_fft_mag_row(
sweep: np.ndarray,
freqs: Optional[np.ndarray],
bins: int,
*,
symmetric: bool = True,
mode: str = "symmetric",
symmetric: Optional[bool] = None,
) -> np.ndarray:
"""Compute a linear FFT magnitude row."""
if bins <= 0:
return np.zeros((0,), dtype=np.float32)
if not symmetric:
fft_mode = _normalize_fft_mode(mode, symmetric)
if fft_mode == "direct":
return _compute_fft_mag_row_direct(sweep, freqs, bins)
spectrum_centered = build_symmetric_ifft_spectrum(sweep, freqs, fft_len=FFT_LEN)
if fft_mode == "positive_only":
spectrum_centered = build_positive_only_centered_ifft_spectrum(sweep, freqs, fft_len=FFT_LEN)
else:
spectrum_centered = build_symmetric_ifft_spectrum(sweep, freqs, fft_len=FFT_LEN)
if spectrum_centered is None:
return np.full((bins,), np.nan, dtype=np.float32)
@ -144,17 +197,25 @@ def compute_fft_row(
freqs: Optional[np.ndarray],
bins: int,
*,
symmetric: bool = True,
mode: str = "symmetric",
symmetric: Optional[bool] = None,
) -> np.ndarray:
"""Compute a dB FFT row."""
return fft_mag_to_db(compute_fft_mag_row(sweep, freqs, bins, symmetric=symmetric))
return fft_mag_to_db(compute_fft_mag_row(sweep, freqs, bins, mode=mode, symmetric=symmetric))
def compute_distance_axis(freqs: Optional[np.ndarray], bins: int, *, symmetric: bool = True) -> np.ndarray:
def compute_distance_axis(
freqs: Optional[np.ndarray],
bins: int,
*,
mode: str = "symmetric",
symmetric: Optional[bool] = None,
) -> np.ndarray:
"""Compute the one-way distance axis for IFFT output."""
if bins <= 0:
return np.zeros((0,), dtype=np.float64)
if symmetric:
fft_mode = _normalize_fft_mode(mode, symmetric)
if fft_mode in {"symmetric", "positive_only"}:
df_ghz = (2.0 * float(SWEEP_FREQ_MAX_GHZ)) / max(1, FFT_LEN - 1)
else:
if freqs is None: