fft: add GUI toggle for peak search with rolling-median reference and top-3 peak boxes

This commit is contained in:
2026-03-05 22:02:02 +03:00
parent c784cb5ffc
commit 6260d10c4f

View File

@ -28,7 +28,7 @@ import threading
import time import time
from collections import deque from collections import deque
from queue import Queue, Empty, Full from queue import Queue, Empty, Full
from typing import Any, Dict, Mapping, Optional, Tuple, Union from typing import Any, Dict, List, Mapping, Optional, Tuple, Union
import numpy as np import numpy as np
@ -368,6 +368,141 @@ def _find_peak_width_markers(xs: np.ndarray, ys: np.ndarray) -> Optional[Dict[st
} }
def _rolling_median_ref(xs: np.ndarray, ys: np.ndarray, window_ghz: float) -> np.ndarray:
"""Скользящая медиана по оси частоты/расстояния в окне фиксированной ширины."""
x = np.asarray(xs, dtype=np.float64)
y = np.asarray(ys, dtype=np.float64)
out = np.full(y.shape, np.nan, dtype=np.float64)
if x.size == 0 or y.size == 0 or x.size != y.size:
return out
w = float(window_ghz)
if not np.isfinite(w) or w <= 0.0:
return out
half = 0.5 * w
for i in range(x.size):
xi = x[i]
if not np.isfinite(xi):
continue
left = np.searchsorted(x, xi - half, side="left")
right = np.searchsorted(x, xi + half, side="right")
if right <= left:
continue
seg = y[left:right]
finite = np.isfinite(seg)
if not np.any(finite):
continue
out[i] = float(np.nanmedian(seg))
return out
def _find_top_peaks_over_ref(
xs: np.ndarray,
ys: np.ndarray,
ref: np.ndarray,
top_n: int = 3,
) -> List[Dict[str, float]]:
"""Найти top-N пиков по превышению над референсной кривой."""
x = np.asarray(xs, dtype=np.float64)
y = np.asarray(ys, dtype=np.float64)
r = np.asarray(ref, dtype=np.float64)
if x.size < 3 or y.size != x.size or r.size != x.size:
return []
valid = np.isfinite(x) & np.isfinite(y) & np.isfinite(r)
if not np.any(valid):
return []
d = np.full_like(y, np.nan, dtype=np.float64)
d[valid] = y[valid] - r[valid]
cand: List[int] = []
for i in range(1, x.size - 1):
if not (np.isfinite(d[i - 1]) and np.isfinite(d[i]) and np.isfinite(d[i + 1])):
continue
if d[i] <= 0.0:
continue
left_ok = d[i] > d[i - 1]
right_ok = d[i] >= d[i + 1]
alt_left_ok = d[i] >= d[i - 1]
alt_right_ok = d[i] > d[i + 1]
if (left_ok and right_ok) or (alt_left_ok and alt_right_ok):
cand.append(i)
if not cand:
return []
cand.sort(key=lambda i: float(d[i]), reverse=True)
def _interp_cross(x0: float, y0: float, x1: float, y1: float, y_cross: float) -> float:
dy = y1 - y0
if not np.isfinite(dy) or dy == 0.0:
return x1
t = (y_cross - y0) / dy
t = min(1.0, max(0.0, t))
return x0 + t * (x1 - x0)
picked: List[Dict[str, float]] = []
for idx in cand:
peak_y = float(y[idx])
peak_ref = float(r[idx])
peak_h = float(d[idx])
if not (np.isfinite(peak_y) and np.isfinite(peak_ref) and np.isfinite(peak_h)) or peak_h <= 0.0:
continue
half_level = peak_ref + 0.5 * peak_h
left_x = float(x[0])
found_left = False
for i in range(idx, 0, -1):
y0 = float(y[i - 1])
y1 = float(y[i])
if np.isfinite(y0) and np.isfinite(y1) and (y0 <= half_level <= y1):
left_x = _interp_cross(float(x[i - 1]), y0, float(x[i]), y1, half_level)
found_left = True
break
if not found_left:
left_x = float(x[0])
right_x = float(x[-1])
found_right = False
for i in range(idx, x.size - 1):
y0 = float(y[i])
y1 = float(y[i + 1])
if np.isfinite(y0) and np.isfinite(y1) and (y0 >= half_level >= y1):
right_x = _interp_cross(float(x[i]), y0, float(x[i + 1]), y1, half_level)
found_right = True
break
if not found_right:
right_x = float(x[-1])
width = float(right_x - left_x)
if not np.isfinite(width) or width <= 0.0:
continue
overlap = False
for p in picked:
if not (right_x <= p["left"] or left_x >= p["right"]):
overlap = True
break
if overlap:
continue
picked.append(
{
"x": float(x[idx]),
"peak_y": peak_y,
"ref": peak_ref,
"height": peak_h,
"left": left_x,
"right": right_x,
"width": width,
}
)
if len(picked) >= int(max(1, top_n)):
break
picked.sort(key=lambda p: p["x"])
return picked
def _normalize_sweep_simple(raw: np.ndarray, calib: np.ndarray) -> np.ndarray: def _normalize_sweep_simple(raw: np.ndarray, calib: np.ndarray) -> np.ndarray:
"""Простая нормировка: поэлементное деление raw/calib.""" """Простая нормировка: поэлементное деление raw/calib."""
w = min(raw.size, calib.size) w = min(raw.size, calib.size)
@ -1477,6 +1612,20 @@ def main():
"границ и фона и выводит ширину пика в статус" "границ и фона и выводит ширину пика в статус"
), ),
) )
parser.add_argument(
"--peak_search",
action="store_true",
help=(
"Поиск топ-3 пиков на FFT относительно референса (скользящая медиана) "
"с отрисовкой bounding box и параметров пиков"
),
)
parser.add_argument(
"--peak_ref_window",
type=float,
default=1.0,
help="Ширина окна скользящей медианы для --peak_search, ГГц/м по оси FFT (по умолчанию 1.0)",
)
args = parser.parse_args() args = parser.parse_args()
@ -1557,12 +1706,19 @@ def main():
calib_enabled = False calib_enabled = False
bg_subtract_enabled = False bg_subtract_enabled = False
peak_calibrate_mode = bool(getattr(args, "calibrate", False)) peak_calibrate_mode = bool(getattr(args, "calibrate", False))
peak_search_enabled = bool(getattr(args, "peak_search", False))
peak_ref_window = float(getattr(args, "peak_ref_window", 1.0))
if (not np.isfinite(peak_ref_window)) or peak_ref_window <= 0.0:
peak_ref_window = 1.0
current_peak_width: Optional[float] = None current_peak_width: Optional[float] = None
current_peak_amplitude: Optional[float] = None current_peak_amplitude: Optional[float] = None
peak_candidates: List[Dict[str, float]] = []
norm_type = str(getattr(args, "norm_type", "projector")).strip().lower() norm_type = str(getattr(args, "norm_type", "projector")).strip().lower()
cb = None cb = None
c_boxes = [] c_boxes = []
peak_textboxes = []
c_values_text = None c_values_text = None
peak_values_text = None
# Статусная строка (внизу окна) # Статусная строка (внизу окна)
status_text = fig.text( status_text = fig.text(
@ -1597,6 +1753,8 @@ def main():
# Линейный график спектра текущего свипа # Линейный график спектра текущего свипа
fft_line_obj, = ax_fft.plot([], [], lw=1) fft_line_obj, = ax_fft.plot([], [], lw=1)
fft_ref_obj, = ax_fft.plot([], [], lw=1, color="red", alpha=0.9, visible=False)
fft_peak_box_objs = [ax_fft.plot([], [], lw=1, color="red", visible=False)[0] for _ in range(3)]
fft_bg_obj = ax_fft.axhline(0.0, lw=1, color="red", visible=False) fft_bg_obj = ax_fft.axhline(0.0, lw=1, color="red", visible=False)
fft_left_obj = ax_fft.axvline(0.0, lw=1, color="red", visible=False) fft_left_obj = ax_fft.axvline(0.0, lw=1, color="red", visible=False)
fft_right_obj = ax_fft.axvline(0.0, lw=1, color="red", visible=False) fft_right_obj = ax_fft.axvline(0.0, lw=1, color="red", visible=False)
@ -1656,14 +1814,16 @@ def main():
return _normalize_by_calib(raw, calib, norm_type=norm_type) return _normalize_by_calib(raw, calib, norm_type=norm_type)
def _sync_checkbox_states(): def _sync_checkbox_states():
nonlocal calib_enabled, bg_subtract_enabled, current_sweep_norm nonlocal calib_enabled, bg_subtract_enabled, peak_search_enabled, current_sweep_norm
try: try:
states = cb.get_status() if cb is not None else () states = cb.get_status() if cb is not None else ()
calib_enabled = bool(states[0]) if len(states) > 0 else False calib_enabled = bool(states[0]) if len(states) > 0 else False
bg_subtract_enabled = bool(states[1]) if len(states) > 1 else False bg_subtract_enabled = bool(states[1]) if len(states) > 1 else False
peak_search_enabled = bool(states[2]) if len(states) > 2 else False
except Exception: except Exception:
calib_enabled = False calib_enabled = False
bg_subtract_enabled = False bg_subtract_enabled = False
peak_search_enabled = False
if calib_enabled and current_sweep_raw is not None and last_calib_sweep is not None: if calib_enabled and current_sweep_raw is not None and last_calib_sweep is not None:
current_sweep_norm = _normalize_sweep(current_sweep_raw, last_calib_sweep) current_sweep_norm = _normalize_sweep(current_sweep_raw, last_calib_sweep)
else: else:
@ -1674,11 +1834,15 @@ def main():
ax_smin = fig.add_axes([0.92, 0.55, 0.02, 0.35]) ax_smin = fig.add_axes([0.92, 0.55, 0.02, 0.35])
ax_smax = fig.add_axes([0.95, 0.55, 0.02, 0.35]) ax_smax = fig.add_axes([0.95, 0.55, 0.02, 0.35])
ax_sctr = fig.add_axes([0.98, 0.55, 0.02, 0.35]) ax_sctr = fig.add_axes([0.98, 0.55, 0.02, 0.35])
ax_cb = fig.add_axes([0.90, 0.40, 0.10, 0.14]) ax_cb = fig.add_axes([0.90, 0.37, 0.10, 0.17])
ymin_slider = Slider(ax_smin, "R min", 0.0, 1.0, valinit=0.0, orientation="vertical") ymin_slider = Slider(ax_smin, "R min", 0.0, 1.0, valinit=0.0, orientation="vertical")
ymax_slider = Slider(ax_smax, "R max", 0.0, 1.0, valinit=1.0, orientation="vertical") ymax_slider = Slider(ax_smax, "R max", 0.0, 1.0, valinit=1.0, orientation="vertical")
contrast_slider = Slider(ax_sctr, "Int max", 0, 100, valinit=100, valstep=1, orientation="vertical") contrast_slider = Slider(ax_sctr, "Int max", 0, 100, valinit=100, valstep=1, orientation="vertical")
cb = CheckButtons(ax_cb, ["нормировка", "вычет фона"], [False, False]) cb = CheckButtons(
ax_cb,
["нормировка", "вычет фона", "поиск пиков"],
[False, False, peak_search_enabled],
)
def _on_ylim_change(_val): def _on_ylim_change(_val):
try: try:
@ -1694,6 +1858,7 @@ def main():
# Контраст влияет на верхнюю границу цветовой шкалы (процент от авто-диапазона) # Контраст влияет на верхнюю границу цветовой шкалы (процент от авто-диапазона)
contrast_slider.on_changed(lambda _v: fig.canvas.draw_idle()) contrast_slider.on_changed(lambda _v: fig.canvas.draw_idle())
cb.on_clicked(lambda _v: _sync_checkbox_states()) cb.on_clicked(lambda _v: _sync_checkbox_states())
_sync_checkbox_states()
except Exception: except Exception:
pass pass
@ -1741,6 +1906,56 @@ def main():
except Exception: except Exception:
pass pass
def _refresh_peak_values_text(peaks: List[Dict[str, float]]):
if peak_values_text is None:
return
lines = []
for idx in range(3):
if idx < len(peaks):
p = peaks[idx]
lines.append(f"peak {idx + 1}:")
lines.append(f" X: {p['x']:.4g} m")
lines.append(f" H: {p['height']:.4g} dB")
lines.append(f" W: {p['width']:.4g} m")
else:
lines.append(f"peak {idx + 1}:")
lines.append(" X: - m")
lines.append(" H: - dB")
lines.append(" W: - m")
if idx != 2:
lines.append("")
peak_values_text.set_text("\n".join(lines))
try:
def _set_peak_ref_window(text: str):
nonlocal peak_ref_window
try:
v = float(text.strip())
if np.isfinite(v) and v > 0.0:
peak_ref_window = v
except Exception:
pass
ax_peak_win = fig.add_axes([0.90, 0.20, 0.10, 0.045])
peak_window_tb = TextBox(ax_peak_win, "med,GHz", initial=f"{peak_ref_window:.6g}")
peak_window_tb.on_submit(_set_peak_ref_window)
peak_textboxes.append(peak_window_tb)
ax_peak_info = fig.add_axes([0.90, 0.01, 0.10, 0.19])
ax_peak_info.axis("off")
peak_values_text = ax_peak_info.text(
0.0,
1.0,
"",
ha="left",
va="top",
fontsize=6,
family="monospace",
transform=ax_peak_info.transAxes,
)
_refresh_peak_values_text([])
except Exception:
pass
# Для контроля частоты обновления # Для контроля частоты обновления
max_fps = max(1.0, float(args.max_fps)) max_fps = max(1.0, float(args.max_fps))
interval_ms = int(1000.0 / max_fps) interval_ms = int(1000.0 / max_fps)
@ -1855,7 +2070,8 @@ def main():
if ring_fft is not None: if ring_fft is not None:
bins = ring_fft.shape[1] bins = ring_fft.shape[1]
fft_mag = _compute_fft_mag_row(s, freqs, bins) fft_mag = _compute_fft_mag_row(s, freqs, bins)
ring_fft[(head - 1) % ring_fft.shape[0], :] = fft_mag row_idx = (head - 1) % ring_fft.shape[0]
ring_fft[row_idx, :] = fft_mag
fft_row = _fft_mag_to_db(fft_mag) fft_row = _fft_mag_to_db(fft_mag)
# Экстремумы для цветовой шкалы # Экстремумы для цветовой шкалы
fr_min = np.nanmin(fft_row) fr_min = np.nanmin(fft_row)
@ -1973,8 +2189,8 @@ def main():
return base.T # (bins, time) return base.T # (bins, time)
def update(_frame): def update(_frame):
nonlocal frames_since_ylim_update, current_peak_width, current_peak_amplitude nonlocal frames_since_ylim_update, current_peak_width, current_peak_amplitude, peak_candidates
if peak_calibrate_mode and any(getattr(tb, "capturekeystrokes", False) for tb in c_boxes): if any(getattr(tb, "capturekeystrokes", False) for tb in (c_boxes + peak_textboxes)):
return ( return (
line_obj, line_obj,
line_avg1_obj, line_avg1_obj,
@ -1983,6 +2199,8 @@ def main():
line_norm_obj, line_norm_obj,
img_obj, img_obj,
fft_line_obj, fft_line_obj,
fft_ref_obj,
*fft_peak_box_objs,
fft_bg_obj, fft_bg_obj,
fft_left_obj, fft_left_obj,
fft_right_obj, fft_right_obj,
@ -2039,15 +2257,53 @@ def main():
xs_fft = current_distances if current_distances is not None else distance_shared xs_fft = current_distances if current_distances is not None else distance_shared
if fft_vals.size > xs_fft.size: if fft_vals.size > xs_fft.size:
fft_vals = fft_vals[: xs_fft.size] fft_vals = fft_vals[: xs_fft.size]
fft_line_obj.set_data(xs_fft[: fft_vals.size], fft_vals) xs_fft = xs_fft[: fft_vals.size]
fft_line_obj.set_data(xs_fft, fft_vals)
finite_fft = np.isfinite(xs_fft) & np.isfinite(fft_vals)
y_for_range = fft_vals[finite_fft]
if peak_search_enabled:
fft_ref = _rolling_median_ref(xs_fft, fft_vals, peak_ref_window)
finite_ref = np.isfinite(xs_fft) & np.isfinite(fft_ref)
if np.any(finite_ref):
fft_ref_obj.set_data(xs_fft[finite_ref], fft_ref[finite_ref])
fft_ref_obj.set_visible(True)
y_for_range = np.concatenate((y_for_range, fft_ref[finite_ref]))
else:
fft_ref_obj.set_visible(False)
peak_candidates = _find_top_peaks_over_ref(xs_fft, fft_vals, fft_ref, top_n=3)
_refresh_peak_values_text(peak_candidates)
for idx, box_obj in enumerate(fft_peak_box_objs):
if idx < len(peak_candidates):
p = peak_candidates[idx]
box_obj.set_data(
[p["left"], p["left"], p["right"], p["right"], p["left"]],
[p["ref"], p["peak_y"], p["peak_y"], p["ref"], p["ref"]],
)
box_obj.set_visible(True)
else:
box_obj.set_visible(False)
else:
peak_candidates = []
fft_ref_obj.set_visible(False)
_refresh_peak_values_text([])
for box_obj in fft_peak_box_objs:
box_obj.set_visible(False)
# Авто-диапазон по Y для спектра # Авто-диапазон по Y для спектра
if np.isfinite(np.nanmin(fft_vals)) and np.isfinite(np.nanmax(fft_vals)): if np.any(finite_fft):
finite_x = xs_fft[: fft_vals.size][np.isfinite(xs_fft[: fft_vals.size])] finite_x = xs_fft[finite_fft]
if finite_x.size > 0: if finite_x.size > 0:
ax_fft.set_xlim(float(np.min(finite_x)), float(np.max(finite_x))) ax_fft.set_xlim(float(np.min(finite_x)), float(np.max(finite_x)))
ax_fft.set_ylim(float(np.nanmin(fft_vals)), float(np.nanmax(fft_vals))) finite_y = y_for_range[np.isfinite(y_for_range)]
if finite_y.size > 0:
y0 = float(np.min(finite_y))
y1 = float(np.max(finite_y))
if y1 <= y0:
y1 = y0 + 1e-3
ax_fft.set_ylim(y0, y1)
if peak_calibrate_mode: if peak_calibrate_mode:
markers = _find_peak_width_markers(xs_fft[: fft_vals.size], fft_vals) markers = _find_peak_width_markers(xs_fft, fft_vals)
if markers is not None: if markers is not None:
fft_bg_obj.set_ydata([markers["background"], markers["background"]]) fft_bg_obj.set_ydata([markers["background"], markers["background"]])
fft_left_obj.set_xdata([markers["left"], markers["left"]]) fft_left_obj.set_xdata([markers["left"], markers["left"]])
@ -2077,6 +2333,12 @@ def main():
spec_right_obj.set_visible(False) spec_right_obj.set_visible(False)
current_peak_width = None current_peak_width = None
current_peak_amplitude = None current_peak_amplitude = None
else:
fft_ref_obj.set_visible(False)
for box_obj in fft_peak_box_objs:
box_obj.set_visible(False)
peak_candidates = []
_refresh_peak_values_text([])
# Обновление водопада # Обновление водопада
if changed and ring is not None: if changed and ring is not None:
@ -2179,6 +2441,8 @@ def main():
line_norm_obj, line_norm_obj,
img_obj, img_obj,
fft_line_obj, fft_line_obj,
fft_ref_obj,
*fft_peak_box_objs,
fft_bg_obj, fft_bg_obj,
fft_left_obj, fft_left_obj,
fft_right_obj, fft_right_obj,
@ -2231,6 +2495,7 @@ def main():
def run_pyqtgraph(args): def run_pyqtgraph(args):
"""Быстрый GUI на PyQtGraph. Требует pyqtgraph и PyQt5/PySide6.""" """Быстрый GUI на PyQtGraph. Требует pyqtgraph и PyQt5/PySide6."""
peak_calibrate_mode = bool(getattr(args, "calibrate", False)) peak_calibrate_mode = bool(getattr(args, "calibrate", False))
peak_search_enabled = bool(getattr(args, "peak_search", False))
try: try:
import pyqtgraph as pg import pyqtgraph as pg
from pyqtgraph.Qt import QtCore, QtWidgets # type: ignore from pyqtgraph.Qt import QtCore, QtWidgets # type: ignore
@ -2292,7 +2557,8 @@ def run_pyqtgraph(args):
settings_layout.setContentsMargins(6, 6, 6, 6) settings_layout.setContentsMargins(6, 6, 6, 6)
settings_layout.setSpacing(8) settings_layout.setSpacing(8)
try: try:
settings_widget.setMinimumWidth(170) settings_widget.setMinimumWidth(130)
settings_widget.setMaximumWidth(150)
except Exception: except Exception:
pass pass
main_layout.addWidget(settings_widget) main_layout.addWidget(settings_widget)
@ -2328,10 +2594,15 @@ def run_pyqtgraph(args):
p_fft = win.addPlot(row=1, col=0, title="FFT") p_fft = win.addPlot(row=1, col=0, title="FFT")
p_fft.showGrid(x=True, y=True, alpha=0.3) p_fft.showGrid(x=True, y=True, alpha=0.3)
curve_fft = p_fft.plot(pen=pg.mkPen((255, 120, 80), width=1)) curve_fft = p_fft.plot(pen=pg.mkPen((255, 120, 80), width=1))
curve_fft_ref = p_fft.plot(pen=pg.mkPen((255, 0, 0), width=1))
peak_pen = pg.mkPen((255, 0, 0), width=1) peak_pen = pg.mkPen((255, 0, 0), width=1)
fft_peak_boxes = [p_fft.plot(pen=peak_pen) for _ in range(3)]
fft_bg_line = pg.InfiniteLine(angle=0, movable=False, pen=peak_pen) fft_bg_line = pg.InfiniteLine(angle=0, movable=False, pen=peak_pen)
fft_left_line = pg.InfiniteLine(angle=90, movable=False, pen=peak_pen) fft_left_line = pg.InfiniteLine(angle=90, movable=False, pen=peak_pen)
fft_right_line = pg.InfiniteLine(angle=90, movable=False, pen=peak_pen) fft_right_line = pg.InfiniteLine(angle=90, movable=False, pen=peak_pen)
curve_fft_ref.setVisible(False)
for box in fft_peak_boxes:
box.setVisible(False)
p_fft.addItem(fft_bg_line) p_fft.addItem(fft_bg_line)
p_fft.addItem(fft_left_line) p_fft.addItem(fft_left_line)
p_fft.addItem(fft_right_line) p_fft.addItem(fft_right_line)
@ -2363,6 +2634,7 @@ def run_pyqtgraph(args):
# Правая панель настроек внутри основного окна. # Правая панель настроек внутри основного окна.
calib_cb = QtWidgets.QCheckBox("нормировка") calib_cb = QtWidgets.QCheckBox("нормировка")
bg_subtract_cb = QtWidgets.QCheckBox("вычет фона") bg_subtract_cb = QtWidgets.QCheckBox("вычет фона")
peak_search_cb = QtWidgets.QCheckBox("поиск пиков")
try: try:
settings_title = QtWidgets.QLabel("Настройки") settings_title = QtWidgets.QLabel("Настройки")
settings_layout.addWidget(settings_title) settings_layout.addWidget(settings_title)
@ -2370,6 +2642,7 @@ def run_pyqtgraph(args):
pass pass
settings_layout.addWidget(calib_cb) settings_layout.addWidget(calib_cb)
settings_layout.addWidget(bg_subtract_cb) settings_layout.addWidget(bg_subtract_cb)
settings_layout.addWidget(peak_search_cb)
calib_window = None calib_window = None
# Статусная строка (внизу окна) # Статусная строка (внизу окна)
@ -2403,9 +2676,16 @@ def run_pyqtgraph(args):
bg_subtract_enabled = False bg_subtract_enabled = False
current_peak_width: Optional[float] = None current_peak_width: Optional[float] = None
current_peak_amplitude: Optional[float] = None current_peak_amplitude: Optional[float] = None
peak_candidates: List[Dict[str, float]] = []
norm_type = str(getattr(args, "norm_type", "projector")).strip().lower() norm_type = str(getattr(args, "norm_type", "projector")).strip().lower()
c_edits = [] c_edits = []
c_value_labels = [] c_value_labels = []
peak_group = None
peak_window_edit = None
peak_params_label = None
peak_ref_window = float(getattr(args, "peak_ref_window", 1.0))
if (not np.isfinite(peak_ref_window)) or peak_ref_window <= 0.0:
peak_ref_window = 1.0
plot_dirty = False plot_dirty = False
# Диапазон по Y: авто по умолчанию (поддерживает отрицательные значения) # Диапазон по Y: авто по умолчанию (поддерживает отрицательные значения)
fixed_ylim: Optional[Tuple[float, float]] = None fixed_ylim: Optional[Tuple[float, float]] = None
@ -2450,6 +2730,90 @@ def run_pyqtgraph(args):
except Exception: except Exception:
pass pass
def _refresh_peak_params_label(peaks: List[Dict[str, float]]):
if peak_params_label is None:
return
lines = []
for idx in range(3):
if idx < len(peaks):
p = peaks[idx]
lines.append(f"peak {idx + 1}:")
lines.append(f" X: {p['x']:.4g} m")
lines.append(f" H: {p['height']:.4g} dB")
lines.append(f" W: {p['width']:.4g} m")
else:
lines.append(f"peak {idx + 1}:")
lines.append(" X: - m")
lines.append(" H: - dB")
lines.append(" W: - m")
if idx != 2:
lines.append("")
peak_params_label.setText("\n".join(lines))
try:
peak_group = QtWidgets.QGroupBox("Поиск пиков")
peak_layout = QtWidgets.QFormLayout(peak_group)
peak_layout.setContentsMargins(6, 6, 6, 6)
peak_layout.setSpacing(6)
peak_window_edit = QtWidgets.QLineEdit(f"{peak_ref_window:.6g}")
peak_layout.addRow("Окно медианы, ГГц", peak_window_edit)
peak_params_label = QtWidgets.QLabel("")
try:
peak_params_label.setTextInteractionFlags(QtCore.Qt.TextSelectableByMouse)
except Exception:
pass
peak_layout.addRow("Параметры", peak_params_label)
settings_layout.addWidget(peak_group)
def _apply_peak_window():
nonlocal peak_ref_window, plot_dirty
if peak_window_edit is None:
return
try:
v = float(peak_window_edit.text().strip())
if np.isfinite(v) and v > 0.0:
peak_ref_window = v
plot_dirty = True
except Exception:
pass
try:
peak_window_edit.setText(f"{peak_ref_window:.6g}")
except Exception:
pass
peak_window_edit.editingFinished.connect(_apply_peak_window)
_refresh_peak_params_label([])
except Exception:
peak_group = None
peak_window_edit = None
peak_params_label = None
def _set_peak_search_enabled():
nonlocal peak_search_enabled, plot_dirty, peak_candidates
try:
peak_search_enabled = bool(peak_search_cb.isChecked())
except Exception:
peak_search_enabled = False
try:
if peak_group is not None:
peak_group.setEnabled(peak_search_enabled)
except Exception:
pass
if not peak_search_enabled:
peak_candidates = []
_refresh_peak_params_label([])
plot_dirty = True
try:
peak_search_cb.setChecked(peak_search_enabled)
except Exception:
pass
try:
peak_search_cb.stateChanged.connect(lambda _v: _set_peak_search_enabled())
except Exception:
pass
_set_peak_search_enabled()
if peak_calibrate_mode: if peak_calibrate_mode:
try: try:
calib_window = QtWidgets.QWidget() calib_window = QtWidgets.QWidget()
@ -2638,7 +3002,8 @@ def run_pyqtgraph(args):
if ring_fft is not None: if ring_fft is not None:
bins = ring_fft.shape[1] bins = ring_fft.shape[1]
fft_mag = _compute_fft_mag_row(s, freqs, bins) fft_mag = _compute_fft_mag_row(s, freqs, bins)
ring_fft[(head - 1) % ring_fft.shape[0], :] = fft_mag row_idx = (head - 1) % ring_fft.shape[0]
ring_fft[row_idx, :] = fft_mag
fft_row = _fft_mag_to_db(fft_mag) fft_row = _fft_mag_to_db(fft_mag)
current_fft_db = fft_row current_fft_db = fft_row
fr_min = np.nanmin(fft_row) fr_min = np.nanmin(fft_row)
@ -2703,9 +3068,11 @@ def run_pyqtgraph(args):
pass pass
def update(): def update():
nonlocal current_peak_width, current_peak_amplitude, plot_dirty, current_fft_db nonlocal current_peak_width, current_peak_amplitude, plot_dirty, current_fft_db, peak_candidates
if peak_calibrate_mode and any(edit.hasFocus() for edit in c_edits): if peak_calibrate_mode and any(edit.hasFocus() for edit in c_edits):
return return
if peak_search_enabled and peak_window_edit is not None and peak_window_edit.hasFocus():
return
changed = drain_queue() > 0 changed = drain_queue() > 0
redraw_needed = changed or plot_dirty redraw_needed = changed or plot_dirty
if redraw_needed and current_sweep_raw is not None and x_shared is not None: if redraw_needed and current_sweep_raw is not None and x_shared is not None:
@ -2756,13 +3123,51 @@ def run_pyqtgraph(args):
xs_fft = current_distances if current_distances is not None else distance_shared xs_fft = current_distances if current_distances is not None else distance_shared
if fft_vals.size > xs_fft.size: if fft_vals.size > xs_fft.size:
fft_vals = fft_vals[: xs_fft.size] fft_vals = fft_vals[: xs_fft.size]
curve_fft.setData(xs_fft[: fft_vals.size], fft_vals) xs_fft = xs_fft[: fft_vals.size]
finite_x = xs_fft[: fft_vals.size][np.isfinite(xs_fft[: fft_vals.size])] curve_fft.setData(xs_fft, fft_vals)
finite_x = xs_fft[np.isfinite(xs_fft)]
if finite_x.size > 0: if finite_x.size > 0:
p_fft.setXRange(float(np.min(finite_x)), float(np.max(finite_x)), padding=0) p_fft.setXRange(float(np.min(finite_x)), float(np.max(finite_x)), padding=0)
p_fft.setYRange(float(np.nanmin(fft_vals)), float(np.nanmax(fft_vals)), padding=0)
finite_fft = np.isfinite(xs_fft) & np.isfinite(fft_vals)
y_for_range = fft_vals[finite_fft]
if peak_search_enabled:
fft_ref = _rolling_median_ref(xs_fft, fft_vals, peak_ref_window)
finite_ref = np.isfinite(xs_fft) & np.isfinite(fft_ref)
if np.any(finite_ref):
curve_fft_ref.setData(xs_fft[finite_ref], fft_ref[finite_ref])
curve_fft_ref.setVisible(True)
y_for_range = np.concatenate((y_for_range, fft_ref[finite_ref]))
else:
curve_fft_ref.setVisible(False)
peak_candidates = _find_top_peaks_over_ref(xs_fft, fft_vals, fft_ref, top_n=3)
_refresh_peak_params_label(peak_candidates)
for idx, box in enumerate(fft_peak_boxes):
if idx < len(peak_candidates):
p = peak_candidates[idx]
box.setData(
[p["left"], p["left"], p["right"], p["right"], p["left"]],
[p["ref"], p["peak_y"], p["peak_y"], p["ref"], p["ref"]],
)
box.setVisible(True)
else:
box.setVisible(False)
else:
peak_candidates = []
_refresh_peak_params_label([])
curve_fft_ref.setVisible(False)
for box in fft_peak_boxes:
box.setVisible(False)
finite_y = y_for_range[np.isfinite(y_for_range)]
if finite_y.size > 0:
y0 = float(np.min(finite_y))
y1 = float(np.max(finite_y))
if y1 <= y0:
y1 = y0 + 1e-3
p_fft.setYRange(y0, y1, padding=0)
if peak_calibrate_mode: if peak_calibrate_mode:
markers = _find_peak_width_markers(xs_fft[: fft_vals.size], fft_vals) markers = _find_peak_width_markers(xs_fft, fft_vals)
if markers is not None: if markers is not None:
fft_bg_line.setValue(markers["background"]) fft_bg_line.setValue(markers["background"])
fft_left_line.setValue(markers["left"]) fft_left_line.setValue(markers["left"])
@ -2792,6 +3197,12 @@ def run_pyqtgraph(args):
spec_right_line.setVisible(False) spec_right_line.setVisible(False)
current_peak_width = None current_peak_width = None
current_peak_amplitude = None current_peak_amplitude = None
else:
curve_fft_ref.setVisible(False)
for box in fft_peak_boxes:
box.setVisible(False)
peak_candidates = []
_refresh_peak_params_label([])
plot_dirty = False plot_dirty = False
if changed and ring is not None: if changed and ring is not None: