From 9a9279c8f8724021c532bdb5b3d9b36460d0ca8d Mon Sep 17 00:00:00 2001 From: Theodor Chikin Date: Tue, 10 Mar 2026 14:50:58 +0300 Subject: [PATCH] simplified refactoring --- RFG_ADC_dataplotter.py | 3389 +--------------------------------------- 1 file changed, 3 insertions(+), 3386 deletions(-) mode change 100755 => 100644 RFG_ADC_dataplotter.py diff --git a/RFG_ADC_dataplotter.py b/RFG_ADC_dataplotter.py old mode 100755 new mode 100644 index d9c286d..225e4d3 --- a/RFG_ADC_dataplotter.py +++ b/RFG_ADC_dataplotter.py @@ -1,3391 +1,8 @@ #!/usr/bin/env python3 -""" -Реалтайм-плоттер для свипов из виртуального COM-порта. +"""Backward-compatible launcher for the refactored dataplotter.""" -Формат строк: - - "Sweep_start" — начало нового свипа (предыдущий считается завершённым) - - "s CH X Y" — точка (номер канала, индекс X, значение Y), все целые со знаком - -Отрисовываются два графика: - - Левый: последний полученный свип (Y vs X) - - Правый: водопад (последние N свипов во времени) - -Оптимизации для скорости: - - Парсинг и чтение в фоновой нити - - Анимация с обновлением только данных (без лишнего пересоздания фигур) - - Кольцевой буфер под водопад с фиксированным числом свипов - -Зависимости: matplotlib, numpy. PySerial опционален — при его отсутствии -используется сырой доступ к TTY через termios. -""" - -import argparse -import io -import os -import signal -import sys -import threading -import time -from collections import deque -from queue import Queue, Empty, Full -from typing import Any, Dict, List, Mapping, Optional, Tuple, Union - -import numpy as np - -WF_WIDTH = 1000 # максимальное число точек в ряду водопада -FFT_LEN = 1024 # длина БПФ для спектра/водопада спектров -SWEEP_FREQ_MIN_GHZ = 3.3 -SWEEP_FREQ_MAX_GHZ = 14.3 -LOG_BASE = 10.0 -LOG_SCALER = 0.001 # int32 значения приходят в fixed-point лог-шкале с шагом 1e-3 -LOG_POSTSCALER = 10 -LOG_EXP_LIMIT = 300.0 # запас до переполнения float64 при возведении LOG_BASE в степень -C_M_S = 299_792_458.0 -# Порог для инверсии сырых данных: если среднее значение свипа ниже порога — -# считаем, что сигнал «меньше нуля» и домножаем свип на -1 -DATA_INVERSION_THRASHOLD = 10.0 - -Number = Union[int, float] -SweepInfo = Dict[str, Any] -SweepData = Dict[str, np.ndarray] -SweepAuxCurves = Optional[Tuple[np.ndarray, np.ndarray]] -SweepPacket = Tuple[np.ndarray, SweepInfo, SweepAuxCurves] - - -def recalculate_calibration_c( - base_coeffs: np.ndarray, - f_min: float = SWEEP_FREQ_MIN_GHZ, - f_max: float = SWEEP_FREQ_MAX_GHZ, -) -> np.ndarray: - """Пересчитать коэффициенты так, чтобы калибровка сохраняла края диапазона свипа.""" - coeffs = np.asarray(base_coeffs, dtype=np.float64).reshape(-1) - if coeffs.size < 3: - out = np.zeros((3,), dtype=np.float64) - out[: coeffs.size] = coeffs - coeffs = out - c0, c1, c2 = float(coeffs[0]), float(coeffs[1]), float(coeffs[2]) - x0 = float(f_min) - x1 = float(f_max) - y0 = c0 + c1 * x0 + c2 * (x0 ** 2) - y1 = c0 + c1 * x1 + c2 * (x1 ** 2) - if not (np.isfinite(y0) and np.isfinite(y1)) or y1 == y0: - return np.asarray([c0, c1, c2], dtype=np.float64) - scale = (x1 - x0) / (y1 - y0) - shift = x0 - scale * y0 - return np.asarray( - [ - shift + scale * c0, - scale * c1, - scale * c2, - ], - dtype=np.float64, - ) - - -CALIBRATION_C_BASE = np.asarray([0.0, 1.0, 0.025], dtype=np.float64) -CALIBRATION_C = recalculate_calibration_c(CALIBRATION_C_BASE) - - - -def _format_status_kv(data: Mapping[str, Any]) -> str: - """Преобразовать словарь метрик в одну строку 'k:v'.""" - - def _fmt(v: Any) -> str: - if v is None: - return "NA" - try: - fv = float(v) - except Exception: - return str(v) - if not np.isfinite(fv): - return "nan" - # Достаточно компактно для статус-строки. - if abs(fv) >= 1000 or (0 < abs(fv) < 0.01): - return f"{fv:.3g}" - return f"{fv:.3f}".rstrip("0").rstrip(".") - - parts = [f"{k}:{_fmt(v)}" for k, v in data.items()] - return " ".join(parts) - - -def _parse_spec_clip(spec: Optional[str]) -> Optional[Tuple[float, float]]: - """Разобрать строку вида "low,high" процентов для контрастного отображения водопада спектров. - - Возвращает пару (low, high) или None для отключения. Допустимы значения 0..100, low < high. - Ключевые слова отключения: "off", "none", "no". - """ - if not spec: - return None - s = str(spec).strip().lower() - if s in ("off", "none", "no"): - return None - try: - p0, p1 = s.replace(";", ",").split(",") - low = float(p0) - high = float(p1) - if not (0.0 <= low < high <= 100.0): - return None - return (low, high) - except Exception: - return None - - -def _log_value_to_linear(value: int) -> float: - """Преобразовать fixed-point логарифмическое значение в линейную шкалу.""" - exponent = max(-LOG_EXP_LIMIT, min(LOG_EXP_LIMIT, float(value) * LOG_SCALER)) - return float(LOG_BASE ** exponent) - - -def _log_pair_to_sweep(avg_1: int, avg_2: int) -> float: - """Разность двух логарифмических усреднений в линейной шкале.""" - return (_log_value_to_linear(avg_1) - _log_value_to_linear(avg_2))*LOG_POSTSCALER - - -def _compute_auto_ylim(*series_list: Optional[np.ndarray]) -> Optional[Tuple[float, float]]: - """Общий Y-диапазон по всем переданным кривым с небольшим запасом.""" - y_min: Optional[float] = None - y_max: Optional[float] = None - for series in series_list: - if series is None: - continue - arr = np.asarray(series) - if arr.size == 0: - continue - finite = arr[np.isfinite(arr)] - if finite.size == 0: - continue - cur_min = float(np.min(finite)) - cur_max = float(np.max(finite)) - y_min = cur_min if y_min is None else min(y_min, cur_min) - y_max = cur_max if y_max is None else max(y_max, cur_max) - - if y_min is None or y_max is None: - return None - if y_min == y_max: - pad = max(1.0, abs(y_min) * 0.05) - else: - pad = 0.05 * (y_max - y_min) - return (y_min - pad, y_max + pad) - - -def calibrate_freqs(sweep: Mapping[str, Any]) -> SweepData: - """Вернуть копию свипа с применённой калибровкой частотной оси.""" - F = np.asarray(sweep["F"], dtype=np.float64).copy() - I = np.asarray(sweep["I"], dtype=np.float64).copy() - C = np.asarray(CALIBRATION_C, dtype=np.float64) - if F.size > 0: - F = C[0] + C[1] * F + C[2] * (F * F) - - if F.size >= 2: - F_cal = np.linspace(float(F[0]), float(F[-1]), F.size, dtype=np.float64) - I_cal = np.interp(F_cal, F, I).astype(np.float64) - else: - F_cal = F.copy() - I_cal = I.copy() - - return { - "F": F_cal, - "I": I_cal, - } - -def _prepare_fft_segment( - sweep: np.ndarray, - freqs: Optional[np.ndarray], - fft_len: int = FFT_LEN, -) -> Optional[Tuple[np.ndarray, int]]: - """Подготовить свип к FFT, пересэмплируя его на равномерную сетку по частоте.""" - take_fft = min(int(sweep.size), int(fft_len)) - if take_fft <= 0: - return None - - sweep_seg = np.asarray(sweep[:take_fft], dtype=np.float32) - fallback = np.nan_to_num(sweep_seg, nan=0.0).astype(np.float32, copy=False) - if freqs is None: - return fallback, take_fft - - freq_arr = np.asarray(freqs) - if freq_arr.size < take_fft: - return fallback, take_fft - - freq_seg = np.asarray(freq_arr[:take_fft], dtype=np.float64) - valid = np.isfinite(sweep_seg) & np.isfinite(freq_seg) - if int(np.count_nonzero(valid)) < 2: - return fallback, take_fft - - x_valid = freq_seg[valid] - y_valid = sweep_seg[valid] - order = np.argsort(x_valid, kind="mergesort") - x_valid = x_valid[order] - y_valid = y_valid[order] - x_unique, unique_idx = np.unique(x_valid, return_index=True) - y_unique = y_valid[unique_idx] - if x_unique.size < 2 or x_unique[-1] <= x_unique[0]: - return fallback, take_fft - - x_uniform = np.linspace(float(x_unique[0]), float(x_unique[-1]), take_fft, dtype=np.float64) - resampled = np.interp(x_uniform, x_unique, y_unique).astype(np.float32) - return resampled, take_fft - - -def _fft_mag_to_db(mag: np.ndarray) -> np.ndarray: - """Перевод модуля спектра в дБ с отсечкой отрицательных значений после вычитания фона.""" - mag_arr = np.asarray(mag, dtype=np.float32) - safe_mag = np.maximum(mag_arr, 0.0) - return (20.0 * np.log10(safe_mag + 1e-9)).astype(np.float32, copy=False) - - -def _compute_fft_mag_row( - sweep: np.ndarray, - freqs: Optional[np.ndarray], - bins: int, -) -> np.ndarray: - """Посчитать линейный модуль FFT-строки, используя калиброванную частотную ось.""" - if bins <= 0: - return np.zeros((0,), dtype=np.float32) - - prepared = _prepare_fft_segment(sweep, freqs, fft_len=FFT_LEN) - if prepared is None: - return np.full((bins,), np.nan, dtype=np.float32) - - fft_seg, take_fft = prepared - fft_in = np.zeros((FFT_LEN,), dtype=np.float32) - win = np.hanning(take_fft).astype(np.float32) - fft_in[:take_fft] = fft_seg * win - spec = np.fft.ifft(fft_in) - mag = np.abs(spec).astype(np.float32) - if mag.shape[0] != bins: - mag = mag[:bins] - return mag - - -def _compute_fft_row( - sweep: np.ndarray, - freqs: Optional[np.ndarray], - bins: int, -) -> np.ndarray: - """Посчитать FFT-строку в дБ.""" - return _fft_mag_to_db(_compute_fft_mag_row(sweep, freqs, bins)) - - -def _compute_distance_axis(freqs: Optional[np.ndarray], bins: int) -> np.ndarray: - """Рассчитать ось расстояния для IFFT по равномерной частотной сетке.""" - if bins <= 0: - return np.zeros((0,), dtype=np.float64) - - if freqs is None: - return np.arange(bins, dtype=np.float64) - - freq_arr = np.asarray(freqs, dtype=np.float64) - finite = freq_arr[np.isfinite(freq_arr)] - if finite.size < 2: - return np.arange(bins, dtype=np.float64) - - df_ghz = float((finite[-1] - finite[0]) / max(1, finite.size - 1)) - df_hz = abs(df_ghz) * 1e9 - if not np.isfinite(df_hz) or df_hz <= 0.0: - return np.arange(bins, dtype=np.float64) - - step_m = C_M_S / (2.0 * FFT_LEN * df_hz) - return np.arange(bins, dtype=np.float64) * step_m - - -def _find_peak_width_markers(xs: np.ndarray, ys: np.ndarray) -> Optional[Dict[str, float]]: - """Найти главный ненулевой пик и его ширину по уровню половины высоты над фоном.""" - x_arr = np.asarray(xs, dtype=np.float64) - y_arr = np.asarray(ys, dtype=np.float64) - valid = np.isfinite(x_arr) & np.isfinite(y_arr) & (x_arr > 0.0) - if int(np.count_nonzero(valid)) < 3: - return None - - x = x_arr[valid] - y = y_arr[valid] - x_min = float(x[0]) - x_max = float(x[-1]) - x_span = x_max - x_min - central_mask = (x >= (x_min + 0.25 * x_span)) & (x <= (x_min + 0.75 * x_span)) - if int(np.count_nonzero(central_mask)) > 0: - central_idx = np.flatnonzero(central_mask) - peak_idx = int(central_idx[int(np.argmax(y[central_mask]))]) - else: - peak_idx = int(np.argmax(y)) - peak_y = float(y[peak_idx]) - shoulder_gap = max(1, min(8, y.size // 64 if y.size > 0 else 1)) - shoulder_width = max(4, min(32, y.size // 16 if y.size > 0 else 4)) - left_lo = max(0, peak_idx - shoulder_gap - shoulder_width) - left_hi = max(0, peak_idx - shoulder_gap) - right_lo = min(y.size, peak_idx + shoulder_gap + 1) - right_hi = min(y.size, right_lo + shoulder_width) - background_parts = [] - if left_hi > left_lo: - background_parts.append(float(np.nanmedian(y[left_lo:left_hi]))) - if right_hi > right_lo: - background_parts.append(float(np.nanmedian(y[right_lo:right_hi]))) - if background_parts: - background = float(np.mean(background_parts)) - else: - background = float(np.nanpercentile(y, 10)) - if not np.isfinite(peak_y) or not np.isfinite(background) or peak_y <= background: - return None - - half_level = background + 0.5 * (peak_y - background) - - def _interp_cross(x0: float, y0: float, x1: float, y1: float) -> float: - if not (np.isfinite(x0) and np.isfinite(y0) and np.isfinite(x1) and np.isfinite(y1)): - return x1 - dy = y1 - y0 - if dy == 0.0: - return x1 - t = (half_level - y0) / dy - t = min(1.0, max(0.0, t)) - return x0 + t * (x1 - x0) - - left_x = float(x[0]) - for i in range(peak_idx, 0, -1): - if y[i - 1] <= half_level <= y[i]: - left_x = _interp_cross(float(x[i - 1]), float(y[i - 1]), float(x[i]), float(y[i])) - break - else: - left_x = float(x[0]) - - right_x = float(x[-1]) - for i in range(peak_idx, x.size - 1): - if y[i] >= half_level >= y[i + 1]: - right_x = _interp_cross(float(x[i]), float(y[i]), float(x[i + 1]), float(y[i + 1])) - break - else: - right_x = float(x[-1]) - - width = right_x - left_x - if not np.isfinite(width) or width <= 0.0: - return None - - return { - "background": background, - "left": left_x, - "right": right_x, - "width": width, - "amplitude": peak_y, - } - - -def _rolling_median_ref(xs: np.ndarray, ys: np.ndarray, window_ghz: float) -> np.ndarray: - """Скользящая медиана по оси частоты/расстояния в окне фиксированной ширины.""" - x = np.asarray(xs, dtype=np.float64) - y = np.asarray(ys, dtype=np.float64) - out = np.full(y.shape, np.nan, dtype=np.float64) - if x.size == 0 or y.size == 0 or x.size != y.size: - return out - w = float(window_ghz) - if not np.isfinite(w) or w <= 0.0: - return out - half = 0.5 * w - for i in range(x.size): - xi = x[i] - if not np.isfinite(xi): - continue - left = np.searchsorted(x, xi - half, side="left") - right = np.searchsorted(x, xi + half, side="right") - if right <= left: - continue - seg = y[left:right] - finite = np.isfinite(seg) - if not np.any(finite): - continue - out[i] = float(np.nanmedian(seg)) - return out - - -def _find_top_peaks_over_ref( - xs: np.ndarray, - ys: np.ndarray, - ref: np.ndarray, - top_n: int = 3, -) -> List[Dict[str, float]]: - """Найти top-N пиков по превышению над референсной кривой.""" - x = np.asarray(xs, dtype=np.float64) - y = np.asarray(ys, dtype=np.float64) - r = np.asarray(ref, dtype=np.float64) - if x.size < 3 or y.size != x.size or r.size != x.size: - return [] - - valid = np.isfinite(x) & np.isfinite(y) & np.isfinite(r) - if not np.any(valid): - return [] - d = np.full_like(y, np.nan, dtype=np.float64) - d[valid] = y[valid] - r[valid] - - cand: List[int] = [] - for i in range(1, x.size - 1): - if not (np.isfinite(d[i - 1]) and np.isfinite(d[i]) and np.isfinite(d[i + 1])): - continue - if d[i] <= 0.0: - continue - left_ok = d[i] > d[i - 1] - right_ok = d[i] >= d[i + 1] - alt_left_ok = d[i] >= d[i - 1] - alt_right_ok = d[i] > d[i + 1] - if (left_ok and right_ok) or (alt_left_ok and alt_right_ok): - cand.append(i) - if not cand: - return [] - - cand.sort(key=lambda i: float(d[i]), reverse=True) - - def _interp_cross(x0: float, y0: float, x1: float, y1: float, y_cross: float) -> float: - dy = y1 - y0 - if not np.isfinite(dy) or dy == 0.0: - return x1 - t = (y_cross - y0) / dy - t = min(1.0, max(0.0, t)) - return x0 + t * (x1 - x0) - - picked: List[Dict[str, float]] = [] - for idx in cand: - peak_y = float(y[idx]) - peak_ref = float(r[idx]) - peak_h = float(d[idx]) - if not (np.isfinite(peak_y) and np.isfinite(peak_ref) and np.isfinite(peak_h)) or peak_h <= 0.0: - continue - - half_level = peak_ref + 0.5 * peak_h - - left_x = float(x[0]) - found_left = False - for i in range(idx, 0, -1): - y0 = float(y[i - 1]) - y1 = float(y[i]) - if np.isfinite(y0) and np.isfinite(y1) and (y0 <= half_level <= y1): - left_x = _interp_cross(float(x[i - 1]), y0, float(x[i]), y1, half_level) - found_left = True - break - if not found_left: - left_x = float(x[0]) - - right_x = float(x[-1]) - found_right = False - for i in range(idx, x.size - 1): - y0 = float(y[i]) - y1 = float(y[i + 1]) - if np.isfinite(y0) and np.isfinite(y1) and (y0 >= half_level >= y1): - right_x = _interp_cross(float(x[i]), y0, float(x[i + 1]), y1, half_level) - found_right = True - break - if not found_right: - right_x = float(x[-1]) - - width = float(right_x - left_x) - if not np.isfinite(width) or width <= 0.0: - continue - - overlap = False - for p in picked: - if not (right_x <= p["left"] or left_x >= p["right"]): - overlap = True - break - if overlap: - continue - - picked.append( - { - "x": float(x[idx]), - "peak_y": peak_y, - "ref": peak_ref, - "height": peak_h, - "left": left_x, - "right": right_x, - "width": width, - } - ) - if len(picked) >= int(max(1, top_n)): - break - - picked.sort(key=lambda p: p["x"]) - return picked - - -def _normalize_sweep_simple(raw: np.ndarray, calib: np.ndarray) -> np.ndarray: - """Простая нормировка: поэлементное деление raw/calib.""" - w = min(raw.size, calib.size) - if w <= 0: - return raw - out = np.full_like(raw, np.nan, dtype=np.float32) - with np.errstate(divide="ignore", invalid="ignore"): - out[:w] = raw[:w] / calib[:w] - out = np.nan_to_num(out, nan=np.nan, posinf=np.nan, neginf=np.nan) - return out - - -def _build_calib_envelopes(calib: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: - """Оценить нижнюю/верхнюю огибающие калибровочной кривой.""" - n = int(calib.size) - if n <= 0: - empty = np.zeros((0,), dtype=np.float32) - return empty, empty - - y = np.asarray(calib, dtype=np.float32) - finite = np.isfinite(y) - if not np.any(finite): - zeros = np.zeros_like(y, dtype=np.float32) - return zeros, zeros - - if not np.all(finite): - x = np.arange(n, dtype=np.float32) - y = y.copy() - y[~finite] = np.interp(x[~finite], x[finite], y[finite]).astype(np.float32) - - if n < 3: - return y.copy(), y.copy() - - dy = np.diff(y) - s = np.sign(dy).astype(np.int8, copy=False) - - if np.any(s == 0): - for i in range(1, s.size): - if s[i] == 0: - s[i] = s[i - 1] - for i in range(s.size - 2, -1, -1): - if s[i] == 0: - s[i] = s[i + 1] - s[s == 0] = 1 - - max_idx = np.where((s[:-1] > 0) & (s[1:] < 0))[0] + 1 - min_idx = np.where((s[:-1] < 0) & (s[1:] > 0))[0] + 1 - - x = np.arange(n, dtype=np.float32) - - def _interp_nodes(nodes: np.ndarray) -> np.ndarray: - if nodes.size == 0: - idx = np.array([0, n - 1], dtype=np.int64) - else: - idx = np.unique(np.concatenate(([0], nodes, [n - 1]))).astype(np.int64) - return np.interp(x, idx.astype(np.float32), y[idx]).astype(np.float32) - - upper = _interp_nodes(max_idx) - lower = _interp_nodes(min_idx) - - swap = lower > upper - if np.any(swap): - tmp = upper[swap].copy() - upper[swap] = lower[swap] - lower[swap] = tmp - - return lower, upper - - -def _normalize_sweep_projector(raw: np.ndarray, calib: np.ndarray) -> np.ndarray: - """Нормировка через проекцию между огибающими калибровки в диапазон [-1, +1].""" - w = min(raw.size, calib.size) - if w <= 0: - return raw - - out = np.full_like(raw, np.nan, dtype=np.float32) - raw_seg = np.asarray(raw[:w], dtype=np.float32) - lower, upper = _build_calib_envelopes(np.asarray(calib[:w], dtype=np.float32)) - span = upper - lower - - finite_span = span[np.isfinite(span) & (span > 0)] - if finite_span.size > 0: - eps = max(float(np.median(finite_span)) * 1e-6, 1e-9) - else: - eps = 1e-9 - - valid = ( - np.isfinite(raw_seg) - & np.isfinite(lower) - & np.isfinite(upper) - & (span > eps) - ) - if np.any(valid): - proj = np.empty_like(raw_seg, dtype=np.float32) - proj[valid] = ((2.0 * (raw_seg[valid] - lower[valid]) / span[valid]) - 1.0) * 1000.0 - proj[valid] = np.clip(proj[valid], -1000.0, 1000.0) - proj[~valid] = np.nan - out[:w] = proj - - return out - - -def _normalize_by_calib(raw: np.ndarray, calib: np.ndarray, norm_type: str) -> np.ndarray: - """Нормировка свипа по выбранному алгоритму.""" - nt = str(norm_type).strip().lower() - if nt == "simple": - return _normalize_sweep_simple(raw, calib) - return _normalize_sweep_projector(raw, calib) - - -def try_open_pyserial(path: str, baud: int, timeout: float): - try: - import serial # type: ignore - except Exception: - return None - try: - ser = serial.Serial(path, baudrate=baud, timeout=timeout) - return ser - except Exception: - return None - - -class FDReader: - """Простой враппер чтения строк из файлового дескриптора TTY.""" - - def __init__(self, fd: int): - # Отдельно буферизуем для корректной readline() - self._fd = fd - raw = os.fdopen(fd, "rb", closefd=False) - self._file = raw - self._buf = io.BufferedReader(raw, buffer_size=65536) - - def fileno(self) -> int: - return self._fd - - def readline(self) -> bytes: - return self._buf.readline() - - def close(self): - try: - self._buf.close() - except Exception: - pass - - -def open_raw_tty(path: str, baud: int) -> Optional[FDReader]: - """Открыть TTY без pyserial и настроить порт через termios. - - Возвращает FDReader или None при ошибке. - """ - try: - import termios - import tty - except Exception: - return None - - try: - fd = os.open(path, os.O_RDONLY | os.O_NOCTTY) - except Exception: - return None - - try: - attrs = termios.tcgetattr(fd) - # Установим «сырое» состояние - tty.setraw(fd) - - # Скорость - baud_map = { - 9600: termios.B9600, - 19200: termios.B19200, - 38400: termios.B38400, - 57600: termios.B57600, - 115200: termios.B115200, - 230400: getattr(termios, "B230400", None), - 460800: getattr(termios, "B460800", None), - } - b = baud_map.get(baud) or termios.B115200 - - attrs[4] = b # ispeed - attrs[5] = b # ospeed - - # VMIN=1, VTIME=0 — блокирующее чтение по байту - cc = attrs[6] - cc[termios.VMIN] = 1 - cc[termios.VTIME] = 0 - attrs[6] = cc - - termios.tcsetattr(fd, termios.TCSANOW, attrs) - except Exception: - try: - os.close(fd) - except Exception: - pass - return None - - return FDReader(fd) - - -class SerialLineSource: - """Единый интерфейс для чтения строк из порта (pyserial или raw TTY).""" - - def __init__(self, path: str, baud: int, timeout: float = 1.0): - self._pyserial = try_open_pyserial(path, baud, timeout) - #self._pyserial = None - self._fdreader = None - self._using = "pyserial" if self._pyserial is not None else "raw" - if self._pyserial is None: - self._fdreader = open_raw_tty(path, baud) - if self._fdreader is None: - msg = f"Не удалось открыть порт '{path}' (pyserial и raw TTY не сработали)" - if sys.platform.startswith("win"): - msg += ". На Windows нужен pyserial: pip install pyserial" - raise RuntimeError(msg) - - def readline(self) -> bytes: - if self._pyserial is not None: - try: - return self._pyserial.readline() - except Exception: - return b"" - else: - try: - return self._fdreader.readline() # type: ignore[union-attr] - except Exception: - return b"" - - def close(self): - try: - if self._pyserial is not None: - self._pyserial.close() - elif self._fdreader is not None: - self._fdreader.close() - except Exception: - pass - - -class SerialChunkReader: - """Быстрое неблокирующее чтение чанков из serial/raw TTY для максимального дренажа буфера.""" - - def __init__(self, src: SerialLineSource): - self._src = src - self._ser = src._pyserial - self._fd: Optional[int] = None - if self._ser is not None: - # Неблокирующий режим для быстрой откачки - try: - self._ser.timeout = 0 - except Exception: - pass - else: - try: - self._fd = src._fdreader.fileno() # type: ignore[union-attr] - try: - os.set_blocking(self._fd, False) - except Exception: - pass - except Exception: - self._fd = None - - def read_available(self) -> bytes: - """Вернёт доступные байты (b"" если данных нет).""" - if self._ser is not None: - try: - n = int(getattr(self._ser, "in_waiting", 0)) - except Exception: - n = 0 - if n > 0: - try: - return self._ser.read(n) - except Exception: - return b"" - return b"" - if self._fd is None: - return b"" - out = bytearray() - while True: - try: - chunk = os.read(self._fd, 65536) - if not chunk: - break - out += chunk - if len(chunk) < 65536: - break - except BlockingIOError: - break - except Exception: - break - return bytes(out) - - -class SweepReader(threading.Thread): - """Фоновый поток: читает строки, формирует завершённые свипы и кладёт в очередь.""" - - def __init__( - self, - port_path: str, - baud: int, - out_queue: Queue[SweepPacket], - stop_event: threading.Event, - fancy: bool = False, - bin_mode: bool = False, - logscale: bool = False, - parser_16_bit_x2: bool = False, - parser_test: bool = False, - ): - super().__init__(daemon=True) - self._port_path = port_path - self._baud = baud - self._q = out_queue - self._stop = stop_event - self._src: Optional[SerialLineSource] = None - self._fancy = bool(fancy) - self._bin_mode = bool(bin_mode) - self._logscale = bool(logscale) - self._parser_16_bit_x2 = bool(parser_16_bit_x2) - self._parser_test = bool(parser_test) - self._max_width: int = 0 - self._sweep_idx: int = 0 - self._last_sweep_ts: Optional[float] = None - self._n_valid_hist = deque() - - @staticmethod - def _u32_to_i32(v: int) -> int: - """Преобразование 32-bit слова в знаковое значение.""" - return v - 0x1_0000_0000 if (v & 0x8000_0000) else v - - @staticmethod - def _u16_to_i16(v: int) -> int: - """Преобразование 16-bit слова в знаковое значение.""" - return v - 0x1_0000 if (v & 0x8000) else v - - def _finalize_current( - self, - xs, - ys, - channels: Optional[set[int]], - raw_curves: Optional[Tuple[list[int], list[int]]] = None, - apply_inversion: bool = True, - ): - if not xs: - return - ch_list = sorted(channels) if channels else [0] - ch_primary = ch_list[0] if ch_list else 0 - max_x = max(xs) - width = max_x + 1 - self._max_width = max(self._max_width, width) - target_width = self._max_width if self._fancy else width - - def _scatter(values, dtype) -> np.ndarray: - series = np.full((target_width,), np.nan, dtype=dtype) - try: - idx = np.asarray(xs, dtype=np.int64) - vals = np.asarray(values, dtype=dtype) - series[idx] = vals - except Exception: - for x, y in zip(xs, values): - if 0 <= x < target_width: - series[x] = y - return series - - def _fill_missing(series: np.ndarray): - known = ~np.isnan(series) - if not np.any(known): - return - known_idx = np.nonzero(known)[0] - for i0, i1 in zip(known_idx[:-1], known_idx[1:]): - if i1 - i0 > 1: - avg = (series[i0] + series[i1]) * 0.5 - series[i0 + 1 : i1] = avg - first_idx = int(known_idx[0]) - last_idx = int(known_idx[-1]) - if first_idx > 0: - series[:first_idx] = series[first_idx] - if last_idx < series.size - 1: - series[last_idx + 1 :] = series[last_idx] - - # Быстрый векторизованный путь - sweep = _scatter(ys, np.float32) - aux_curves: SweepAuxCurves = None - if raw_curves is not None: - aux_curves = ( - _scatter(raw_curves[0], np.float32), - _scatter(raw_curves[1], np.float32), - ) - # Метрики валидных точек до заполнения пропусков - finite_pre = np.isfinite(sweep) - n_valid_cur = int(np.count_nonzero(finite_pre)) - - # Дополнительная обработка пропусков: при --fancy заполняем внутренние разрывы, края и дотягиваем до максимальной длины - if self._fancy: - try: - _fill_missing(sweep) - if aux_curves is not None: - _fill_missing(aux_curves[0]) - _fill_missing(aux_curves[1]) - except Exception: - # В случае ошибки просто оставляем как есть - pass - # Инверсия данных при «отрицательном» уровне (среднее ниже порога) - if apply_inversion: - try: - m = float(np.nanmean(sweep)) - if np.isfinite(m) and m < DATA_INVERSION_THRASHOLD: - sweep *= -1.0 - except Exception: - pass - #sweep = np.abs(sweep) - #sweep -= float(np.nanmean(sweep)) - - # Метрики для статусной строки (вид словаря: переменная -> значение) - self._sweep_idx += 1 - if len(ch_list) > 1: - sys.stderr.write( - f"[warn] Sweep {self._sweep_idx}: изменялся номер канала: {ch_list}\n" - ) - now = time.time() - if self._last_sweep_ts is None: - dt_ms = float("nan") - else: - dt_ms = (now - self._last_sweep_ts) * 1000.0 - self._last_sweep_ts = now - self._n_valid_hist.append((now, n_valid_cur)) - while self._n_valid_hist and (now - self._n_valid_hist[0][0]) > 1.0: - self._n_valid_hist.popleft() - if self._n_valid_hist: - n_valid = float(sum(v for _t, v in self._n_valid_hist) / len(self._n_valid_hist)) - else: - n_valid = float(n_valid_cur) - - if n_valid_cur > 0: - vmin = float(np.nanmin(sweep)) - vmax = float(np.nanmax(sweep)) - mean = float(np.nanmean(sweep)) - std = float(np.nanstd(sweep)) - else: - vmin = vmax = mean = std = float("nan") - info: SweepInfo = { - "sweep": self._sweep_idx, - "ch": ch_primary, - "chs": ch_list, - "n_valid": n_valid, - "min": vmin, - "max": vmax, - "mean": mean, - "std": std, - "dt_ms": dt_ms, - } - - # Кладём готовый свип (если очередь полна — выбрасываем самый старый) - try: - self._q.put_nowait((sweep, info, aux_curves)) - except Full: - try: - _ = self._q.get_nowait() - except Exception: - pass - try: - self._q.put_nowait((sweep, info, aux_curves)) - except Exception: - pass - - def _run_ascii_stream(self, chunk_reader: SerialChunkReader): - xs: list[int] = [] - ys: list[int] = [] - cur_channel: Optional[int] = None - cur_channels: set[int] = set() - buf = bytearray() - while not self._stop.is_set(): - data = chunk_reader.read_available() - if data: - buf += data - else: - time.sleep(0.0005) - continue - - while True: - nl = buf.find(b"\n") - if nl == -1: - break - line = bytes(buf[:nl]) - del buf[: nl + 1] - if line.endswith(b"\r"): - line = line[:-1] - if not line: - continue - - if line.startswith(b"Sweep_start"): - self._finalize_current(xs, ys, cur_channels) - xs.clear() - ys.clear() - cur_channel = None - cur_channels.clear() - continue - - # sCH X Y или s CH X Y (все целые со знаком). Разделяем по любым пробелам/табам. - if len(line) >= 3: - parts = line.split() - if len(parts) >= 3 and (parts[0].lower() == b"s" or parts[0].lower().startswith(b"s")): - try: - if parts[0].lower() == b"s": - if len(parts) >= 4: - ch = int(parts[1], 10) - x = int(parts[2], 10) - y = int(parts[3], 10) # поддержка знака: "+…" и "-…" - else: - ch = 0 - x = int(parts[1], 10) - y = int(parts[2], 10) # поддержка знака: "+…" и "-…" - else: - # формат вида "s0" - ch = int(parts[0][1:], 10) - x = int(parts[1], 10) - y = int(parts[2], 10) # поддержка знака: "+…" и "-…" - except Exception: - continue - if cur_channel is None: - cur_channel = ch - cur_channels.add(ch) - xs.append(x) - ys.append(y) - - if len(buf) > 1_000_000: - del buf[:-262144] - - self._finalize_current(xs, ys, cur_channels) - - def _run_binary_stream(self, chunk_reader: SerialChunkReader): - xs: list[int] = [] - ys: list[int] = [] - cur_channel: Optional[int] = None - cur_channels: set[int] = set() - expected_step: Optional[int] = None - syncing = False - words = deque() - - buf = bytearray() - while not self._stop.is_set(): - data = chunk_reader.read_available() - if data: - buf += data - else: - time.sleep(0.0005) - continue - - usable = len(buf) & ~1 - if usable == 0: - continue - - i = 0 - while i < usable: - w = int(buf[i]) | (int(buf[i + 1]) << 8) - words.append(w) - i += 2 - - - - - - # Бинарный протокол: - # старт свипа: 0xFFFF, 0xFFFF, 0xFFFF, (ch<<8)|0x0A - # точка: step, value_hi, value_lo, 0x000A - while len(words) >= 4: - w0 = int(words[0]) - w1 = int(words[1]) - w2 = int(words[2]) - w3 = int(words[3]) - - if w0 == 0xFFFF and w1 == 0xFFFF and w2 == 0xFFFF and (w3 & 0x00FF) == 0x000A: - if len(words) < 5: - break - if int(words[4]) != 1: - words.popleft() - continue - self._finalize_current(xs, ys, cur_channels) - xs.clear() - ys.clear() - cur_channels.clear() - cur_channel = (w3 >> 8) & 0x00FF - cur_channels.add(cur_channel) - expected_step = 1 - syncing = False - for _ in range(4): - words.popleft() - continue - - if syncing: - if w0 == 0x000A: - syncing = False - expected_step = None - words.popleft() - continue - - if w3 != 0x000A: - syncing = True - continue - - if w0 <= 0: - syncing = True - continue - - if expected_step is not None and w0 < expected_step: - syncing = True - continue - - if cur_channel is not None: - cur_channels.add(cur_channel) - xs.append(w0) - value_u32 = (w1 << 16) | w2 - ys.append(self._u32_to_i32(value_u32)) - expected_step = w0 + 1 - for _ in range(4): - words.popleft() - continue - - # Поток может начаться с середины пакета; сдвигаемся по слову до ресинхронизации. - words.popleft() - - del buf[:usable] - if len(buf) > 1_000_000: - del buf[:-262144] - - self._finalize_current(xs, ys, cur_channels) - - def _run_logscale_binary_stream(self, chunk_reader: SerialChunkReader): - xs: list[int] = [] - ys: list[float] = [] - avg_1_vals: list[int] = [] - avg_2_vals: list[int] = [] - cur_channel: Optional[int] = None - cur_channels: set[int] = set() - expected_step: Optional[int] = None - syncing = False - words = deque() - - buf = bytearray() - while not self._stop.is_set(): - data = chunk_reader.read_available() - if data: - buf += data - else: - time.sleep(0.0005) - continue - - usable = len(buf) & ~1 - if usable == 0: - continue - - i = 0 - while i < usable: - w = int(buf[i]) | (int(buf[i + 1]) << 8) - words.append(w) - i += 2 - - # Бинарный logscale-протокол: - # старт свипа: 0xFFFF x5, затем (ch<<8)|0x0A - # точка: step, avg1_hi, avg1_lo, avg2_hi, avg2_lo, 0x000A - while len(words) >= 6: - w0 = int(words[0]) - w1 = int(words[1]) - w2 = int(words[2]) - w3 = int(words[3]) - w4 = int(words[4]) - w5 = int(words[5]) - - if ( - w0 == 0xFFFF - and w1 == 0xFFFF - and w2 == 0xFFFF - and w3 == 0xFFFF - and w4 == 0xFFFF - and (w5 & 0x00FF) == 0x000A - ): - if len(words) < 7: - break - if int(words[6]) != 1: - words.popleft() - continue - self._finalize_current( - xs, - ys, - cur_channels, - raw_curves=(avg_1_vals, avg_2_vals), - apply_inversion=False, - ) - xs.clear() - ys.clear() - avg_1_vals.clear() - avg_2_vals.clear() - cur_channels.clear() - cur_channel = (w5 >> 8) & 0x00FF - cur_channels.add(cur_channel) - expected_step = 1 - syncing = False - for _ in range(6): - words.popleft() - continue - - if syncing: - if w0 == 0x000A: - syncing = False - expected_step = None - words.popleft() - continue - - if w5 != 0x000A: - syncing = True - continue - - if w0 <= 0: - syncing = True - continue - - if expected_step is not None and w0 < expected_step: - syncing = True - continue - - if cur_channel is not None: - cur_channels.add(cur_channel) - avg_1 = self._u32_to_i32((w1 << 16) | w2) - avg_2 = self._u32_to_i32((w3 << 16) | w4) - xs.append(w0) - avg_1_vals.append(avg_1) - avg_2_vals.append(avg_2) - ys.append(_log_pair_to_sweep(avg_1, avg_2)) - expected_step = w0 + 1 - #ys.append(LOG_BASE**(avg_1/LOG_SCALER) - LOG_BASE**(avg_2/LOG_SCALER)) - for _ in range(6): - words.popleft() - continue - - words.popleft() - - del buf[:usable] - if len(buf) > 1_000_000: - del buf[:-262144] - - self._finalize_current( - xs, - ys, - cur_channels, - raw_curves=(avg_1_vals, avg_2_vals), - apply_inversion=False, - ) - - def _run_logscale_16_bit_x2_binary_stream(self, chunk_reader: SerialChunkReader): - xs: list[int] = [] - ys: list[float] = [] - avg_1_vals: list[int] = [] - avg_2_vals: list[int] = [] - cur_channel: Optional[int] = None - cur_channels: set[int] = set() - expected_step: Optional[int] = None - syncing = False - words = deque() - - buf = bytearray() - while not self._stop.is_set(): - data = chunk_reader.read_available() - if data: - buf += data - else: - time.sleep(0.0005) - continue - - usable = len(buf) & ~1 - if usable == 0: - continue - - i = 0 - while i < usable: - w = int(buf[i]) | (int(buf[i + 1]) << 8) - words.append(w) - i += 2 - #print(i) - #print(words) - # Бинарный logscale-протокол (16-bit x2): - # старт свипа: 0xFFFF, 0xFFFF, 0xFFFF, (ch<<8)|0x0A - # точка: step, avg1_lo16, avg2_lo16, 0xFFFF - while len(words) >= 4: - w0 = int(words[0]) - w1 = int(words[1]) - w2 = int(words[2]) - w3 = int(words[3]) - - if w0 == 0xFFFF and w1 == 0xFFFF and w2 == 0xFFFF and (w3 & 0x00FF) == 0x000A: - if len(words) < 5: - break - if int(words[4]) != 1: - words.popleft() - continue - self._finalize_current( - xs, - ys, - cur_channels, - raw_curves=(avg_1_vals, avg_2_vals), - apply_inversion=False, - ) - xs.clear() - ys.clear() - avg_1_vals.clear() - avg_2_vals.clear() - cur_channels.clear() - cur_channel = (w3 >> 8) & 0x00FF - cur_channels.add(cur_channel) - expected_step = 1 - syncing = False - for _ in range(4): - words.popleft() - continue - - if syncing: - if w0 == 0xFFFF: - syncing = False - expected_step = None - words.popleft() - continue - - if w3 != 0xFFFF: - syncing = True - continue - - if w0 <= 0: - syncing = True - continue - - if expected_step is not None and w0 < expected_step: - syncing = True - continue - - if cur_channel is not None: - cur_channels.add(cur_channel) - avg_1 = self._u16_to_i16(w1) - avg_2 = self._u16_to_i16(w2) - xs.append(w0) - avg_1_vals.append(avg_1) - avg_2_vals.append(avg_2) - ys.append(_log_pair_to_sweep(avg_1, avg_2)) - expected_step = w0 + 1 - for _ in range(4): - words.popleft() - continue - - words.popleft() - - del buf[:usable] - if len(buf) > 1_000_000: - del buf[:-262144] - - self._finalize_current( - xs, - ys, - cur_channels, - raw_curves=(avg_1_vals, avg_2_vals), - apply_inversion=False, - ) - - def _run_parser_test_stream(self, chunk_reader: SerialChunkReader): - """Тестовый парсер 16-bit x2: - - разделитель точки: одиночный 0xFFFF - - маркер старта свипа: серия 0xFFFF (>=2), затем (ch<<8)|0x0A - """ - xs: list[int] = [] - ys: list[float] = [] - avg_1_vals: list[int] = [] - avg_2_vals: list[int] = [] - cur_channel: Optional[int] = None - cur_channels: set[int] = set() - expected_step: Optional[int] = None - in_sweep = False - point_buf: list[int] = [] - ffff_run = 0 - local_resync = False - - def _finalize_sweep(): - self._finalize_current( - xs, - ys, - cur_channels, - raw_curves=(avg_1_vals, avg_2_vals), - apply_inversion=False, - ) - xs.clear() - ys.clear() - avg_1_vals.clear() - avg_2_vals.clear() - cur_channels.clear() - - def _consume_point() -> bool: - nonlocal expected_step - if len(point_buf) != 3: - return False - step = int(point_buf[0]) - if step <= 0: - return False - if expected_step is not None and step < expected_step: - return False - if cur_channel is not None: - cur_channels.add(cur_channel) - avg_1 = self._u16_to_i16(int(point_buf[1])) - avg_2 = self._u16_to_i16(int(point_buf[2])) - xs.append(step) - avg_1_vals.append(avg_1) - avg_2_vals.append(avg_2) - ys.append(_log_pair_to_sweep(avg_1, avg_2)) - expected_step = step + 1 - return True - - buf = bytearray() - buf_pos = 0 - while not self._stop.is_set(): - data = chunk_reader.read_available() - if data: - buf += data - else: - time.sleep(0.000005) - continue - - # Обрабатываем поток строго по одному слову (16 бит) за шаг. - while (buf_pos + 1) < len(buf): - w = int(buf[buf_pos]) | (int(buf[buf_pos + 1]) << 8) - buf_pos += 2 - - if w == 0xFFFF: - ffff_run += 1 - continue - - if ffff_run > 0: - bad_point_on_this_delim = False - if in_sweep and point_buf and (not local_resync): - if not _consume_point(): - local_resync = True - bad_point_on_this_delim = True - point_buf.clear() - - if ffff_run >= 2: - if (w & 0x00FF) == 0x000A: - _finalize_sweep() - cur_channel = (w >> 8) & 0x00FF - cur_channels.add(cur_channel) - in_sweep = True - expected_step = 1 - local_resync = False - point_buf.clear() - ffff_run = 0 - continue - # Некорректная серия FFFF: остаёмся в текущем свипе, ждём - # следующего одиночного разделителя точки. - if in_sweep: - local_resync = True - ffff_run = 0 - continue - - # ffff_run == 1: это просто разделитель точки - if local_resync and (not bad_point_on_this_delim): - # Нашли следующий одиночный разделитель: выходим из локального resync. - local_resync = False - point_buf.clear() - ffff_run = 0 - - if in_sweep and (not local_resync): - point_buf.append(w) - if len(point_buf) > 3: - point_buf.clear() - local_resync = True - - # Периодически уплотняем буфер, сохраняя возможный хвост из 1 байта. - if buf_pos >= 262144: - del buf[:buf_pos] - buf_pos = 0 - # Аварийный лимит на остаток неразобранных данных. - if (len(buf) - buf_pos) > 1_000_000: - tail = buf[buf_pos:] - if len(tail) > 262144: - tail = tail[-262144:] - buf = bytearray(tail) - buf_pos = 0 - - # Попробуем завершить последнюю точку, если поток закончился ровно на разделителе. - if in_sweep and (not local_resync) and ffff_run == 1 and point_buf: - _consume_point() - point_buf.clear() - _finalize_sweep() - - def run(self): - try: - self._src = SerialLineSource(self._port_path, self._baud, timeout=1.0) - sys.stderr.write(f"[info] Открыл порт {self._port_path} ({self._src._using})\n") - except Exception as e: - sys.stderr.write(f"[error] {e}\n") - return - - try: - chunk_reader = SerialChunkReader(self._src) - if self._parser_test: - self._run_parser_test_stream(chunk_reader) - elif self._parser_16_bit_x2: - self._run_logscale_16_bit_x2_binary_stream(chunk_reader) - elif self._logscale: - self._run_logscale_binary_stream(chunk_reader) - elif self._bin_mode: - self._run_binary_stream(chunk_reader) - else: - self._run_ascii_stream(chunk_reader) - finally: - try: - if self._src is not None: - self._src.close() - except Exception: - pass - - -def main(): - parser = argparse.ArgumentParser( - description=( - "Читает свипы из виртуального COM-порта и рисует: " - "последний свип и водопад (реалтайм)." - ) - ) - parser.add_argument( - "port", - help="Путь к порту, например /dev/ttyACM1 или COM3 (COM10+: \\\\.\\COM10)", - ) - parser.add_argument("--baud", type=int, default=115200, help="Скорость (по умолчанию 115200)") - parser.add_argument("--max-sweeps", type=int, default=200, help="Количество видимых свипов в водопаде") - parser.add_argument("--max-fps", type=float, default=30.0, help="Лимит частоты отрисовки, кадров/с") - parser.add_argument("--cmap", default="viridis", help="Цветовая карта водопада") - parser.add_argument( - "--spec-clip", - default="2,98", - help=( - "Процентильная обрезка уровней водопада спектров, % (min,max). " - "Напр. 2,98. 'off' — отключить" - ), - ) - parser.add_argument( - "--spec-mean-sec", - type=float, - default=0.0, - help=( - "Вычитание среднего по каждой частоте за последние N секунд " - "в водопаде спектров (0 — отключить)" - ), - ) - parser.add_argument("--title", default="ADC Sweeps", help="Заголовок окна") - parser.add_argument( - "--fancy", - action="store_true", - help="Заполнять выпавшие точки средними значениями между соседними", - ) - parser.add_argument( - "--ylim", - type=str, - default=None, - help="Фиксированные Y-пределы для кривой формата min,max (например -1000,1000). По умолчанию авто", - ) - parser.add_argument( - "--backend", - choices=["auto", "pg", "mpl"], - default="pg", - help="Графический бэкенд: pyqtgraph (pg) — быстрее; matplotlib (mpl) — совместимый. По умолчанию pg", - ) - parser.add_argument( - "--norm-type", - choices=["projector", "simple"], - default="projector", - help="Тип нормировки: projector (по огибающим в [-1,+1]) или simple (raw/calib)", - ) - parser.add_argument( - "--bin", - dest="bin_mode", - action="store_true", - help=( - "Бинарный протокол: старт свипа 0xFFFF,0xFFFF,0xFFFF,(CH<<8)|0x0A; " - "точки step,uint32(hi16,lo16),0x000A" - ), - ) - parser.add_argument( - "--logscale", - action="store_true", - default=True, - help=( - "Новый бинарный протокол: точка несёт пару int32 (avg_1, avg_2), " - "а свип считается как 10**(avg_1*0.001) - 10**(avg_2*0.001)" - ), - ) - parser.add_argument( - "--parser_16_bit_x2", - action="store_true", - help=( - "Бинарный logscale-протокол c парой int16 (avg_1, avg_2): " - "старт 0xFFFF,0xFFFF,0xFFFF,(CH<<8)|0x0A; точка step,avg1_lo16,avg2_lo16,0xFFFF" - ), - ) - parser.add_argument( - "--parser_test", - action="store_true", - help=( - "Тестовый парсер для формата 16-bit x2: " - "одиночный 0xFFFF завершает точку, серия 0xFFFF начинает новый свип" - ), - ) - parser.add_argument( - "--calibrate", - action="store_true", - help=( - "Режим измерения ширины главного пика FFT: рисует красные маркеры " - "границ и фона и выводит ширину пика в статус" - ), - ) - parser.add_argument( - "--peak_search", - action="store_true", - help=( - "Поиск топ-3 пиков на FFT относительно референса (скользящая медиана) " - "с отрисовкой bounding box и параметров пиков" - ), - ) - parser.add_argument( - "--peak_ref_window", - type=float, - default=1.0, - help="Ширина окна скользящей медианы для --peak_search, ГГц/м по оси FFT (по умолчанию 1.0)", - ) - - args = parser.parse_args() - - # Попробуем быстрый бэкенд (pyqtgraph) при auto/pg - if args.backend == "pg": - try: - return run_pyqtgraph(args) - except Exception as e: - sys.stderr.write(f"[error] PyQtGraph бэкенд недоступен: {e}\n") - sys.exit(1) - - if args.backend == "auto": - try: - return run_pyqtgraph(args) - except Exception: - pass # При auto — тихо откатываемся на matplotlib - - try: - import matplotlib - import matplotlib.pyplot as plt - from matplotlib.animation import FuncAnimation - from matplotlib.widgets import Slider, CheckButtons, TextBox - except Exception as e: - sys.stderr.write(f"[error] Нужны matplotlib и ее зависимости: {e}\n") - sys.exit(1) - - # Очередь завершённых свипов и поток чтения - q: Queue[SweepPacket] = Queue(maxsize=1000) - stop_event = threading.Event() - reader = SweepReader( - args.port, - args.baud, - q, - stop_event, - fancy=bool(args.fancy), - bin_mode=bool(args.bin_mode), - logscale=bool(args.logscale), - parser_16_bit_x2=bool(args.parser_16_bit_x2), - parser_test=bool(args.parser_test), - ) - reader.start() - - # Графика - fig, axs = plt.subplots(2, 2, figsize=(12, 8)) - (ax_line, ax_img), (ax_fft, ax_spec) = axs - fig.canvas.manager.set_window_title(args.title) if hasattr(fig.canvas.manager, "set_window_title") else None - # Увеличим расстояния и оставим место справа под ползунки оси Y B-scan - fig.subplots_adjust(wspace=0.25, hspace=0.35, left=0.07, right=0.90, top=0.92, bottom=0.08) - - # Состояние для отображения - current_freqs: Optional[np.ndarray] = None - current_distances: Optional[np.ndarray] = None - current_sweep_raw: Optional[np.ndarray] = None - current_aux_curves: SweepAuxCurves = None - current_sweep_norm: Optional[np.ndarray] = None - current_fft_db: Optional[np.ndarray] = None - last_calib_sweep: Optional[np.ndarray] = None - current_info: Optional[SweepInfo] = None - x_shared: Optional[np.ndarray] = None - width: Optional[int] = None - max_sweeps = int(max(10, args.max_sweeps)) - ring = None # type: Optional[np.ndarray] - ring_time = None # type: Optional[np.ndarray] - head = 0 - # Авто-уровни цветовой шкалы водопада сырых данных пересчитываются по видимой области. - # FFT состояние - fft_bins = FFT_LEN // 2 + 1 - ring_fft = None # type: Optional[np.ndarray] - y_min_fft, y_max_fft = None, None - distance_shared: Optional[np.ndarray] = None - # Параметры контраста водопада спектров - spec_clip = _parse_spec_clip(getattr(args, "spec_clip", None)) - spec_mean_sec = float(getattr(args, "spec_mean_sec", 0.0)) - # Ползунки управления Y для B-scan и контрастом - ymin_slider = None - ymax_slider = None - contrast_slider = None - calib_enabled = False - bg_subtract_enabled = False - peak_calibrate_mode = bool(getattr(args, "calibrate", False)) - peak_search_enabled = bool(getattr(args, "peak_search", False)) - peak_ref_window = float(getattr(args, "peak_ref_window", 1.0)) - if (not np.isfinite(peak_ref_window)) or peak_ref_window <= 0.0: - peak_ref_window = 1.0 - current_peak_width: Optional[float] = None - current_peak_amplitude: Optional[float] = None - peak_candidates: List[Dict[str, float]] = [] - norm_type = str(getattr(args, "norm_type", "projector")).strip().lower() - cb = None - c_boxes = [] - peak_textboxes = [] - c_values_text = None - peak_values_text = None - - # Статусная строка (внизу окна) - status_text = fig.text( - 0.01, - 0.01, - "", - ha="left", - va="bottom", - fontsize=8, - family="monospace", - ) - - # Линейный график последнего свипа - line_avg1_obj, = ax_line.plot([], [], lw=1, color="0.65") - line_avg2_obj, = ax_line.plot([], [], lw=1, color="0.45") - line_obj, = ax_line.plot([], [], lw=1, color="tab:blue") - line_calib_obj, = ax_line.plot([], [], lw=1, color="tab:red") - line_norm_obj, = ax_line.plot([], [], lw=1, color="tab:green") - ax_line.set_title("Сырые данные", pad=1) - ax_line.set_xlabel("ГГц") - ax_line.set_ylabel("") - channel_text = ax_line.text( - 0.98, - 0.98, - "", - transform=ax_line.transAxes, - ha="right", - va="top", - fontsize=9, - family="monospace", - ) - - # Линейный график спектра текущего свипа - fft_line_obj, = ax_fft.plot([], [], lw=1) - fft_ref_obj, = ax_fft.plot([], [], lw=1, color="red", alpha=0.9, visible=False) - fft_peak_box_objs = [ax_fft.plot([], [], lw=1, color="red", visible=False)[0] for _ in range(3)] - fft_bg_obj = ax_fft.axhline(0.0, lw=1, color="red", visible=False) - fft_left_obj = ax_fft.axvline(0.0, lw=1, color="red", visible=False) - fft_right_obj = ax_fft.axvline(0.0, lw=1, color="red", visible=False) - ax_fft.set_title("FFT", pad=1) - ax_fft.set_xlabel("Расстояние, м") - ax_fft.set_ylabel("дБ") - - # Диапазон по Y для последнего свипа: авто по умолчанию (поддерживает отрицательные значения) - fixed_ylim: Optional[Tuple[float, float]] = None - # CLI переопределение при необходимости - if args.ylim: - try: - y0, y1 = args.ylim.split(",") - fixed_ylim = (float(y0), float(y1)) - except Exception: - sys.stderr.write("[warn] Некорректный формат --ylim, игнорирую. Ожидалось min,max\n") - if fixed_ylim is not None: - ax_line.set_ylim(fixed_ylim) - - # Водопад (будет инициализирован при первом свипе) - img_obj = ax_img.imshow( - np.zeros((1, 1), dtype=np.float32), - aspect="auto", - interpolation="nearest", - origin="lower", - cmap=args.cmap, - ) - ax_img.set_title("Сырые данные", pad=12) - ax_img.set_xlabel("") - ax_img.set_ylabel("ГГц") - # Не показываем численные значения по времени на водопаде сырых данных - try: - ax_img.tick_params(axis="x", labelbottom=False) - except Exception: - pass - - # Водопад спектров - img_fft_obj = ax_spec.imshow( - np.zeros((1, 1), dtype=np.float32), - aspect="auto", - interpolation="nearest", - origin="lower", - cmap=args.cmap, - ) - ax_spec.set_title("B-scan (дБ)", pad=12) - ax_spec.set_xlabel("") - ax_spec.set_ylabel("Расстояние, м") - # Не показываем численные значения по времени на B-scan - try: - ax_spec.tick_params(axis="x", labelbottom=False) - except Exception: - pass - spec_left_obj = ax_spec.axhline(0.0, lw=1, color="red", visible=False) - spec_right_obj = ax_spec.axhline(0.0, lw=1, color="red", visible=False) - - def _normalize_sweep(raw: np.ndarray, calib: np.ndarray) -> np.ndarray: - return _normalize_by_calib(raw, calib, norm_type=norm_type) - - def _sync_checkbox_states(): - nonlocal calib_enabled, bg_subtract_enabled, peak_search_enabled, current_sweep_norm - try: - states = cb.get_status() if cb is not None else () - calib_enabled = bool(states[0]) if len(states) > 0 else False - bg_subtract_enabled = bool(states[1]) if len(states) > 1 else False - peak_search_enabled = bool(states[2]) if len(states) > 2 else False - except Exception: - calib_enabled = False - bg_subtract_enabled = False - peak_search_enabled = False - if calib_enabled and current_sweep_raw is not None and last_calib_sweep is not None: - current_sweep_norm = _normalize_sweep(current_sweep_raw, last_calib_sweep) - else: - current_sweep_norm = None - - # Слайдеры для управления осью Y B-scan (мин/макс) и контрастом - try: - ax_smin = fig.add_axes([0.92, 0.55, 0.02, 0.35]) - ax_smax = fig.add_axes([0.95, 0.55, 0.02, 0.35]) - ax_sctr = fig.add_axes([0.98, 0.55, 0.02, 0.35]) - ax_cb = fig.add_axes([0.90, 0.37, 0.10, 0.17]) - ymin_slider = Slider(ax_smin, "R min", 0.0, 1.0, valinit=0.0, orientation="vertical") - ymax_slider = Slider(ax_smax, "R max", 0.0, 1.0, valinit=1.0, orientation="vertical") - contrast_slider = Slider(ax_sctr, "Int max", 0, 100, valinit=100, valstep=1, orientation="vertical") - cb = CheckButtons( - ax_cb, - ["нормировка", "вычет фона", "поиск пиков"], - [False, False, peak_search_enabled], - ) - - def _on_ylim_change(_val): - try: - y0 = float(min(ymin_slider.val, ymax_slider.val)) - y1 = float(max(ymin_slider.val, ymax_slider.val)) - ax_spec.set_ylim(y0, y1) - fig.canvas.draw_idle() - except Exception: - pass - - ymin_slider.on_changed(_on_ylim_change) - ymax_slider.on_changed(_on_ylim_change) - # Контраст влияет на верхнюю границу цветовой шкалы (процент от авто-диапазона) - contrast_slider.on_changed(lambda _v: fig.canvas.draw_idle()) - cb.on_clicked(lambda _v: _sync_checkbox_states()) - _sync_checkbox_states() - except Exception: - pass - - if peak_calibrate_mode: - try: - def _refresh_c_values_text(): - if c_values_text is None: - return - try: - c_values_text.set_text( - "\n".join( - f"C*{idx}={float(CALIBRATION_C[idx]):.6g}" for idx in range(3) - ) - ) - except Exception: - pass - - def _set_c_value(idx: int, text: str): - global CALIBRATION_C, CALIBRATION_C_BASE - try: - CALIBRATION_C_BASE[idx] = float(text.strip()) - CALIBRATION_C = recalculate_calibration_c(CALIBRATION_C_BASE) - except Exception: - pass - _refresh_c_values_text() - - for idx, ypos in enumerate((0.36, 0.30, 0.24)): - ax_c = fig.add_axes([0.92, ypos, 0.08, 0.045]) - tb = TextBox(ax_c, f"C{idx}", initial=f"{float(CALIBRATION_C_BASE[idx]):.6g}") - tb.on_submit(lambda text, i=idx: _set_c_value(i, text)) - c_boxes.append(tb) - ax_c_info = fig.add_axes([0.90, 0.14, 0.10, 0.08]) - ax_c_info.axis("off") - c_values_text = ax_c_info.text( - 0.0, - 1.0, - "", - ha="left", - va="top", - fontsize=7, - family="monospace", - transform=ax_c_info.transAxes, - ) - _refresh_c_values_text() - except Exception: - pass - - def _refresh_peak_values_text(peaks: List[Dict[str, float]]): - if peak_values_text is None: - return - lines = [] - for idx in range(3): - if idx < len(peaks): - p = peaks[idx] - lines.append(f"peak {idx + 1}:") - lines.append(f" X: {p['x']:.4g} m") - lines.append(f" H: {p['height']:.4g} dB") - lines.append(f" W: {p['width']:.4g} m") - else: - lines.append(f"peak {idx + 1}:") - lines.append(" X: - m") - lines.append(" H: - dB") - lines.append(" W: - m") - if idx != 2: - lines.append("") - peak_values_text.set_text("\n".join(lines)) - - try: - def _set_peak_ref_window(text: str): - nonlocal peak_ref_window - try: - v = float(text.strip()) - if np.isfinite(v) and v > 0.0: - peak_ref_window = v - except Exception: - pass - - ax_peak_win = fig.add_axes([0.90, 0.20, 0.10, 0.045]) - peak_window_tb = TextBox(ax_peak_win, "med,GHz", initial=f"{peak_ref_window:.6g}") - peak_window_tb.on_submit(_set_peak_ref_window) - peak_textboxes.append(peak_window_tb) - ax_peak_info = fig.add_axes([0.90, 0.01, 0.10, 0.19]) - ax_peak_info.axis("off") - peak_values_text = ax_peak_info.text( - 0.0, - 1.0, - "", - ha="left", - va="top", - fontsize=6, - family="monospace", - transform=ax_peak_info.transAxes, - ) - _refresh_peak_values_text([]) - except Exception: - pass - - # Для контроля частоты обновления - max_fps = max(1.0, float(args.max_fps)) - interval_ms = int(1000.0 / max_fps) - frames_since_ylim_update = 0 - spec_slider_initialized = False - - - def ensure_buffer(_w: int): - nonlocal ring, width, head, x_shared, ring_fft, distance_shared, ring_time - if ring is not None: - return - width = WF_WIDTH - x_shared = np.linspace(SWEEP_FREQ_MIN_GHZ, SWEEP_FREQ_MAX_GHZ, width, dtype=np.float32) - ring = np.full((max_sweeps, width), np.nan, dtype=np.float32) - ring_time = np.full((max_sweeps,), np.nan, dtype=np.float64) - head = 0 - # Обновляем изображение под новые размеры: время по X (горизонталь), X по Y - img_obj.set_data(np.zeros((width, max_sweeps), dtype=np.float32)) - img_obj.set_extent((0, max_sweeps - 1, SWEEP_FREQ_MIN_GHZ, SWEEP_FREQ_MAX_GHZ)) - ax_img.set_xlim(0, max_sweeps - 1) - ax_img.set_ylim(SWEEP_FREQ_MIN_GHZ, SWEEP_FREQ_MAX_GHZ) - # FFT буферы: время по X, бин по Y - ring_fft = np.full((max_sweeps, fft_bins), np.nan, dtype=np.float32) - img_fft_obj.set_data(np.zeros((fft_bins, max_sweeps), dtype=np.float32)) - img_fft_obj.set_extent((0, max_sweeps - 1, 0.0, 1.0)) - ax_spec.set_xlim(0, max_sweeps - 1) - ax_spec.set_ylim(0.0, 1.0) - distance_shared = _compute_distance_axis(current_freqs, fft_bins) - - def _update_physical_axes(): - nonlocal distance_shared, spec_slider_initialized - if current_freqs is not None and current_freqs.size > 0: - finite_f = current_freqs[np.isfinite(current_freqs)] - if finite_f.size > 0: - f_min = float(np.min(finite_f)) - f_max = float(np.max(finite_f)) - if f_max <= f_min: - f_max = f_min + 1.0 - img_obj.set_extent((0, max_sweeps - 1, f_min, f_max)) - ax_img.set_ylim(f_min, f_max) - - distance_shared = _compute_distance_axis(current_freqs, fft_bins) - if distance_shared.size > 0: - d_min = float(distance_shared[0]) - d_max = float(distance_shared[-1]) if distance_shared.size > 1 else float(distance_shared[0] + 1.0) - if d_max <= d_min: - d_max = d_min + 1.0 - img_fft_obj.set_extent((0, max_sweeps - 1, d_min, d_max)) - ax_spec.set_ylim(d_min, d_max) - if ymin_slider is not None and ymax_slider is not None: - try: - ymin_slider.valmin = d_min - ymin_slider.valmax = d_max - ymax_slider.valmin = d_min - ymax_slider.valmax = d_max - ymin_slider.ax.set_ylim(d_min, d_max) - ymax_slider.ax.set_ylim(d_min, d_max) - if (not spec_slider_initialized) or (not (d_min <= ymin_slider.val <= d_max)): - ymin_slider.set_val(d_min) - if (not spec_slider_initialized) or (not (d_min <= ymax_slider.val <= d_max)): - ymax_slider.set_val(d_max) - spec_slider_initialized = True - except Exception: - pass - - def _visible_levels_matplotlib(data: np.ndarray, axis) -> Optional[Tuple[float, float]]: - """(vmin, vmax) по центральным 90% значений в видимой области imshow.""" - if data.size == 0: - return None - ny, nx = data.shape[0], data.shape[1] - try: - x0, x1 = axis.get_xlim() - y0, y1 = axis.get_ylim() - except Exception: - x0, x1 = 0.0, float(nx - 1) - y0, y1 = 0.0, float(ny - 1) - xmin, xmax = sorted((float(x0), float(x1))) - ymin, ymax = sorted((float(y0), float(y1))) - ix0 = max(0, min(nx - 1, int(np.floor(xmin)))) - ix1 = max(0, min(nx - 1, int(np.ceil(xmax)))) - iy0 = max(0, min(ny - 1, int(np.floor(ymin)))) - iy1 = max(0, min(ny - 1, int(np.ceil(ymax)))) - if ix1 < ix0: - ix1 = ix0 - if iy1 < iy0: - iy1 = iy0 - sub = data[iy0 : iy1 + 1, ix0 : ix1 + 1] - finite = np.isfinite(sub) - if not finite.any(): - return None - vals = sub[finite] - vmin = float(np.nanpercentile(vals, 5)) - vmax = float(np.nanpercentile(vals, 95)) - if not (np.isfinite(vmin) and np.isfinite(vmax)) or vmin == vmax: - return None - return (vmin, vmax) - - def push_sweep(s: np.ndarray, freqs: Optional[np.ndarray] = None): - nonlocal ring, head, ring_fft, y_min_fft, y_max_fft, ring_time - if s is None or s.size == 0 or ring is None: - return - # Нормализуем длину до фиксированной ширины - w = ring.shape[1] - row = np.full((w,), np.nan, dtype=np.float32) - take = min(w, s.size) - row[:take] = s[:take] - ring[head, :] = row - if ring_time is not None: - ring_time[head] = time.time() - head = (head + 1) % ring.shape[0] - # FFT строка (линейный модуль; перевод в дБ делаем при отображении) - if ring_fft is not None: - bins = ring_fft.shape[1] - fft_mag = _compute_fft_mag_row(s, freqs, bins) - row_idx = (head - 1) % ring_fft.shape[0] - ring_fft[row_idx, :] = fft_mag - fft_row = _fft_mag_to_db(fft_mag) - # Экстремумы для цветовой шкалы - fr_min = np.nanmin(fft_row) - fr_max = np.nanmax(fft_row) - fr_max = np.nanpercentile(fft_row, 90) - if y_min_fft is None or (not np.isnan(fr_min) and fr_min < y_min_fft): - y_min_fft = float(fr_min) - if y_max_fft is None or (not np.isnan(fr_max) and fr_max > y_max_fft): - y_max_fft = float(fr_max) - - def drain_queue(): - nonlocal current_freqs, current_distances, current_sweep_raw, current_aux_curves, current_sweep_norm, current_info, last_calib_sweep - drained = 0 - while True: - try: - s, info, aux_curves = q.get_nowait() - except Empty: - break - drained += 1 - calibrated = calibrate_freqs( - { - "F": np.linspace(SWEEP_FREQ_MIN_GHZ, SWEEP_FREQ_MAX_GHZ, s.size, dtype=np.float64), - "I": s, - } - ) - current_freqs = calibrated["F"] - current_distances = _compute_distance_axis(current_freqs, fft_bins) - s = calibrated["I"] - current_sweep_raw = s - current_aux_curves = aux_curves - current_info = info - ch = 0 - try: - ch = int(info.get("ch", 0)) if isinstance(info, dict) else 0 - except Exception: - ch = 0 - if ch == 0: - last_calib_sweep = s - current_sweep_norm = None - sweep_for_proc = s - else: - if calib_enabled and last_calib_sweep is not None: - current_sweep_norm = _normalize_sweep(s, last_calib_sweep) - sweep_for_proc = current_sweep_norm - else: - current_sweep_norm = None - sweep_for_proc = s - ensure_buffer(s.size) - _update_physical_axes() - push_sweep(sweep_for_proc, current_freqs) - return drained - - def make_display_ring(): - # Возвращаем буфер с правильным порядком по времени (старые→новые) и осью времени по X - if ring is None: - return np.zeros((1, 1), dtype=np.float32) - base = ring if head == 0 else np.roll(ring, -head, axis=0) - return base.T # (width, time) - - def make_display_times(): - if ring_time is None: - return None - base_t = ring_time if head == 0 else np.roll(ring_time, -head) - return base_t - - def _subtract_recent_mean_fft(disp_fft: np.ndarray) -> np.ndarray: - """Вычесть среднее по каждой дальности за последние spec_mean_sec секунд в линейной области.""" - if spec_mean_sec <= 0.0: - return disp_fft - disp_times = make_display_times() - if disp_times is None: - return disp_fft - now_t = time.time() - mask = np.isfinite(disp_times) & (disp_times >= (now_t - spec_mean_sec)) - if not np.any(mask): - return disp_fft - try: - mean_spec = np.nanmean(disp_fft[:, mask], axis=1) - except Exception: - return disp_fft - mean_spec = np.nan_to_num(mean_spec, nan=0.0) - return disp_fft - mean_spec[:, None] - - def _visible_bg_fft(disp_fft: np.ndarray) -> Optional[np.ndarray]: - """Оценка фона по медиане в текущем видимом по времени окне B-scan.""" - if not bg_subtract_enabled or disp_fft.size == 0: - return None - ny, nx = disp_fft.shape - if ny <= 0 or nx <= 0: - return None - try: - x0, x1 = ax_spec.get_xlim() - except Exception: - x0, x1 = 0.0, float(nx - 1) - xmin, xmax = sorted((float(x0), float(x1))) - ix0 = max(0, min(nx - 1, int(np.floor(xmin)))) - ix1 = max(0, min(nx - 1, int(np.ceil(xmax)))) - if ix1 < ix0: - ix1 = ix0 - window = disp_fft[:, ix0 : ix1 + 1] - if window.size == 0: - return None - try: - bg_spec = np.nanmedian(window, axis=1) - except Exception: - return None - if not np.any(np.isfinite(bg_spec)): - return None - return np.nan_to_num(bg_spec, nan=0.0).astype(np.float32, copy=False) - - def make_display_ring_fft(): - if ring_fft is None: - return np.zeros((1, 1), dtype=np.float32) - base = ring_fft if head == 0 else np.roll(ring_fft, -head, axis=0) - return base.T # (bins, time) - - def update(_frame): - nonlocal frames_since_ylim_update, current_peak_width, current_peak_amplitude, peak_candidates - if any(getattr(tb, "capturekeystrokes", False) for tb in (c_boxes + peak_textboxes)): - return ( - line_obj, - line_avg1_obj, - line_avg2_obj, - line_calib_obj, - line_norm_obj, - img_obj, - fft_line_obj, - fft_ref_obj, - *fft_peak_box_objs, - fft_bg_obj, - fft_left_obj, - fft_right_obj, - img_fft_obj, - spec_left_obj, - spec_right_obj, - status_text, - channel_text, - ) - changed = drain_queue() > 0 - - # Обновление линии последнего свипа - if current_sweep_raw is not None: - if current_freqs is not None and current_freqs.size == current_sweep_raw.size: - xs = current_freqs - elif x_shared is not None and current_sweep_raw.size <= x_shared.size: - xs = x_shared[: current_sweep_raw.size] - else: - xs = np.arange(current_sweep_raw.size, dtype=np.int32) - line_obj.set_data(xs, current_sweep_raw) - if current_aux_curves is not None: - avg_1_curve, avg_2_curve = current_aux_curves - line_avg1_obj.set_data(xs[: avg_1_curve.size], avg_1_curve) - line_avg2_obj.set_data(xs[: avg_2_curve.size], avg_2_curve) - else: - line_avg1_obj.set_data([], []) - line_avg2_obj.set_data([], []) - if last_calib_sweep is not None: - line_calib_obj.set_data(xs[: last_calib_sweep.size], last_calib_sweep) - else: - line_calib_obj.set_data([], []) - if current_sweep_norm is not None: - line_norm_obj.set_data(xs[: current_sweep_norm.size], current_sweep_norm) - else: - line_norm_obj.set_data([], []) - # Лимиты по X: номинальный диапазон свипа - if isinstance(xs, np.ndarray) and xs.size > 0 and np.isfinite(xs[0]) and np.isfinite(xs[-1]): - ax_line.set_xlim(float(np.nanmin(xs)), float(np.nanmax(xs))) - else: - ax_line.set_xlim(SWEEP_FREQ_MIN_GHZ, SWEEP_FREQ_MAX_GHZ) - # Адаптивные Y-лимиты (если не задан --ylim) - if fixed_ylim is None: - y_series = [current_sweep_raw, last_calib_sweep, current_sweep_norm] - if current_aux_curves is not None: - y_series.extend(current_aux_curves) - y_limits = _compute_auto_ylim(*y_series) - if y_limits is not None: - ax_line.set_ylim(y_limits[0], y_limits[1]) - - # Обновление спектра текущего свипа - sweep_for_fft = current_sweep_norm if current_sweep_norm is not None else current_sweep_raw - if sweep_for_fft.size > 0 and distance_shared is not None: - fft_vals = _compute_fft_row(sweep_for_fft, current_freqs, distance_shared.size) - xs_fft = current_distances if current_distances is not None else distance_shared - if fft_vals.size > xs_fft.size: - fft_vals = fft_vals[: xs_fft.size] - xs_fft = xs_fft[: fft_vals.size] - fft_line_obj.set_data(xs_fft, fft_vals) - - finite_fft = np.isfinite(xs_fft) & np.isfinite(fft_vals) - y_for_range = fft_vals[finite_fft] - if peak_search_enabled: - fft_ref = _rolling_median_ref(xs_fft, fft_vals, peak_ref_window) - finite_ref = np.isfinite(xs_fft) & np.isfinite(fft_ref) - if np.any(finite_ref): - fft_ref_obj.set_data(xs_fft[finite_ref], fft_ref[finite_ref]) - fft_ref_obj.set_visible(True) - y_for_range = np.concatenate((y_for_range, fft_ref[finite_ref])) - else: - fft_ref_obj.set_visible(False) - peak_candidates = _find_top_peaks_over_ref(xs_fft, fft_vals, fft_ref, top_n=3) - _refresh_peak_values_text(peak_candidates) - for idx, box_obj in enumerate(fft_peak_box_objs): - if idx < len(peak_candidates): - p = peak_candidates[idx] - box_obj.set_data( - [p["left"], p["left"], p["right"], p["right"], p["left"]], - [p["ref"], p["peak_y"], p["peak_y"], p["ref"], p["ref"]], - ) - box_obj.set_visible(True) - else: - box_obj.set_visible(False) - else: - peak_candidates = [] - fft_ref_obj.set_visible(False) - _refresh_peak_values_text([]) - for box_obj in fft_peak_box_objs: - box_obj.set_visible(False) - - # Авто-диапазон по Y для спектра - if np.any(finite_fft): - finite_x = xs_fft[finite_fft] - if finite_x.size > 0: - ax_fft.set_xlim(float(np.min(finite_x)), float(np.max(finite_x))) - finite_y = y_for_range[np.isfinite(y_for_range)] - if finite_y.size > 0: - y0 = float(np.min(finite_y)) - y1 = float(np.max(finite_y)) - if y1 <= y0: - y1 = y0 + 1e-3 - ax_fft.set_ylim(y0, y1) - if peak_calibrate_mode: - markers = _find_peak_width_markers(xs_fft, fft_vals) - if markers is not None: - fft_bg_obj.set_ydata([markers["background"], markers["background"]]) - fft_left_obj.set_xdata([markers["left"], markers["left"]]) - fft_right_obj.set_xdata([markers["right"], markers["right"]]) - spec_left_obj.set_ydata([markers["left"], markers["left"]]) - spec_right_obj.set_ydata([markers["right"], markers["right"]]) - fft_bg_obj.set_visible(True) - fft_left_obj.set_visible(True) - fft_right_obj.set_visible(True) - spec_left_obj.set_visible(True) - spec_right_obj.set_visible(True) - current_peak_width = markers["width"] - current_peak_amplitude = markers["amplitude"] - else: - fft_bg_obj.set_visible(False) - fft_left_obj.set_visible(False) - fft_right_obj.set_visible(False) - spec_left_obj.set_visible(False) - spec_right_obj.set_visible(False) - current_peak_width = None - current_peak_amplitude = None - else: - fft_bg_obj.set_visible(False) - fft_left_obj.set_visible(False) - fft_right_obj.set_visible(False) - spec_left_obj.set_visible(False) - spec_right_obj.set_visible(False) - current_peak_width = None - current_peak_amplitude = None - else: - fft_ref_obj.set_visible(False) - for box_obj in fft_peak_box_objs: - box_obj.set_visible(False) - peak_candidates = [] - _refresh_peak_values_text([]) - - # Обновление водопада - if changed and ring is not None: - disp = make_display_ring() - # Новые данные справа: без реверса - img_obj.set_data(disp) - # Подписи времени не обновляем динамически (оставляем авто-тики) - # Авто-уровни: по видимой области (не накапливаем за всё время) - levels = _visible_levels_matplotlib(disp, ax_img) - if levels is not None: - img_obj.set_clim(vmin=levels[0], vmax=levels[1]) - - # Обновление водопада спектров - if changed and ring_fft is not None: - disp_fft_lin = make_display_ring_fft() - disp_fft_lin = _subtract_recent_mean_fft(disp_fft_lin) - bg_spec = _visible_bg_fft(disp_fft_lin) - if bg_spec is not None: - num = np.maximum(disp_fft_lin, 0.0).astype(np.float32, copy=False) + 1e-9 - den = bg_spec[:, None] + 1e-9 - disp_fft = (20.0 * np.log10(num / den)).astype(np.float32, copy=False) - else: - disp_fft = _fft_mag_to_db(disp_fft_lin) - # Новые данные справа: без реверса - img_fft_obj.set_data(disp_fft) - # Подписи времени не обновляем динамически (оставляем авто-тики) - # Автодиапазон по среднему спектру за видимый интервал (как в хорошей версии) - if bg_spec is not None: - try: - p5 = float(np.nanpercentile(disp_fft, 5)) - p95 = float(np.nanpercentile(disp_fft, 95)) - span = max(abs(p5), abs(p95)) - except Exception: - span = float("nan") - if np.isfinite(span) and span > 0.0: - try: - c = float(contrast_slider.val) / 100.0 if contrast_slider is not None else 1.0 - except Exception: - c = 1.0 - span_eff = max(span * c, 1e-6) - img_fft_obj.set_clim(vmin=-span_eff, vmax=span_eff) - else: - try: - # disp_fft имеет форму (bins, time); берём среднее по времени - mean_spec = np.nanmean(disp_fft, axis=1) - vmin_v = float(np.nanmin(mean_spec)) - vmax_v = float(np.nanmax(mean_spec)) - except Exception: - vmin_v = vmax_v = None - # Если средние не дают валидный диапазон — используем процентильную обрезку (если задана) - if (vmin_v is None or not np.isfinite(vmin_v)) or (vmax_v is None or not np.isfinite(vmax_v)) or vmin_v == vmax_v: - if spec_clip is not None: - try: - vmin_v = float(np.nanpercentile(disp_fft, spec_clip[0])) - vmax_v = float(np.nanpercentile(disp_fft, spec_clip[1])) - except Exception: - vmin_v = vmax_v = None - # Фолбэк к отслеживаемым минимум/максимумам - if (vmin_v is None or not np.isfinite(vmin_v)) or (vmax_v is None or not np.isfinite(vmax_v)) or vmin_v == vmax_v: - if y_min_fft is not None and y_max_fft is not None and np.isfinite(y_min_fft) and np.isfinite(y_max_fft) and y_min_fft != y_max_fft: - vmin_v, vmax_v = y_min_fft, y_max_fft - if vmin_v is not None and vmax_v is not None and vmin_v != vmax_v: - # Применим скалирование контрастом (верхняя граница) - try: - c = float(contrast_slider.val) / 100.0 if contrast_slider is not None else 1.0 - except Exception: - c = 1.0 - vmax_eff = vmin_v + c * (vmax_v - vmin_v) - img_fft_obj.set_clim(vmin=vmin_v, vmax=vmax_eff) - - if changed and current_info: - status_payload = dict(current_info) - if peak_calibrate_mode and current_peak_width is not None: - status_payload["peak_w"] = current_peak_width - if peak_calibrate_mode and current_peak_amplitude is not None: - status_payload["peak_a"] = current_peak_amplitude - status_text.set_text(_format_status_kv(status_payload)) - chs = current_info.get("chs") if isinstance(current_info, dict) else None - if chs is None: - chs = current_info.get("ch") if isinstance(current_info, dict) else None - if chs is None: - channel_text.set_text("") - else: - try: - if isinstance(chs, (list, tuple, set)): - ch_list = sorted(int(v) for v in chs) - ch_text_val = ", ".join(str(v) for v in ch_list) - else: - ch_text_val = str(int(chs)) - channel_text.set_text(f"chs {ch_text_val}") - except Exception: - channel_text.set_text(f"chs {chs}") - - # Возвращаем обновлённые артисты - return ( - line_obj, - line_avg1_obj, - line_avg2_obj, - line_calib_obj, - line_norm_obj, - img_obj, - fft_line_obj, - fft_ref_obj, - *fft_peak_box_objs, - fft_bg_obj, - fft_left_obj, - fft_right_obj, - img_fft_obj, - spec_left_obj, - spec_right_obj, - status_text, - channel_text, - ) - - ani = FuncAnimation(fig, update, interval=interval_ms, blit=False) - cleanup_done = False - - def _cleanup(): - nonlocal cleanup_done - if cleanup_done: - return - cleanup_done = True - stop_event.set() - reader.join(timeout=1.0) - - def _handle_sigint(_signum, _frame): - _cleanup() - try: - plt.close(fig) - except Exception: - pass - - prev_sigint = signal.getsignal(signal.SIGINT) - try: - fig.canvas.mpl_connect("close_event", lambda _evt: _cleanup()) - except Exception: - pass - try: - signal.signal(signal.SIGINT, _handle_sigint) - except Exception: - prev_sigint = None - - try: - plt.show() - finally: - _cleanup() - if prev_sigint is not None: - try: - signal.signal(signal.SIGINT, prev_sigint) - except Exception: - pass - - -def run_pyqtgraph(args): - """Быстрый GUI на PyQtGraph. Требует pyqtgraph и PyQt5/PySide6.""" - peak_calibrate_mode = bool(getattr(args, "calibrate", False)) - peak_search_enabled = bool(getattr(args, "peak_search", False)) - try: - import pyqtgraph as pg - from pyqtgraph.Qt import QtCore, QtWidgets # type: ignore - except Exception as e: - raise RuntimeError( - "pyqtgraph и совместимый Qt backend не найдены. Установите: pip install pyqtgraph PyQt5" - ) from e - - # Очередь завершённых свипов и поток чтения - q: Queue[SweepPacket] = Queue(maxsize=1000) - stop_event = threading.Event() - reader = SweepReader( - args.port, - args.baud, - q, - stop_event, - fancy=bool(args.fancy), - bin_mode=bool(args.bin_mode), - logscale=bool(args.logscale), - parser_16_bit_x2=bool(args.parser_16_bit_x2), - parser_test=bool(args.parser_test), - ) - reader.start() - - # Настройки скорости - max_sweeps = int(max(10, args.max_sweeps)) - max_fps = max(1.0, float(args.max_fps)) - interval_ms = int(1000.0 / max_fps) - - # PyQtGraph настройки - pg.setConfigOptions( - useOpenGL=not peak_calibrate_mode, - antialias=False, - imageAxisOrder="row-major", - ) - app = QtWidgets.QApplication.instance() - if app is None: - app = QtWidgets.QApplication([]) - try: - app.setApplicationName(str(args.title)) - except Exception: - pass - try: - app.setQuitOnLastWindowClosed(True) - except Exception: - pass - main_window = QtWidgets.QWidget() - try: - main_window.setWindowTitle(str(args.title)) - except Exception: - pass - main_layout = QtWidgets.QHBoxLayout(main_window) - main_layout.setContentsMargins(6, 6, 6, 6) - main_layout.setSpacing(6) - win = pg.GraphicsLayoutWidget(show=False, title=args.title) - main_layout.addWidget(win) - settings_widget = QtWidgets.QWidget() - settings_layout = QtWidgets.QVBoxLayout(settings_widget) - settings_layout.setContentsMargins(6, 6, 6, 6) - settings_layout.setSpacing(8) - try: - settings_widget.setMinimumWidth(130) - settings_widget.setMaximumWidth(150) - except Exception: - pass - main_layout.addWidget(settings_widget) - - # Плот последнего свипа (слева-сверху) - p_line = win.addPlot(row=0, col=0, title="Сырые данные") - p_line.showGrid(x=True, y=True, alpha=0.3) - curve_avg1 = p_line.plot(pen=pg.mkPen((170, 170, 170), width=1)) - curve_avg2 = p_line.plot(pen=pg.mkPen((110, 110, 110), width=1)) - curve = p_line.plot(pen=pg.mkPen((80, 120, 255), width=1)) - curve_calib = p_line.plot(pen=pg.mkPen((220, 60, 60), width=1)) - curve_norm = p_line.plot(pen=pg.mkPen((60, 180, 90), width=1)) - p_line.setLabel("bottom", "ГГц") - p_line.setLabel("left", "Y") - ch_text = pg.TextItem("", anchor=(1, 1)) - ch_text.setZValue(10) - p_line.addItem(ch_text) - - # Водопад (справа-сверху) - p_img = win.addPlot(row=0, col=1, title="Сырые данные водопад") - p_img.invertY(False) - p_img.showGrid(x=False, y=False) - p_img.setLabel("bottom", "Время, с (новое справа)") - try: - p_img.getAxis("bottom").setStyle(showValues=False) - except Exception: - pass - p_img.setLabel("left", "ГГц") - img = pg.ImageItem() - p_img.addItem(img) - - # FFT (слева-снизу) - p_fft = win.addPlot(row=1, col=0, title="FFT") - p_fft.showGrid(x=True, y=True, alpha=0.3) - curve_fft = p_fft.plot(pen=pg.mkPen((255, 120, 80), width=1)) - curve_fft_ref = p_fft.plot(pen=pg.mkPen((255, 0, 0), width=1)) - peak_pen = pg.mkPen((255, 0, 0), width=1) - fft_peak_boxes = [p_fft.plot(pen=peak_pen) for _ in range(3)] - fft_bg_line = pg.InfiniteLine(angle=0, movable=False, pen=peak_pen) - fft_left_line = pg.InfiniteLine(angle=90, movable=False, pen=peak_pen) - fft_right_line = pg.InfiniteLine(angle=90, movable=False, pen=peak_pen) - curve_fft_ref.setVisible(False) - for box in fft_peak_boxes: - box.setVisible(False) - p_fft.addItem(fft_bg_line) - p_fft.addItem(fft_left_line) - p_fft.addItem(fft_right_line) - fft_bg_line.setVisible(False) - fft_left_line.setVisible(False) - fft_right_line.setVisible(False) - p_fft.setLabel("bottom", "Расстояние, м") - p_fft.setLabel("left", "дБ") - - # Водопад спектров (справа-снизу) - p_spec = win.addPlot(row=1, col=1, title="B-scan (дБ)") - p_spec.invertY(False) - p_spec.showGrid(x=False, y=False) - p_spec.setLabel("bottom", "Время, с (новое справа)") - try: - p_spec.getAxis("bottom").setStyle(showValues=False) - except Exception: - pass - p_spec.setLabel("left", "Расстояние, м") - img_fft = pg.ImageItem() - p_spec.addItem(img_fft) - spec_left_line = pg.InfiniteLine(angle=0, movable=False, pen=peak_pen) - spec_right_line = pg.InfiniteLine(angle=0, movable=False, pen=peak_pen) - p_spec.addItem(spec_left_line) - p_spec.addItem(spec_right_line) - spec_left_line.setVisible(False) - spec_right_line.setVisible(False) - - # Правая панель настроек внутри основного окна. - calib_cb = QtWidgets.QCheckBox("нормировка") - bg_subtract_cb = QtWidgets.QCheckBox("вычет фона") - peak_search_cb = QtWidgets.QCheckBox("поиск пиков") - try: - settings_title = QtWidgets.QLabel("Настройки") - settings_layout.addWidget(settings_title) - except Exception: - pass - settings_layout.addWidget(calib_cb) - settings_layout.addWidget(bg_subtract_cb) - settings_layout.addWidget(peak_search_cb) - calib_window = None - - # Статусная строка (внизу окна) - status = pg.LabelItem(justify="left") - win.addItem(status, row=3, col=0, colspan=2) - - # Состояние - ring: Optional[np.ndarray] = None - ring_time: Optional[np.ndarray] = None - head = 0 - width: Optional[int] = None - x_shared: Optional[np.ndarray] = None - current_freqs: Optional[np.ndarray] = None - current_distances: Optional[np.ndarray] = None - current_sweep_raw: Optional[np.ndarray] = None - current_aux_curves: SweepAuxCurves = None - current_sweep_norm: Optional[np.ndarray] = None - current_fft_db: Optional[np.ndarray] = None - last_calib_sweep: Optional[np.ndarray] = None - current_info: Optional[SweepInfo] = None - # Авто-уровни цветовой шкалы водопада сырых данных пересчитываются по видимой области. - # Для спектров - fft_bins = FFT_LEN // 2 + 1 - ring_fft: Optional[np.ndarray] = None - distance_shared: Optional[np.ndarray] = None - y_min_fft, y_max_fft = None, None - # Параметры контраста водопада спектров (процентильная обрезка) - spec_clip = _parse_spec_clip(getattr(args, "spec_clip", None)) - spec_mean_sec = float(getattr(args, "spec_mean_sec", 0.0)) - calib_enabled = False - bg_subtract_enabled = False - current_peak_width: Optional[float] = None - current_peak_amplitude: Optional[float] = None - peak_candidates: List[Dict[str, float]] = [] - norm_type = str(getattr(args, "norm_type", "projector")).strip().lower() - c_edits = [] - c_value_labels = [] - peak_group = None - peak_window_edit = None - peak_params_label = None - peak_ref_window = float(getattr(args, "peak_ref_window", 1.0)) - if (not np.isfinite(peak_ref_window)) or peak_ref_window <= 0.0: - peak_ref_window = 1.0 - plot_dirty = False - # Диапазон по Y: авто по умолчанию (поддерживает отрицательные значения) - fixed_ylim: Optional[Tuple[float, float]] = None - if args.ylim: - try: - y0, y1 = args.ylim.split(",") - fixed_ylim = (float(y0), float(y1)) - except Exception: - pass - if fixed_ylim is not None: - p_line.setYRange(fixed_ylim[0], fixed_ylim[1], padding=0) - - def _normalize_sweep(raw: np.ndarray, calib: np.ndarray) -> np.ndarray: - return _normalize_by_calib(raw, calib, norm_type=norm_type) - - def _set_calib_enabled(): - nonlocal calib_enabled, current_sweep_norm, plot_dirty - try: - calib_enabled = bool(calib_cb.isChecked()) - except Exception: - calib_enabled = False - if calib_enabled and current_sweep_raw is not None and last_calib_sweep is not None: - current_sweep_norm = _normalize_sweep(current_sweep_raw, last_calib_sweep) - else: - current_sweep_norm = None - plot_dirty = True - - def _set_bg_subtract_enabled(): - nonlocal bg_subtract_enabled, plot_dirty - try: - bg_subtract_enabled = bool(bg_subtract_cb.isChecked()) - except Exception: - bg_subtract_enabled = False - plot_dirty = True - - try: - calib_cb.stateChanged.connect(lambda _v: _set_calib_enabled()) - except Exception: - pass - try: - bg_subtract_cb.stateChanged.connect(lambda _v: _set_bg_subtract_enabled()) - except Exception: - pass - - def _refresh_peak_params_label(peaks: List[Dict[str, float]]): - if peak_params_label is None: - return - lines = [] - for idx in range(3): - if idx < len(peaks): - p = peaks[idx] - lines.append(f"peak {idx + 1}:") - lines.append(f" X: {p['x']:.4g} m") - lines.append(f" H: {p['height']:.4g} dB") - lines.append(f" W: {p['width']:.4g} m") - else: - lines.append(f"peak {idx + 1}:") - lines.append(" X: - m") - lines.append(" H: - dB") - lines.append(" W: - m") - if idx != 2: - lines.append("") - peak_params_label.setText("\n".join(lines)) - - try: - peak_group = QtWidgets.QGroupBox("Поиск пиков") - peak_layout = QtWidgets.QFormLayout(peak_group) - peak_layout.setContentsMargins(6, 6, 6, 6) - peak_layout.setSpacing(6) - peak_window_edit = QtWidgets.QLineEdit(f"{peak_ref_window:.6g}") - peak_layout.addRow("Окно медианы, ГГц", peak_window_edit) - peak_params_label = QtWidgets.QLabel("") - try: - peak_params_label.setTextInteractionFlags(QtCore.Qt.TextSelectableByMouse) - except Exception: - pass - peak_layout.addRow("Параметры", peak_params_label) - settings_layout.addWidget(peak_group) - - def _apply_peak_window(): - nonlocal peak_ref_window, plot_dirty - if peak_window_edit is None: - return - try: - v = float(peak_window_edit.text().strip()) - if np.isfinite(v) and v > 0.0: - peak_ref_window = v - plot_dirty = True - except Exception: - pass - try: - peak_window_edit.setText(f"{peak_ref_window:.6g}") - except Exception: - pass - - peak_window_edit.editingFinished.connect(_apply_peak_window) - _refresh_peak_params_label([]) - except Exception: - peak_group = None - peak_window_edit = None - peak_params_label = None - - def _set_peak_search_enabled(): - nonlocal peak_search_enabled, plot_dirty, peak_candidates - try: - peak_search_enabled = bool(peak_search_cb.isChecked()) - except Exception: - peak_search_enabled = False - try: - if peak_group is not None: - peak_group.setEnabled(peak_search_enabled) - except Exception: - pass - if not peak_search_enabled: - peak_candidates = [] - _refresh_peak_params_label([]) - plot_dirty = True - - try: - peak_search_cb.setChecked(peak_search_enabled) - except Exception: - pass - try: - peak_search_cb.stateChanged.connect(lambda _v: _set_peak_search_enabled()) - except Exception: - pass - _set_peak_search_enabled() - - if peak_calibrate_mode: - try: - calib_window = QtWidgets.QWidget() - try: - calib_window.setWindowTitle(f"{args.title} freq calibration") - except Exception: - pass - calib_layout = QtWidgets.QFormLayout(calib_window) - calib_layout.setContentsMargins(8, 8, 8, 8) - - def _refresh_c_value_labels(): - for idx, label in enumerate(c_value_labels): - try: - label.setText(f"{float(CALIBRATION_C[idx]):.6g}") - except Exception: - pass - - def _apply_c_value(idx: int, edit): - global CALIBRATION_C, CALIBRATION_C_BASE - try: - CALIBRATION_C_BASE[idx] = float(edit.text().strip()) - CALIBRATION_C = recalculate_calibration_c(CALIBRATION_C_BASE) - except Exception: - try: - edit.setText(f"{float(CALIBRATION_C_BASE[idx]):.6g}") - except Exception: - pass - _refresh_c_value_labels() - - def _apply_all_c_values(): - for idx, edit in enumerate(c_edits): - _apply_c_value(idx, edit) - - for idx in range(3): - edit = QtWidgets.QLineEdit(f"{float(CALIBRATION_C_BASE[idx]):.6g}") - try: - edit.setMaximumWidth(120) - except Exception: - pass - try: - edit.editingFinished.connect(lambda i=idx, e=edit: _apply_c_value(i, e)) - except Exception: - pass - calib_layout.addRow(f"C{idx}", edit) - c_edits.append(edit) - try: - update_btn = QtWidgets.QPushButton("Update") - update_btn.clicked.connect(lambda _checked=False: _apply_all_c_values()) - calib_layout.addRow(update_btn) - except Exception: - pass - try: - calib_layout.addRow(QtWidgets.QLabel("Working C"), QtWidgets.QLabel("")) - except Exception: - pass - for idx in range(3): - label = QtWidgets.QLabel(f"{float(CALIBRATION_C[idx]):.6g}") - calib_layout.addRow(f"C*{idx}", label) - c_value_labels.append(label) - _refresh_c_value_labels() - try: - calib_window.show() - except Exception: - pass - except Exception: - pass - try: - settings_layout.addStretch(1) - except Exception: - pass - - def ensure_buffer(_w: int): - nonlocal ring, ring_time, head, width, x_shared, ring_fft, distance_shared - if ring is not None: - return - width = WF_WIDTH - x_shared = np.linspace(SWEEP_FREQ_MIN_GHZ, SWEEP_FREQ_MAX_GHZ, width, dtype=np.float32) - ring = np.full((max_sweeps, width), np.nan, dtype=np.float32) - ring_time = np.full((max_sweeps,), np.nan, dtype=np.float64) - head = 0 - # Водопад: время по оси X, X по оси Y (ось Y: номинальный диапазон свипа) - img.setImage(ring.T, autoLevels=False) - img.setRect(0, SWEEP_FREQ_MIN_GHZ, max_sweeps, SWEEP_FREQ_MAX_GHZ - SWEEP_FREQ_MIN_GHZ) - p_img.setRange(xRange=(0, max_sweeps - 1), yRange=(SWEEP_FREQ_MIN_GHZ, SWEEP_FREQ_MAX_GHZ), padding=0) - p_line.setXRange(SWEEP_FREQ_MIN_GHZ, SWEEP_FREQ_MAX_GHZ, padding=0) - # FFT: время по оси X, бин по оси Y - ring_fft = np.full((max_sweeps, fft_bins), np.nan, dtype=np.float32) - img_fft.setImage(ring_fft.T, autoLevels=False) - img_fft.setRect(0, 0.0, max_sweeps, 1.0) - p_spec.setRange(xRange=(0, max_sweeps - 1), yRange=(0.0, 1.0), padding=0) - p_fft.setXRange(0.0, 1.0, padding=0) - distance_shared = _compute_distance_axis(current_freqs, fft_bins) - - def _update_physical_axes(): - nonlocal distance_shared - if current_freqs is not None and current_freqs.size > 0: - finite_f = current_freqs[np.isfinite(current_freqs)] - if finite_f.size > 0: - f_min = float(np.min(finite_f)) - f_max = float(np.max(finite_f)) - if f_max <= f_min: - f_max = f_min + 1.0 - img.setRect(0, f_min, max_sweeps, f_max - f_min) - p_img.setRange(xRange=(0, max_sweeps - 1), yRange=(f_min, f_max), padding=0) - - distance_shared = _compute_distance_axis(current_freqs, fft_bins) - if distance_shared.size > 0: - d_min = float(distance_shared[0]) - d_max = float(distance_shared[-1]) if distance_shared.size > 1 else float(distance_shared[0] + 1.0) - if d_max <= d_min: - d_max = d_min + 1.0 - img_fft.setRect(0, d_min, max_sweeps, d_max - d_min) - p_spec.setRange(xRange=(0, max_sweeps - 1), yRange=(d_min, d_max), padding=0) - - def _visible_levels_pyqtgraph(data: np.ndarray) -> Optional[Tuple[float, float]]: - """(vmin, vmax) по центральным 90% значений в видимой области ImageItem.""" - if data.size == 0: - return None - ny, nx = data.shape[0], data.shape[1] - try: - (x0, x1), (y0, y1) = p_img.viewRange() - except Exception: - x0, x1 = 0.0, float(nx - 1) - y0, y1 = 0.0, float(ny - 1) - xmin, xmax = sorted((float(x0), float(x1))) - ymin, ymax = sorted((float(y0), float(y1))) - ix0 = max(0, min(nx - 1, int(np.floor(xmin)))) - ix1 = max(0, min(nx - 1, int(np.ceil(xmax)))) - iy0 = max(0, min(ny - 1, int(np.floor(ymin)))) - iy1 = max(0, min(ny - 1, int(np.ceil(ymax)))) - if ix1 < ix0: - ix1 = ix0 - if iy1 < iy0: - iy1 = iy0 - sub = data[iy0 : iy1 + 1, ix0 : ix1 + 1] - finite = np.isfinite(sub) - if not finite.any(): - return None - vals = sub[finite] - vmin = float(np.nanpercentile(vals, 5)) - vmax = float(np.nanpercentile(vals, 95)) - if not (np.isfinite(vmin) and np.isfinite(vmax)) or vmin == vmax: - return None - return (vmin, vmax) - - def _visible_bg_fft(disp_fft: np.ndarray) -> Optional[np.ndarray]: - """Оценка фона по медиане в текущем видимом по времени окне B-scan.""" - if not bg_subtract_enabled or disp_fft.size == 0: - return None - ny, nx = disp_fft.shape - if ny <= 0 or nx <= 0: - return None - try: - (x0, x1), _ = p_spec.viewRange() - except Exception: - x0, x1 = 0.0, float(nx - 1) - xmin, xmax = sorted((float(x0), float(x1))) - ix0 = max(0, min(nx - 1, int(np.floor(xmin)))) - ix1 = max(0, min(nx - 1, int(np.ceil(xmax)))) - if ix1 < ix0: - ix1 = ix0 - window = disp_fft[:, ix0 : ix1 + 1] - if window.size == 0: - return None - try: - bg_spec = np.nanmedian(window, axis=1) - except Exception: - return None - if not np.any(np.isfinite(bg_spec)): - return None - return np.nan_to_num(bg_spec, nan=0.0).astype(np.float32, copy=False) - - def push_sweep(s: np.ndarray, freqs: Optional[np.ndarray] = None): - nonlocal ring, ring_time, head, ring_fft, y_min_fft, y_max_fft, current_fft_db - if s is None or s.size == 0 or ring is None: - return - w = ring.shape[1] - row = np.full((w,), np.nan, dtype=np.float32) - take = min(w, s.size) - row[:take] = s[:take] - ring[head, :] = row - if ring_time is not None: - ring_time[head] = time.time() - head = (head + 1) % ring.shape[0] - # FFT строка (линейный модуль; перевод в дБ делаем при отображении) - if ring_fft is not None: - bins = ring_fft.shape[1] - fft_mag = _compute_fft_mag_row(s, freqs, bins) - row_idx = (head - 1) % ring_fft.shape[0] - ring_fft[row_idx, :] = fft_mag - fft_row = _fft_mag_to_db(fft_mag) - current_fft_db = fft_row - fr_min = np.nanmin(fft_row) - fr_max = np.nanmax(fft_row) - if y_min_fft is None or (not np.isnan(fr_min) and fr_min < y_min_fft): - y_min_fft = float(fr_min) - if y_max_fft is None or (not np.isnan(fr_max) and fr_max > y_max_fft): - y_max_fft = float(fr_max) - - def drain_queue(): - nonlocal current_freqs, current_distances, current_sweep_raw, current_aux_curves, current_sweep_norm, current_info, last_calib_sweep - drained = 0 - while True: - try: - s, info, aux_curves = q.get_nowait() - except Empty: - break - drained += 1 - calibrated = calibrate_freqs( - { - "F": np.linspace(SWEEP_FREQ_MIN_GHZ, SWEEP_FREQ_MAX_GHZ, s.size, dtype=np.float64), - "I": s, - } - ) - current_freqs = calibrated["F"] - current_distances = _compute_distance_axis(current_freqs, fft_bins) - s = calibrated["I"] - current_sweep_raw = s - current_aux_curves = aux_curves - current_info = info - ch = 0 - try: - ch = int(info.get("ch", 0)) if isinstance(info, dict) else 0 - except Exception: - ch = 0 - if ch == 0: - last_calib_sweep = s - current_sweep_norm = None - sweep_for_proc = s - else: - if calib_enabled and last_calib_sweep is not None: - current_sweep_norm = _normalize_sweep(s, last_calib_sweep) - sweep_for_proc = current_sweep_norm - else: - current_sweep_norm = None - sweep_for_proc = s - ensure_buffer(s.size) - push_sweep(sweep_for_proc, current_freqs) - if drained > 0: - _update_physical_axes() - return drained - - # Попытка применить LUT из колормэпа (если доступен) - try: - cm_mod = getattr(pg, "colormap", None) - if cm_mod is not None: - cm = cm_mod.get(args.cmap) - lut = cm.getLookupTable(0.0, 1.0, 256) - img.setLookupTable(lut) - img_fft.setLookupTable(lut) - except Exception: - pass - - def update(): - nonlocal current_peak_width, current_peak_amplitude, plot_dirty, current_fft_db, peak_candidates - if peak_calibrate_mode and any(edit.hasFocus() for edit in c_edits): - return - if peak_search_enabled and peak_window_edit is not None and peak_window_edit.hasFocus(): - return - changed = drain_queue() > 0 - redraw_needed = changed or plot_dirty - if redraw_needed and current_sweep_raw is not None and x_shared is not None: - if current_freqs is not None and current_freqs.size == current_sweep_raw.size: - xs = current_freqs - elif current_sweep_raw.size <= x_shared.size: - xs = x_shared[: current_sweep_raw.size] - else: - xs = np.arange(current_sweep_raw.size) - curve.setData(xs, current_sweep_raw, autoDownsample=True) - if current_aux_curves is not None: - avg_1_curve, avg_2_curve = current_aux_curves - curve_avg1.setData(xs[: avg_1_curve.size], avg_1_curve, autoDownsample=True) - curve_avg2.setData(xs[: avg_2_curve.size], avg_2_curve, autoDownsample=True) - else: - curve_avg1.setData([], []) - curve_avg2.setData([], []) - if last_calib_sweep is not None: - curve_calib.setData(xs[: last_calib_sweep.size], last_calib_sweep, autoDownsample=True) - else: - curve_calib.setData([], []) - if current_sweep_norm is not None: - curve_norm.setData(xs[: current_sweep_norm.size], current_sweep_norm, autoDownsample=True) - else: - curve_norm.setData([], []) - if fixed_ylim is None: - y_series = [current_sweep_raw, last_calib_sweep, current_sweep_norm] - if current_aux_curves is not None: - y_series.extend(current_aux_curves) - y_limits = _compute_auto_ylim(*y_series) - if y_limits is not None: - p_line.setYRange(y_limits[0], y_limits[1], padding=0) - if isinstance(xs, np.ndarray) and xs.size > 0: - finite_x = xs[np.isfinite(xs)] - if finite_x.size > 0: - p_line.setXRange(float(np.min(finite_x)), float(np.max(finite_x)), padding=0) - - # Обновим спектр - sweep_for_fft = current_sweep_norm if current_sweep_norm is not None else current_sweep_raw - if sweep_for_fft.size > 0 and distance_shared is not None: - if ( - current_fft_db is None - or current_fft_db.size != distance_shared.size - or plot_dirty - ): - current_fft_db = _compute_fft_row(sweep_for_fft, current_freqs, distance_shared.size) - fft_vals = current_fft_db - xs_fft = current_distances if current_distances is not None else distance_shared - if fft_vals.size > xs_fft.size: - fft_vals = fft_vals[: xs_fft.size] - xs_fft = xs_fft[: fft_vals.size] - curve_fft.setData(xs_fft, fft_vals) - finite_x = xs_fft[np.isfinite(xs_fft)] - if finite_x.size > 0: - p_fft.setXRange(float(np.min(finite_x)), float(np.max(finite_x)), padding=0) - - finite_fft = np.isfinite(xs_fft) & np.isfinite(fft_vals) - y_for_range = fft_vals[finite_fft] - if peak_search_enabled: - fft_ref = _rolling_median_ref(xs_fft, fft_vals, peak_ref_window) - finite_ref = np.isfinite(xs_fft) & np.isfinite(fft_ref) - if np.any(finite_ref): - curve_fft_ref.setData(xs_fft[finite_ref], fft_ref[finite_ref]) - curve_fft_ref.setVisible(True) - y_for_range = np.concatenate((y_for_range, fft_ref[finite_ref])) - else: - curve_fft_ref.setVisible(False) - peak_candidates = _find_top_peaks_over_ref(xs_fft, fft_vals, fft_ref, top_n=3) - _refresh_peak_params_label(peak_candidates) - for idx, box in enumerate(fft_peak_boxes): - if idx < len(peak_candidates): - p = peak_candidates[idx] - box.setData( - [p["left"], p["left"], p["right"], p["right"], p["left"]], - [p["ref"], p["peak_y"], p["peak_y"], p["ref"], p["ref"]], - ) - box.setVisible(True) - else: - box.setVisible(False) - else: - peak_candidates = [] - _refresh_peak_params_label([]) - curve_fft_ref.setVisible(False) - for box in fft_peak_boxes: - box.setVisible(False) - - finite_y = y_for_range[np.isfinite(y_for_range)] - if finite_y.size > 0: - y0 = float(np.min(finite_y)) - y1 = float(np.max(finite_y)) - if y1 <= y0: - y1 = y0 + 1e-3 - p_fft.setYRange(y0, y1, padding=0) - if peak_calibrate_mode: - markers = _find_peak_width_markers(xs_fft, fft_vals) - if markers is not None: - fft_bg_line.setValue(markers["background"]) - fft_left_line.setValue(markers["left"]) - fft_right_line.setValue(markers["right"]) - spec_left_line.setValue(markers["left"]) - spec_right_line.setValue(markers["right"]) - fft_bg_line.setVisible(True) - fft_left_line.setVisible(True) - fft_right_line.setVisible(True) - spec_left_line.setVisible(True) - spec_right_line.setVisible(True) - current_peak_width = markers["width"] - current_peak_amplitude = markers["amplitude"] - else: - fft_bg_line.setVisible(False) - fft_left_line.setVisible(False) - fft_right_line.setVisible(False) - spec_left_line.setVisible(False) - spec_right_line.setVisible(False) - current_peak_width = None - current_peak_amplitude = None - else: - fft_bg_line.setVisible(False) - fft_left_line.setVisible(False) - fft_right_line.setVisible(False) - spec_left_line.setVisible(False) - spec_right_line.setVisible(False) - current_peak_width = None - current_peak_amplitude = None - else: - curve_fft_ref.setVisible(False) - for box in fft_peak_boxes: - box.setVisible(False) - peak_candidates = [] - _refresh_peak_params_label([]) - plot_dirty = False - - if changed and ring is not None: - disp = ring if head == 0 else np.roll(ring, -head, axis=0) - disp = disp.T # (width, time with newest at right) - levels = _visible_levels_pyqtgraph(disp) - if levels is not None: - img.setImage(disp, autoLevels=False, levels=levels) - else: - img.setImage(disp, autoLevels=False) - - if changed and current_info: - try: - status_payload = dict(current_info) - if peak_calibrate_mode and current_peak_width is not None: - status_payload["peak_w"] = current_peak_width - if peak_calibrate_mode and current_peak_amplitude is not None: - status_payload["peak_a"] = current_peak_amplitude - status.setText(_format_status_kv(status_payload)) - except Exception: - pass - try: - chs = current_info.get("chs") if isinstance(current_info, dict) else None - if chs is None: - chs = current_info.get("ch") if isinstance(current_info, dict) else None - if chs is None: - ch_text.setText("") - else: - if isinstance(chs, (list, tuple, set)): - ch_list = sorted(int(v) for v in chs) - ch_text_val = ", ".join(str(v) for v in ch_list) - else: - ch_text_val = str(int(chs)) - ch_text.setText(f"chs {ch_text_val}") - (x0, x1), (y0, y1) = p_line.viewRange() - dx = 0.01 * max(1.0, float(x1 - x0)) - dy = 0.01 * max(1.0, float(y1 - y0)) - ch_text.setPos(float(x1 - dx), float(y1 - dy)) - except Exception: - pass - - if changed and ring_fft is not None: - disp_fft_lin = ring_fft if head == 0 else np.roll(ring_fft, -head, axis=0) - disp_fft_lin = disp_fft_lin.T - if spec_mean_sec > 0.0 and ring_time is not None: - disp_times = ring_time if head == 0 else np.roll(ring_time, -head) - now_t = time.time() - mask = np.isfinite(disp_times) & (disp_times >= (now_t - spec_mean_sec)) - if np.any(mask): - try: - mean_spec = np.nanmean(disp_fft_lin[:, mask], axis=1) - mean_spec = np.nan_to_num(mean_spec, nan=0.0) - disp_fft_lin = disp_fft_lin - mean_spec[:, None] - except Exception: - pass - bg_spec = _visible_bg_fft(disp_fft_lin) - if bg_spec is not None: - num = np.maximum(disp_fft_lin, 0.0).astype(np.float32, copy=False) + 1e-9 - den = bg_spec[:, None] + 1e-9 - disp_fft = (20.0 * np.log10(num / den)).astype(np.float32, copy=False) - else: - disp_fft = _fft_mag_to_db(disp_fft_lin) - # Автодиапазон по среднему спектру за видимый интервал (как в хорошей версии) - levels = None - if bg_spec is not None: - try: - p5 = float(np.nanpercentile(disp_fft, 5)) - p95 = float(np.nanpercentile(disp_fft, 95)) - span = max(abs(p5), abs(p95)) - if np.isfinite(span) and span > 0.0: - levels = (-span, span) - except Exception: - levels = None - else: - try: - mean_spec = np.nanmean(disp_fft, axis=1) - vmin_v = float(np.nanmin(mean_spec)) - vmax_v = float(np.nanmax(mean_spec)) - if np.isfinite(vmin_v) and np.isfinite(vmax_v) and vmin_v != vmax_v: - levels = (vmin_v, vmax_v) - except Exception: - levels = None - # Процентильная обрезка как запасной вариант - if levels is None and spec_clip is not None: - try: - vmin_v = float(np.nanpercentile(disp_fft, spec_clip[0])) - vmax_v = float(np.nanpercentile(disp_fft, spec_clip[1])) - if np.isfinite(vmin_v) and np.isfinite(vmax_v) and vmin_v != vmax_v: - levels = (vmin_v, vmax_v) - except Exception: - levels = None - # Ещё один фолбэк — глобальные накопленные мин/макс - if levels is None and y_min_fft is not None and y_max_fft is not None and np.isfinite(y_min_fft) and np.isfinite(y_max_fft) and y_min_fft != y_max_fft: - levels = (y_min_fft, y_max_fft) - if levels is not None: - img_fft.setImage(disp_fft, autoLevels=False, levels=levels) - else: - img_fft.setImage(disp_fft, autoLevels=False) - - timer = pg.QtCore.QTimer() - timer.timeout.connect(update) - timer.start(interval_ms) - sigint_requested = threading.Event() - sigint_timer = pg.QtCore.QTimer() - sigint_timer.setInterval(50) - sigint_timer.timeout.connect(lambda: app.quit() if sigint_requested.is_set() else None) - sigint_timer.start() - cleanup_done = False - - def on_quit(): - nonlocal cleanup_done - if cleanup_done: - return - cleanup_done = True - try: - timer.stop() - except Exception: - pass - try: - sigint_timer.stop() - except Exception: - pass - stop_event.set() - reader.join(timeout=1.0) - try: - main_window.close() - except Exception: - pass - if calib_window is not None: - try: - calib_window.close() - except Exception: - pass - - def _handle_sigint(_signum, _frame): - sigint_requested.set() - - prev_sigint = signal.getsignal(signal.SIGINT) - try: - signal.signal(signal.SIGINT, _handle_sigint) - except Exception: - prev_sigint = None - - orig_close_event = getattr(main_window, "closeEvent", None) - - def _close_event(event): - try: - if callable(orig_close_event): - orig_close_event(event) - else: - event.accept() - except Exception: - try: - event.accept() - except Exception: - pass - try: - app.quit() - except Exception: - pass - - try: - main_window.closeEvent = _close_event # type: ignore[method-assign] - except Exception: - pass - - app.aboutToQuit.connect(on_quit) - try: - main_window.resize(1200, 680) - except Exception: - pass - main_window.show() - exec_fn = getattr(app, "exec_", None) or getattr(app, "exec", None) - try: - exec_fn() - finally: - on_quit() - if prev_sigint is not None: - try: - signal.signal(signal.SIGINT, prev_sigint) - except Exception: - pass +from rfg_adc_plotter.main import main if __name__ == "__main__": - main() + raise SystemExit(main())