"""Преобразование свипа в IFFT-профиль по глубине (м). Новый pipeline перед IFFT: 1) нормировка по max(abs(sweep)) 2) clip в [-1, 1] 3) phi = arccos(x) 4) непрерывная развёртка фазы (nearest-continuous) 5) s_complex = exp(1j * phi) 6) IFFT с учётом смещения частотной сетки """ from __future__ import annotations import logging from typing import Optional import numpy as np from rfg_adc_plotter.constants import ( FREQ_MAX_GHZ, FREQ_MIN_GHZ, FREQ_SPAN_GHZ, IFFT_LEN, SPEED_OF_LIGHT_M_S, ) logger = logging.getLogger(__name__) _EPS = 1e-12 _TWO_PI = float(2.0 * np.pi) def _fallback_depth_response( size: int, values: Optional[np.ndarray] = None, ) -> tuple[np.ndarray, np.ndarray]: """Безопасный fallback для GUI/ring: всегда возвращает ненулевую длину.""" n = max(1, int(size)) depth = np.linspace(0.0, 1.0, n, dtype=np.float32) if values is None: return depth, np.zeros((n,), dtype=np.float32) arr = np.asarray(values) if arr.size == 0: return depth, np.zeros((n,), dtype=np.float32) if np.iscomplexobj(arr): src = np.abs(arr) else: src = np.abs(np.nan_to_num(arr, nan=0.0, posinf=0.0, neginf=0.0)) src = np.asarray(src, dtype=np.float32).ravel() out = np.zeros((n,), dtype=np.float32) take = min(n, src.size) if take > 0: out[:take] = src[:take] return depth, out def build_ifft_time_axis_ns() -> np.ndarray: """Legacy helper: старая временная ось IFFT в наносекундах (фиксированная длина).""" return ( np.arange(IFFT_LEN, dtype=np.float64) / (FREQ_SPAN_GHZ * 1e9) * 1e9 ).astype(np.float32) def build_frequency_axis_hz(sweep_width: int) -> np.ndarray: """Построить частотную сетку (Гц) для текущей длины свипа.""" n = int(sweep_width) if n <= 0: return np.zeros((0,), dtype=np.float64) if n == 1: return np.array([FREQ_MIN_GHZ * 1e9], dtype=np.float64) return np.linspace(FREQ_MIN_GHZ * 1e9, FREQ_MAX_GHZ * 1e9, n, dtype=np.float64) def normalize_sweep_for_phase(sweep: np.ndarray) -> np.ndarray: """Нормировать свип на max(abs(.)) и вернуть float64.""" x = np.asarray(sweep, dtype=np.float64).ravel() if x.size == 0: return x x = np.nan_to_num(x, nan=0.0, posinf=0.0, neginf=0.0) amax = float(np.max(np.abs(x))) if (not np.isfinite(amax)) or amax <= _EPS: return np.zeros_like(x, dtype=np.float64) return x / amax def unwrap_arccos_phase_continuous(x_norm: np.ndarray) -> np.ndarray: """Непрерывно развернуть фазу, восстановленную через arccos. Для каждой точки рассматриваются ветви ±phi + 2πk и выбирается кандидат, ближайший к предыдущей фазе (nearest continuous). """ x = np.asarray(x_norm, dtype=np.float64).ravel() if x.size == 0: return np.zeros((0,), dtype=np.float64) x = np.nan_to_num(x, nan=0.0, posinf=1.0, neginf=-1.0) x = np.clip(x, -1.0, 1.0) phi0 = np.arccos(x) out = np.empty_like(phi0, dtype=np.float64) out[0] = float(phi0[0]) for i in range(1, phi0.size): base_phi = float(phi0[i]) prev = float(out[i - 1]) best_cand: Optional[float] = None best_key: Optional[tuple[float, float]] = None for sign in (1.0, -1.0): base = sign * base_phi # Ищем ближайший сдвиг 2πk относительно prev именно для этой ветви. k_center = int(np.round((prev - base) / _TWO_PI)) for k in (k_center - 1, k_center, k_center + 1): cand = base + _TWO_PI * float(k) step = abs(cand - prev) # Tie-break: при равенстве шага предпочесть больший кандидат. key = (step, -cand) if best_key is None or key < best_key: best_key = key best_cand = cand out[i] = prev if best_cand is None else float(best_cand) return out def reconstruct_complex_spectrum_from_real_trace(sweep: np.ndarray) -> np.ndarray: """Восстановить комплексный спектр из вещественного следа через arccos+Euler.""" x_norm = normalize_sweep_for_phase(sweep) if x_norm.size == 0: return np.zeros((0,), dtype=np.complex128) x_norm = np.clip(x_norm, -1.0, 1.0) phi = unwrap_arccos_phase_continuous(x_norm) return np.exp(1j * phi).astype(np.complex128, copy=False) def perform_ifft_depth_response( s_array: np.ndarray, frequencies_hz: np.ndarray, *, axis: str = "abs", start_hz: float | None = None, stop_hz: float | None = None, ) -> tuple[np.ndarray, np.ndarray]: """Frequency-to-depth conversion with zero-padding and frequency offset handling.""" try: s_in = np.asarray(s_array, dtype=np.complex128).ravel() f_in = np.asarray(frequencies_hz, dtype=np.float64).ravel() m = min(s_in.size, f_in.size) if m < 2: raise ValueError("Not enough points") s = s_in[:m] f = f_in[:m] lo = float(FREQ_MIN_GHZ * 1e9 if start_hz is None else start_hz) hi = float(FREQ_MAX_GHZ * 1e9 if stop_hz is None else stop_hz) if hi < lo: lo, hi = hi, lo mask = ( np.isfinite(f) & np.isfinite(np.real(s)) & np.isfinite(np.imag(s)) & (f >= lo) & (f <= hi) ) f = f[mask] s = s[mask] n = int(f.size) if n < 2: raise ValueError("Not enough frequency points after filtering") if np.any(np.diff(f) <= 0.0): raise ValueError("Non-increasing frequency grid") df = float((f[-1] - f[0]) / (n - 1)) if not np.isfinite(df) or df <= 0.0: raise ValueError("Invalid frequency step") k0 = int(np.round(float(f[0]) / df)) if k0 < 0: raise ValueError("Negative frequency offset index") min_len = int(2 * (k0 + n - 1)) if min_len <= 0: raise ValueError("Invalid FFT length") n_fft = 1 << int(np.ceil(np.log2(float(min_len)))) dt = 1.0 / (n_fft * df) t_sec = np.arange(n_fft, dtype=np.float64) * dt h = np.zeros((n_fft,), dtype=np.complex128) end = k0 + n if end > n_fft: raise ValueError("Spectrum placement exceeds FFT buffer") h[k0:end] = s y = np.fft.ifft(h) depth_m = t_sec * SPEED_OF_LIGHT_M_S axis_name = str(axis).strip().lower() if axis_name == "abs": y_fin = np.abs(y) elif axis_name == "real": y_fin = np.real(y) elif axis_name == "imag": y_fin = np.imag(y) elif axis_name == "phase": y_fin = np.angle(y) else: raise ValueError(f"Invalid axis parameter: {axis!r}") return depth_m.astype(np.float32, copy=False), np.asarray(y_fin, dtype=np.float32) except Exception as exc: # noqa: BLE001 logger.error("IFFT depth response failed: %r", exc) return _fallback_depth_response(np.asarray(s_array).size, np.asarray(s_array)) def compute_ifft_profile_from_sweep(sweep: Optional[np.ndarray]) -> tuple[np.ndarray, np.ndarray]: """Высокоуровневый pipeline: sweep -> complex spectrum -> IFFT(abs) depth profile.""" if sweep is None: return _fallback_depth_response(1, None) try: s = np.asarray(sweep, dtype=np.float64).ravel() if s.size == 0: return _fallback_depth_response(1, None) freqs_hz = build_frequency_axis_hz(s.size) s_complex = reconstruct_complex_spectrum_from_real_trace(s) depth_m, y = perform_ifft_depth_response(s_complex, freqs_hz, axis="abs") n = min(depth_m.size, y.size) if n <= 0: return _fallback_depth_response(s.size, s) return depth_m[:n].astype(np.float32, copy=False), y[:n].astype(np.float32, copy=False) except Exception as exc: # noqa: BLE001 logger.error("compute_ifft_profile_from_sweep failed: %r", exc) return _fallback_depth_response(np.asarray(sweep).size if sweep is not None else 1, sweep) def compute_ifft_db_profile(sweep: Optional[np.ndarray]) -> np.ndarray: """Legacy wrapper (deprecated name): возвращает линейный |IFFT| профиль.""" _depth_m, y = compute_ifft_profile_from_sweep(sweep) return y