plotter works. But rather slow

This commit is contained in:
2025-12-18 23:21:52 +03:00
commit 97f821d0a2

425
RFG_ADC_dataplotter.py Executable file
View File

@ -0,0 +1,425 @@
#!/usr/bin/env python3
"""
Реалтайм-плоттер для свипов из виртуального COM-порта.
Формат строк:
- "Sweep_start" — начало нового свипа (предыдущий считается завершённым)
- "curr_step X Y" — точка (индекс X, значение Y), все целые со знаком
Отрисовываются два графика:
- Левый: последний полученный свип (Y vs X)
- Правый: водопад (последние N свипов во времени)
Оптимизации для скорости:
- Парсинг и чтение в фоновой нити
- Анимация с обновлением только данных (без лишнего пересоздания фигур)
- Кольцевой буфер под водопад с фиксированным числом свипов
Зависимости: matplotlib, numpy. PySerial опционален — при его отсутствии
используется сырой доступ к TTY через termios.
"""
import argparse
import io
import os
import sys
import threading
import time
from queue import Queue, Empty, Full
from typing import Optional, Tuple
import numpy as np
def try_open_pyserial(path: str, baud: int, timeout: float):
try:
import serial # type: ignore
except Exception:
return None
try:
ser = serial.Serial(path, baudrate=baud, timeout=timeout)
return ser
except Exception:
return None
class FDReader:
"""Простой враппер чтения строк из файлового дескриптора TTY."""
def __init__(self, fd: int):
# Отдельно буферизуем для корректной readline()
raw = os.fdopen(fd, "rb", closefd=False)
self._buf = io.BufferedReader(raw, buffer_size=65536)
def readline(self) -> bytes:
return self._buf.readline()
def close(self):
try:
self._buf.close()
except Exception:
pass
def open_raw_tty(path: str, baud: int) -> Optional[FDReader]:
"""Открыть TTY без pyserial и настроить порт через termios.
Возвращает FDReader или None при ошибке.
"""
try:
import termios
import tty
except Exception:
return None
try:
fd = os.open(path, os.O_RDONLY | os.O_NOCTTY)
except Exception:
return None
try:
attrs = termios.tcgetattr(fd)
# Установим «сырое» состояние
tty.setraw(fd)
# Скорость
baud_map = {
9600: termios.B9600,
19200: termios.B19200,
38400: termios.B38400,
57600: termios.B57600,
115200: termios.B115200,
230400: getattr(termios, "B230400", None),
460800: getattr(termios, "B460800", None),
}
b = baud_map.get(baud) or termios.B115200
attrs[4] = b # ispeed
attrs[5] = b # ospeed
# VMIN=1, VTIME=0 — блокирующее чтение по байту
cc = attrs[6]
cc[termios.VMIN] = 1
cc[termios.VTIME] = 0
attrs[6] = cc
termios.tcsetattr(fd, termios.TCSANOW, attrs)
except Exception:
try:
os.close(fd)
except Exception:
pass
return None
return FDReader(fd)
class SerialLineSource:
"""Единый интерфейс для чтения строк из порта (pyserial или raw TTY)."""
def __init__(self, path: str, baud: int, timeout: float = 1.0):
self._pyserial = try_open_pyserial(path, baud, timeout)
self._fdreader = None
self._using = "pyserial" if self._pyserial is not None else "raw"
if self._pyserial is None:
self._fdreader = open_raw_tty(path, baud)
if self._fdreader is None:
raise RuntimeError(
f"Не удалось открыть порт '{path}' (pyserial и raw TTY не сработали)"
)
def readline(self) -> bytes:
if self._pyserial is not None:
try:
return self._pyserial.readline()
except Exception:
return b""
else:
try:
return self._fdreader.readline() # type: ignore[union-attr]
except Exception:
return b""
def close(self):
try:
if self._pyserial is not None:
self._pyserial.close()
elif self._fdreader is not None:
self._fdreader.close()
except Exception:
pass
class SweepReader(threading.Thread):
"""Фоновый поток: читает строки, формирует завершённые свипы и кладёт в очередь."""
def __init__(self, port_path: str, baud: int, out_queue: Queue, stop_event: threading.Event):
super().__init__(daemon=True)
self._port_path = port_path
self._baud = baud
self._q = out_queue
self._stop = stop_event
self._src: Optional[SerialLineSource] = None
def _finalize_current(self, xs, ys):
if not xs:
return
try:
max_x = max(xs)
except ValueError:
return
width = max_x + 1
sweep = np.full((width,), np.nan, dtype=np.float32)
# Заполнение известными точками
for x, y in zip(xs, ys):
if 0 <= x < width:
sweep[x] = float(y)
# Кладём готовый свип (если очередь полна — выбрасываем самый старый)
try:
self._q.put_nowait(sweep)
except Full:
try:
_ = self._q.get_nowait()
except Exception:
pass
try:
self._q.put_nowait(sweep)
except Exception:
pass
def run(self):
# Состояние текущего свипа
xs: list[int] = []
ys: list[int] = []
try:
self._src = SerialLineSource(self._port_path, self._baud, timeout=1.0)
sys.stderr.write(f"[info] Открыл порт {self._port_path} ({self._src._using})\n")
except Exception as e:
sys.stderr.write(f"[error] {e}\n")
return
try:
while not self._stop.is_set():
raw = self._src.readline()
if not raw:
# timeout/ошибка/EOF — небольшой сон, чтобы не крутить CPU
time.sleep(0.001)
continue
try:
line = raw.decode("ascii", errors="ignore").strip()
except Exception:
continue
if not line:
continue
if line.startswith("Sweep_start"):
# Завершаем предыдущий, начинаем новый
self._finalize_current(xs, ys)
xs.clear()
ys.clear()
continue
# curr_step X Y
# Разрешим как с пробелами, так и табами
parts = line.split()
if len(parts) >= 3 and parts[0].lower() == "curr_step":
try:
x = int(parts[1], 10)
y = int(parts[2], 10)
except Exception:
continue
xs.append(x)
ys.append(y)
finally:
try:
# Завершаем оставшийся свип
self._finalize_current(xs, ys)
except Exception:
pass
try:
if self._src is not None:
self._src.close()
except Exception:
pass
def main():
parser = argparse.ArgumentParser(
description=(
"Читает свипы из виртуального COM-порта и рисует: "
"последний свип и водопад (реалтайм)."
)
)
parser.add_argument("port", help="Путь к порту, например /dev/ttyACM1")
parser.add_argument("--baud", type=int, default=115200, help="Скорость (по умолчанию 115200)")
parser.add_argument("--max-sweeps", type=int, default=200, help="Количество видимых свипов в водопаде")
parser.add_argument("--max-fps", type=float, default=30.0, help="Лимит частоты отрисовки, кадров/с")
parser.add_argument("--cmap", default="viridis", help="Цветовая карта водопада")
parser.add_argument("--title", default="ADC Sweeps", help="Заголовок окна")
parser.add_argument(
"--ylim",
type=str,
default=None,
help="Фиксированные Y-пределы для кривой формата min,max (например -1000,1000). По умолчанию авто",
)
args = parser.parse_args()
try:
import matplotlib
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
except Exception as e:
sys.stderr.write(f"[error] Нужны matplotlib и ее зависимости: {e}\n")
sys.exit(1)
# Очередь завершённых свипов и поток чтения
q: Queue[np.ndarray] = Queue(maxsize=1000)
stop_event = threading.Event()
reader = SweepReader(args.port, args.baud, q, stop_event)
reader.start()
# Графика
fig, (ax_line, ax_img) = plt.subplots(1, 2, figsize=(12, 6))
fig.canvas.manager.set_window_title(args.title) if hasattr(fig.canvas.manager, "set_window_title") else None
fig.tight_layout()
# Состояние для отображения
current_sweep: Optional[np.ndarray] = None
width: Optional[int] = None
max_sweeps = int(max(10, args.max_sweeps))
ring = None # type: Optional[np.ndarray]
head = 0
y_min, y_max = None, None
# Линейный график последнего свипа
line_obj, = ax_line.plot([], [], lw=1)
ax_line.set_title("Последний свип")
ax_line.set_xlabel("X")
ax_line.set_ylabel("Y")
fixed_ylim: Optional[Tuple[float, float]] = None
if args.ylim:
try:
y0, y1 = args.ylim.split(",")
fixed_ylim = (float(y0), float(y1))
ax_line.set_ylim(fixed_ylim)
except Exception:
sys.stderr.write("[warn] Некорректный формат --ylim, игнорирую. Ожидалось min,max\n")
# Водопад (будет инициализирован при первом свипе)
img_obj = ax_img.imshow(
np.zeros((1, 1), dtype=np.float32),
aspect="auto",
interpolation="nearest",
origin="upper",
cmap=args.cmap,
)
ax_img.set_title("Водопад (последние свипы)")
ax_img.set_xlabel("X")
ax_img.set_ylabel("Номер свипа (время →)")
# Для контроля частоты обновления
max_fps = max(1.0, float(args.max_fps))
interval_ms = int(1000.0 / max_fps)
frames_since_ylim_update = 0
def ensure_buffer(w: int):
nonlocal ring, width, head
if ring is not None and width == w:
return
width = w
ring = np.full((max_sweeps, w), np.nan, dtype=np.float32)
head = 0
# Обновляем изображение под новые размеры
img_obj.set_data(ring)
img_obj.set_extent((0, w - 1 if w > 0 else 1, 0, max_sweeps - 1))
ax_img.set_xlim(0, max(1, w - 1))
ax_img.set_ylim(max_sweeps - 1, 0)
def push_sweep(s: np.ndarray):
nonlocal ring, head, y_min, y_max
if s is None or s.size == 0 or ring is None:
return
# Нормализуем длину
w = ring.shape[1]
row = np.full((w,), np.nan, dtype=np.float32)
take = min(w, s.size)
row[:take] = s[:take]
ring[head, :] = row
head = (head + 1) % ring.shape[0]
# Обновляем мин/макс по данным (игнорим NaN)
sv_min = np.nanmin(row)
sv_max = np.nanmax(row)
if y_min is None or (not np.isnan(sv_min) and sv_min < y_min):
y_min = float(sv_min)
if y_max is None or (not np.isnan(sv_max) and sv_max > y_max):
y_max = float(sv_max)
def drain_queue():
nonlocal current_sweep
drained = 0
while True:
try:
s = q.get_nowait()
except Empty:
break
drained += 1
current_sweep = s
ensure_buffer(s.size)
push_sweep(s)
return drained
def make_display_ring():
# Возвращаем буфер с правильным порядком строк (старые сверху, новые снизу)
if ring is None:
return np.zeros((1, 1), dtype=np.float32)
if head == 0:
return ring
return np.roll(ring, -head, axis=0)
def update(_frame):
nonlocal frames_since_ylim_update
changed = drain_queue() > 0
# Обновление линии последнего свипа
if current_sweep is not None:
x = np.arange(current_sweep.size, dtype=np.int32)
line_obj.set_data(x, current_sweep)
# Лимиты по X постоянные под текущую ширину
ax_line.set_xlim(0, max(1, current_sweep.size - 1))
# Y-лимиты: фиксированные либо периодическая автоподстройка
if fixed_ylim is None:
frames_since_ylim_update += 1
if frames_since_ylim_update >= 3: # реже трогаем ось для скорости
y0 = np.nanmin(current_sweep)
y1 = np.nanmax(current_sweep)
if np.isfinite(y0) and np.isfinite(y1):
margin = 0.05 * max(1.0, float(y1 - y0))
ax_line.set_ylim(y0 - margin, y1 + margin)
frames_since_ylim_update = 0
# Обновление водопада
if changed and ring is not None:
disp = make_display_ring()
img_obj.set_data(disp)
# Актуализируем цветовую шкалу только при расширении экстремумов
if y_min is not None and y_max is not None and np.isfinite(y_min) and np.isfinite(y_max):
if y_min != y_max:
img_obj.set_clim(vmin=y_min, vmax=y_max)
# Возвращаем обновлённые артисты
return (line_obj, img_obj)
ani = FuncAnimation(fig, update, interval=interval_ms, blit=False)
plt.show()
# Нормальное завершение при закрытии окна
stop_event.set()
reader.join(timeout=1.0)
if __name__ == "__main__":
main()