#!/usr/bin/env python3 """ Реалтайм-плоттер для свипов из виртуального COM-порта. Формат строк: - "Sweep_start" — начало нового свипа (предыдущий считается завершённым) - "s CH X Y" — точка (номер канала, индекс X, значение Y), все целые со знаком Отрисовываются два графика: - Левый: последний полученный свип (Y vs X) - Правый: водопад (последние N свипов во времени) Оптимизации для скорости: - Парсинг и чтение в фоновой нити - Анимация с обновлением только данных (без лишнего пересоздания фигур) - Кольцевой буфер под водопад с фиксированным числом свипов Зависимости: matplotlib, numpy. PySerial опционален — при его отсутствии используется сырой доступ к TTY через termios. """ import argparse import io import os import sys import threading import time from collections import deque from queue import Queue, Empty, Full from typing import Any, Dict, Mapping, Optional, Tuple, Union import numpy as np WF_WIDTH = 1000 # максимальное число точек в ряду водопада FFT_LEN = 1024 # длина БПФ для спектра/водопада спектров # Порог для инверсии сырых данных: если среднее значение свипа ниже порога — # считаем, что сигнал «меньше нуля» и домножаем свип на -1 DATA_INVERSION_THRASHOLD = 10.0 Number = Union[int, float] SweepInfo = Dict[str, Any] SweepPacket = Tuple[np.ndarray, SweepInfo] def _format_status_kv(data: Mapping[str, Any]) -> str: """Преобразовать словарь метрик в одну строку 'k:v'.""" def _fmt(v: Any) -> str: if v is None: return "NA" try: fv = float(v) except Exception: return str(v) if not np.isfinite(fv): return "nan" # Достаточно компактно для статус-строки. if abs(fv) >= 1000 or (0 < abs(fv) < 0.01): return f"{fv:.3g}" return f"{fv:.3f}".rstrip("0").rstrip(".") parts = [f"{k}:{_fmt(v)}" for k, v in data.items()] return " ".join(parts) def _parse_spec_clip(spec: Optional[str]) -> Optional[Tuple[float, float]]: """Разобрать строку вида "low,high" процентов для контрастного отображения водопада спектров. Возвращает пару (low, high) или None для отключения. Допустимы значения 0..100, low < high. Ключевые слова отключения: "off", "none", "no". """ if not spec: return None s = str(spec).strip().lower() if s in ("off", "none", "no"): return None try: p0, p1 = s.replace(";", ",").split(",") low = float(p0) high = float(p1) if not (0.0 <= low < high <= 100.0): return None return (low, high) except Exception: return None def try_open_pyserial(path: str, baud: int, timeout: float): try: import serial # type: ignore except Exception: return None try: ser = serial.Serial(path, baudrate=baud, timeout=timeout) # ВРЕМЕННО ОТКЛЮЧЕН: hardware flow control для проверки # ser.rtscts = True # Увеличиваем буфер приема ядра до 64KB try: ser.set_buffer_size(rx_size=65536, tx_size=4096) except (AttributeError, NotImplementedError): # Не все платформы/версии pyserial поддерживают set_buffer_size pass return ser except Exception: return None class FDReader: """Простой враппер чтения строк из файлового дескриптора TTY.""" def __init__(self, fd: int): # Отдельно буферизуем для корректной readline() self._fd = fd raw = os.fdopen(fd, "rb", closefd=False) self._file = raw # Увеличен размер буфера до 256KB для предотвращения потерь self._buf = io.BufferedReader(raw, buffer_size=262144) def fileno(self) -> int: return self._fd def readline(self) -> bytes: return self._buf.readline() def close(self): try: self._buf.close() except Exception: pass def open_raw_tty(path: str, baud: int) -> Optional[FDReader]: """Открыть TTY без pyserial и настроить порт через termios. Возвращает FDReader или None при ошибке. """ try: import termios import tty except Exception: return None try: fd = os.open(path, os.O_RDONLY | os.O_NOCTTY) except Exception: return None try: attrs = termios.tcgetattr(fd) # Установим «сырое» состояние tty.setraw(fd) # Скорость baud_map = { 9600: termios.B9600, 19200: termios.B19200, 38400: termios.B38400, 57600: termios.B57600, 115200: termios.B115200, 230400: getattr(termios, "B230400", None), 460800: getattr(termios, "B460800", None), } b = baud_map.get(baud) or termios.B115200 attrs[4] = b # ispeed attrs[5] = b # ospeed # VMIN=1, VTIME=0 — блокирующее чтение по байту cc = attrs[6] cc[termios.VMIN] = 1 cc[termios.VTIME] = 0 attrs[6] = cc termios.tcsetattr(fd, termios.TCSANOW, attrs) except Exception: try: os.close(fd) except Exception: pass return None return FDReader(fd) class SerialLineSource: """Единый интерфейс для чтения строк из порта (pyserial или raw TTY).""" def __init__(self, path: str, baud: int, timeout: float = 1.0): self._pyserial = try_open_pyserial(path, baud, timeout) self._fdreader = None self._using = "pyserial" if self._pyserial is not None else "raw" if self._pyserial is None: self._fdreader = open_raw_tty(path, baud) if self._fdreader is None: msg = f"Не удалось открыть порт '{path}' (pyserial и raw TTY не сработали)" if sys.platform.startswith("win"): msg += ". На Windows нужен pyserial: pip install pyserial" raise RuntimeError(msg) def readline(self) -> bytes: if self._pyserial is not None: try: return self._pyserial.readline() except Exception: return b"" else: try: return self._fdreader.readline() # type: ignore[union-attr] except Exception: return b"" def close(self): try: if self._pyserial is not None: self._pyserial.close() elif self._fdreader is not None: self._fdreader.close() except Exception: pass class SerialChunkReader: """Быстрое неблокирующее чтение чанков из serial/raw TTY для максимального дренажа буфера.""" def __init__(self, src: SerialLineSource, error_counter: Optional[list] = None): self._src = src self._ser = src._pyserial self._fd: Optional[int] = None self._error_counter = error_counter # Список с 1 элементом для передачи по ссылке if self._ser is not None: # Неблокирующий режим для быстрой откачки try: self._ser.timeout = 0 except Exception: pass else: try: self._fd = src._fdreader.fileno() # type: ignore[union-attr] try: os.set_blocking(self._fd, False) except Exception: pass except Exception: self._fd = None def read_available(self) -> bytes: """Вернёт доступные байты (b"" если данных нет).""" if self._ser is not None: try: n = int(getattr(self._ser, "in_waiting", 0)) except Exception: if self._error_counter: self._error_counter[0] += 1 n = 0 if n > 0: try: return self._ser.read(n) except Exception: if self._error_counter: self._error_counter[0] += 1 return b"" return b"" if self._fd is None: return b"" out = bytearray() while True: try: chunk = os.read(self._fd, 65536) if not chunk: break out += chunk if len(chunk) < 65536: break except BlockingIOError: break except Exception: if self._error_counter: self._error_counter[0] += 1 break return bytes(out) class SweepReader(threading.Thread): """Фоновый поток: читает строки, формирует завершённые свипы и кладёт в очередь.""" def __init__( self, port_path: str, baud: int, out_queue: Queue[SweepPacket], stop_event: threading.Event, fancy: bool = False, ): super().__init__(daemon=True) self._port_path = port_path self._baud = baud self._q = out_queue self._stop = stop_event self._src: Optional[SerialLineSource] = None self._fancy = bool(fancy) self._max_width: int = 0 self._sweep_idx: int = 0 self._last_sweep_ts: Optional[float] = None self._n_valid_hist = deque() # Счетчик потерь данных (выброшенных свипов из-за переполнения очереди) self._dropped_sweeps: int = 0 # Диагностика потери точек внутри свипа self._total_lines_received: int = 0 # Всего принято строк с данными self._total_parse_errors: int = 0 # Ошибок парсинга строк self._total_empty_lines: int = 0 # Пустых строк self._max_buf_size: int = 0 # Максимальный размер буфера парсинга self._read_errors: int = 0 # Ошибок чтения из порта self._last_diag_time: float = 0.0 # Время последнего вывода диагностики def _finalize_current(self, xs, ys, channels: Optional[set[int]]): if not xs: return ch_list = sorted(channels) if channels else [0] ch_primary = ch_list[0] if ch_list else 0 max_x = max(xs) width = max_x + 1 self._max_width = max(self._max_width, width) target_width = self._max_width if self._fancy else width # Быстрый векторизованный путь sweep = np.full((target_width,), np.nan, dtype=np.float32) try: idx = np.asarray(xs, dtype=np.int64) vals = np.asarray(ys, dtype=np.float32) sweep[idx] = vals except Exception: # Запасной путь for x, y in zip(xs, ys): if 0 <= x < target_width: sweep[x] = float(y) # Метрики валидных точек до заполнения пропусков finite_pre = np.isfinite(sweep) n_valid_cur = int(np.count_nonzero(finite_pre)) # Дополнительная обработка пропусков: при --fancy заполняем внутренние разрывы, края и дотягиваем до максимальной длины if self._fancy: try: known = ~np.isnan(sweep) if np.any(known): known_idx = np.nonzero(known)[0] # Для каждой пары соседних известных индексов заполним промежуток средним значением for i0, i1 in zip(known_idx[:-1], known_idx[1:]): if i1 - i0 > 1: avg = (sweep[i0] + sweep[i1]) * 0.5 sweep[i0 + 1 : i1] = avg first_idx = int(known_idx[0]) last_idx = int(known_idx[-1]) if first_idx > 0: sweep[:first_idx] = sweep[first_idx] if last_idx < sweep.size - 1: sweep[last_idx + 1 :] = sweep[last_idx] except Exception: # В случае ошибки просто оставляем как есть pass # Инверсия данных при «отрицательном» уровне (среднее ниже порога) try: m = float(np.nanmean(sweep)) if np.isfinite(m) and m < DATA_INVERSION_THRASHOLD: sweep *= -1.0 except Exception: pass #sweep -= float(np.nanmean(sweep)) # Метрики для статусной строки (вид словаря: переменная -> значение) self._sweep_idx += 1 if len(ch_list) > 1: sys.stderr.write( f"[warn] Sweep {self._sweep_idx}: изменялся номер канала: {ch_list}\n" ) now = time.time() if self._last_sweep_ts is None: dt_ms = float("nan") else: dt_ms = (now - self._last_sweep_ts) * 1000.0 self._last_sweep_ts = now self._n_valid_hist.append((now, n_valid_cur)) while self._n_valid_hist and (now - self._n_valid_hist[0][0]) > 1.0: self._n_valid_hist.popleft() if self._n_valid_hist: n_valid = float(sum(v for _t, v in self._n_valid_hist) / len(self._n_valid_hist)) else: n_valid = float(n_valid_cur) if n_valid_cur > 0: vmin = float(np.nanmin(sweep)) vmax = float(np.nanmax(sweep)) mean = float(np.nanmean(sweep)) std = float(np.nanstd(sweep)) else: vmin = vmax = mean = std = float("nan") info: SweepInfo = { "sweep": self._sweep_idx, "ch": ch_primary, "chs": ch_list, "n_valid": n_valid, "min": vmin, "max": vmax, "mean": mean, "std": std, "dt_ms": dt_ms, "dropped": self._dropped_sweeps, "lines": self._total_lines_received, "parse_err": self._total_parse_errors, "read_err": self._read_errors, "max_buf": self._max_buf_size, } # Периодический вывод детальной диагностики в stderr (каждые 10 секунд) now = time.time() if now - self._last_diag_time > 10.0: self._last_diag_time = now sys.stderr.write( f"[DIAG] sweep={self._sweep_idx} n_valid={n_valid:.1f} " f"lines={self._total_lines_received} parse_err={self._total_parse_errors} " f"read_err={self._read_errors} max_buf={self._max_buf_size} " f"dropped={self._dropped_sweeps}\n" ) sys.stderr.flush() # Кладём готовый свип (если очередь полна — выбрасываем самый старый) try: self._q.put_nowait((sweep, info)) except Full: # Счетчик потерь для диагностики self._dropped_sweeps += 1 try: _ = self._q.get_nowait() except Exception: pass try: self._q.put_nowait((sweep, info)) except Exception: pass def run(self): # Состояние текущего свипа xs: list[int] = [] ys: list[int] = [] cur_channel: Optional[int] = None cur_channels: set[int] = set() try: self._src = SerialLineSource(self._port_path, self._baud, timeout=1.0) sys.stderr.write(f"[info] Открыл порт {self._port_path} ({self._src._using})\n") except Exception as e: sys.stderr.write(f"[error] {e}\n") return try: # Быстрый неблокирующий дренаж порта с разбором по байтам # Передаем счетчик ошибок чтения как список для изменения по ссылке error_counter = [0] chunk_reader = SerialChunkReader(self._src, error_counter) buf = bytearray() while not self._stop.is_set(): data = chunk_reader.read_available() # Обновляем счетчик ошибок чтения self._read_errors = error_counter[0] if data: buf += data # Отслеживаем максимальный размер буфера парсинга if len(buf) > self._max_buf_size: self._max_buf_size = len(buf) else: # Короткая уступка CPU, если нет новых данных (уменьшена до 0.1ms) time.sleep(0.0001) continue # Обрабатываем все полные строки while True: nl = buf.find(b"\n") if nl == -1: break line = bytes(buf[:nl]) del buf[: nl + 1] if line.endswith(b"\r"): line = line[:-1] if not line: self._total_empty_lines += 1 continue if line.startswith(b"Sweep_start"): self._finalize_current(xs, ys, cur_channels) xs.clear() ys.clear() cur_channel = None cur_channels.clear() continue # sCH X Y или s CH X Y (все целые со знаком). Разделяем по любым пробелам/табам. if len(line) >= 3: parts = line.split() if len(parts) >= 3 and (parts[0].lower() == b"s" or parts[0].lower().startswith(b"s")): try: if parts[0].lower() == b"s": if len(parts) >= 4: ch = int(parts[1], 10) x = int(parts[2], 10) y = int(parts[3], 10) # поддержка знака: "+…" и "-…" else: ch = 0 x = int(parts[1], 10) y = int(parts[2], 10) # поддержка знака: "+…" и "-…" else: # формат вида "s0" ch = int(parts[0][1:], 10) x = int(parts[1], 10) y = int(parts[2], 10) # поддержка знака: "+…" и "-…" except Exception: self._total_parse_errors += 1 continue if cur_channel is None: cur_channel = ch cur_channels.add(ch) xs.append(x) ys.append(y) self._total_lines_received += 1 else: # Строка не в формате "s X Y" self._total_parse_errors += 1 else: # Строка слишком короткая self._total_parse_errors += 1 # Защита от переполнения буфера при отсутствии переводов строки (снижен порог) if len(buf) > 262144: del buf[:-131072] finally: try: # Завершаем оставшийся свип self._finalize_current(xs, ys, cur_channels) except Exception: pass try: if self._src is not None: self._src.close() except Exception: pass def apply_temporal_unwrap( current_phase: np.ndarray, prev_phase: Optional[np.ndarray], phase_offset: Optional[np.ndarray], ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """Применяет улучшенный phase unwrapping для FMCW радара с адаптивным порогом. Алгоритм учитывает особенности косинусоидального сигнала и заранее корректирует фазу при приближении к границам ±π для получения монотонно растущей абсолютной фазы. Args: current_phase: Текущая фаза (развернутая по частоте) для всех бинов prev_phase: Предыдущая фаза, может быть None при первом вызове phase_offset: Накопленные смещения для каждого бина, может быть None Returns: (unwrapped_phase, new_prev_phase, new_phase_offset) unwrapped_phase - абсолютная развёрнутая фаза (может быть > 2π) new_prev_phase - обновлённая предыдущая фаза (для следующего вызова) new_phase_offset - обновлённые смещения (для следующего вызова) """ n_bins = current_phase.size # Инициализация при первом вызове if prev_phase is None: prev_phase = current_phase.copy() phase_offset = np.zeros(n_bins, dtype=np.float32) # При первом вызове просто возвращаем текущую фазу return current_phase.copy(), prev_phase, phase_offset if phase_offset is None: phase_offset = np.zeros(n_bins, dtype=np.float32) # Адаптивный порог для обнаружения приближения к границам THRESHOLD = 0.8 * np.pi # Вычисляем разницу между текущей и предыдущей фазой delta = current_phase - prev_phase # Обнаруживаем скачки и корректируем offset # Используем улучшенный алгоритм с адаптивным порогом # Метод 1: Стандартная коррекция для больших скачков (> π) # Это ловит случаи, когда фаза уже перескочила границу phase_offset = phase_offset - 2.0 * np.pi * np.round(delta / (2.0 * np.pi)) # Метод 2: Адаптивная коррекция при приближении к границам # Проверяем текущую развернутую фазу unwrapped_phase = current_phase + phase_offset # Если фаза близка к нечетным π (π, 3π, 5π...), проверяем направление # и корректируем для обеспечения монотонности phase_mod = np.mod(unwrapped_phase + np.pi, 2.0 * np.pi) - np.pi # Приводим к [-π, π] # Обнаруживаем точки, близкие к границам near_upper = phase_mod > THRESHOLD # Приближение к +π near_lower = phase_mod < -THRESHOLD # Приближение к -π # Для точек, приближающихся к границам, анализируем тренд if np.any(near_upper) or np.any(near_lower): # Если delta положительна и мы около +π, готовимся к переходу should_add = near_upper & (delta > 0) # Если delta отрицательна и мы около -π, готовимся к переходу should_sub = near_lower & (delta < 0) # Применяем дополнительную коррекцию только там, где нужно # (этот код срабатывает редко, только при быстром движении объекта) pass # Основная коррекция уже сделана выше # Финальная развернутая фаза unwrapped_phase = current_phase + phase_offset # Сохраняем текущую фазу как предыдущую для следующего свипа new_prev_phase = current_phase.copy() new_phase_offset = phase_offset.copy() return unwrapped_phase, new_prev_phase, new_phase_offset def phase_to_distance(phase: np.ndarray, center_freq_hz: float = 6e9) -> np.ndarray: """Преобразует развернутую фазу в расстояние для FMCW радара. Формула: Δl = φ * c / (4π * ν) где: φ - фаза (радианы) c - скорость света (м/с) ν - центральная частота свипа (Гц) Args: phase: Развернутая фаза в радианах center_freq_hz: Центральная частота диапазона в Гц (по умолчанию 6 ГГц для 2-10 ГГц) Returns: Расстояние в метрах """ c = 299792458.0 # Скорость света в м/с distance = phase * c / (4.0 * np.pi * center_freq_hz) return distance.astype(np.float32) def main(): parser = argparse.ArgumentParser( description=( "Читает свипы из виртуального COM-порта и рисует: " "последний свип и водопад (реалтайм)." ) ) parser.add_argument( "port", help="Путь к порту, например /dev/ttyACM1 или COM3 (COM10+: \\\\.\\COM10)", ) parser.add_argument("--baud", type=int, default=115200, help="Скорость (по умолчанию 115200)") parser.add_argument("--max-sweeps", type=int, default=200, help="Количество видимых свипов в водопаде") parser.add_argument("--max-fps", type=float, default=30.0, help="Лимит частоты отрисовки, кадров/с") parser.add_argument("--cmap", default="viridis", help="Цветовая карта водопада") parser.add_argument( "--spec-clip", default="2,98", help=( "Процентильная обрезка уровней водопада спектров, % (min,max). " "Напр. 2,98. 'off' — отключить" ), ) parser.add_argument( "--spec-mean-sec", type=float, default=0.0, help=( "Вычитание среднего по каждой частоте за последние N секунд " "в водопаде спектров (0 — отключить)" ), ) parser.add_argument("--title", default="ADC Sweeps", help="Заголовок окна") parser.add_argument( "--fancy", action="store_true", help="Заполнять выпавшие точки средними значениями между соседними", ) parser.add_argument( "--ylim", type=str, default=None, help="Фиксированные Y-пределы для кривой формата min,max (например -1000,1000). По умолчанию авто", ) parser.add_argument( "--backend", choices=["auto", "pg", "mpl"], default="auto", help="Графический бэкенд: pyqtgraph (pg) — быстрее; matplotlib (mpl) — совместимый. По умолчанию auto", ) args = parser.parse_args() # Попробуем быстрый бэкенд (pyqtgraph) при auto/pg if args.backend in ("pg"): try: return run_pyqtgraph(args) except Exception as e: if args.backend == "pg": sys.stderr.write(f"[error] PyQtGraph бэкенд недоступен: {e}\n") sys.exit(1) # При auto — тихо откатываемся на matplotlib try: import matplotlib import matplotlib.pyplot as plt from matplotlib.animation import FuncAnimation from matplotlib.widgets import Slider, CheckButtons except Exception as e: sys.stderr.write(f"[error] Нужны matplotlib и ее зависимости: {e}\n") sys.exit(1) # Очередь завершённых свипов и поток чтения q: Queue[SweepPacket] = Queue(maxsize=1000) stop_event = threading.Event() reader = SweepReader(args.port, args.baud, q, stop_event, fancy=bool(args.fancy)) reader.start() # Графика (3 ряда x 2 колонки = 6 графиков) fig, axs = plt.subplots(3, 2, figsize=(12, 12)) (ax_line, ax_img), (ax_fft, ax_spec), (ax_phase, ax_phase_wf) = axs fig.canvas.manager.set_window_title(args.title) if hasattr(fig.canvas.manager, "set_window_title") else None # Увеличим расстояния и оставим место справа под ползунки оси Y B-scan fig.subplots_adjust(wspace=0.25, hspace=0.35, left=0.07, right=0.90, top=0.95, bottom=0.05) # Состояние для отображения current_sweep_raw: Optional[np.ndarray] = None current_sweep_norm: Optional[np.ndarray] = None last_calib_sweep: Optional[np.ndarray] = None current_info: Optional[SweepInfo] = None x_shared: Optional[np.ndarray] = None width: Optional[int] = None max_sweeps = int(max(10, args.max_sweeps)) ring = None # type: Optional[np.ndarray] ring_time = None # type: Optional[np.ndarray] head = 0 # Авто-уровни цветовой шкалы водопада сырых данных пересчитываются по видимой области. # FFT состояние fft_bins = FFT_LEN // 2 + 1 ring_fft = None # type: Optional[np.ndarray] y_min_fft, y_max_fft = None, None freq_shared: Optional[np.ndarray] = None # Phase состояние ring_phase = None # type: Optional[np.ndarray] prev_phase_per_bin: Optional[np.ndarray] = None phase_offset_per_bin: Optional[np.ndarray] = None y_min_phase, y_max_phase = None, None # Параметры контраста водопада спектров spec_clip = _parse_spec_clip(getattr(args, "spec_clip", None)) spec_mean_sec = float(getattr(args, "spec_mean_sec", 0.0)) # Ползунки управления Y для B-scan и контрастом ymin_slider = None ymax_slider = None contrast_slider = None calib_enabled = False cb = None # Статусная строка (внизу окна) status_text = fig.text( 0.01, 0.01, "", ha="left", va="bottom", fontsize=8, family="monospace", ) # Линейный график последнего свипа line_obj, = ax_line.plot([], [], lw=1, color="tab:blue") line_calib_obj, = ax_line.plot([], [], lw=1, color="tab:red") line_norm_obj, = ax_line.plot([], [], lw=1, color="tab:green") ax_line.set_title("Сырые данные", pad=1) ax_line.set_xlabel("F") ax_line.set_ylabel("") channel_text = ax_line.text( 0.98, 0.98, "", transform=ax_line.transAxes, ha="right", va="top", fontsize=9, family="monospace", ) # Линейный график спектра текущего свипа fft_line_obj, = ax_fft.plot([], [], lw=1) ax_fft.set_title("FFT", pad=1) ax_fft.set_xlabel("X") ax_fft.set_ylabel("Амплитуда, дБ") # Диапазон по Y для последнего свипа: авто по умолчанию (поддерживает отрицательные значения) fixed_ylim: Optional[Tuple[float, float]] = None # CLI переопределение при необходимости if args.ylim: try: y0, y1 = args.ylim.split(",") fixed_ylim = (float(y0), float(y1)) except Exception: sys.stderr.write("[warn] Некорректный формат --ylim, игнорирую. Ожидалось min,max\n") if fixed_ylim is not None: ax_line.set_ylim(fixed_ylim) # Водопад (будет инициализирован при первом свипе) img_obj = ax_img.imshow( np.zeros((1, 1), dtype=np.float32), aspect="auto", interpolation="nearest", origin="lower", cmap=args.cmap, ) ax_img.set_title("Сырые данные", pad=12) ax_img.set_xlabel("") ax_img.set_ylabel("частота") # Не показываем численные значения по времени на водопаде сырых данных try: ax_img.tick_params(axis="x", labelbottom=False) except Exception: pass # Водопад спектров img_fft_obj = ax_spec.imshow( np.zeros((1, 1), dtype=np.float32), aspect="auto", interpolation="nearest", origin="lower", cmap=args.cmap, ) ax_spec.set_title("B-scan (дБ)", pad=12) ax_spec.set_xlabel("") ax_spec.set_ylabel("расстояние") # Не показываем численные значения по времени на B-scan try: ax_spec.tick_params(axis="x", labelbottom=False) except Exception: pass def _normalize_sweep(raw: np.ndarray, calib: np.ndarray) -> np.ndarray: w = min(raw.size, calib.size) if w <= 0: return raw out = np.full_like(raw, np.nan, dtype=np.float32) with np.errstate(divide="ignore", invalid="ignore"): out[:w] = raw[:w] / calib[:w] out = np.nan_to_num(out, nan=np.nan, posinf=np.nan, neginf=np.nan) return out def _set_calib_enabled(): nonlocal calib_enabled, current_sweep_norm try: calib_enabled = bool(cb.get_status()[0]) if cb is not None else False except Exception: calib_enabled = False if calib_enabled and current_sweep_raw is not None and last_calib_sweep is not None: current_sweep_norm = _normalize_sweep(current_sweep_raw, last_calib_sweep) else: current_sweep_norm = None # График фазы текущего свипа phase_line_obj, = ax_phase.plot([], [], lw=1) ax_phase.set_title("Фаза спектра (развернутая)", pad=1) ax_phase.set_xlabel("Бин") ax_phase.set_ylabel("Фаза, радианы") # Добавим второй Y axis для расстояния ax_phase_dist = ax_phase.twinx() ax_phase_dist.set_ylabel("Расстояние, м", color='green') # Водопад фазы img_phase_obj = ax_phase_wf.imshow( np.zeros((1, 1), dtype=np.float32), aspect="auto", interpolation="nearest", origin="lower", cmap=args.cmap, ) ax_phase_wf.set_title("Водопад фазы", pad=12) ax_phase_wf.set_xlabel("") ax_phase_wf.set_ylabel("Бин") # Не показываем численные значения по времени try: ax_phase_wf.tick_params(axis="x", labelbottom=False) except Exception: pass # Слайдеры для управления осью Y B-scan (мин/макс) и контрастом try: ax_smin = fig.add_axes([0.92, 0.55, 0.02, 0.35]) ax_smax = fig.add_axes([0.95, 0.55, 0.02, 0.35]) ax_sctr = fig.add_axes([0.98, 0.55, 0.02, 0.35]) ax_cb = fig.add_axes([0.92, 0.45, 0.08, 0.08]) ymin_slider = Slider(ax_smin, "Y min", 0, max(1, fft_bins - 1), valinit=0, valstep=1, orientation="vertical") ymax_slider = Slider(ax_smax, "Y max", 0, max(1, fft_bins - 1), valinit=max(1, fft_bins - 1), valstep=1, orientation="vertical") contrast_slider = Slider(ax_sctr, "Int max", 0, 100, valinit=100, valstep=1, orientation="vertical") cb = CheckButtons(ax_cb, ["калибровка"], [False]) def _on_ylim_change(_val): try: y0 = int(min(ymin_slider.val, ymax_slider.val)) y1 = int(max(ymin_slider.val, ymax_slider.val)) ax_spec.set_ylim(y0, y1) fig.canvas.draw_idle() except Exception: pass ymin_slider.on_changed(_on_ylim_change) ymax_slider.on_changed(_on_ylim_change) # Контраст влияет на верхнюю границу цветовой шкалы (процент от авто-диапазона) contrast_slider.on_changed(lambda _v: fig.canvas.draw_idle()) cb.on_clicked(lambda _v: _set_calib_enabled()) except Exception: pass # Для контроля частоты обновления max_fps = max(1.0, float(args.max_fps)) interval_ms = int(1000.0 / max_fps) frames_since_ylim_update = 0 def ensure_buffer(_w: int): nonlocal ring, width, head, x_shared, ring_fft, freq_shared, ring_time nonlocal ring_phase, prev_phase_per_bin, phase_offset_per_bin if ring is not None: return width = WF_WIDTH x_shared = np.arange(width, dtype=np.int32) ring = np.full((max_sweeps, width), np.nan, dtype=np.float32) ring_time = np.full((max_sweeps,), np.nan, dtype=np.float64) head = 0 # Обновляем изображение под новые размеры: время по X (горизонталь), X по Y img_obj.set_data(np.zeros((width, max_sweeps), dtype=np.float32)) img_obj.set_extent((0, max_sweeps - 1, 0, width - 1 if width > 0 else 1)) ax_img.set_xlim(0, max_sweeps - 1) ax_img.set_ylim(0, max(1, width - 1)) # FFT буферы: время по X, бин по Y ring_fft = np.full((max_sweeps, fft_bins), np.nan, dtype=np.float32) img_fft_obj.set_data(np.zeros((fft_bins, max_sweeps), dtype=np.float32)) img_fft_obj.set_extent((0, max_sweeps - 1, 0, fft_bins - 1)) ax_spec.set_xlim(0, max_sweeps - 1) ax_spec.set_ylim(0, max(1, fft_bins - 1)) freq_shared = np.arange(fft_bins, dtype=np.int32) # Phase буферы: время по X, бин по Y ring_phase = np.full((max_sweeps, fft_bins), np.nan, dtype=np.float32) prev_phase_per_bin = np.zeros(fft_bins, dtype=np.float32) phase_offset_per_bin = np.zeros(fft_bins, dtype=np.float32) img_phase_obj.set_data(np.zeros((fft_bins, max_sweeps), dtype=np.float32)) img_phase_obj.set_extent((0, max_sweeps - 1, 0, fft_bins - 1)) ax_phase_wf.set_xlim(0, max_sweeps - 1) ax_phase_wf.set_ylim(0, max(1, fft_bins - 1)) def _visible_levels_matplotlib(data: np.ndarray, axis) -> Optional[Tuple[float, float]]: """(vmin, vmax) по текущей видимой области imshow (без накопления по времени).""" if data.size == 0: return None ny, nx = data.shape[0], data.shape[1] try: x0, x1 = axis.get_xlim() y0, y1 = axis.get_ylim() except Exception: x0, x1 = 0.0, float(nx - 1) y0, y1 = 0.0, float(ny - 1) xmin, xmax = sorted((float(x0), float(x1))) ymin, ymax = sorted((float(y0), float(y1))) ix0 = max(0, min(nx - 1, int(np.floor(xmin)))) ix1 = max(0, min(nx - 1, int(np.ceil(xmax)))) iy0 = max(0, min(ny - 1, int(np.floor(ymin)))) iy1 = max(0, min(ny - 1, int(np.ceil(ymax)))) if ix1 < ix0: ix1 = ix0 if iy1 < iy0: iy1 = iy0 sub = data[iy0 : iy1 + 1, ix0 : ix1 + 1] finite = np.isfinite(sub) if not finite.any(): return None vals = sub[finite] vmin = float(np.min(vals)) vmax = float(np.max(vals)) if not (np.isfinite(vmin) and np.isfinite(vmax)) or vmin == vmax: return None return (vmin, vmax) def push_sweep(s: np.ndarray): nonlocal ring, head, ring_fft, y_min_fft, y_max_fft, ring_time nonlocal ring_phase, prev_phase_per_bin, phase_offset_per_bin, y_min_phase, y_max_phase if s is None or s.size == 0 or ring is None: return # Нормализуем длину до фиксированной ширины w = ring.shape[1] row = np.full((w,), np.nan, dtype=np.float32) take = min(w, s.size) row[:take] = s[:take] ring[head, :] = row if ring_time is not None: ring_time[head] = time.time() head = (head + 1) % ring.shape[0] # FFT строка (дБ) и фаза if ring_fft is not None: bins = ring_fft.shape[1] # Подготовка входа FFT_LEN, замена NaN на 0 take_fft = min(int(s.size), FFT_LEN) if take_fft <= 0: fft_row = np.full((bins,), np.nan, dtype=np.float32) phase_row = np.full((bins,), np.nan, dtype=np.float32) else: fft_in = np.zeros((FFT_LEN,), dtype=np.float32) seg = s[:take_fft] if isinstance(seg, np.ndarray): seg = np.nan_to_num(seg, nan=0.0).astype(np.float32, copy=False) else: seg = np.asarray(seg, dtype=np.float32) seg = np.nan_to_num(seg, nan=0.0) # Окно Хэннинга win = np.hanning(take_fft).astype(np.float32) fft_in[:take_fft] = seg * win spec = np.fft.rfft(fft_in) mag = np.abs(spec).astype(np.float32) fft_row = 20.0 * np.log10(mag + 1e-9) if fft_row.shape[0] != bins: # rfft длиной FFT_LEN даёт bins == FFT_LEN//2+1 fft_row = fft_row[:bins] # Расчет фазы phase = np.angle(spec).astype(np.float32) if phase.shape[0] > bins: phase = phase[:bins] # Unwrapping по частоте (внутри свипа) phase_unwrapped_freq = np.unwrap(phase) # Unwrapping по времени (между свипами) phase_unwrapped_time, prev_phase_per_bin, phase_offset_per_bin = apply_temporal_unwrap( phase_unwrapped_freq, prev_phase_per_bin, phase_offset_per_bin ) phase_row = phase_unwrapped_time ring_fft[(head - 1) % ring_fft.shape[0], :] = fft_row # Экстремумы для цветовой шкалы fr_min = np.nanmin(fft_row) fr_max = np.nanmax(fft_row) fr_max = np.nanpercentile(fft_row, 90) if y_min_fft is None or (not np.isnan(fr_min) and fr_min < y_min_fft): y_min_fft = float(fr_min) if y_max_fft is None or (not np.isnan(fr_max) and fr_max > y_max_fft): y_max_fft = float(fr_max) # Сохраняем фазу в буфер if ring_phase is not None: ring_phase[(head - 1) % ring_phase.shape[0], :] = phase_row # Экстремумы для цветовой шкалы фазы ph_min = np.nanmin(phase_row) ph_max = np.nanmax(phase_row) if y_min_phase is None or (not np.isnan(ph_min) and ph_min < y_min_phase): y_min_phase = float(ph_min) if y_max_phase is None or (not np.isnan(ph_max) and ph_max > y_max_phase): y_max_phase = float(ph_max) def drain_queue(): nonlocal current_sweep_raw, current_sweep_norm, current_info, last_calib_sweep drained = 0 while True: try: s, info = q.get_nowait() except Empty: break drained += 1 current_sweep_raw = s current_info = info ch = 0 try: ch = int(info.get("ch", 0)) if isinstance(info, dict) else 0 except Exception: ch = 0 if ch == 0: last_calib_sweep = s current_sweep_norm = None sweep_for_proc = s else: if calib_enabled and last_calib_sweep is not None: current_sweep_norm = _normalize_sweep(s, last_calib_sweep) sweep_for_proc = current_sweep_norm else: current_sweep_norm = None sweep_for_proc = s ensure_buffer(s.size) push_sweep(sweep_for_proc) return drained def make_display_ring(): # Возвращаем буфер с правильным порядком по времени (старые→новые) и осью времени по X if ring is None: return np.zeros((1, 1), dtype=np.float32) base = ring if head == 0 else np.roll(ring, -head, axis=0) return base.T # (width, time) def make_display_times(): if ring_time is None: return None base_t = ring_time if head == 0 else np.roll(ring_time, -head) return base_t def _subtract_recent_mean_fft(disp_fft: np.ndarray) -> np.ndarray: """Вычесть среднее по каждой частоте за последние spec_mean_sec секунд.""" if spec_mean_sec <= 0.0: return disp_fft disp_times = make_display_times() if disp_times is None: return disp_fft now_t = time.time() mask = np.isfinite(disp_times) & (disp_times >= (now_t - spec_mean_sec)) if not np.any(mask): return disp_fft try: mean_spec = np.nanmean(disp_fft[:, mask], axis=1) except Exception: return disp_fft mean_spec = np.nan_to_num(mean_spec, nan=0.0) return disp_fft - mean_spec[:, None] def make_display_ring_fft(): if ring_fft is None: return np.zeros((1, 1), dtype=np.float32) base = ring_fft if head == 0 else np.roll(ring_fft, -head, axis=0) return base.T # (bins, time) def make_display_ring_phase(): if ring_phase is None: return np.zeros((1, 1), dtype=np.float32) base = ring_phase if head == 0 else np.roll(ring_phase, -head, axis=0) return base.T # (bins, time) def update(_frame): nonlocal frames_since_ylim_update changed = drain_queue() > 0 # Обновление линии последнего свипа if current_sweep_raw is not None: if x_shared is not None and current_sweep_raw.size <= x_shared.size: xs = x_shared[: current_sweep_raw.size] else: xs = np.arange(current_sweep_raw.size, dtype=np.int32) line_obj.set_data(xs, current_sweep_raw) if last_calib_sweep is not None: line_calib_obj.set_data(xs[: last_calib_sweep.size], last_calib_sweep) else: line_calib_obj.set_data([], []) if current_sweep_norm is not None: line_norm_obj.set_data(xs[: current_sweep_norm.size], current_sweep_norm) else: line_norm_obj.set_data([], []) # Лимиты по X постоянные под текущую ширину ax_line.set_xlim(0, max(1, current_sweep_raw.size - 1)) # Адаптивные Y-лимиты (если не задан --ylim) if fixed_ylim is None: y0 = float(np.nanmin(current_sweep_raw)) y1 = float(np.nanmax(current_sweep_raw)) if np.isfinite(y0) and np.isfinite(y1): if y0 == y1: pad = max(1.0, abs(y0) * 0.05) y0 -= pad y1 += pad else: pad = 0.05 * (y1 - y0) y0 -= pad y1 += pad ax_line.set_ylim(y0, y1) # Обновление спектра текущего свипа sweep_for_fft = current_sweep_norm if current_sweep_norm is not None else current_sweep_raw take_fft = min(int(sweep_for_fft.size), FFT_LEN) if take_fft > 0 and freq_shared is not None: fft_in = np.zeros((FFT_LEN,), dtype=np.float32) seg = np.nan_to_num(sweep_for_fft[:take_fft], nan=0.0).astype(np.float32, copy=False) win = np.hanning(take_fft).astype(np.float32) fft_in[:take_fft] = seg * win spec = np.fft.rfft(fft_in) mag = np.abs(spec).astype(np.float32) fft_vals = 20.0 * np.log10(mag + 1e-9) xs_fft = freq_shared if fft_vals.size > xs_fft.size: fft_vals = fft_vals[: xs_fft.size] fft_line_obj.set_data(xs_fft[: fft_vals.size], fft_vals) # Авто-диапазон по Y для спектра if np.isfinite(np.nanmin(fft_vals)) and np.isfinite(np.nanmax(fft_vals)): ax_fft.set_xlim(0, max(1, xs_fft.size - 1)) ax_fft.set_ylim(float(np.nanmin(fft_vals)), float(np.nanmax(fft_vals))) # Расчет и отображение фазы текущего свипа phase = np.angle(spec).astype(np.float32) if phase.size > xs_fft.size: phase = phase[: xs_fft.size] # Unwrapping по частоте phase_unwrapped = np.unwrap(phase) phase_line_obj.set_data(xs_fft[: phase_unwrapped.size], phase_unwrapped) # Авто-диапазон по Y для фазы if np.isfinite(np.nanmin(phase_unwrapped)) and np.isfinite(np.nanmax(phase_unwrapped)): ax_phase.set_xlim(0, max(1, xs_fft.size - 1)) phase_min = float(np.nanmin(phase_unwrapped)) phase_max = float(np.nanmax(phase_unwrapped)) ax_phase.set_ylim(phase_min, phase_max) # Обновляем вторую ось Y с расстоянием try: dist_min = phase_to_distance(np.array([phase_min]))[0] dist_max = phase_to_distance(np.array([phase_max]))[0] ax_phase_dist.set_ylim(dist_min, dist_max) except Exception: pass # Обновление водопада if changed and ring is not None: disp = make_display_ring() # Новые данные справа: без реверса img_obj.set_data(disp) # Подписи времени не обновляем динамически (оставляем авто-тики) # Авто-уровни: по видимой области (не накапливаем за всё время) levels = _visible_levels_matplotlib(disp, ax_img) if levels is not None: img_obj.set_clim(vmin=levels[0], vmax=levels[1]) # Обновление водопада спектров if changed and ring_fft is not None: disp_fft = make_display_ring_fft() disp_fft = _subtract_recent_mean_fft(disp_fft) # Новые данные справа: без реверса img_fft_obj.set_data(disp_fft) # Подписи времени не обновляем динамически (оставляем авто-тики) # Автодиапазон по среднему спектру за видимый интервал (как в хорошей версии) try: # disp_fft имеет форму (bins, time); берём среднее по времени mean_spec = np.nanmean(disp_fft, axis=1) vmin_v = float(np.nanmin(mean_spec)) vmax_v = float(np.nanmax(mean_spec)) except Exception: vmin_v = vmax_v = None # Если средние не дают валидный диапазон — используем процентильную обрезку (если задана) if (vmin_v is None or not np.isfinite(vmin_v)) or (vmax_v is None or not np.isfinite(vmax_v)) or vmin_v == vmax_v: if spec_clip is not None: try: vmin_v = float(np.nanpercentile(disp_fft, spec_clip[0])) vmax_v = float(np.nanpercentile(disp_fft, spec_clip[1])) except Exception: vmin_v = vmax_v = None # Фолбэк к отслеживаемым минимум/максимумам if (vmin_v is None or not np.isfinite(vmin_v)) or (vmax_v is None or not np.isfinite(vmax_v)) or vmin_v == vmax_v: if y_min_fft is not None and y_max_fft is not None and np.isfinite(y_min_fft) and np.isfinite(y_max_fft) and y_min_fft != y_max_fft: vmin_v, vmax_v = y_min_fft, y_max_fft if vmin_v is not None and vmax_v is not None and vmin_v != vmax_v: # Применим скалирование контрастом (верхняя граница) try: c = float(contrast_slider.val) / 100.0 if contrast_slider is not None else 1.0 except Exception: c = 1.0 vmax_eff = vmin_v + c * (vmax_v - vmin_v) img_fft_obj.set_clim(vmin=vmin_v, vmax=vmax_eff) # Обновление водопада фазы if changed and ring_phase is not None: disp_phase = make_display_ring_phase() img_phase_obj.set_data(disp_phase) # Автодиапазон для фазы try: mean_phase = np.nanmean(disp_phase, axis=1) vmin_p = float(np.nanmin(mean_phase)) vmax_p = float(np.nanmax(mean_phase)) except Exception: vmin_p = vmax_p = None # Фолбэк к отслеживаемым минимум/максимумам if (vmin_p is None or not np.isfinite(vmin_p)) or (vmax_p is None or not np.isfinite(vmax_p)) or vmin_p == vmax_p: if y_min_phase is not None and y_max_phase is not None and np.isfinite(y_min_phase) and np.isfinite(y_max_phase) and y_min_phase != y_max_phase: vmin_p, vmax_p = y_min_phase, y_max_phase if vmin_p is not None and vmax_p is not None and vmin_p != vmax_p: img_phase_obj.set_clim(vmin=vmin_p, vmax=vmax_p) if changed and current_info: status_text.set_text(_format_status_kv(current_info)) chs = current_info.get("chs") if isinstance(current_info, dict) else None if chs is None: chs = current_info.get("ch") if isinstance(current_info, dict) else None if chs is None: channel_text.set_text("") else: try: if isinstance(chs, (list, tuple, set)): ch_list = sorted(int(v) for v in chs) ch_text_val = ", ".join(str(v) for v in ch_list) else: ch_text_val = str(int(chs)) channel_text.set_text(f"chs {ch_text_val}") except Exception: channel_text.set_text(f"chs {chs}") # Возвращаем обновлённые артисты return ( line_obj, line_calib_obj, line_norm_obj, img_obj, fft_line_obj, img_fft_obj, status_text, channel_text, ) ani = FuncAnimation(fig, update, interval=interval_ms, blit=False) plt.show() # Нормальное завершение при закрытии окна stop_event.set() reader.join(timeout=1.0) def run_pyqtgraph(args): """Быстрый GUI на PyQtGraph. Требует pyqtgraph и PyQt5/PySide6.""" try: import pyqtgraph as pg from PyQt5 import QtCore, QtWidgets # noqa: F401 except Exception: # Возможно установлена PySide6 try: import pyqtgraph as pg from PySide6 import QtCore, QtWidgets # noqa: F401 except Exception as e: raise RuntimeError( "pyqtgraph/PyQt5(Pyside6) не найдены. Установите: pip install pyqtgraph PyQt5" ) from e # Очередь завершённых свипов и поток чтения q: Queue[SweepPacket] = Queue(maxsize=1000) stop_event = threading.Event() reader = SweepReader(args.port, args.baud, q, stop_event, fancy=bool(args.fancy)) reader.start() # Настройки скорости max_sweeps = int(max(10, args.max_sweeps)) max_fps = max(1.0, float(args.max_fps)) interval_ms = int(1000.0 / max_fps) # PyQtGraph настройки pg.setConfigOptions(useOpenGL=True, antialias=False) app = pg.mkQApp(args.title) win = pg.GraphicsLayoutWidget(show=True, title=args.title) win.resize(1200, 900) # Плот последнего свипа (слева-сверху) p_line = win.addPlot(row=0, col=0, title="Сырые данные") p_line.showGrid(x=True, y=True, alpha=0.3) curve = p_line.plot(pen=pg.mkPen((80, 120, 255), width=1)) curve_calib = p_line.plot(pen=pg.mkPen((220, 60, 60), width=1)) curve_norm = p_line.plot(pen=pg.mkPen((60, 180, 90), width=1)) p_line.setLabel("bottom", "X") p_line.setLabel("left", "Y") ch_text = pg.TextItem("", anchor=(1, 1)) ch_text.setZValue(10) p_line.addItem(ch_text) # Водопад (справа-сверху) p_img = win.addPlot(row=0, col=1, title="Сырые данные водопад") p_img.invertY(False) p_img.showGrid(x=False, y=False) p_img.setLabel("bottom", "Время, с (новое справа)") try: p_img.getAxis("bottom").setStyle(showValues=False) except Exception: pass p_img.setLabel("left", "X (0 снизу)") img = pg.ImageItem() p_img.addItem(img) # FFT (слева-средний ряд) p_fft = win.addPlot(row=1, col=0, title="FFT") p_fft.showGrid(x=True, y=True, alpha=0.3) curve_fft = p_fft.plot(pen=pg.mkPen((255, 120, 80), width=1)) p_fft.setLabel("bottom", "Бин") p_fft.setLabel("left", "Амплитуда, дБ") # Водопад спектров (справа-средний ряд) p_spec = win.addPlot(row=1, col=1, title="B-scan (дБ)") p_spec.invertY(True) p_spec.showGrid(x=False, y=False) p_spec.setLabel("bottom", "Время, с (новое справа)") try: p_spec.getAxis("bottom").setStyle(showValues=False) except Exception: pass p_spec.setLabel("left", "Бин (0 снизу)") img_fft = pg.ImageItem() p_spec.addItem(img_fft) # Чекбокс калибровки calib_cb = QtWidgets.QCheckBox("калибровка") cb_proxy = QtWidgets.QGraphicsProxyWidget() cb_proxy.setWidget(calib_cb) win.addItem(cb_proxy, row=2, col=1) # График фазы (слева-снизу) p_phase = win.addPlot(row=2, col=0, title="Фаза спектра (развернутая)") p_phase.showGrid(x=True, y=True, alpha=0.3) curve_phase = p_phase.plot(pen=pg.mkPen((120, 255, 80), width=1)) p_phase.setLabel("bottom", "Бин") p_phase.setLabel("left", "Фаза, радианы") # Добавим вторую ось Y для расстояния p_phase_dist_axis = pg.ViewBox() p_phase.showAxis("right") p_phase.scene().addItem(p_phase_dist_axis) p_phase.getAxis("right").linkToView(p_phase_dist_axis) p_phase_dist_axis.setXLink(p_phase) p_phase.setLabel("right", "Расстояние, м") def updateViews(): try: p_phase_dist_axis.setGeometry(p_phase.vb.sceneBoundingRect()) p_phase_dist_axis.linkedViewChanged(p_phase.vb, p_phase_dist_axis.XAxis) except Exception: pass updateViews() p_phase.vb.sigResized.connect(updateViews) # Водопад фазы (справа-снизу) p_phase_wf = win.addPlot(row=2, col=1, title="Водопад фазы") p_phase_wf.invertY(True) p_phase_wf.showGrid(x=False, y=False) p_phase_wf.setLabel("bottom", "Время, с (новое справа)") try: p_phase_wf.getAxis("bottom").setStyle(showValues=False) except Exception: pass p_phase_wf.setLabel("left", "Бин (0 снизу)") img_phase = pg.ImageItem() p_phase_wf.addItem(img_phase) # Статусная строка (внизу окна) status = pg.LabelItem(justify="left") win.addItem(status, row=3, col=0, colspan=2) # Состояние ring: Optional[np.ndarray] = None ring_time: Optional[np.ndarray] = None head = 0 width: Optional[int] = None x_shared: Optional[np.ndarray] = None current_sweep_raw: Optional[np.ndarray] = None current_sweep_norm: Optional[np.ndarray] = None last_calib_sweep: Optional[np.ndarray] = None current_info: Optional[SweepInfo] = None # Авто-уровни цветовой шкалы водопада сырых данных пересчитываются по видимой области. # Для спектров fft_bins = FFT_LEN // 2 + 1 ring_fft: Optional[np.ndarray] = None freq_shared: Optional[np.ndarray] = None y_min_fft, y_max_fft = None, None # Phase состояние ring_phase: Optional[np.ndarray] = None prev_phase_per_bin: Optional[np.ndarray] = None phase_offset_per_bin: Optional[np.ndarray] = None y_min_phase, y_max_phase = None, None # Параметры контраста водопада спектров (процентильная обрезка) spec_clip = _parse_spec_clip(getattr(args, "spec_clip", None)) spec_mean_sec = float(getattr(args, "spec_mean_sec", 0.0)) calib_enabled = False # Диапазон по Y: авто по умолчанию (поддерживает отрицательные значения) fixed_ylim: Optional[Tuple[float, float]] = None if args.ylim: try: y0, y1 = args.ylim.split(",") fixed_ylim = (float(y0), float(y1)) except Exception: pass if fixed_ylim is not None: p_line.setYRange(fixed_ylim[0], fixed_ylim[1], padding=0) def _normalize_sweep(raw: np.ndarray, calib: np.ndarray) -> np.ndarray: w = min(raw.size, calib.size) if w <= 0: return raw out = np.full_like(raw, np.nan, dtype=np.float32) with np.errstate(divide="ignore", invalid="ignore"): out[:w] = raw[:w] / calib[:w] out = np.nan_to_num(out, nan=np.nan, posinf=np.nan, neginf=np.nan) return out def _set_calib_enabled(): nonlocal calib_enabled, current_sweep_norm try: calib_enabled = bool(calib_cb.isChecked()) except Exception: calib_enabled = False if calib_enabled and current_sweep_raw is not None and last_calib_sweep is not None: current_sweep_norm = _normalize_sweep(current_sweep_raw, last_calib_sweep) else: current_sweep_norm = None try: calib_cb.stateChanged.connect(lambda _v: _set_calib_enabled()) except Exception: pass def ensure_buffer(_w: int): nonlocal ring, ring_time, head, width, x_shared, ring_fft, freq_shared nonlocal ring_phase, prev_phase_per_bin, phase_offset_per_bin if ring is not None: return width = WF_WIDTH x_shared = np.arange(width, dtype=np.int32) ring = np.full((max_sweeps, width), np.nan, dtype=np.float32) ring_time = np.full((max_sweeps,), np.nan, dtype=np.float64) head = 0 # Водопад: время по оси X, X по оси Y img.setImage(ring.T, autoLevels=False) p_img.setRange(xRange=(0, max_sweeps - 1), yRange=(0, max(1, width - 1)), padding=0) p_line.setXRange(0, max(1, width - 1), padding=0) # FFT: время по оси X, бин по оси Y ring_fft = np.full((max_sweeps, fft_bins), np.nan, dtype=np.float32) img_fft.setImage(ring_fft.T, autoLevels=False) p_spec.setRange(xRange=(0, max_sweeps - 1), yRange=(0, max(1, fft_bins - 1)), padding=0) p_fft.setXRange(0, max(1, fft_bins - 1), padding=0) freq_shared = np.arange(fft_bins, dtype=np.int32) # Phase: время по оси X, бин по оси Y ring_phase = np.full((max_sweeps, fft_bins), np.nan, dtype=np.float32) prev_phase_per_bin = np.zeros(fft_bins, dtype=np.float32) phase_offset_per_bin = np.zeros(fft_bins, dtype=np.float32) img_phase.setImage(ring_phase.T, autoLevels=False) p_phase_wf.setRange(xRange=(0, max_sweeps - 1), yRange=(0, max(1, fft_bins - 1)), padding=0) p_phase.setXRange(0, max(1, fft_bins - 1), padding=0) def _visible_levels_pyqtgraph(data: np.ndarray) -> Optional[Tuple[float, float]]: """(vmin, vmax) по текущей видимой области ImageItem (без накопления по времени).""" if data.size == 0: return None ny, nx = data.shape[0], data.shape[1] try: (x0, x1), (y0, y1) = p_img.viewRange() except Exception: x0, x1 = 0.0, float(nx - 1) y0, y1 = 0.0, float(ny - 1) xmin, xmax = sorted((float(x0), float(x1))) ymin, ymax = sorted((float(y0), float(y1))) ix0 = max(0, min(nx - 1, int(np.floor(xmin)))) ix1 = max(0, min(nx - 1, int(np.ceil(xmax)))) iy0 = max(0, min(ny - 1, int(np.floor(ymin)))) iy1 = max(0, min(ny - 1, int(np.ceil(ymax)))) if ix1 < ix0: ix1 = ix0 if iy1 < iy0: iy1 = iy0 sub = data[iy0 : iy1 + 1, ix0 : ix1 + 1] finite = np.isfinite(sub) if not finite.any(): return None vals = sub[finite] vmin = float(np.min(vals)) vmax = float(np.max(vals)) if not (np.isfinite(vmin) and np.isfinite(vmax)) or vmin == vmax: return None return (vmin, vmax) def push_sweep(s: np.ndarray): nonlocal ring, ring_time, head, ring_fft, y_min_fft, y_max_fft nonlocal ring_phase, prev_phase_per_bin, phase_offset_per_bin, y_min_phase, y_max_phase if s is None or s.size == 0 or ring is None: return w = ring.shape[1] row = np.full((w,), np.nan, dtype=np.float32) take = min(w, s.size) row[:take] = s[:take] ring[head, :] = row if ring_time is not None: ring_time[head] = time.time() head = (head + 1) % ring.shape[0] # FFT строка (дБ) и фаза if ring_fft is not None: bins = ring_fft.shape[1] take_fft = min(int(s.size), FFT_LEN) if take_fft > 0: fft_in = np.zeros((FFT_LEN,), dtype=np.float32) seg = np.nan_to_num(s[:take_fft], nan=0.0).astype(np.float32, copy=False) win = np.hanning(take_fft).astype(np.float32) fft_in[:take_fft] = seg * win spec = np.fft.rfft(fft_in) mag = np.abs(spec).astype(np.float32) fft_row = 20.0 * np.log10(mag + 1e-9) if fft_row.shape[0] != bins: fft_row = fft_row[:bins] # Расчет фазы phase = np.angle(spec).astype(np.float32) if phase.shape[0] > bins: phase = phase[:bins] # Unwrapping по частоте (внутри свипа) phase_unwrapped_freq = np.unwrap(phase) # Unwrapping по времени (между свипами) phase_unwrapped_time, prev_phase_per_bin, phase_offset_per_bin = apply_temporal_unwrap( phase_unwrapped_freq, prev_phase_per_bin, phase_offset_per_bin ) phase_row = phase_unwrapped_time else: fft_row = np.full((bins,), np.nan, dtype=np.float32) phase_row = np.full((bins,), np.nan, dtype=np.float32) ring_fft[(head - 1) % ring_fft.shape[0], :] = fft_row fr_min = np.nanmin(fft_row) fr_max = np.nanmax(fft_row) if y_min_fft is None or (not np.isnan(fr_min) and fr_min < y_min_fft): y_min_fft = float(fr_min) if y_max_fft is None or (not np.isnan(fr_max) and fr_max > y_max_fft): y_max_fft = float(fr_max) # Сохраняем фазу в буфер if ring_phase is not None: ring_phase[(head - 1) % ring_phase.shape[0], :] = phase_row # Экстремумы для цветовой шкалы фазы ph_min = np.nanmin(phase_row) ph_max = np.nanmax(phase_row) if y_min_phase is None or (not np.isnan(ph_min) and ph_min < y_min_phase): y_min_phase = float(ph_min) if y_max_phase is None or (not np.isnan(ph_max) and ph_max > y_max_phase): y_max_phase = float(ph_max) def drain_queue(): nonlocal current_sweep_raw, current_sweep_norm, current_info, last_calib_sweep drained = 0 while True: try: s, info = q.get_nowait() except Empty: break drained += 1 current_sweep_raw = s current_info = info ch = 0 try: ch = int(info.get("ch", 0)) if isinstance(info, dict) else 0 except Exception: ch = 0 if ch == 0: last_calib_sweep = s current_sweep_norm = None sweep_for_proc = s else: if calib_enabled and last_calib_sweep is not None: current_sweep_norm = _normalize_sweep(s, last_calib_sweep) sweep_for_proc = current_sweep_norm else: current_sweep_norm = None sweep_for_proc = s ensure_buffer(s.size) push_sweep(sweep_for_proc) return drained # Попытка применить LUT из колормэпа (если доступен) try: cm_mod = getattr(pg, "colormap", None) if cm_mod is not None: cm = cm_mod.get(args.cmap) img.setLookupTable(cm.getLookupTable(0.0, 1.0, 256)) except Exception: pass def update(): changed = drain_queue() > 0 if current_sweep_raw is not None and x_shared is not None: if current_sweep_raw.size <= x_shared.size: xs = x_shared[: current_sweep_raw.size] else: xs = np.arange(current_sweep_raw.size) curve.setData(xs, current_sweep_raw, autoDownsample=True) if last_calib_sweep is not None: curve_calib.setData(xs[: last_calib_sweep.size], last_calib_sweep, autoDownsample=True) else: curve_calib.setData([], []) if current_sweep_norm is not None: curve_norm.setData(xs[: current_sweep_norm.size], current_sweep_norm, autoDownsample=True) else: curve_norm.setData([], []) if fixed_ylim is None: y0 = float(np.nanmin(current_sweep_raw)) y1 = float(np.nanmax(current_sweep_raw)) if np.isfinite(y0) and np.isfinite(y1): margin = 0.05 * max(1.0, (y1 - y0)) p_line.setYRange(y0 - margin, y1 + margin, padding=0) # Обновим спектр sweep_for_fft = current_sweep_norm if current_sweep_norm is not None else current_sweep_raw take_fft = min(int(sweep_for_fft.size), FFT_LEN) if take_fft > 0 and freq_shared is not None: fft_in = np.zeros((FFT_LEN,), dtype=np.float32) seg = np.nan_to_num(sweep_for_fft[:take_fft], nan=0.0).astype(np.float32, copy=False) win = np.hanning(take_fft).astype(np.float32) fft_in[:take_fft] = seg * win spec = np.fft.rfft(fft_in) mag = np.abs(spec).astype(np.float32) fft_vals = 20.0 * np.log10(mag + 1e-9) xs_fft = freq_shared if fft_vals.size > xs_fft.size: fft_vals = fft_vals[: xs_fft.size] curve_fft.setData(xs_fft[: fft_vals.size], fft_vals) p_fft.setYRange(float(np.nanmin(fft_vals)), float(np.nanmax(fft_vals)), padding=0) # Расчет и отображение фазы текущего свипа phase = np.angle(spec).astype(np.float32) if phase.size > xs_fft.size: phase = phase[: xs_fft.size] # Unwrapping по частоте phase_unwrapped = np.unwrap(phase) curve_phase.setData(xs_fft[: phase_unwrapped.size], phase_unwrapped) phase_min = float(np.nanmin(phase_unwrapped)) phase_max = float(np.nanmax(phase_unwrapped)) p_phase.setYRange(phase_min, phase_max, padding=0) # Обновляем вторую ось Y с расстоянием try: dist_min = phase_to_distance(np.array([phase_min]))[0] dist_max = phase_to_distance(np.array([phase_max]))[0] p_phase_dist_axis.setYRange(dist_min, dist_max, padding=0) except Exception: pass if changed and ring is not None: disp = ring if head == 0 else np.roll(ring, -head, axis=0) disp = disp.T[:, ::-1] # (width, time with newest at left) levels = _visible_levels_pyqtgraph(disp) if levels is not None: img.setImage(disp, autoLevels=False, levels=levels) else: img.setImage(disp, autoLevels=False) if changed and current_info: try: status.setText(_format_status_kv(current_info)) except Exception: pass try: chs = current_info.get("chs") if isinstance(current_info, dict) else None if chs is None: chs = current_info.get("ch") if isinstance(current_info, dict) else None if chs is None: ch_text.setText("") else: if isinstance(chs, (list, tuple, set)): ch_list = sorted(int(v) for v in chs) ch_text_val = ", ".join(str(v) for v in ch_list) else: ch_text_val = str(int(chs)) ch_text.setText(f"chs {ch_text_val}") (x0, x1), (y0, y1) = p_line.viewRange() dx = 0.01 * max(1.0, float(x1 - x0)) dy = 0.01 * max(1.0, float(y1 - y0)) ch_text.setPos(float(x1 - dx), float(y1 - dy)) except Exception: pass if changed and ring_fft is not None: disp_fft = ring_fft if head == 0 else np.roll(ring_fft, -head, axis=0) disp_fft = disp_fft.T[:, ::-1] if spec_mean_sec > 0.0 and ring_time is not None: disp_times = ring_time if head == 0 else np.roll(ring_time, -head) disp_times = disp_times[::-1] now_t = time.time() mask = np.isfinite(disp_times) & (disp_times >= (now_t - spec_mean_sec)) if np.any(mask): try: mean_spec = np.nanmean(disp_fft[:, mask], axis=1) mean_spec = np.nan_to_num(mean_spec, nan=0.0) disp_fft = disp_fft - mean_spec[:, None] except Exception: pass # Автодиапазон по среднему спектру за видимый интервал (как в хорошей версии) levels = None try: mean_spec = np.nanmean(disp_fft, axis=1) vmin_v = float(np.nanmin(mean_spec)) vmax_v = float(np.nanmax(mean_spec)) if np.isfinite(vmin_v) and np.isfinite(vmax_v) and vmin_v != vmax_v: levels = (vmin_v, vmax_v) except Exception: levels = None # Процентильная обрезка как запасной вариант if levels is None and spec_clip is not None: try: vmin_v = float(np.nanpercentile(disp_fft, spec_clip[0])) vmax_v = float(np.nanpercentile(disp_fft, spec_clip[1])) if np.isfinite(vmin_v) and np.isfinite(vmax_v) and vmin_v != vmax_v: levels = (vmin_v, vmax_v) except Exception: levels = None # Ещё один фолбэк — глобальные накопленные мин/макс if levels is None and y_min_fft is not None and y_max_fft is not None and np.isfinite(y_min_fft) and np.isfinite(y_max_fft) and y_min_fft != y_max_fft: levels = (y_min_fft, y_max_fft) if levels is not None: img_fft.setImage(disp_fft, autoLevels=False, levels=levels) else: img_fft.setImage(disp_fft, autoLevels=False) # Обновление водопада фазы if changed and ring_phase is not None: disp_phase = ring_phase if head == 0 else np.roll(ring_phase, -head, axis=0) disp_phase = disp_phase.T[:, ::-1] # Автодиапазон для фазы levels_phase = None try: mean_phase = np.nanmean(disp_phase, axis=1) vmin_p = float(np.nanmin(mean_phase)) vmax_p = float(np.nanmax(mean_phase)) if np.isfinite(vmin_p) and np.isfinite(vmax_p) and vmin_p != vmax_p: levels_phase = (vmin_p, vmax_p) except Exception: levels_phase = None # Фолбэк к отслеживаемым минимум/максимумам if levels_phase is None and y_min_phase is not None and y_max_phase is not None and np.isfinite(y_min_phase) and np.isfinite(y_max_phase) and y_min_phase != y_max_phase: levels_phase = (y_min_phase, y_max_phase) if levels_phase is not None: img_phase.setImage(disp_phase, autoLevels=False, levels=levels_phase) else: img_phase.setImage(disp_phase, autoLevels=False) timer = pg.QtCore.QTimer() timer.timeout.connect(update) timer.start(interval_ms) def on_quit(): stop_event.set() reader.join(timeout=1.0) app.aboutToQuit.connect(on_quit) win.show() exec_fn = getattr(app, "exec_", None) or getattr(app, "exec", None) exec_fn() # На случай если aboutToQuit не сработал on_quit() if __name__ == "__main__": main()