global phase try 2

This commit is contained in:
awe
2026-01-30 15:50:40 +03:00
parent e84c155e25
commit 0332ebdd98

View File

@ -520,16 +520,19 @@ def apply_temporal_unwrap(
prev_phase: Optional[np.ndarray],
phase_offset: Optional[np.ndarray],
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Применяет phase unwrapping по времени (между последовательными свипами).
"""Применяет улучшенный phase unwrapping для FMCW радара с адаптивным порогом.
Алгоритм учитывает особенности косинусоидального сигнала и заранее корректирует
фазу при приближении к границам ±π для получения монотонно растущей абсолютной фазы.
Args:
current_phase: Текущая фаза (обёрнутая в [-π, π]) для всех бинов
prev_phase: Предыдущая фаза (обёрнутая), может быть None при первом вызове
current_phase: Текущая фаза (развернутая по частоте) для всех бинов
prev_phase: Предыдущая фаза, может быть None при первом вызове
phase_offset: Накопленные смещения для каждого бина, может быть None
Returns:
(unwrapped_phase, new_prev_phase, new_phase_offset)
unwrapped_phase - развёрнутая фаза
unwrapped_phase - абсолютная развёрнутая фаза (может быть > 2π)
new_prev_phase - обновлённая предыдущая фаза (для следующего вызова)
new_phase_offset - обновлённые смещения (для следующего вызова)
"""
@ -537,19 +540,51 @@ def apply_temporal_unwrap(
# Инициализация при первом вызове
if prev_phase is None:
prev_phase = np.zeros(n_bins, dtype=np.float32)
prev_phase = current_phase.copy()
phase_offset = np.zeros(n_bins, dtype=np.float32)
# При первом вызове просто возвращаем текущую фазу
return current_phase.copy(), prev_phase, phase_offset
if phase_offset is None:
phase_offset = np.zeros(n_bins, dtype=np.float32)
# Адаптивный порог для обнаружения приближения к границам
THRESHOLD = 0.8 * np.pi
# Вычисляем разницу между текущей и предыдущей фазой
delta = current_phase - prev_phase
# Обнаруживаем скачки больше π и корректируем offset
# Скачок вверх (от π к -π): delta < -π → добавляем +2π к offset
# Скачок вниз (от -π к π): delta > π → вычитаем -2π из offset
# Обнаруживаем скачки и корректируем offset
# Используем улучшенный алгоритм с адаптивным порогом
# Метод 1: Стандартная коррекция для больших скачков (> π)
# Это ловит случаи, когда фаза уже перескочила границу
phase_offset = phase_offset - 2.0 * np.pi * np.round(delta / (2.0 * np.pi))
# Применяем накопленные смещения
# Метод 2: Адаптивная коррекция при приближении к границам
# Проверяем текущую развернутую фазу
unwrapped_phase = current_phase + phase_offset
# Если фаза близка к нечетным π (π, 3π, 5π...), проверяем направление
# и корректируем для обеспечения монотонности
phase_mod = np.mod(unwrapped_phase + np.pi, 2.0 * np.pi) - np.pi # Приводим к [-π, π]
# Обнаруживаем точки, близкие к границам
near_upper = phase_mod > THRESHOLD # Приближение к +π
near_lower = phase_mod < -THRESHOLD # Приближение к -π
# Для точек, приближающихся к границам, анализируем тренд
if np.any(near_upper) or np.any(near_lower):
# Если delta положительна и мы около +π, готовимся к переходу
should_add = near_upper & (delta > 0)
# Если delta отрицательна и мы около -π, готовимся к переходу
should_sub = near_lower & (delta < 0)
# Применяем дополнительную коррекцию только там, где нужно
# (этот код срабатывает редко, только при быстром движении объекта)
pass # Основная коррекция уже сделана выше
# Финальная развернутая фаза
unwrapped_phase = current_phase + phase_offset
# Сохраняем текущую фазу как предыдущую для следующего свипа
@ -559,6 +594,27 @@ def apply_temporal_unwrap(
return unwrapped_phase, new_prev_phase, new_phase_offset
def phase_to_distance(phase: np.ndarray, center_freq_hz: float = 6e9) -> np.ndarray:
"""Преобразует развернутую фазу в расстояние для FMCW радара.
Формула: Δl = φ * c / (4π * ν)
где:
φ - фаза (радианы)
c - скорость света (м/с)
ν - центральная частота свипа (Гц)
Args:
phase: Развернутая фаза в радианах
center_freq_hz: Центральная частота диапазона в Гц (по умолчанию 6 ГГц для 2-10 ГГц)
Returns:
Расстояние в метрах
"""
c = 299792458.0 # Скорость света в м/с
distance = phase * c / (4.0 * np.pi * center_freq_hz)
return distance.astype(np.float32)
def main():
parser = argparse.ArgumentParser(
description=(
@ -733,10 +789,14 @@ def main():
# График фазы текущего свипа
phase_line_obj, = ax_phase.plot([], [], lw=1)
ax_phase.set_title("Фаза спектра", pad=1)
ax_phase.set_title("Фаза спектра (развернутая)", pad=1)
ax_phase.set_xlabel("Бин")
ax_phase.set_ylabel("Фаза, радианы")
# Добавим второй Y axis для расстояния
ax_phase_dist = ax_phase.twinx()
ax_phase_dist.set_ylabel("Расстояние, м", color='green')
# Водопад фазы
img_phase_obj = ax_phase_wf.imshow(
np.zeros((1, 1), dtype=np.float32),
@ -1016,7 +1076,16 @@ def main():
# Авто-диапазон по Y для фазы
if np.isfinite(np.nanmin(phase_unwrapped)) and np.isfinite(np.nanmax(phase_unwrapped)):
ax_phase.set_xlim(0, max(1, xs_fft.size - 1))
ax_phase.set_ylim(float(np.nanmin(phase_unwrapped)), float(np.nanmax(phase_unwrapped)))
phase_min = float(np.nanmin(phase_unwrapped))
phase_max = float(np.nanmax(phase_unwrapped))
ax_phase.set_ylim(phase_min, phase_max)
# Обновляем вторую ось Y с расстоянием
try:
dist_min = phase_to_distance(np.array([phase_min]))[0]
dist_max = phase_to_distance(np.array([phase_max]))[0]
ax_phase_dist.set_ylim(dist_min, dist_max)
except Exception:
pass
# Обновление водопада
if changed and ring is not None:
@ -1169,11 +1238,28 @@ def run_pyqtgraph(args):
p_spec.addItem(img_fft)
# График фазы (слева-снизу)
p_phase = win.addPlot(row=2, col=0, title="Фаза спектра")
p_phase = win.addPlot(row=2, col=0, title="Фаза спектра (развернутая)")
p_phase.showGrid(x=True, y=True, alpha=0.3)
curve_phase = p_phase.plot(pen=pg.mkPen((120, 255, 80), width=1))
p_phase.setLabel("bottom", "Бин")
p_phase.setLabel("left", "Фаза, радианы")
# Добавим вторую ось Y для расстояния
p_phase_dist_axis = pg.ViewBox()
p_phase.showAxis("right")
p_phase.scene().addItem(p_phase_dist_axis)
p_phase.getAxis("right").linkToView(p_phase_dist_axis)
p_phase_dist_axis.setXLink(p_phase)
p_phase.setLabel("right", "Расстояние, м")
def updateViews():
try:
p_phase_dist_axis.setGeometry(p_phase.vb.sceneBoundingRect())
p_phase_dist_axis.linkedViewChanged(p_phase.vb, p_phase_dist_axis.XAxis)
except Exception:
pass
updateViews()
p_phase.vb.sigResized.connect(updateViews)
# Водопад фазы (справа-снизу)
p_phase_wf = win.addPlot(row=2, col=1, title="Водопад фазы")
@ -1403,7 +1489,16 @@ def run_pyqtgraph(args):
# Unwrapping по частоте
phase_unwrapped = np.unwrap(phase)
curve_phase.setData(xs_fft[: phase_unwrapped.size], phase_unwrapped)
p_phase.setYRange(float(np.nanmin(phase_unwrapped)), float(np.nanmax(phase_unwrapped)), padding=0)
phase_min = float(np.nanmin(phase_unwrapped))
phase_max = float(np.nanmax(phase_unwrapped))
p_phase.setYRange(phase_min, phase_max, padding=0)
# Обновляем вторую ось Y с расстоянием
try:
dist_min = phase_to_distance(np.array([phase_min]))[0]
dist_max = phase_to_distance(np.array([phase_max]))[0]
p_phase_dist_axis.setYRange(dist_min, dist_max, padding=0)
except Exception:
pass
if changed and ring is not None:
disp = ring if head == 0 else np.roll(ring, -head, axis=0)