implemented --calibrate mode. In this mode frequency calibration coeffs can be entered via GUI. Also fixed some bugs in PyQT backend. Problem: matplotlib is so slow...

This commit is contained in:
2026-03-04 14:34:41 +03:00
parent 283631c52e
commit c171ae07e0

View File

@ -47,6 +47,7 @@ SweepInfo = Dict[str, Any]
SweepData = Dict[str, np.ndarray] SweepData = Dict[str, np.ndarray]
SweepAuxCurves = Optional[Tuple[np.ndarray, np.ndarray]] SweepAuxCurves = Optional[Tuple[np.ndarray, np.ndarray]]
SweepPacket = Tuple[np.ndarray, SweepInfo, SweepAuxCurves] SweepPacket = Tuple[np.ndarray, SweepInfo, SweepAuxCurves]
CALIBRATION_C = np.asarray([0.0, 1.0, 0.025], dtype=np.float64)
def _format_status_kv(data: Mapping[str, Any]) -> str: def _format_status_kv(data: Mapping[str, Any]) -> str:
@ -136,8 +137,7 @@ def calibrate_freqs(sweep: Mapping[str, Any]) -> SweepData:
F = np.asarray(sweep["F"], dtype=np.float64).copy() F = np.asarray(sweep["F"], dtype=np.float64).copy()
I = np.asarray(sweep["I"], dtype=np.float64).copy() I = np.asarray(sweep["I"], dtype=np.float64).copy()
tmp = [] tmp = []
C = np.asarray(CALIBRATION_C, dtype=np.float64)
C = [0,1,0]
for f in F: for f in F:
val = C[0] + (f**1) * C[1] + (f**2) * C[2] val = C[0] + (f**1) * C[1] + (f**2) * C[2]
@ -244,6 +244,84 @@ def _compute_distance_axis(freqs: Optional[np.ndarray], bins: int) -> np.ndarray
return np.arange(bins, dtype=np.float64) * step_m return np.arange(bins, dtype=np.float64) * step_m
def _find_peak_width_markers(xs: np.ndarray, ys: np.ndarray) -> Optional[Dict[str, float]]:
"""Найти главный ненулевой пик и его ширину по уровню половины высоты над фоном."""
x_arr = np.asarray(xs, dtype=np.float64)
y_arr = np.asarray(ys, dtype=np.float64)
valid = np.isfinite(x_arr) & np.isfinite(y_arr) & (x_arr > 0.0)
if int(np.count_nonzero(valid)) < 3:
return None
x = x_arr[valid]
y = y_arr[valid]
x_min = float(x[0])
x_max = float(x[-1])
x_span = x_max - x_min
central_mask = (x >= (x_min + 0.25 * x_span)) & (x <= (x_min + 0.75 * x_span))
if int(np.count_nonzero(central_mask)) > 0:
central_idx = np.flatnonzero(central_mask)
peak_idx = int(central_idx[int(np.argmax(y[central_mask]))])
else:
peak_idx = int(np.argmax(y))
peak_y = float(y[peak_idx])
shoulder_gap = max(1, min(8, y.size // 64 if y.size > 0 else 1))
shoulder_width = max(4, min(32, y.size // 16 if y.size > 0 else 4))
left_lo = max(0, peak_idx - shoulder_gap - shoulder_width)
left_hi = max(0, peak_idx - shoulder_gap)
right_lo = min(y.size, peak_idx + shoulder_gap + 1)
right_hi = min(y.size, right_lo + shoulder_width)
background_parts = []
if left_hi > left_lo:
background_parts.append(float(np.nanmedian(y[left_lo:left_hi])))
if right_hi > right_lo:
background_parts.append(float(np.nanmedian(y[right_lo:right_hi])))
if background_parts:
background = float(np.mean(background_parts))
else:
background = float(np.nanpercentile(y, 10))
if not np.isfinite(peak_y) or not np.isfinite(background) or peak_y <= background:
return None
half_level = background + 0.5 * (peak_y - background)
def _interp_cross(x0: float, y0: float, x1: float, y1: float) -> float:
if not (np.isfinite(x0) and np.isfinite(y0) and np.isfinite(x1) and np.isfinite(y1)):
return x1
dy = y1 - y0
if dy == 0.0:
return x1
t = (half_level - y0) / dy
t = min(1.0, max(0.0, t))
return x0 + t * (x1 - x0)
left_x = float(x[0])
for i in range(peak_idx, 0, -1):
if y[i - 1] <= half_level <= y[i]:
left_x = _interp_cross(float(x[i - 1]), float(y[i - 1]), float(x[i]), float(y[i]))
break
else:
left_x = float(x[0])
right_x = float(x[-1])
for i in range(peak_idx, x.size - 1):
if y[i] >= half_level >= y[i + 1]:
right_x = _interp_cross(float(x[i]), float(y[i]), float(x[i + 1]), float(y[i + 1]))
break
else:
right_x = float(x[-1])
width = right_x - left_x
if not np.isfinite(width) or width <= 0.0:
return None
return {
"background": background,
"left": left_x,
"right": right_x,
"width": width,
}
def _normalize_sweep_simple(raw: np.ndarray, calib: np.ndarray) -> np.ndarray: def _normalize_sweep_simple(raw: np.ndarray, calib: np.ndarray) -> np.ndarray:
"""Простая нормировка: поэлементное деление raw/calib.""" """Простая нормировка: поэлементное деление raw/calib."""
w = min(raw.size, calib.size) w = min(raw.size, calib.size)
@ -1028,6 +1106,14 @@ def main():
"а свип считается как 10**(avg_1*0.001) - 10**(avg_2*0.001)" "а свип считается как 10**(avg_1*0.001) - 10**(avg_2*0.001)"
), ),
) )
parser.add_argument(
"--calibrate",
action="store_true",
help=(
"Режим измерения ширины главного пика FFT: рисует красные маркеры "
"границ и фона и выводит ширину пика в статус"
),
)
args = parser.parse_args() args = parser.parse_args()
@ -1045,7 +1131,7 @@ def main():
import matplotlib import matplotlib
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation from matplotlib.animation import FuncAnimation
from matplotlib.widgets import Slider, CheckButtons from matplotlib.widgets import Slider, CheckButtons, TextBox
except Exception as e: except Exception as e:
sys.stderr.write(f"[error] Нужны matplotlib и ее зависимости: {e}\n") sys.stderr.write(f"[error] Нужны matplotlib и ее зависимости: {e}\n")
sys.exit(1) sys.exit(1)
@ -1099,8 +1185,11 @@ def main():
ymax_slider = None ymax_slider = None
contrast_slider = None contrast_slider = None
calib_enabled = False calib_enabled = False
peak_calibrate_mode = bool(getattr(args, "calibrate", False))
current_peak_width: Optional[float] = None
norm_type = str(getattr(args, "norm_type", "projector")).strip().lower() norm_type = str(getattr(args, "norm_type", "projector")).strip().lower()
cb = None cb = None
c_boxes = []
# Статусная строка (внизу окна) # Статусная строка (внизу окна)
status_text = fig.text( status_text = fig.text(
@ -1135,6 +1224,9 @@ def main():
# Линейный график спектра текущего свипа # Линейный график спектра текущего свипа
fft_line_obj, = ax_fft.plot([], [], lw=1) fft_line_obj, = ax_fft.plot([], [], lw=1)
fft_bg_obj = ax_fft.axhline(0.0, lw=1, color="red", visible=False)
fft_left_obj = ax_fft.axvline(0.0, lw=1, color="red", visible=False)
fft_right_obj = ax_fft.axvline(0.0, lw=1, color="red", visible=False)
ax_fft.set_title("FFT", pad=1) ax_fft.set_title("FFT", pad=1)
ax_fft.set_xlabel("Расстояние, м") ax_fft.set_xlabel("Расстояние, м")
ax_fft.set_ylabel("дБ") ax_fft.set_ylabel("дБ")
@ -1184,6 +1276,8 @@ def main():
ax_spec.tick_params(axis="x", labelbottom=False) ax_spec.tick_params(axis="x", labelbottom=False)
except Exception: except Exception:
pass pass
spec_left_obj = ax_spec.axhline(0.0, lw=1, color="red", visible=False)
spec_right_obj = ax_spec.axhline(0.0, lw=1, color="red", visible=False)
def _normalize_sweep(raw: np.ndarray, calib: np.ndarray) -> np.ndarray: def _normalize_sweep(raw: np.ndarray, calib: np.ndarray) -> np.ndarray:
return _normalize_by_calib(raw, calib, norm_type=norm_type) return _normalize_by_calib(raw, calib, norm_type=norm_type)
@ -1227,6 +1321,23 @@ def main():
except Exception: except Exception:
pass pass
if peak_calibrate_mode:
try:
def _set_c_value(idx: int, text: str):
global CALIBRATION_C
try:
CALIBRATION_C[idx] = float(text.strip())
except Exception:
pass
for idx, ypos in enumerate((0.36, 0.30, 0.24)):
ax_c = fig.add_axes([0.92, ypos, 0.08, 0.045])
tb = TextBox(ax_c, f"C{idx}", initial=f"{float(CALIBRATION_C[idx]):.6g}")
tb.on_submit(lambda text, i=idx: _set_c_value(i, text))
c_boxes.append(tb)
except Exception:
pass
# Для контроля частоты обновления # Для контроля частоты обновления
max_fps = max(1.0, float(args.max_fps)) max_fps = max(1.0, float(args.max_fps))
interval_ms = int(1000.0 / max_fps) interval_ms = int(1000.0 / max_fps)
@ -1431,7 +1542,25 @@ def main():
return base.T # (bins, time) return base.T # (bins, time)
def update(_frame): def update(_frame):
nonlocal frames_since_ylim_update nonlocal frames_since_ylim_update, current_peak_width
if peak_calibrate_mode and any(getattr(tb, "capturekeystrokes", False) for tb in c_boxes):
return (
line_obj,
line_avg1_obj,
line_avg2_obj,
line_calib_obj,
line_norm_obj,
img_obj,
fft_line_obj,
fft_bg_obj,
fft_left_obj,
fft_right_obj,
img_fft_obj,
spec_left_obj,
spec_right_obj,
status_text,
channel_text,
)
changed = drain_queue() > 0 changed = drain_queue() > 0
# Обновление линии последнего свипа # Обновление линии последнего свипа
@ -1486,6 +1615,34 @@ def main():
if finite_x.size > 0: if finite_x.size > 0:
ax_fft.set_xlim(float(np.min(finite_x)), float(np.max(finite_x))) ax_fft.set_xlim(float(np.min(finite_x)), float(np.max(finite_x)))
ax_fft.set_ylim(float(np.nanmin(fft_vals)), float(np.nanmax(fft_vals))) ax_fft.set_ylim(float(np.nanmin(fft_vals)), float(np.nanmax(fft_vals)))
if peak_calibrate_mode:
markers = _find_peak_width_markers(xs_fft[: fft_vals.size], fft_vals)
if markers is not None:
fft_bg_obj.set_ydata([markers["background"], markers["background"]])
fft_left_obj.set_xdata([markers["left"], markers["left"]])
fft_right_obj.set_xdata([markers["right"], markers["right"]])
spec_left_obj.set_ydata([markers["left"], markers["left"]])
spec_right_obj.set_ydata([markers["right"], markers["right"]])
fft_bg_obj.set_visible(True)
fft_left_obj.set_visible(True)
fft_right_obj.set_visible(True)
spec_left_obj.set_visible(True)
spec_right_obj.set_visible(True)
current_peak_width = markers["width"]
else:
fft_bg_obj.set_visible(False)
fft_left_obj.set_visible(False)
fft_right_obj.set_visible(False)
spec_left_obj.set_visible(False)
spec_right_obj.set_visible(False)
current_peak_width = None
else:
fft_bg_obj.set_visible(False)
fft_left_obj.set_visible(False)
fft_right_obj.set_visible(False)
spec_left_obj.set_visible(False)
spec_right_obj.set_visible(False)
current_peak_width = None
# Обновление водопада # Обновление водопада
if changed and ring is not None: if changed and ring is not None:
@ -1535,7 +1692,10 @@ def main():
img_fft_obj.set_clim(vmin=vmin_v, vmax=vmax_eff) img_fft_obj.set_clim(vmin=vmin_v, vmax=vmax_eff)
if changed and current_info: if changed and current_info:
status_text.set_text(_format_status_kv(current_info)) status_payload = dict(current_info)
if peak_calibrate_mode and current_peak_width is not None:
status_payload["peak_w"] = current_peak_width
status_text.set_text(_format_status_kv(status_payload))
chs = current_info.get("chs") if isinstance(current_info, dict) else None chs = current_info.get("chs") if isinstance(current_info, dict) else None
if chs is None: if chs is None:
chs = current_info.get("ch") if isinstance(current_info, dict) else None chs = current_info.get("ch") if isinstance(current_info, dict) else None
@ -1561,7 +1721,12 @@ def main():
line_norm_obj, line_norm_obj,
img_obj, img_obj,
fft_line_obj, fft_line_obj,
fft_bg_obj,
fft_left_obj,
fft_right_obj,
img_fft_obj, img_fft_obj,
spec_left_obj,
spec_right_obj,
status_text, status_text,
channel_text, channel_text,
) )
@ -1576,6 +1741,7 @@ def main():
def run_pyqtgraph(args): def run_pyqtgraph(args):
"""Быстрый GUI на PyQtGraph. Требует pyqtgraph и PyQt5/PySide6.""" """Быстрый GUI на PyQtGraph. Требует pyqtgraph и PyQt5/PySide6."""
peak_calibrate_mode = bool(getattr(args, "calibrate", False))
try: try:
import pyqtgraph as pg import pyqtgraph as pg
from PyQt5 import QtCore, QtWidgets # noqa: F401 from PyQt5 import QtCore, QtWidgets # noqa: F401
@ -1609,8 +1775,14 @@ def run_pyqtgraph(args):
interval_ms = int(1000.0 / max_fps) interval_ms = int(1000.0 / max_fps)
# PyQtGraph настройки # PyQtGraph настройки
pg.setConfigOptions(useOpenGL=True, antialias=False) pg.setConfigOptions(useOpenGL=not peak_calibrate_mode, antialias=False)
app = pg.mkQApp(args.title) app = QtWidgets.QApplication.instance()
if app is None:
app = QtWidgets.QApplication([])
try:
app.setApplicationName(str(args.title))
except Exception:
pass
win = pg.GraphicsLayoutWidget(show=True, title=args.title) win = pg.GraphicsLayoutWidget(show=True, title=args.title)
win.resize(1200, 600) win.resize(1200, 600)
@ -1645,6 +1817,16 @@ def run_pyqtgraph(args):
p_fft = win.addPlot(row=1, col=0, title="FFT") p_fft = win.addPlot(row=1, col=0, title="FFT")
p_fft.showGrid(x=True, y=True, alpha=0.3) p_fft.showGrid(x=True, y=True, alpha=0.3)
curve_fft = p_fft.plot(pen=pg.mkPen((255, 120, 80), width=1)) curve_fft = p_fft.plot(pen=pg.mkPen((255, 120, 80), width=1))
peak_pen = pg.mkPen((255, 0, 0), width=1)
fft_bg_line = pg.InfiniteLine(angle=0, movable=False, pen=peak_pen)
fft_left_line = pg.InfiniteLine(angle=90, movable=False, pen=peak_pen)
fft_right_line = pg.InfiniteLine(angle=90, movable=False, pen=peak_pen)
p_fft.addItem(fft_bg_line)
p_fft.addItem(fft_left_line)
p_fft.addItem(fft_right_line)
fft_bg_line.setVisible(False)
fft_left_line.setVisible(False)
fft_right_line.setVisible(False)
p_fft.setLabel("bottom", "Расстояние, м") p_fft.setLabel("bottom", "Расстояние, м")
p_fft.setLabel("left", "дБ") p_fft.setLabel("left", "дБ")
@ -1660,12 +1842,24 @@ def run_pyqtgraph(args):
p_spec.setLabel("left", "Расстояние, м") p_spec.setLabel("left", "Расстояние, м")
img_fft = pg.ImageItem() img_fft = pg.ImageItem()
p_spec.addItem(img_fft) p_spec.addItem(img_fft)
spec_left_line = pg.InfiniteLine(angle=0, movable=False, pen=peak_pen)
spec_right_line = pg.InfiniteLine(angle=0, movable=False, pen=peak_pen)
p_spec.addItem(spec_left_line)
p_spec.addItem(spec_right_line)
spec_left_line.setVisible(False)
spec_right_line.setVisible(False)
# Чекбокс калибровки # Отдельное окно контролов: GraphicsLayoutWidget не принимает обычные QWidget через addItem.
calib_cb = QtWidgets.QCheckBox("калибровка") calib_cb = QtWidgets.QCheckBox("калибровка")
cb_proxy = QtWidgets.QGraphicsProxyWidget() control_window = QtWidgets.QWidget()
cb_proxy.setWidget(calib_cb) try:
win.addItem(cb_proxy, row=2, col=1) control_window.setWindowTitle(f"{args.title} controls")
except Exception:
pass
control_layout = QtWidgets.QVBoxLayout(control_window)
control_layout.setContentsMargins(8, 8, 8, 8)
control_layout.setSpacing(6)
control_layout.addWidget(calib_cb)
# Статусная строка (внизу окна) # Статусная строка (внизу окна)
status = pg.LabelItem(justify="left") status = pg.LabelItem(justify="left")
@ -1694,7 +1888,9 @@ def run_pyqtgraph(args):
spec_clip = _parse_spec_clip(getattr(args, "spec_clip", None)) spec_clip = _parse_spec_clip(getattr(args, "spec_clip", None))
spec_mean_sec = float(getattr(args, "spec_mean_sec", 0.0)) spec_mean_sec = float(getattr(args, "spec_mean_sec", 0.0))
calib_enabled = False calib_enabled = False
current_peak_width: Optional[float] = None
norm_type = str(getattr(args, "norm_type", "projector")).strip().lower() norm_type = str(getattr(args, "norm_type", "projector")).strip().lower()
c_edits = []
# Диапазон по Y: авто по умолчанию (поддерживает отрицательные значения) # Диапазон по Y: авто по умолчанию (поддерживает отрицательные значения)
fixed_ylim: Optional[Tuple[float, float]] = None fixed_ylim: Optional[Tuple[float, float]] = None
if args.ylim: if args.ylim:
@ -1725,6 +1921,42 @@ def run_pyqtgraph(args):
except Exception: except Exception:
pass pass
if peak_calibrate_mode:
try:
c_widget = QtWidgets.QWidget()
c_layout = QtWidgets.QFormLayout(c_widget)
c_layout.setContentsMargins(0, 0, 0, 0)
def _apply_c_value(idx: int, edit):
global CALIBRATION_C
try:
CALIBRATION_C[idx] = float(edit.text().strip())
except Exception:
try:
edit.setText(f"{float(CALIBRATION_C[idx]):.6g}")
except Exception:
pass
for idx in range(3):
edit = QtWidgets.QLineEdit(f"{float(CALIBRATION_C[idx]):.6g}")
try:
edit.setMaximumWidth(120)
except Exception:
pass
try:
edit.editingFinished.connect(lambda i=idx, e=edit: _apply_c_value(i, e))
except Exception:
pass
c_layout.addRow(f"C{idx}", edit)
c_edits.append(edit)
control_layout.addWidget(c_widget)
except Exception:
pass
try:
control_window.show()
except Exception:
pass
def ensure_buffer(_w: int): def ensure_buffer(_w: int):
nonlocal ring, ring_time, head, width, x_shared, ring_fft, distance_shared nonlocal ring, ring_time, head, width, x_shared, ring_fft, distance_shared
if ring is not None: if ring is not None:
@ -1875,6 +2107,9 @@ def run_pyqtgraph(args):
pass pass
def update(): def update():
nonlocal current_peak_width
if peak_calibrate_mode and any(edit.hasFocus() for edit in c_edits):
return
changed = drain_queue() > 0 changed = drain_queue() > 0
if current_sweep_raw is not None and x_shared is not None: if current_sweep_raw is not None and x_shared is not None:
if current_freqs is not None and current_freqs.size == current_sweep_raw.size: if current_freqs is not None and current_freqs.size == current_sweep_raw.size:
@ -1923,6 +2158,34 @@ def run_pyqtgraph(args):
if finite_x.size > 0: if finite_x.size > 0:
p_fft.setXRange(float(np.min(finite_x)), float(np.max(finite_x)), padding=0) p_fft.setXRange(float(np.min(finite_x)), float(np.max(finite_x)), padding=0)
p_fft.setYRange(float(np.nanmin(fft_vals)), float(np.nanmax(fft_vals)), padding=0) p_fft.setYRange(float(np.nanmin(fft_vals)), float(np.nanmax(fft_vals)), padding=0)
if peak_calibrate_mode:
markers = _find_peak_width_markers(xs_fft[: fft_vals.size], fft_vals)
if markers is not None:
fft_bg_line.setValue(markers["background"])
fft_left_line.setValue(markers["left"])
fft_right_line.setValue(markers["right"])
spec_left_line.setValue(markers["left"])
spec_right_line.setValue(markers["right"])
fft_bg_line.setVisible(True)
fft_left_line.setVisible(True)
fft_right_line.setVisible(True)
spec_left_line.setVisible(True)
spec_right_line.setVisible(True)
current_peak_width = markers["width"]
else:
fft_bg_line.setVisible(False)
fft_left_line.setVisible(False)
fft_right_line.setVisible(False)
spec_left_line.setVisible(False)
spec_right_line.setVisible(False)
current_peak_width = None
else:
fft_bg_line.setVisible(False)
fft_left_line.setVisible(False)
fft_right_line.setVisible(False)
spec_left_line.setVisible(False)
spec_right_line.setVisible(False)
current_peak_width = None
if changed and ring is not None: if changed and ring is not None:
disp = ring if head == 0 else np.roll(ring, -head, axis=0) disp = ring if head == 0 else np.roll(ring, -head, axis=0)
@ -1935,7 +2198,10 @@ def run_pyqtgraph(args):
if changed and current_info: if changed and current_info:
try: try:
status.setText(_format_status_kv(current_info)) status_payload = dict(current_info)
if peak_calibrate_mode and current_peak_width is not None:
status_payload["peak_w"] = current_peak_width
status.setText(_format_status_kv(status_payload))
except Exception: except Exception:
pass pass
try: try:
@ -2007,6 +2273,10 @@ def run_pyqtgraph(args):
def on_quit(): def on_quit():
stop_event.set() stop_event.set()
reader.join(timeout=1.0) reader.join(timeout=1.0)
try:
control_window.close()
except Exception:
pass
app.aboutToQuit.connect(on_quit) app.aboutToQuit.connect(on_quit)
win.show() win.show()