a bit faster realisation. But waterfall fails when number of points in a row varyes

This commit is contained in:
2025-12-18 23:29:43 +03:00
parent 97f821d0a2
commit 4959cb6347

View File

@ -264,9 +264,25 @@ def main():
default=None,
help="Фиксированные Y-пределы для кривой формата min,max (например -1000,1000). По умолчанию авто",
)
parser.add_argument(
"--backend",
choices=["auto", "pg", "mpl"],
default="auto",
help="Графический бэкенд: pyqtgraph (pg) — быстрее; matplotlib (mpl) — совместимый. По умолчанию auto",
)
args = parser.parse_args()
# Попробуем быстрый бэкенд (pyqtgraph) при auto/pg
if args.backend in ("auto", "pg"):
try:
return run_pyqtgraph(args)
except Exception as e:
if args.backend == "pg":
sys.stderr.write(f"[error] PyQtGraph бэкенд недоступен: {e}\n")
sys.exit(1)
# При auto — тихо откатываемся на matplotlib
try:
import matplotlib
import matplotlib.pyplot as plt
@ -288,6 +304,7 @@ def main():
# Состояние для отображения
current_sweep: Optional[np.ndarray] = None
x_shared: Optional[np.ndarray] = None
width: Optional[int] = None
max_sweeps = int(max(10, args.max_sweeps))
ring = None # type: Optional[np.ndarray]
@ -327,10 +344,11 @@ def main():
frames_since_ylim_update = 0
def ensure_buffer(w: int):
nonlocal ring, width, head
nonlocal ring, width, head, x_shared
if ring is not None and width == w:
return
width = w
x_shared = np.arange(w, dtype=np.int32)
ring = np.full((max_sweeps, w), np.nan, dtype=np.float32)
head = 0
# Обновляем изображение под новые размеры
@ -386,8 +404,8 @@ def main():
# Обновление линии последнего свипа
if current_sweep is not None:
x = np.arange(current_sweep.size, dtype=np.int32)
line_obj.set_data(x, current_sweep)
xs = x_shared if x_shared is not None and x_shared.size == current_sweep.size else np.arange(current_sweep.size, dtype=np.int32)
line_obj.set_data(xs, current_sweep)
# Лимиты по X постоянные под текущую ширину
ax_line.set_xlim(0, max(1, current_sweep.size - 1))
# Y-лимиты: фиксированные либо периодическая автоподстройка
@ -421,5 +439,156 @@ def main():
reader.join(timeout=1.0)
def run_pyqtgraph(args):
"""Быстрый GUI на PyQtGraph. Требует pyqtgraph и PyQt5/PySide6."""
try:
import pyqtgraph as pg
from PyQt5 import QtCore, QtWidgets # noqa: F401
except Exception:
# Возможно установлена PySide6
try:
import pyqtgraph as pg
from PySide6 import QtCore, QtWidgets # noqa: F401
except Exception as e:
raise RuntimeError(
"pyqtgraph/PyQt5(Pyside6) не найдены. Установите: pip install pyqtgraph PyQt5"
) from e
# Очередь завершённых свипов и поток чтения
q: Queue[np.ndarray] = Queue(maxsize=1000)
stop_event = threading.Event()
reader = SweepReader(args.port, args.baud, q, stop_event)
reader.start()
# Настройки скорости
max_sweeps = int(max(10, args.max_sweeps))
max_fps = max(1.0, float(args.max_fps))
interval_ms = int(1000.0 / max_fps)
# PyQtGraph настройки
pg.setConfigOptions(useOpenGL=True, antialias=False)
app = pg.mkQApp(args.title)
win = pg.GraphicsLayoutWidget(show=True, title=args.title)
win.resize(1200, 600)
# Плот последнего свипа (слева)
p_line = win.addPlot(row=0, col=0, title="Последний свип")
p_line.showGrid(x=True, y=True, alpha=0.3)
curve = p_line.plot(pen=pg.mkPen((80, 120, 255), width=1))
p_line.setLabel("bottom", "X")
p_line.setLabel("left", "Y")
# Водопад (справа)
p_img = win.addPlot(row=0, col=1, title="Водопад (последние свипы)")
p_img.invertY(True) # 0 сверху, новые снизу
p_img.showGrid(x=False, y=False)
p_img.setLabel("bottom", "X")
p_img.setLabel("left", "Номер свипа (время →)")
img = pg.ImageItem()
p_img.addItem(img)
# Состояние
ring: Optional[np.ndarray] = None
head = 0
width: Optional[int] = None
x_shared: Optional[np.ndarray] = None
current_sweep: Optional[np.ndarray] = None
y_min, y_max = None, None
fixed_ylim: Optional[Tuple[float, float]] = None
if args.ylim:
try:
y0, y1 = args.ylim.split(",")
fixed_ylim = (float(y0), float(y1))
p_line.setYRange(fixed_ylim[0], fixed_ylim[1], padding=0)
except Exception:
pass
def ensure_buffer(w: int):
nonlocal ring, head, width, x_shared
if ring is not None and width == w:
return
width = w
x_shared = np.arange(w, dtype=np.int32)
ring = np.full((max_sweeps, w), np.nan, dtype=np.float32)
head = 0
img.setImage(ring, autoLevels=False)
p_img.setRange(xRange=(0, max(1, w - 1)), yRange=(0, max_sweeps - 1), padding=0)
p_line.setXRange(0, max(1, w - 1), padding=0)
def push_sweep(s: np.ndarray):
nonlocal ring, head, y_min, y_max
if s is None or s.size == 0 or ring is None:
return
w = ring.shape[1]
row = np.full((w,), np.nan, dtype=np.float32)
take = min(w, s.size)
row[:take] = s[:take]
ring[head, :] = row
head = (head + 1) % ring.shape[0]
# Обновление экстремумов
sv_min = np.nanmin(row)
sv_max = np.nanmax(row)
if y_min is None or (not np.isnan(sv_min) and sv_min < y_min):
y_min = float(sv_min)
if y_max is None or (not np.isnan(sv_max) and sv_max > y_max):
y_max = float(sv_max)
def drain_queue():
nonlocal current_sweep
drained = 0
while True:
try:
s = q.get_nowait()
except Empty:
break
drained += 1
current_sweep = s
ensure_buffer(s.size)
push_sweep(s)
return drained
# Попытка применить LUT из колормэпа (если доступен)
try:
cm_mod = getattr(pg, "colormap", None)
if cm_mod is not None:
cm = cm_mod.get(args.cmap)
img.setLookupTable(cm.getLookupTable(0.0, 1.0, 256))
except Exception:
pass
def update():
changed = drain_queue() > 0
if current_sweep is not None and x_shared is not None:
curve.setData(x_shared[: current_sweep.size], current_sweep, autoDownsample=True)
if fixed_ylim is None:
y0 = float(np.nanmin(current_sweep))
y1 = float(np.nanmax(current_sweep))
if np.isfinite(y0) and np.isfinite(y1):
margin = 0.05 * max(1.0, (y1 - y0))
p_line.setYRange(y0 - margin, y1 + margin, padding=0)
if changed and ring is not None:
disp = ring if head == 0 else np.roll(ring, -head, axis=0)
if y_min is not None and y_max is not None and y_min != y_max and np.isfinite(y_min) and np.isfinite(y_max):
img.setImage(disp, autoLevels=False, levels=(y_min, y_max))
else:
img.setImage(disp, autoLevels=False)
timer = pg.QtCore.QTimer()
timer.timeout.connect(update)
timer.start(interval_ms)
def on_quit():
stop_event.set()
reader.join(timeout=1.0)
app.aboutToQuit.connect(on_quit)
win.show()
exec_fn = getattr(app, "exec_", None) or getattr(app, "exec", None)
exec_fn()
# На случай если aboutToQuit не сработал
on_quit()
if __name__ == "__main__":
main()