#!/usr/bin/env python3 """ Реалтайм-плоттер для свипов из виртуального COM-порта. Формат строк: - "Sweep_start" — начало нового свипа (предыдущий считается завершённым) - "stp X Y" — точка (индекс X, значение Y), все целые со знаком Отрисовываются два графика: - Левый: последний полученный свип (Y vs X) - Правый: водопад (последние N свипов во времени) Оптимизации для скорости: - Парсинг и чтение в фоновой нити - Анимация с обновлением только данных (без лишнего пересоздания фигур) - Кольцевой буфер под водопад с фиксированным числом свипов Зависимости: matplotlib, numpy. PySerial опционален — при его отсутствии используется сырой доступ к TTY через termios. """ import argparse import io import os import sys import threading import time from queue import Queue, Empty, Full from typing import Optional, Tuple import numpy as np WF_WIDTH = 1000 # максимальное число точек в ряду водопада FFT_LEN = 1024 # длина БПФ для спектра/водопада спектров def try_open_pyserial(path: str, baud: int, timeout: float): try: import serial # type: ignore except Exception: return None try: ser = serial.Serial(path, baudrate=baud, timeout=timeout) return ser except Exception: return None class FDReader: """Простой враппер чтения строк из файлового дескриптора TTY.""" def __init__(self, fd: int): # Отдельно буферизуем для корректной readline() self._fd = fd raw = os.fdopen(fd, "rb", closefd=False) self._file = raw self._buf = io.BufferedReader(raw, buffer_size=65536) def fileno(self) -> int: return self._fd def readline(self) -> bytes: return self._buf.readline() def close(self): try: self._buf.close() except Exception: pass def open_raw_tty(path: str, baud: int) -> Optional[FDReader]: """Открыть TTY без pyserial и настроить порт через termios. Возвращает FDReader или None при ошибке. """ try: import termios import tty except Exception: return None try: fd = os.open(path, os.O_RDONLY | os.O_NOCTTY) except Exception: return None try: attrs = termios.tcgetattr(fd) # Установим «сырое» состояние tty.setraw(fd) # Скорость baud_map = { 9600: termios.B9600, 19200: termios.B19200, 38400: termios.B38400, 57600: termios.B57600, 115200: termios.B115200, 230400: getattr(termios, "B230400", None), 460800: getattr(termios, "B460800", None), } b = baud_map.get(baud) or termios.B115200 attrs[4] = b # ispeed attrs[5] = b # ospeed # VMIN=1, VTIME=0 — блокирующее чтение по байту cc = attrs[6] cc[termios.VMIN] = 1 cc[termios.VTIME] = 0 attrs[6] = cc termios.tcsetattr(fd, termios.TCSANOW, attrs) except Exception: try: os.close(fd) except Exception: pass return None return FDReader(fd) class SerialLineSource: """Единый интерфейс для чтения строк из порта (pyserial или raw TTY).""" def __init__(self, path: str, baud: int, timeout: float = 1.0): self._pyserial = try_open_pyserial(path, baud, timeout) self._fdreader = None self._using = "pyserial" if self._pyserial is not None else "raw" if self._pyserial is None: self._fdreader = open_raw_tty(path, baud) if self._fdreader is None: raise RuntimeError( f"Не удалось открыть порт '{path}' (pyserial и raw TTY не сработали)" ) def readline(self) -> bytes: if self._pyserial is not None: try: return self._pyserial.readline() except Exception: return b"" else: try: return self._fdreader.readline() # type: ignore[union-attr] except Exception: return b"" def close(self): try: if self._pyserial is not None: self._pyserial.close() elif self._fdreader is not None: self._fdreader.close() except Exception: pass class SerialChunkReader: """Быстрое неблокирующее чтение чанков из serial/raw TTY для максимального дренажа буфера.""" def __init__(self, src: SerialLineSource): self._src = src self._ser = src._pyserial self._fd: Optional[int] = None if self._ser is not None: # Неблокирующий режим для быстрой откачки try: self._ser.timeout = 0 except Exception: pass else: try: self._fd = src._fdreader.fileno() # type: ignore[union-attr] try: os.set_blocking(self._fd, False) except Exception: pass except Exception: self._fd = None def read_available(self) -> bytes: """Вернёт доступные байты (b"" если данных нет).""" if self._ser is not None: try: n = int(getattr(self._ser, "in_waiting", 0)) except Exception: n = 0 if n > 0: try: return self._ser.read(n) except Exception: return b"" return b"" if self._fd is None: return b"" out = bytearray() while True: try: chunk = os.read(self._fd, 65536) if not chunk: break out += chunk if len(chunk) < 65536: break except BlockingIOError: break except Exception: break return bytes(out) class SweepReader(threading.Thread): """Фоновый поток: читает строки, формирует завершённые свипы и кладёт в очередь.""" def __init__(self, port_path: str, baud: int, out_queue: Queue, stop_event: threading.Event): super().__init__(daemon=True) self._port_path = port_path self._baud = baud self._q = out_queue self._stop = stop_event self._src: Optional[SerialLineSource] = None def _finalize_current(self, xs, ys): if not xs: return max_x = max(xs) width = max_x + 1 # Быстрый векторизованный путь sweep = np.full((width,), np.nan, dtype=np.float32) try: idx = np.asarray(xs, dtype=np.int64) vals = np.asarray(ys, dtype=np.float32) sweep[idx] = vals except Exception: # Запасной путь for x, y in zip(xs, ys): if 0 <= x < width: sweep[x] = float(y) # Кладём готовый свип (если очередь полна — выбрасываем самый старый) try: self._q.put_nowait(sweep) except Full: try: _ = self._q.get_nowait() except Exception: pass try: self._q.put_nowait(sweep) except Exception: pass def run(self): # Состояние текущего свипа xs: list[int] = [] ys: list[int] = [] try: self._src = SerialLineSource(self._port_path, self._baud, timeout=1.0) sys.stderr.write(f"[info] Открыл порт {self._port_path} ({self._src._using})\n") except Exception as e: sys.stderr.write(f"[error] {e}\n") return try: # Быстрый неблокирующий дренаж порта с разбором по байтам chunk_reader = SerialChunkReader(self._src) buf = bytearray() while not self._stop.is_set(): data = chunk_reader.read_available() if data: buf += data else: # Короткая уступка CPU, если нет новых данных time.sleep(0.0005) continue # Обрабатываем все полные строки while True: nl = buf.find(b"\n") if nl == -1: break line = bytes(buf[:nl]) del buf[: nl + 1] if line.endswith(b"\r"): line = line[:-1] if not line: continue if line.startswith(b"Sweep_start"): self._finalize_current(xs, ys) xs.clear() ys.clear() continue # stp X Y (оба целые со знаком). Разделяем по любым пробелам/табам. if len(line) >= 3: parts = line.split() if len(parts) >= 3 and parts[0].lower() == b"stp": try: x = int(parts[1], 10) y = int(parts[2], 10) # поддержка знака: "+…" и "-…" except Exception: continue xs.append(x) ys.append(y) # Защита от переполнения буфера при отсутствии переводов строки if len(buf) > 1_000_000: del buf[:-262144] finally: try: # Завершаем оставшийся свип self._finalize_current(xs, ys) except Exception: pass try: if self._src is not None: self._src.close() except Exception: pass def main(): parser = argparse.ArgumentParser( description=( "Читает свипы из виртуального COM-порта и рисует: " "последний свип и водопад (реалтайм)." ) ) parser.add_argument("port", help="Путь к порту, например /dev/ttyACM1") parser.add_argument("--baud", type=int, default=115200, help="Скорость (по умолчанию 115200)") parser.add_argument("--max-sweeps", type=int, default=200, help="Количество видимых свипов в водопаде") parser.add_argument("--max-fps", type=float, default=30.0, help="Лимит частоты отрисовки, кадров/с") parser.add_argument("--cmap", default="viridis", help="Цветовая карта водопада") parser.add_argument("--title", default="ADC Sweeps", help="Заголовок окна") parser.add_argument( "--ylim", type=str, default=None, help="Фиксированные Y-пределы для кривой формата min,max (например -1000,1000). По умолчанию авто", ) parser.add_argument( "--backend", choices=["auto", "pg", "mpl"], default="auto", help="Графический бэкенд: pyqtgraph (pg) — быстрее; matplotlib (mpl) — совместимый. По умолчанию auto", ) args = parser.parse_args() # Попробуем быстрый бэкенд (pyqtgraph) при auto/pg if args.backend in ("auto", "pg"): try: return run_pyqtgraph(args) except Exception as e: if args.backend == "pg": sys.stderr.write(f"[error] PyQtGraph бэкенд недоступен: {e}\n") sys.exit(1) # При auto — тихо откатываемся на matplotlib try: import matplotlib import matplotlib.pyplot as plt from matplotlib.animation import FuncAnimation except Exception as e: sys.stderr.write(f"[error] Нужны matplotlib и ее зависимости: {e}\n") sys.exit(1) # Очередь завершённых свипов и поток чтения q: Queue[np.ndarray] = Queue(maxsize=1000) stop_event = threading.Event() reader = SweepReader(args.port, args.baud, q, stop_event) reader.start() # Графика fig, axs = plt.subplots(2, 2, figsize=(12, 8)) (ax_line, ax_img), (ax_fft, ax_spec) = axs fig.canvas.manager.set_window_title(args.title) if hasattr(fig.canvas.manager, "set_window_title") else None fig.tight_layout() # Состояние для отображения current_sweep: Optional[np.ndarray] = None x_shared: Optional[np.ndarray] = None width: Optional[int] = None max_sweeps = int(max(10, args.max_sweeps)) ring = None # type: Optional[np.ndarray] head = 0 y_min, y_max = None, None # FFT состояние fft_bins = FFT_LEN // 2 + 1 ring_fft = None # type: Optional[np.ndarray] y_min_fft, y_max_fft = None, None freq_shared: Optional[np.ndarray] = None # Линейный график последнего свипа line_obj, = ax_line.plot([], [], lw=1) ax_line.set_title("Последний свип") ax_line.set_xlabel("X") ax_line.set_ylabel("Y") # Линейный график спектра текущего свипа fft_line_obj, = ax_fft.plot([], [], lw=1) ax_fft.set_title("Спектр (|FFT|, дБ)") ax_fft.set_xlabel("Бин") ax_fft.set_ylabel("Амплитуда, дБ") # Диапазон по Y для последнего свипа: авто по умолчанию (поддерживает отрицательные значения) fixed_ylim: Optional[Tuple[float, float]] = None # CLI переопределение при необходимости if args.ylim: try: y0, y1 = args.ylim.split(",") fixed_ylim = (float(y0), float(y1)) except Exception: sys.stderr.write("[warn] Некорректный формат --ylim, игнорирую. Ожидалось min,max\n") if fixed_ylim is not None: ax_line.set_ylim(fixed_ylim) # Водопад (будет инициализирован при первом свипе) img_obj = ax_img.imshow( np.zeros((1, 1), dtype=np.float32), aspect="auto", interpolation="nearest", origin="upper", cmap=args.cmap, ) ax_img.set_title("Водопад (последние свипы)") ax_img.set_xlabel("X") ax_img.set_ylabel("Номер свипа (время →)") # Водопад спектров img_fft_obj = ax_spec.imshow( np.zeros((1, 1), dtype=np.float32), aspect="auto", interpolation="nearest", origin="upper", cmap=args.cmap, ) ax_spec.set_title("Водопад спектров (дБ)") ax_spec.set_xlabel("Бин") ax_spec.set_ylabel("Номер свипа (время →)") # Для контроля частоты обновления max_fps = max(1.0, float(args.max_fps)) interval_ms = int(1000.0 / max_fps) frames_since_ylim_update = 0 def ensure_buffer(_w: int): nonlocal ring, width, head, x_shared, ring_fft, freq_shared if ring is not None: return width = WF_WIDTH x_shared = np.arange(width, dtype=np.int32) ring = np.full((max_sweeps, width), np.nan, dtype=np.float32) head = 0 # Обновляем изображение под новые размеры img_obj.set_data(ring) img_obj.set_extent((0, width - 1 if width > 0 else 1, 0, max_sweeps - 1)) ax_img.set_xlim(0, max(1, width - 1)) ax_img.set_ylim(max_sweeps - 1, 0) # FFT буферы ring_fft = np.full((max_sweeps, fft_bins), np.nan, dtype=np.float32) img_fft_obj.set_data(ring_fft) img_fft_obj.set_extent((0, fft_bins - 1, 0, max_sweeps - 1)) ax_spec.set_xlim(0, max(1, fft_bins - 1)) ax_spec.set_ylim(max_sweeps - 1, 0) freq_shared = np.arange(fft_bins, dtype=np.int32) def push_sweep(s: np.ndarray): nonlocal ring, head, y_min, y_max, ring_fft, y_min_fft, y_max_fft if s is None or s.size == 0 or ring is None: return # Нормализуем длину до фиксированной ширины w = ring.shape[1] row = np.full((w,), np.nan, dtype=np.float32) take = min(w, s.size) row[:take] = s[:take] ring[head, :] = row head = (head + 1) % ring.shape[0] # Обновляем мин/макс по данным (игнорим NaN) sv_min = np.nanmin(row) sv_max = np.nanmax(row) if y_min is None or (not np.isnan(sv_min) and sv_min < y_min): y_min = float(sv_min) if y_max is None or (not np.isnan(sv_max) and sv_max > y_max): y_max = float(sv_max) # FFT строка (дБ) if ring_fft is not None: bins = ring_fft.shape[1] # Подготовка входа FFT_LEN, замена NaN на 0 take_fft = min(int(s.size), FFT_LEN) if take_fft <= 0: fft_row = np.full((bins,), np.nan, dtype=np.float32) else: fft_in = np.zeros((FFT_LEN,), dtype=np.float32) seg = s[:take_fft] if isinstance(seg, np.ndarray): seg = np.nan_to_num(seg, nan=0.0).astype(np.float32, copy=False) else: seg = np.asarray(seg, dtype=np.float32) seg = np.nan_to_num(seg, nan=0.0) # Окно Хэннинга win = np.hanning(take_fft).astype(np.float32) fft_in[:take_fft] = seg * win spec = np.fft.rfft(fft_in) mag = np.abs(spec).astype(np.float32) fft_row = 20.0 * np.log10(mag + 1e-9) if fft_row.shape[0] != bins: # rfft длиной FFT_LEN даёт bins == FFT_LEN//2+1 fft_row = fft_row[:bins] ring_fft[(head - 1) % ring_fft.shape[0], :] = fft_row # Экстремумы для цветовой шкалы fr_min = np.nanmin(fft_row) fr_max = np.nanmax(fft_row) if y_min_fft is None or (not np.isnan(fr_min) and fr_min < y_min_fft): y_min_fft = float(fr_min) if y_max_fft is None or (not np.isnan(fr_max) and fr_max > y_max_fft): y_max_fft = float(fr_max) def drain_queue(): nonlocal current_sweep drained = 0 while True: try: s = q.get_nowait() except Empty: break drained += 1 current_sweep = s ensure_buffer(s.size) push_sweep(s) return drained def make_display_ring(): # Возвращаем буфер с правильным порядком строк (старые сверху, новые снизу) if ring is None: return np.zeros((1, 1), dtype=np.float32) if head == 0: return ring return np.roll(ring, -head, axis=0) def make_display_ring_fft(): if ring_fft is None: return np.zeros((1, 1), dtype=np.float32) if head == 0: return ring_fft return np.roll(ring_fft, -head, axis=0) def update(_frame): nonlocal frames_since_ylim_update changed = drain_queue() > 0 # Обновление линии последнего свипа if current_sweep is not None: if x_shared is not None and current_sweep.size <= x_shared.size: xs = x_shared[: current_sweep.size] else: xs = np.arange(current_sweep.size, dtype=np.int32) line_obj.set_data(xs, current_sweep) # Лимиты по X постоянные под текущую ширину ax_line.set_xlim(0, max(1, current_sweep.size - 1)) # Адаптивные Y-лимиты (если не задан --ylim) if fixed_ylim is None: y0 = float(np.nanmin(current_sweep)) y1 = float(np.nanmax(current_sweep)) if np.isfinite(y0) and np.isfinite(y1): if y0 == y1: pad = max(1.0, abs(y0) * 0.05) y0 -= pad y1 += pad else: pad = 0.05 * (y1 - y0) y0 -= pad y1 += pad ax_line.set_ylim(y0, y1) # Обновление спектра текущего свипа take_fft = min(int(current_sweep.size), FFT_LEN) if take_fft > 0 and freq_shared is not None: fft_in = np.zeros((FFT_LEN,), dtype=np.float32) seg = np.nan_to_num(current_sweep[:take_fft], nan=0.0).astype(np.float32, copy=False) win = np.hanning(take_fft).astype(np.float32) fft_in[:take_fft] = seg * win spec = np.fft.rfft(fft_in) mag = np.abs(spec).astype(np.float32) fft_vals = 20.0 * np.log10(mag + 1e-9) xs_fft = freq_shared if fft_vals.size > xs_fft.size: fft_vals = fft_vals[: xs_fft.size] fft_line_obj.set_data(xs_fft[: fft_vals.size], fft_vals) # Авто-диапазон по Y для спектра if np.isfinite(np.nanmin(fft_vals)) and np.isfinite(np.nanmax(fft_vals)): ax_fft.set_xlim(0, max(1, xs_fft.size - 1)) ax_fft.set_ylim(float(np.nanmin(fft_vals)), float(np.nanmax(fft_vals))) # Обновление водопада if changed and ring is not None: disp = make_display_ring() img_obj.set_data(disp) # Актуализируем цветовую шкалу только при расширении экстремумов if y_min is not None and y_max is not None and np.isfinite(y_min) and np.isfinite(y_max): if y_min != y_max: img_obj.set_clim(vmin=y_min, vmax=y_max) # Обновление водопада спектров if changed and ring_fft is not None: disp_fft = make_display_ring_fft() img_fft_obj.set_data(disp_fft) if y_min_fft is not None and y_max_fft is not None and np.isfinite(y_min_fft) and np.isfinite(y_max_fft): if y_min_fft != y_max_fft: img_fft_obj.set_clim(vmin=y_min_fft, vmax=y_max_fft) # Возвращаем обновлённые артисты return (line_obj, img_obj, fft_line_obj, img_fft_obj) ani = FuncAnimation(fig, update, interval=interval_ms, blit=False) plt.show() # Нормальное завершение при закрытии окна stop_event.set() reader.join(timeout=1.0) def run_pyqtgraph(args): """Быстрый GUI на PyQtGraph. Требует pyqtgraph и PyQt5/PySide6.""" try: import pyqtgraph as pg from PyQt5 import QtCore, QtWidgets # noqa: F401 except Exception: # Возможно установлена PySide6 try: import pyqtgraph as pg from PySide6 import QtCore, QtWidgets # noqa: F401 except Exception as e: raise RuntimeError( "pyqtgraph/PyQt5(Pyside6) не найдены. Установите: pip install pyqtgraph PyQt5" ) from e # Очередь завершённых свипов и поток чтения q: Queue[np.ndarray] = Queue(maxsize=1000) stop_event = threading.Event() reader = SweepReader(args.port, args.baud, q, stop_event) reader.start() # Настройки скорости max_sweeps = int(max(10, args.max_sweeps)) max_fps = max(1.0, float(args.max_fps)) interval_ms = int(1000.0 / max_fps) # PyQtGraph настройки pg.setConfigOptions(useOpenGL=True, antialias=False) app = pg.mkQApp(args.title) win = pg.GraphicsLayoutWidget(show=True, title=args.title) win.resize(1200, 600) # Плот последнего свипа (слева-сверху) p_line = win.addPlot(row=0, col=0, title="Последний свип") p_line.showGrid(x=True, y=True, alpha=0.3) curve = p_line.plot(pen=pg.mkPen((80, 120, 255), width=1)) p_line.setLabel("bottom", "X") p_line.setLabel("left", "Y") # Водопад (справа-сверху) p_img = win.addPlot(row=0, col=1, title="Водопад (последние свипы)") p_img.invertY(True) # 0 сверху, новые снизу p_img.showGrid(x=False, y=False) p_img.setLabel("bottom", "X") p_img.setLabel("left", "Номер свипа (время →)") img = pg.ImageItem() p_img.addItem(img) # Спектр (слева-снизу) p_fft = win.addPlot(row=1, col=0, title="Спектр (|FFT|, дБ)") p_fft.showGrid(x=True, y=True, alpha=0.3) curve_fft = p_fft.plot(pen=pg.mkPen((255, 120, 80), width=1)) p_fft.setLabel("bottom", "Бин") p_fft.setLabel("left", "Амплитуда, дБ") # Водопад спектров (справа-снизу) p_spec = win.addPlot(row=1, col=1, title="Водопад спектров (дБ)") p_spec.invertY(True) p_spec.showGrid(x=False, y=False) p_spec.setLabel("bottom", "Бин") p_spec.setLabel("left", "Номер свипа (время →)") img_fft = pg.ImageItem() p_spec.addItem(img_fft) # Состояние ring: Optional[np.ndarray] = None head = 0 width: Optional[int] = None x_shared: Optional[np.ndarray] = None current_sweep: Optional[np.ndarray] = None y_min, y_max = None, None # Для спектров fft_bins = FFT_LEN // 2 + 1 ring_fft: Optional[np.ndarray] = None freq_shared: Optional[np.ndarray] = None y_min_fft, y_max_fft = None, None # Диапазон по Y: авто по умолчанию (поддерживает отрицательные значения) fixed_ylim: Optional[Tuple[float, float]] = None if args.ylim: try: y0, y1 = args.ylim.split(",") fixed_ylim = (float(y0), float(y1)) except Exception: pass if fixed_ylim is not None: p_line.setYRange(fixed_ylim[0], fixed_ylim[1], padding=0) def ensure_buffer(_w: int): nonlocal ring, head, width, x_shared, ring_fft, freq_shared if ring is not None: return width = WF_WIDTH x_shared = np.arange(width, dtype=np.int32) ring = np.full((max_sweeps, width), np.nan, dtype=np.float32) head = 0 img.setImage(ring, autoLevels=False) p_img.setRange(xRange=(0, max(1, width - 1)), yRange=(0, max_sweeps - 1), padding=0) p_line.setXRange(0, max(1, width - 1), padding=0) # FFT ring_fft = np.full((max_sweeps, fft_bins), np.nan, dtype=np.float32) img_fft.setImage(ring_fft, autoLevels=False) p_spec.setRange(xRange=(0, max(1, fft_bins - 1)), yRange=(0, max_sweeps - 1), padding=0) p_fft.setXRange(0, max(1, fft_bins - 1), padding=0) freq_shared = np.arange(fft_bins, dtype=np.int32) def push_sweep(s: np.ndarray): nonlocal ring, head, y_min, y_max, ring_fft, y_min_fft, y_max_fft if s is None or s.size == 0 or ring is None: return w = ring.shape[1] row = np.full((w,), np.nan, dtype=np.float32) take = min(w, s.size) row[:take] = s[:take] ring[head, :] = row head = (head + 1) % ring.shape[0] # Обновление экстремумов sv_min = np.nanmin(row) sv_max = np.nanmax(row) if y_min is None or (not np.isnan(sv_min) and sv_min < y_min): y_min = float(sv_min) if y_max is None or (not np.isnan(sv_max) and sv_max > y_max): y_max = float(sv_max) # FFT строка (дБ) if ring_fft is not None: bins = ring_fft.shape[1] take_fft = min(int(s.size), FFT_LEN) if take_fft > 0: fft_in = np.zeros((FFT_LEN,), dtype=np.float32) seg = np.nan_to_num(s[:take_fft], nan=0.0).astype(np.float32, copy=False) win = np.hanning(take_fft).astype(np.float32) fft_in[:take_fft] = seg * win spec = np.fft.rfft(fft_in) mag = np.abs(spec).astype(np.float32) fft_row = 20.0 * np.log10(mag + 1e-9) if fft_row.shape[0] != bins: fft_row = fft_row[:bins] else: fft_row = np.full((bins,), np.nan, dtype=np.float32) ring_fft[(head - 1) % ring_fft.shape[0], :] = fft_row fr_min = np.nanmin(fft_row) fr_max = np.nanmax(fft_row) if y_min_fft is None or (not np.isnan(fr_min) and fr_min < y_min_fft): y_min_fft = float(fr_min) if y_max_fft is None or (not np.isnan(fr_max) and fr_max > y_max_fft): y_max_fft = float(fr_max) def drain_queue(): nonlocal current_sweep drained = 0 while True: try: s = q.get_nowait() except Empty: break drained += 1 current_sweep = s ensure_buffer(s.size) push_sweep(s) return drained # Попытка применить LUT из колормэпа (если доступен) try: cm_mod = getattr(pg, "colormap", None) if cm_mod is not None: cm = cm_mod.get(args.cmap) img.setLookupTable(cm.getLookupTable(0.0, 1.0, 256)) except Exception: pass def update(): changed = drain_queue() > 0 if current_sweep is not None and x_shared is not None: if current_sweep.size <= x_shared.size: xs = x_shared[: current_sweep.size] else: xs = np.arange(current_sweep.size) curve.setData(xs, current_sweep, autoDownsample=True) if fixed_ylim is None: y0 = float(np.nanmin(current_sweep)) y1 = float(np.nanmax(current_sweep)) if np.isfinite(y0) and np.isfinite(y1): margin = 0.05 * max(1.0, (y1 - y0)) p_line.setYRange(y0 - margin, y1 + margin, padding=0) # Обновим спектр take_fft = min(int(current_sweep.size), FFT_LEN) if take_fft > 0 and freq_shared is not None: fft_in = np.zeros((FFT_LEN,), dtype=np.float32) seg = np.nan_to_num(current_sweep[:take_fft], nan=0.0).astype(np.float32, copy=False) win = np.hanning(take_fft).astype(np.float32) fft_in[:take_fft] = seg * win spec = np.fft.rfft(fft_in) mag = np.abs(spec).astype(np.float32) fft_vals = 20.0 * np.log10(mag + 1e-9) xs_fft = freq_shared if fft_vals.size > xs_fft.size: fft_vals = fft_vals[: xs_fft.size] curve_fft.setData(xs_fft[: fft_vals.size], fft_vals) p_fft.setYRange(float(np.nanmin(fft_vals)), float(np.nanmax(fft_vals)), padding=0) if changed and ring is not None: disp = ring if head == 0 else np.roll(ring, -head, axis=0) if y_min is not None and y_max is not None and y_min != y_max and np.isfinite(y_min) and np.isfinite(y_max): img.setImage(disp, autoLevels=False, levels=(y_min, y_max)) else: img.setImage(disp, autoLevels=False) if changed and ring_fft is not None: disp_fft = ring_fft if head == 0 else np.roll(ring_fft, -head, axis=0) if y_min_fft is not None and y_max_fft is not None and y_min_fft != y_max_fft and np.isfinite(y_min_fft) and np.isfinite(y_max_fft): img_fft.setImage(disp_fft, autoLevels=False, levels=(y_min_fft, y_max_fft)) else: img_fft.setImage(disp_fft, autoLevels=False) timer = pg.QtCore.QTimer() timer.timeout.connect(update) timer.start(interval_ms) def on_quit(): stop_event.set() reader.join(timeout=1.0) app.aboutToQuit.connect(on_quit) win.show() exec_fn = getattr(app, "exec_", None) or getattr(app, "exec", None) exec_fn() # На случай если aboutToQuit не сработал on_quit() if __name__ == "__main__": main()