commit 97f821d0a2c6e1be1d465975884079709d7df360 Author: Theodor Chikin Date: Thu Dec 18 23:21:52 2025 +0300 plotter works. But rather slow diff --git a/RFG_ADC_dataplotter.py b/RFG_ADC_dataplotter.py new file mode 100755 index 0000000..024fca9 --- /dev/null +++ b/RFG_ADC_dataplotter.py @@ -0,0 +1,425 @@ +#!/usr/bin/env python3 +""" +Реалтайм-плоттер для свипов из виртуального COM-порта. + +Формат строк: + - "Sweep_start" — начало нового свипа (предыдущий считается завершённым) + - "curr_step X Y" — точка (индекс X, значение Y), все целые со знаком + +Отрисовываются два графика: + - Левый: последний полученный свип (Y vs X) + - Правый: водопад (последние N свипов во времени) + +Оптимизации для скорости: + - Парсинг и чтение в фоновой нити + - Анимация с обновлением только данных (без лишнего пересоздания фигур) + - Кольцевой буфер под водопад с фиксированным числом свипов + +Зависимости: matplotlib, numpy. PySerial опционален — при его отсутствии +используется сырой доступ к TTY через termios. +""" + +import argparse +import io +import os +import sys +import threading +import time +from queue import Queue, Empty, Full +from typing import Optional, Tuple + +import numpy as np + + +def try_open_pyserial(path: str, baud: int, timeout: float): + try: + import serial # type: ignore + except Exception: + return None + try: + ser = serial.Serial(path, baudrate=baud, timeout=timeout) + return ser + except Exception: + return None + + +class FDReader: + """Простой враппер чтения строк из файлового дескриптора TTY.""" + + def __init__(self, fd: int): + # Отдельно буферизуем для корректной readline() + raw = os.fdopen(fd, "rb", closefd=False) + self._buf = io.BufferedReader(raw, buffer_size=65536) + + def readline(self) -> bytes: + return self._buf.readline() + + def close(self): + try: + self._buf.close() + except Exception: + pass + + +def open_raw_tty(path: str, baud: int) -> Optional[FDReader]: + """Открыть TTY без pyserial и настроить порт через termios. + + Возвращает FDReader или None при ошибке. + """ + try: + import termios + import tty + except Exception: + return None + + try: + fd = os.open(path, os.O_RDONLY | os.O_NOCTTY) + except Exception: + return None + + try: + attrs = termios.tcgetattr(fd) + # Установим «сырое» состояние + tty.setraw(fd) + + # Скорость + baud_map = { + 9600: termios.B9600, + 19200: termios.B19200, + 38400: termios.B38400, + 57600: termios.B57600, + 115200: termios.B115200, + 230400: getattr(termios, "B230400", None), + 460800: getattr(termios, "B460800", None), + } + b = baud_map.get(baud) or termios.B115200 + + attrs[4] = b # ispeed + attrs[5] = b # ospeed + + # VMIN=1, VTIME=0 — блокирующее чтение по байту + cc = attrs[6] + cc[termios.VMIN] = 1 + cc[termios.VTIME] = 0 + attrs[6] = cc + + termios.tcsetattr(fd, termios.TCSANOW, attrs) + except Exception: + try: + os.close(fd) + except Exception: + pass + return None + + return FDReader(fd) + + +class SerialLineSource: + """Единый интерфейс для чтения строк из порта (pyserial или raw TTY).""" + + def __init__(self, path: str, baud: int, timeout: float = 1.0): + self._pyserial = try_open_pyserial(path, baud, timeout) + self._fdreader = None + self._using = "pyserial" if self._pyserial is not None else "raw" + if self._pyserial is None: + self._fdreader = open_raw_tty(path, baud) + if self._fdreader is None: + raise RuntimeError( + f"Не удалось открыть порт '{path}' (pyserial и raw TTY не сработали)" + ) + + def readline(self) -> bytes: + if self._pyserial is not None: + try: + return self._pyserial.readline() + except Exception: + return b"" + else: + try: + return self._fdreader.readline() # type: ignore[union-attr] + except Exception: + return b"" + + def close(self): + try: + if self._pyserial is not None: + self._pyserial.close() + elif self._fdreader is not None: + self._fdreader.close() + except Exception: + pass + + +class SweepReader(threading.Thread): + """Фоновый поток: читает строки, формирует завершённые свипы и кладёт в очередь.""" + + def __init__(self, port_path: str, baud: int, out_queue: Queue, stop_event: threading.Event): + super().__init__(daemon=True) + self._port_path = port_path + self._baud = baud + self._q = out_queue + self._stop = stop_event + self._src: Optional[SerialLineSource] = None + + def _finalize_current(self, xs, ys): + if not xs: + return + try: + max_x = max(xs) + except ValueError: + return + width = max_x + 1 + sweep = np.full((width,), np.nan, dtype=np.float32) + # Заполнение известными точками + for x, y in zip(xs, ys): + if 0 <= x < width: + sweep[x] = float(y) + # Кладём готовый свип (если очередь полна — выбрасываем самый старый) + try: + self._q.put_nowait(sweep) + except Full: + try: + _ = self._q.get_nowait() + except Exception: + pass + try: + self._q.put_nowait(sweep) + except Exception: + pass + + def run(self): + # Состояние текущего свипа + xs: list[int] = [] + ys: list[int] = [] + + try: + self._src = SerialLineSource(self._port_path, self._baud, timeout=1.0) + sys.stderr.write(f"[info] Открыл порт {self._port_path} ({self._src._using})\n") + except Exception as e: + sys.stderr.write(f"[error] {e}\n") + return + + try: + while not self._stop.is_set(): + raw = self._src.readline() + if not raw: + # timeout/ошибка/EOF — небольшой сон, чтобы не крутить CPU + time.sleep(0.001) + continue + try: + line = raw.decode("ascii", errors="ignore").strip() + except Exception: + continue + + if not line: + continue + + if line.startswith("Sweep_start"): + # Завершаем предыдущий, начинаем новый + self._finalize_current(xs, ys) + xs.clear() + ys.clear() + continue + + # curr_step X Y + # Разрешим как с пробелами, так и табами + parts = line.split() + if len(parts) >= 3 and parts[0].lower() == "curr_step": + try: + x = int(parts[1], 10) + y = int(parts[2], 10) + except Exception: + continue + xs.append(x) + ys.append(y) + finally: + try: + # Завершаем оставшийся свип + self._finalize_current(xs, ys) + except Exception: + pass + try: + if self._src is not None: + self._src.close() + except Exception: + pass + + +def main(): + parser = argparse.ArgumentParser( + description=( + "Читает свипы из виртуального COM-порта и рисует: " + "последний свип и водопад (реалтайм)." + ) + ) + parser.add_argument("port", help="Путь к порту, например /dev/ttyACM1") + parser.add_argument("--baud", type=int, default=115200, help="Скорость (по умолчанию 115200)") + parser.add_argument("--max-sweeps", type=int, default=200, help="Количество видимых свипов в водопаде") + parser.add_argument("--max-fps", type=float, default=30.0, help="Лимит частоты отрисовки, кадров/с") + parser.add_argument("--cmap", default="viridis", help="Цветовая карта водопада") + parser.add_argument("--title", default="ADC Sweeps", help="Заголовок окна") + parser.add_argument( + "--ylim", + type=str, + default=None, + help="Фиксированные Y-пределы для кривой формата min,max (например -1000,1000). По умолчанию авто", + ) + + args = parser.parse_args() + + try: + import matplotlib + import matplotlib.pyplot as plt + from matplotlib.animation import FuncAnimation + except Exception as e: + sys.stderr.write(f"[error] Нужны matplotlib и ее зависимости: {e}\n") + sys.exit(1) + + # Очередь завершённых свипов и поток чтения + q: Queue[np.ndarray] = Queue(maxsize=1000) + stop_event = threading.Event() + reader = SweepReader(args.port, args.baud, q, stop_event) + reader.start() + + # Графика + fig, (ax_line, ax_img) = plt.subplots(1, 2, figsize=(12, 6)) + fig.canvas.manager.set_window_title(args.title) if hasattr(fig.canvas.manager, "set_window_title") else None + fig.tight_layout() + + # Состояние для отображения + current_sweep: Optional[np.ndarray] = None + width: Optional[int] = None + max_sweeps = int(max(10, args.max_sweeps)) + ring = None # type: Optional[np.ndarray] + head = 0 + y_min, y_max = None, None + + # Линейный график последнего свипа + line_obj, = ax_line.plot([], [], lw=1) + ax_line.set_title("Последний свип") + ax_line.set_xlabel("X") + ax_line.set_ylabel("Y") + + fixed_ylim: Optional[Tuple[float, float]] = None + if args.ylim: + try: + y0, y1 = args.ylim.split(",") + fixed_ylim = (float(y0), float(y1)) + ax_line.set_ylim(fixed_ylim) + except Exception: + sys.stderr.write("[warn] Некорректный формат --ylim, игнорирую. Ожидалось min,max\n") + + # Водопад (будет инициализирован при первом свипе) + img_obj = ax_img.imshow( + np.zeros((1, 1), dtype=np.float32), + aspect="auto", + interpolation="nearest", + origin="upper", + cmap=args.cmap, + ) + ax_img.set_title("Водопад (последние свипы)") + ax_img.set_xlabel("X") + ax_img.set_ylabel("Номер свипа (время →)") + + # Для контроля частоты обновления + max_fps = max(1.0, float(args.max_fps)) + interval_ms = int(1000.0 / max_fps) + frames_since_ylim_update = 0 + + def ensure_buffer(w: int): + nonlocal ring, width, head + if ring is not None and width == w: + return + width = w + ring = np.full((max_sweeps, w), np.nan, dtype=np.float32) + head = 0 + # Обновляем изображение под новые размеры + img_obj.set_data(ring) + img_obj.set_extent((0, w - 1 if w > 0 else 1, 0, max_sweeps - 1)) + ax_img.set_xlim(0, max(1, w - 1)) + ax_img.set_ylim(max_sweeps - 1, 0) + + def push_sweep(s: np.ndarray): + nonlocal ring, head, y_min, y_max + if s is None or s.size == 0 or ring is None: + return + # Нормализуем длину + w = ring.shape[1] + row = np.full((w,), np.nan, dtype=np.float32) + take = min(w, s.size) + row[:take] = s[:take] + ring[head, :] = row + head = (head + 1) % ring.shape[0] + # Обновляем мин/макс по данным (игнорим NaN) + sv_min = np.nanmin(row) + sv_max = np.nanmax(row) + if y_min is None or (not np.isnan(sv_min) and sv_min < y_min): + y_min = float(sv_min) + if y_max is None or (not np.isnan(sv_max) and sv_max > y_max): + y_max = float(sv_max) + + def drain_queue(): + nonlocal current_sweep + drained = 0 + while True: + try: + s = q.get_nowait() + except Empty: + break + drained += 1 + current_sweep = s + ensure_buffer(s.size) + push_sweep(s) + return drained + + def make_display_ring(): + # Возвращаем буфер с правильным порядком строк (старые сверху, новые снизу) + if ring is None: + return np.zeros((1, 1), dtype=np.float32) + if head == 0: + return ring + return np.roll(ring, -head, axis=0) + + def update(_frame): + nonlocal frames_since_ylim_update + changed = drain_queue() > 0 + + # Обновление линии последнего свипа + if current_sweep is not None: + x = np.arange(current_sweep.size, dtype=np.int32) + line_obj.set_data(x, current_sweep) + # Лимиты по X постоянные под текущую ширину + ax_line.set_xlim(0, max(1, current_sweep.size - 1)) + # Y-лимиты: фиксированные либо периодическая автоподстройка + if fixed_ylim is None: + frames_since_ylim_update += 1 + if frames_since_ylim_update >= 3: # реже трогаем ось для скорости + y0 = np.nanmin(current_sweep) + y1 = np.nanmax(current_sweep) + if np.isfinite(y0) and np.isfinite(y1): + margin = 0.05 * max(1.0, float(y1 - y0)) + ax_line.set_ylim(y0 - margin, y1 + margin) + frames_since_ylim_update = 0 + + # Обновление водопада + if changed and ring is not None: + disp = make_display_ring() + img_obj.set_data(disp) + # Актуализируем цветовую шкалу только при расширении экстремумов + if y_min is not None and y_max is not None and np.isfinite(y_min) and np.isfinite(y_max): + if y_min != y_max: + img_obj.set_clim(vmin=y_min, vmax=y_max) + + # Возвращаем обновлённые артисты + return (line_obj, img_obj) + + ani = FuncAnimation(fig, update, interval=interval_ms, blit=False) + + plt.show() + # Нормальное завершение при закрытии окна + stop_event.set() + reader.join(timeout=1.0) + + +if __name__ == "__main__": + main()