last implement diff

This commit is contained in:
awe
2026-02-27 17:43:32 +03:00
parent 33e1976233
commit c199ab7f28
8 changed files with 265 additions and 34 deletions

View File

@ -1,12 +1,8 @@
"""Преобразование свипа в IFFT-профиль по глубине (м).
Новый pipeline перед IFFT:
1) нормировка по max(abs(sweep))
2) clip в [-1, 1]
3) phi = arccos(x)
4) непрерывная развёртка фазы (nearest-continuous)
5) s_complex = exp(1j * phi)
6) IFFT с учётом смещения частотной сетки
Поддерживает несколько режимов восстановления комплексного спектра перед IFFT:
- ``arccos``: phi = arccos(x), continuous unwrap, z = exp(1j*phi)
- ``diff``: x ~= cos(phi), diff(x) -> sin(phi), z = cos + 1j*sin (с проекцией на единичную окружность)
"""
from __future__ import annotations
@ -29,6 +25,7 @@ logger = logging.getLogger(__name__)
_EPS = 1e-12
_TWO_PI = float(2.0 * np.pi)
_VALID_COMPLEX_MODES = {"arccos", "diff"}
def _fallback_depth_response(
@ -58,6 +55,13 @@ def _fallback_depth_response(
return depth, out
def _normalize_complex_mode(mode: str) -> str:
m = str(mode).strip().lower()
if m not in _VALID_COMPLEX_MODES:
raise ValueError(f"Invalid complex reconstruction mode: {mode!r}")
return m
def build_ifft_time_axis_ns() -> np.ndarray:
"""Legacy helper: старая временная ось IFFT в наносекундах (фиксированная длина)."""
return (
@ -69,22 +73,27 @@ def build_frequency_axis_hz(sweep_width: int) -> np.ndarray:
"""Построить частотную сетку (Гц) для текущей длины свипа."""
n = int(sweep_width)
if n <= 0:
return np.zeros((0,), dtype=np.float128)
return np.zeros((0,), dtype=np.float64)
if n == 1:
return np.array([FREQ_MIN_GHZ * 1e9], dtype=np.float64)
return np.linspace(FREQ_MIN_GHZ * 1e9, FREQ_MAX_GHZ * 1e9, n, dtype=np.float64)
def normalize_sweep_for_phase(sweep: np.ndarray) -> np.ndarray:
"""Нормировать свип на max(abs(.)) и вернуть float64."""
x = np.asarray(sweep, dtype=np.float64).ravel()
if x.size == 0:
return x
x = np.nan_to_num(x, nan=0.0, posinf=0.0, neginf=0.0)
amax = float(np.max(np.abs(x)))
def normalize_trace_unit_range(x: np.ndarray) -> np.ndarray:
"""Signed-нормировка массива по max(abs(.)) в диапазон около [-1, 1]."""
arr = np.asarray(x, dtype=np.float64).ravel()
if arr.size == 0:
return arr
arr = np.nan_to_num(arr, nan=0.0, posinf=0.0, neginf=0.0)
amax = float(np.max(np.abs(arr)))
if (not np.isfinite(amax)) or amax <= _EPS:
return np.zeros_like(x, dtype=np.float64)
return x / amax
return np.zeros_like(arr, dtype=np.float64)
return arr / amax
def normalize_sweep_for_phase(sweep: np.ndarray) -> np.ndarray:
"""Совместимый alias: нормировка свипа перед восстановлением фазы."""
return normalize_trace_unit_range(sweep)
def unwrap_arccos_phase_continuous(x_norm: np.ndarray) -> np.ndarray:
@ -101,6 +110,7 @@ def unwrap_arccos_phase_continuous(x_norm: np.ndarray) -> np.ndarray:
phi0 = np.arccos(x)
out = np.empty_like(phi0, dtype=np.float64)
out[0] = float(phi0[0])
for i in range(1, phi0.size):
base_phi = float(phi0[i])
prev = float(out[i - 1])
@ -110,7 +120,6 @@ def unwrap_arccos_phase_continuous(x_norm: np.ndarray) -> np.ndarray:
for sign in (1.0, -1.0):
base = sign * base_phi
# Ищем ближайший сдвиг 2πk относительно prev именно для этой ветви.
k_center = int(np.round((prev - base) / _TWO_PI))
for k in (k_center - 1, k_center, k_center + 1):
cand = base + _TWO_PI * float(k)
@ -122,20 +131,55 @@ def unwrap_arccos_phase_continuous(x_norm: np.ndarray) -> np.ndarray:
best_cand = cand
out[i] = prev if best_cand is None else float(best_cand)
return out
return phi0
def reconstruct_complex_spectrum_from_real_trace(sweep: np.ndarray) -> np.ndarray:
"""Восстановить комплексный спектр из вещественного следа через arccos+Euler."""
x_norm = normalize_sweep_for_phase(sweep)
def reconstruct_complex_spectrum_arccos(sweep: np.ndarray) -> np.ndarray:
"""Режим arccos: cos(phi) -> phi -> exp(i*phi)."""
x_norm = normalize_trace_unit_range(sweep)
if x_norm.size == 0:
return np.zeros((0,), dtype=np.complex128)
x_norm = np.clip(x_norm, -1.0, 1.0)
phi = unwrap_arccos_phase_continuous(x_norm)
phi = unwrap_arccos_phase_continuous(np.clip(x_norm, -1.0, 1.0))
return np.exp(1j * phi).astype(np.complex128, copy=False)
def reconstruct_complex_spectrum_diff(sweep: np.ndarray) -> np.ndarray:
"""Режим diff: x~=cos(phi), diff(x)->sin(phi), z=cos+i*sin с проекцией на единичную окружность."""
cos_phi = normalize_trace_unit_range(sweep)
if cos_phi.size == 0:
return np.zeros((0,), dtype=np.complex128)
cos_phi = np.clip(cos_phi, -1.0, 1.0)
if cos_phi.size < 2:
sin_est = np.zeros_like(cos_phi, dtype=np.float64)
else:
d = np.gradient(cos_phi)
sin_est = normalize_trace_unit_range(d)
sin_est = np.clip(sin_est, -1.0, 1.0)
z = cos_phi.astype(np.complex128, copy=False) + 1j * sin_est.astype(np.complex128, copy=False)
mag = np.abs(z)
z_unit = np.ones_like(z, dtype=np.complex128)
mask = mag > _EPS
if np.any(mask):
z_unit[mask] = z[mask] / mag[mask]
return mag
def reconstruct_complex_spectrum_from_real_trace(
sweep: np.ndarray,
*,
complex_mode: str = "arccos",
) -> np.ndarray:
"""Восстановить комплексный спектр из вещественного свипа в выбранном режиме."""
mode = _normalize_complex_mode(complex_mode)
if mode == "arccos":
return reconstruct_complex_spectrum_arccos(sweep)
if mode == "diff":
return reconstruct_complex_spectrum_diff(sweep)
raise ValueError(f"Unsupported complex reconstruction mode: {complex_mode!r}")
def perform_ifft_depth_response(
s_array: np.ndarray,
frequencies_hz: np.ndarray,
@ -173,7 +217,6 @@ def perform_ifft_depth_response(
n = int(f.size)
if n < 2:
raise ValueError("Not enough frequency points after filtering")
if np.any(np.diff(f) <= 0.0):
raise ValueError("Non-increasing frequency grid")
@ -221,7 +264,11 @@ def perform_ifft_depth_response(
return _fallback_depth_response(np.asarray(s_array).size, np.asarray(s_array))
def compute_ifft_profile_from_sweep(sweep: Optional[np.ndarray]) -> tuple[np.ndarray, np.ndarray]:
def compute_ifft_profile_from_sweep(
sweep: Optional[np.ndarray],
*,
complex_mode: str = "arccos",
) -> tuple[np.ndarray, np.ndarray]:
"""Высокоуровневый pipeline: sweep -> complex spectrum -> IFFT(abs) depth profile."""
if sweep is None:
return _fallback_depth_response(1, None)
@ -232,12 +279,12 @@ def compute_ifft_profile_from_sweep(sweep: Optional[np.ndarray]) -> tuple[np.nda
return _fallback_depth_response(1, None)
freqs_hz = build_frequency_axis_hz(s.size)
s_complex = reconstruct_complex_spectrum_from_real_trace(s)
s_complex = reconstruct_complex_spectrum_from_real_trace(s, complex_mode=complex_mode)
depth_m, y = perform_ifft_depth_response(s_complex, freqs_hz, axis="abs")
n = min(depth_m.size, y.size)
if n <= 0:
return _fallback_depth_response(s.size, s)
return depth_m[:n].astype(np.float128, copy=False), y[:n].astype(np.float128, copy=False) *20
return depth_m[:n].astype(np.float32, copy=False), y[:n].astype(np.float32, copy=False) # log10 для лучшей визуализации в водопаде
except Exception as exc: # noqa: BLE001
logger.error("compute_ifft_profile_from_sweep failed: %r", exc)
return _fallback_depth_response(np.asarray(sweep).size if sweep is not None else 1, sweep)
@ -245,6 +292,6 @@ def compute_ifft_profile_from_sweep(sweep: Optional[np.ndarray]) -> tuple[np.nda
def compute_ifft_db_profile(sweep: Optional[np.ndarray]) -> np.ndarray:
"""Legacy wrapper (deprecated name): возвращает линейный |IFFT| профиль."""
_depth_m, y = compute_ifft_profile_from_sweep(sweep)
_depth_m, y = compute_ifft_profile_from_sweep(sweep, complex_mode="arccos")
return y