diff --git a/replay_pty.py b/replay_pty.py new file mode 100644 index 0000000..00a0798 --- /dev/null +++ b/replay_pty.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python3 +""" +Эмулятор серийного порта: воспроизводит лог-файл в цикле через PTY. + +Использование: + python3 replay_pty.py my_picocom_logfile.txt + python3 replay_pty.py my_picocom_logfile.txt --pty /tmp/ttyVIRT0 + python3 replay_pty.py my_picocom_logfile.txt --speed 2.0 # в 2 раза быстрее реального + python3 replay_pty.py my_picocom_logfile.txt --speed 0 # максимально быстро + +Затем в другом терминале: + python -m rfg_adc_plotter.main /tmp/ttyVIRT0 +""" + +import argparse +import os +import sys +import time + + +def main(): + parser = argparse.ArgumentParser( + description="Воспроизводит лог-файл через PTY как виртуальный серийный порт." + ) + parser.add_argument("file", help="Путь к лог-файлу (например my_picocom_logfile.txt)") + parser.add_argument( + "--pty", + default="/tmp/ttyVIRT0", + help="Путь симлинка PTY (по умолчанию /tmp/ttyVIRT0)", + ) + parser.add_argument( + "--speed", + type=float, + default=1.0, + help=( + "Множитель скорости воспроизведения: " + "1.0 = реальное время при --baud, " + "2.0 = вдвое быстрее, " + "0 = максимально быстро" + ), + ) + parser.add_argument( + "--baud", + type=int, + default=115200, + help="Скорость (бод) для расчёта задержек (по умолчанию 115200)", + ) + args = parser.parse_args() + + if not os.path.isfile(args.file): + sys.stderr.write(f"[error] Файл не найден: {args.file}\n") + sys.exit(1) + + # Открываем PTY-пару: master (мы пишем) / slave (GUI читает) + master_fd, slave_fd = os.openpty() + slave_path = os.ttyname(slave_fd) + os.close(slave_fd) # GUI откроет slave сам по симлинку + + # Симлинк с удобным именем + try: + os.unlink(args.pty) + except FileNotFoundError: + pass + os.symlink(slave_path, args.pty) + + print(f"PTY slave : {slave_path}") + print(f"Симлинк : {args.pty} → {slave_path}") + print(f"Запустите : python -m rfg_adc_plotter.main {args.pty}") + print("Ctrl+C для остановки.\n") + + # Задержка на байт: 10 бит (8N1) / скорость / множитель + if args.speed > 0: + bytes_per_sec = args.baud / 10.0 * args.speed + delay_per_byte = 1.0 / bytes_per_sec + else: + delay_per_byte = 0.0 + + loop = 0 + try: + while True: + loop += 1 + print(f"[loop {loop}] {args.file}") + with open(args.file, "rb") as f: + for line in f: + os.write(master_fd, line) + if delay_per_byte > 0: + time.sleep(delay_per_byte * len(line)) + except KeyboardInterrupt: + print("\nОстановлено.") + finally: + try: + os.unlink(args.pty) + except Exception: + pass + try: + os.close(master_fd) + except Exception: + pass + + +if __name__ == "__main__": + main() diff --git a/rfg_adc_plotter/__init__.py b/rfg_adc_plotter/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/rfg_adc_plotter/constants.py b/rfg_adc_plotter/constants.py new file mode 100644 index 0000000..47ca55c --- /dev/null +++ b/rfg_adc_plotter/constants.py @@ -0,0 +1,5 @@ +WF_WIDTH = 1000 # максимальное число точек в ряду водопада +FFT_LEN = 1024 # длина БПФ для спектра/водопада спектров +# Порог для инверсии сырых данных: если среднее значение свипа ниже порога — +# считаем, что сигнал «меньше нуля» и домножаем свип на -1 +DATA_INVERSION_THRESHOLD = 10.0 diff --git a/rfg_adc_plotter/gui/__init__.py b/rfg_adc_plotter/gui/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/rfg_adc_plotter/gui/matplotlib_backend.py b/rfg_adc_plotter/gui/matplotlib_backend.py new file mode 100644 index 0000000..c6664b0 --- /dev/null +++ b/rfg_adc_plotter/gui/matplotlib_backend.py @@ -0,0 +1,284 @@ +"""Matplotlib-бэкенд реалтайм-плоттера свипов.""" + +import sys +import threading +from queue import Queue +from typing import Optional, Tuple + +import numpy as np + +from rfg_adc_plotter.constants import FFT_LEN +from rfg_adc_plotter.io.sweep_reader import SweepReader +from rfg_adc_plotter.state.app_state import AppState, format_status +from rfg_adc_plotter.state.ring_buffer import RingBuffer +from rfg_adc_plotter.types import SweepPacket + + +def _parse_ylim(ylim_str: Optional[str]) -> Optional[Tuple[float, float]]: + if not ylim_str: + return None + try: + y0, y1 = ylim_str.split(",") + return (float(y0), float(y1)) + except Exception: + sys.stderr.write("[warn] Некорректный формат --ylim, игнорирую. Ожидалось min,max\n") + return None + + +def _parse_spec_clip(spec: Optional[str]) -> Optional[Tuple[float, float]]: + if not spec: + return None + s = str(spec).strip().lower() + if s in ("off", "none", "no"): + return None + try: + p0, p1 = s.replace(";", ",").split(",") + low, high = float(p0), float(p1) + if not (0.0 <= low < high <= 100.0): + return None + return (low, high) + except Exception: + return None + + +def _visible_levels(data: np.ndarray, axis) -> Optional[Tuple[float, float]]: + """(vmin, vmax) по текущей видимой области imshow.""" + if data.size == 0: + return None + ny, nx = data.shape[0], data.shape[1] + try: + x0, x1 = axis.get_xlim() + y0, y1 = axis.get_ylim() + except Exception: + x0, x1 = 0.0, float(nx - 1) + y0, y1 = 0.0, float(ny - 1) + xmin, xmax = sorted((float(x0), float(x1))) + ymin, ymax = sorted((float(y0), float(y1))) + ix0 = max(0, min(nx - 1, int(np.floor(xmin)))) + ix1 = max(0, min(nx - 1, int(np.ceil(xmax)))) + iy0 = max(0, min(ny - 1, int(np.floor(ymin)))) + iy1 = max(0, min(ny - 1, int(np.ceil(ymax)))) + if ix1 < ix0: + ix1 = ix0 + if iy1 < iy0: + iy1 = iy0 + sub = data[iy0 : iy1 + 1, ix0 : ix1 + 1] + finite = np.isfinite(sub) + if not finite.any(): + return None + vals = sub[finite] + vmin = float(np.min(vals)) + vmax = float(np.max(vals)) + if not (np.isfinite(vmin) and np.isfinite(vmax)) or vmin == vmax: + return None + return (vmin, vmax) + + +def run_matplotlib(args): + try: + import matplotlib + import matplotlib.pyplot as plt + from matplotlib.animation import FuncAnimation + from matplotlib.widgets import CheckButtons, Slider + except Exception as e: + sys.stderr.write(f"[error] Нужны matplotlib и её зависимости: {e}\n") + sys.exit(1) + + q: Queue[SweepPacket] = Queue(maxsize=1000) + stop_event = threading.Event() + reader = SweepReader(args.port, args.baud, q, stop_event, fancy=bool(args.fancy)) + reader.start() + + max_sweeps = int(max(10, args.max_sweeps)) + max_fps = max(1.0, float(args.max_fps)) + interval_ms = int(1000.0 / max_fps) + spec_clip = _parse_spec_clip(getattr(args, "spec_clip", None)) + spec_mean_sec = float(getattr(args, "spec_mean_sec", 0.0)) + fixed_ylim = _parse_ylim(getattr(args, "ylim", None)) + norm_type = str(getattr(args, "norm_type", "projector")).strip().lower() + + state = AppState(norm_type=norm_type) + ring = RingBuffer(max_sweeps) + + # --- Создание фигуры --- + fig, axs = plt.subplots(2, 2, figsize=(12, 8)) + (ax_line, ax_img), (ax_fft, ax_spec) = axs + if hasattr(fig.canvas.manager, "set_window_title"): + fig.canvas.manager.set_window_title(args.title) + fig.subplots_adjust(wspace=0.25, hspace=0.35, left=0.07, right=0.90, top=0.92, bottom=0.08) + + # Статусная строка + status_text = fig.text(0.01, 0.01, "", ha="left", va="bottom", fontsize=8, family="monospace") + + # График последнего свипа + line_obj, = ax_line.plot([], [], lw=1, color="tab:blue") + line_calib_obj, = ax_line.plot([], [], lw=1, color="tab:red") + line_norm_obj, = ax_line.plot([], [], lw=1, color="tab:green") + ax_line.set_title("Сырые данные", pad=1) + ax_line.set_xlabel("F") + channel_text = ax_line.text( + 0.98, 0.98, "", transform=ax_line.transAxes, + ha="right", va="top", fontsize=9, family="monospace", + ) + if fixed_ylim is not None: + ax_line.set_ylim(fixed_ylim) + + # График спектра + fft_line_obj, = ax_fft.plot([], [], lw=1) + ax_fft.set_title("FFT", pad=1) + ax_fft.set_xlabel("X") + ax_fft.set_ylabel("Амплитуда, дБ") + + # Водопад сырых данных + img_obj = ax_img.imshow( + np.zeros((1, 1), dtype=np.float32), + aspect="auto", interpolation="nearest", origin="lower", cmap=args.cmap, + ) + ax_img.set_title("Сырые данные", pad=12) + ax_img.set_ylabel("частота") + try: + ax_img.tick_params(axis="x", labelbottom=False) + except Exception: + pass + + # Водопад спектров + img_fft_obj = ax_spec.imshow( + np.zeros((1, 1), dtype=np.float32), + aspect="auto", interpolation="nearest", origin="lower", cmap=args.cmap, + ) + ax_spec.set_title("B-scan (дБ)", pad=12) + ax_spec.set_ylabel("расстояние") + try: + ax_spec.tick_params(axis="x", labelbottom=False) + except Exception: + pass + + # Слайдеры и чекбокс + contrast_slider = None + try: + fft_bins = ring.fft_bins + ax_smin = fig.add_axes([0.92, 0.55, 0.02, 0.35]) + ax_smax = fig.add_axes([0.95, 0.55, 0.02, 0.35]) + ax_sctr = fig.add_axes([0.98, 0.55, 0.02, 0.35]) + ax_cb = fig.add_axes([0.92, 0.45, 0.08, 0.08]) + ymin_slider = Slider(ax_smin, "Y min", 0, max(1, fft_bins - 1), valinit=0, valstep=1, orientation="vertical") + ymax_slider = Slider(ax_smax, "Y max", 0, max(1, fft_bins - 1), valinit=max(1, fft_bins - 1), valstep=1, orientation="vertical") + contrast_slider = Slider(ax_sctr, "Int max", 0, 100, valinit=100, valstep=1, orientation="vertical") + calib_cb = CheckButtons(ax_cb, ["калибровка"], [False]) + + def _on_ylim_change(_val): + try: + y0 = int(min(ymin_slider.val, ymax_slider.val)) + y1 = int(max(ymin_slider.val, ymax_slider.val)) + ax_spec.set_ylim(y0, y1) + fig.canvas.draw_idle() + except Exception: + pass + + ymin_slider.on_changed(_on_ylim_change) + ymax_slider.on_changed(_on_ylim_change) + contrast_slider.on_changed(lambda _v: fig.canvas.draw_idle()) + calib_cb.on_clicked(lambda _v: state.set_calib_enabled( + bool(calib_cb.get_status()[0]) + )) + except Exception: + calib_cb = None + + # --- Инициализация imshow при первом свипе --- + def _init_imshow_extents(): + w = ring.width + ms = ring.max_sweeps + fb = ring.fft_bins + img_obj.set_data(np.zeros((w, ms), dtype=np.float32)) + img_obj.set_extent((0, ms - 1, 0, w - 1 if w > 0 else 1)) + ax_img.set_xlim(0, ms - 1) + ax_img.set_ylim(0, max(1, w - 1)) + img_fft_obj.set_data(np.zeros((fb, ms), dtype=np.float32)) + img_fft_obj.set_extent((0, ms - 1, 0, fb - 1)) + ax_spec.set_xlim(0, ms - 1) + ax_spec.set_ylim(0, max(1, fb - 1)) + + _imshow_initialized = [False] + + def update(_frame): + changed = state.drain_queue(q, ring) > 0 + + if changed and not _imshow_initialized[0] and ring.is_ready: + _init_imshow_extents() + _imshow_initialized[0] = True + + # Линейный график свипа + if state.current_sweep_raw is not None: + raw = state.current_sweep_raw + if ring.x_shared is not None and raw.size <= ring.x_shared.size: + xs = ring.x_shared[: raw.size] + else: + xs = np.arange(raw.size, dtype=np.int32) + line_obj.set_data(xs, raw) + if state.last_calib_sweep is not None: + line_calib_obj.set_data(xs[: state.last_calib_sweep.size], state.last_calib_sweep) + else: + line_calib_obj.set_data([], []) + if state.current_sweep_norm is not None: + line_norm_obj.set_data(xs[: state.current_sweep_norm.size], state.current_sweep_norm) + else: + line_norm_obj.set_data([], []) + ax_line.set_xlim(0, max(1, raw.size - 1)) + if fixed_ylim is None: + y0 = float(np.nanmin(raw)) + y1 = float(np.nanmax(raw)) + if np.isfinite(y0) and np.isfinite(y1): + if y0 == y1: + pad = max(1.0, abs(y0) * 0.05) + y0 -= pad + y1 += pad + else: + pad = 0.05 * (y1 - y0) + y0 -= pad + y1 += pad + ax_line.set_ylim(y0, y1) + + # Спектр — используем уже вычисленный в ring FFT + if ring.last_fft_vals is not None and ring.freq_shared is not None: + fft_vals = ring.last_fft_vals + xs_fft = ring.freq_shared + if fft_vals.size > xs_fft.size: + fft_vals = fft_vals[: xs_fft.size] + fft_line_obj.set_data(xs_fft[: fft_vals.size], fft_vals) + if np.isfinite(np.nanmin(fft_vals)) and np.isfinite(np.nanmax(fft_vals)): + ax_fft.set_xlim(0, max(1, xs_fft.size - 1)) + ax_fft.set_ylim(float(np.nanmin(fft_vals)), float(np.nanmax(fft_vals))) + + # Водопад сырых данных + if changed and ring.is_ready: + disp = ring.get_display_ring() + img_obj.set_data(disp) + levels = _visible_levels(disp, ax_img) + if levels is not None: + img_obj.set_clim(vmin=levels[0], vmax=levels[1]) + + # Водопад спектров + if changed and ring.is_ready: + disp_fft = ring.get_display_ring_fft() + disp_fft = ring.subtract_recent_mean_fft(disp_fft, spec_mean_sec) + img_fft_obj.set_data(disp_fft) + levels = ring.compute_fft_levels(disp_fft, spec_clip) + if levels is not None: + try: + c = float(contrast_slider.val) / 100.0 if contrast_slider is not None else 1.0 + except Exception: + c = 1.0 + vmax_eff = levels[0] + c * (levels[1] - levels[0]) + img_fft_obj.set_clim(vmin=levels[0], vmax=vmax_eff) + + # Статус и подпись канала + if changed and state.current_info: + status_text.set_text(format_status(state.current_info)) + channel_text.set_text(state.format_channel_label()) + + return (line_obj, line_calib_obj, line_norm_obj, img_obj, fft_line_obj, img_fft_obj, status_text, channel_text) + + ani = FuncAnimation(fig, update, interval=interval_ms, blit=False) + plt.show() + stop_event.set() + reader.join(timeout=1.0) diff --git a/rfg_adc_plotter/gui/pyqtgraph_backend.py b/rfg_adc_plotter/gui/pyqtgraph_backend.py new file mode 100644 index 0000000..afe01eb --- /dev/null +++ b/rfg_adc_plotter/gui/pyqtgraph_backend.py @@ -0,0 +1,272 @@ +"""PyQtGraph-бэкенд реалтайм-плоттера свипов.""" + +import sys +import threading +from queue import Queue +from typing import Optional, Tuple + +import numpy as np + +from rfg_adc_plotter.io.sweep_reader import SweepReader +from rfg_adc_plotter.state.app_state import AppState, format_status +from rfg_adc_plotter.state.ring_buffer import RingBuffer +from rfg_adc_plotter.types import SweepPacket + + +def _parse_ylim(ylim_str: Optional[str]) -> Optional[Tuple[float, float]]: + if not ylim_str: + return None + try: + y0, y1 = ylim_str.split(",") + return (float(y0), float(y1)) + except Exception: + return None + + +def _parse_spec_clip(spec: Optional[str]) -> Optional[Tuple[float, float]]: + if not spec: + return None + s = str(spec).strip().lower() + if s in ("off", "none", "no"): + return None + try: + p0, p1 = s.replace(";", ",").split(",") + low, high = float(p0), float(p1) + if not (0.0 <= low < high <= 100.0): + return None + return (low, high) + except Exception: + return None + + +def _visible_levels(data: np.ndarray, plot_item) -> Optional[Tuple[float, float]]: + """(vmin, vmax) по текущей видимой области ImageItem.""" + if data.size == 0: + return None + ny, nx = data.shape[0], data.shape[1] + try: + (x0, x1), (y0, y1) = plot_item.viewRange() + except Exception: + x0, x1 = 0.0, float(nx - 1) + y0, y1 = 0.0, float(ny - 1) + xmin, xmax = sorted((float(x0), float(x1))) + ymin, ymax = sorted((float(y0), float(y1))) + ix0 = max(0, min(nx - 1, int(np.floor(xmin)))) + ix1 = max(0, min(nx - 1, int(np.ceil(xmax)))) + iy0 = max(0, min(ny - 1, int(np.floor(ymin)))) + iy1 = max(0, min(ny - 1, int(np.ceil(ymax)))) + if ix1 < ix0: + ix1 = ix0 + if iy1 < iy0: + iy1 = iy0 + sub = data[iy0 : iy1 + 1, ix0 : ix1 + 1] + finite = np.isfinite(sub) + if not finite.any(): + return None + vals = sub[finite] + vmin = float(np.min(vals)) + vmax = float(np.max(vals)) + if not (np.isfinite(vmin) and np.isfinite(vmax)) or vmin == vmax: + return None + return (vmin, vmax) + + +def run_pyqtgraph(args): + """Быстрый GUI на PyQtGraph. Требует pyqtgraph и PyQt5/PySide6.""" + try: + import pyqtgraph as pg + from PyQt5 import QtCore, QtWidgets # noqa: F401 + except Exception: + try: + import pyqtgraph as pg + from PySide6 import QtCore, QtWidgets # noqa: F401 + except Exception as e: + raise RuntimeError( + "pyqtgraph/PyQt5(PySide6) не найдены. Установите: pip install pyqtgraph PyQt5" + ) from e + + q: Queue[SweepPacket] = Queue(maxsize=1000) + stop_event = threading.Event() + reader = SweepReader(args.port, args.baud, q, stop_event, fancy=bool(args.fancy)) + reader.start() + + max_sweeps = int(max(10, args.max_sweeps)) + max_fps = max(1.0, float(args.max_fps)) + interval_ms = int(1000.0 / max_fps) + spec_clip = _parse_spec_clip(getattr(args, "spec_clip", None)) + spec_mean_sec = float(getattr(args, "spec_mean_sec", 0.0)) + fixed_ylim = _parse_ylim(getattr(args, "ylim", None)) + norm_type = str(getattr(args, "norm_type", "projector")).strip().lower() + + state = AppState(norm_type=norm_type) + ring = RingBuffer(max_sweeps) + + # --- Создание окна --- + pg.setConfigOptions(useOpenGL=True, antialias=False) + app = pg.mkQApp(args.title) + win = pg.GraphicsLayoutWidget(show=True, title=args.title) + win.resize(1200, 600) + + # График последнего свипа (слева-сверху) + p_line = win.addPlot(row=0, col=0, title="Сырые данные") + p_line.showGrid(x=True, y=True, alpha=0.3) + curve = p_line.plot(pen=pg.mkPen((80, 120, 255), width=1)) + curve_calib = p_line.plot(pen=pg.mkPen((220, 60, 60), width=1)) + curve_norm = p_line.plot(pen=pg.mkPen((60, 180, 90), width=1)) + p_line.setLabel("bottom", "X") + p_line.setLabel("left", "Y") + ch_text = pg.TextItem("", anchor=(1, 1)) + ch_text.setZValue(10) + p_line.addItem(ch_text) + if fixed_ylim is not None: + p_line.setYRange(fixed_ylim[0], fixed_ylim[1], padding=0) + + # Водопад (справа-сверху) + p_img = win.addPlot(row=0, col=1, title="Сырые данные водопад") + p_img.invertY(False) + p_img.showGrid(x=False, y=False) + p_img.setLabel("bottom", "Время (новое справа)") + try: + p_img.getAxis("bottom").setStyle(showValues=False) + except Exception: + pass + p_img.setLabel("left", "X (0 снизу)") + img = pg.ImageItem() + p_img.addItem(img) + + # Применяем LUT из цветовой карты + try: + cm = pg.colormap.get(args.cmap) + img.setLookupTable(cm.getLookupTable(0.0, 1.0, 256)) + except Exception: + pass + + # FFT (слева-снизу) + p_fft = win.addPlot(row=1, col=0, title="FFT") + p_fft.showGrid(x=True, y=True, alpha=0.3) + curve_fft = p_fft.plot(pen=pg.mkPen((255, 120, 80), width=1)) + p_fft.setLabel("bottom", "Бин") + p_fft.setLabel("left", "Амплитуда, дБ") + + # Водопад спектров (справа-снизу) + p_spec = win.addPlot(row=1, col=1, title="B-scan (дБ)") + p_spec.invertY(True) + p_spec.showGrid(x=False, y=False) + p_spec.setLabel("bottom", "Время (новое справа)") + try: + p_spec.getAxis("bottom").setStyle(showValues=False) + except Exception: + pass + p_spec.setLabel("left", "Бин (0 снизу)") + img_fft = pg.ImageItem() + p_spec.addItem(img_fft) + + # Чекбокс калибровки + calib_cb = QtWidgets.QCheckBox("калибровка") + cb_proxy = QtWidgets.QGraphicsProxyWidget() + cb_proxy.setWidget(calib_cb) + win.addItem(cb_proxy, row=2, col=1) + calib_cb.stateChanged.connect(lambda _v: state.set_calib_enabled(calib_cb.isChecked())) + + # Статусная строка + status = pg.LabelItem(justify="left") + win.addItem(status, row=3, col=0, colspan=2) + + _imshow_initialized = [False] + + def _init_imshow_extents(): + w = ring.width + ms = ring.max_sweeps + fb = ring.fft_bins + img.setImage(ring.ring.T, autoLevels=False) + p_img.setRange(xRange=(0, ms - 1), yRange=(0, max(1, w - 1)), padding=0) + p_line.setXRange(0, max(1, w - 1), padding=0) + img_fft.setImage(ring.ring_fft.T, autoLevels=False) + p_spec.setRange(xRange=(0, ms - 1), yRange=(0, max(1, fb - 1)), padding=0) + p_fft.setXRange(0, max(1, fb - 1), padding=0) + + def update(): + changed = state.drain_queue(q, ring) > 0 + + if changed and not _imshow_initialized[0] and ring.is_ready: + _init_imshow_extents() + _imshow_initialized[0] = True + + # Линейный график свипа + if state.current_sweep_raw is not None and ring.x_shared is not None: + raw = state.current_sweep_raw + xs = ring.x_shared[: raw.size] if raw.size <= ring.x_shared.size else np.arange(raw.size) + curve.setData(xs, raw, autoDownsample=True) + if state.last_calib_sweep is not None: + curve_calib.setData(xs[: state.last_calib_sweep.size], state.last_calib_sweep, autoDownsample=True) + else: + curve_calib.setData([], []) + if state.current_sweep_norm is not None: + curve_norm.setData(xs[: state.current_sweep_norm.size], state.current_sweep_norm, autoDownsample=True) + else: + curve_norm.setData([], []) + if fixed_ylim is None: + y0 = float(np.nanmin(raw)) + y1 = float(np.nanmax(raw)) + if np.isfinite(y0) and np.isfinite(y1): + margin = 0.05 * max(1.0, (y1 - y0)) + p_line.setYRange(y0 - margin, y1 + margin, padding=0) + + # Спектр — используем уже вычисленный в ring FFT + if ring.last_fft_vals is not None and ring.freq_shared is not None: + fft_vals = ring.last_fft_vals + xs_fft = ring.freq_shared + if fft_vals.size > xs_fft.size: + fft_vals = fft_vals[: xs_fft.size] + curve_fft.setData(xs_fft[: fft_vals.size], fft_vals) + p_fft.setYRange(float(np.nanmin(fft_vals)), float(np.nanmax(fft_vals)), padding=0) + + # Позиция подписи канала + try: + (x0, x1), (y0, y1) = p_line.viewRange() + dx = 0.01 * max(1.0, float(x1 - x0)) + dy = 0.01 * max(1.0, float(y1 - y0)) + ch_text.setPos(float(x1 - dx), float(y1 - dy)) + except Exception: + pass + + # Водопад сырых данных — новые данные справа (без реверса) + if changed and ring.is_ready: + disp = ring.get_display_ring() # (width, time), новые справа + levels = _visible_levels(disp, p_img) + if levels is not None: + img.setImage(disp, autoLevels=False, levels=levels) + else: + img.setImage(disp, autoLevels=False) + + # Статус и подпись канала + if changed and state.current_info: + try: + status.setText(format_status(state.current_info)) + except Exception: + pass + ch_text.setText(state.format_channel_label()) + + # Водопад спектров — новые данные справа (без реверса) + if changed and ring.is_ready: + disp_fft = ring.get_display_ring_fft() # (bins, time), новые справа + disp_fft = ring.subtract_recent_mean_fft(disp_fft, spec_mean_sec) + levels = ring.compute_fft_levels(disp_fft, spec_clip) + if levels is not None: + img_fft.setImage(disp_fft, autoLevels=False, levels=levels) + else: + img_fft.setImage(disp_fft, autoLevels=False) + + timer = pg.QtCore.QTimer() + timer.timeout.connect(update) + timer.start(interval_ms) + + def on_quit(): + stop_event.set() + reader.join(timeout=1.0) + + app.aboutToQuit.connect(on_quit) + win.show() + exec_fn = getattr(app, "exec_", None) or getattr(app, "exec", None) + exec_fn() + on_quit() diff --git a/rfg_adc_plotter/io/__init__.py b/rfg_adc_plotter/io/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/rfg_adc_plotter/io/serial_source.py b/rfg_adc_plotter/io/serial_source.py new file mode 100644 index 0000000..31a2ba4 --- /dev/null +++ b/rfg_adc_plotter/io/serial_source.py @@ -0,0 +1,181 @@ +"""Источники последовательного ввода: обёртки над pyserial и raw TTY.""" + +import io +import os +import sys +from typing import Optional + + +def try_open_pyserial(path: str, baud: int, timeout: float): + try: + import serial # type: ignore + except Exception: + return None + try: + ser = serial.Serial(path, baudrate=baud, timeout=timeout) + return ser + except Exception: + return None + + +class FDReader: + """Простой враппер чтения строк из файлового дескриптора TTY.""" + + def __init__(self, fd: int): + self._fd = fd + raw = os.fdopen(fd, "rb", closefd=False) + self._file = raw + self._buf = io.BufferedReader(raw, buffer_size=65536) + + def fileno(self) -> int: + return self._fd + + def readline(self) -> bytes: + return self._buf.readline() + + def close(self): + try: + self._buf.close() + except Exception: + pass + + +def open_raw_tty(path: str, baud: int) -> Optional[FDReader]: + """Открыть TTY без pyserial и настроить порт через termios. + + Возвращает FDReader или None при ошибке. + """ + try: + import termios + import tty + except Exception: + return None + + try: + fd = os.open(path, os.O_RDONLY | os.O_NOCTTY) + except Exception: + return None + + try: + attrs = termios.tcgetattr(fd) + tty.setraw(fd) + + baud_map = { + 9600: termios.B9600, + 19200: termios.B19200, + 38400: termios.B38400, + 57600: termios.B57600, + 115200: termios.B115200, + 230400: getattr(termios, "B230400", None), + 460800: getattr(termios, "B460800", None), + } + b = baud_map.get(baud) or termios.B115200 + + attrs[4] = b # ispeed + attrs[5] = b # ospeed + + # VMIN=1, VTIME=0 — блокирующее чтение по байту + cc = attrs[6] + cc[termios.VMIN] = 1 + cc[termios.VTIME] = 0 + attrs[6] = cc + + termios.tcsetattr(fd, termios.TCSANOW, attrs) + except Exception: + try: + os.close(fd) + except Exception: + pass + return None + + return FDReader(fd) + + +class SerialLineSource: + """Единый интерфейс для чтения строк из порта (pyserial или raw TTY).""" + + def __init__(self, path: str, baud: int, timeout: float = 1.0): + self._pyserial = try_open_pyserial(path, baud, timeout) + self._fdreader = None + self._using = "pyserial" if self._pyserial is not None else "raw" + if self._pyserial is None: + self._fdreader = open_raw_tty(path, baud) + if self._fdreader is None: + msg = f"Не удалось открыть порт '{path}' (pyserial и raw TTY не сработали)" + if sys.platform.startswith("win"): + msg += ". На Windows нужен pyserial: pip install pyserial" + raise RuntimeError(msg) + + def readline(self) -> bytes: + if self._pyserial is not None: + try: + return self._pyserial.readline() + except Exception: + return b"" + else: + try: + return self._fdreader.readline() # type: ignore[union-attr] + except Exception: + return b"" + + def close(self): + try: + if self._pyserial is not None: + self._pyserial.close() + elif self._fdreader is not None: + self._fdreader.close() + except Exception: + pass + + +class SerialChunkReader: + """Быстрое неблокирующее чтение чанков из serial/raw TTY для максимального дренажа буфера.""" + + def __init__(self, src: SerialLineSource): + self._src = src + self._ser = src._pyserial + self._fd: Optional[int] = None + if self._ser is not None: + try: + self._ser.timeout = 0 + except Exception: + pass + else: + try: + self._fd = src._fdreader.fileno() # type: ignore[union-attr] + try: + os.set_blocking(self._fd, False) + except Exception: + pass + except Exception: + self._fd = None + + def read_available(self) -> bytes: + """Вернёт доступные байты (b"" если данных нет).""" + if self._ser is not None: + try: + n = int(getattr(self._ser, "in_waiting", 0)) + except Exception: + n = 0 + if n > 0: + try: + return self._ser.read(n) + except Exception: + return b"" + return b"" + if self._fd is None: + return b"" + out = bytearray() + while True: + try: + chunk = os.read(self._fd, 65536) + if not chunk: + break + out += chunk + if len(chunk) < 65536: + break + except BlockingIOError: + break + except Exception: + break + return bytes(out) diff --git a/rfg_adc_plotter/io/sweep_reader.py b/rfg_adc_plotter/io/sweep_reader.py new file mode 100644 index 0000000..31768a6 --- /dev/null +++ b/rfg_adc_plotter/io/sweep_reader.py @@ -0,0 +1,217 @@ +"""Фоновый поток чтения и парсинга свипов из последовательного порта.""" + +import sys +import threading +import time +from collections import deque +from queue import Full, Queue +from typing import Optional + +import numpy as np + +from rfg_adc_plotter.constants import DATA_INVERSION_THRESHOLD +from rfg_adc_plotter.io.serial_source import SerialChunkReader, SerialLineSource +from rfg_adc_plotter.types import SweepInfo, SweepPacket + + +class SweepReader(threading.Thread): + """Фоновый поток: читает строки, формирует завершённые свипы и кладёт в очередь.""" + + def __init__( + self, + port_path: str, + baud: int, + out_queue: "Queue[SweepPacket]", + stop_event: threading.Event, + fancy: bool = False, + ): + super().__init__(daemon=True) + self._port_path = port_path + self._baud = baud + self._q = out_queue + self._stop = stop_event + self._src: Optional[SerialLineSource] = None + self._fancy = bool(fancy) + self._max_width: int = 0 + self._sweep_idx: int = 0 + self._last_sweep_ts: Optional[float] = None + self._n_valid_hist = deque() + + def _finalize_current(self, xs, ys, channels: Optional[set]): + if not xs: + return + ch_list = sorted(channels) if channels else [0] + ch_primary = ch_list[0] if ch_list else 0 + max_x = max(xs) + width = max_x + 1 + self._max_width = max(self._max_width, width) + target_width = self._max_width if self._fancy else width + + sweep = np.full((target_width,), np.nan, dtype=np.float32) + try: + idx = np.asarray(xs, dtype=np.int64) + vals = np.asarray(ys, dtype=np.float32) + sweep[idx] = vals + except Exception: + for x, y in zip(xs, ys): + if 0 <= x < target_width: + sweep[x] = float(y) + + finite_pre = np.isfinite(sweep) + n_valid_cur = int(np.count_nonzero(finite_pre)) + + if self._fancy: + try: + known = ~np.isnan(sweep) + if np.any(known): + known_idx = np.nonzero(known)[0] + for i0, i1 in zip(known_idx[:-1], known_idx[1:]): + if i1 - i0 > 1: + avg = (sweep[i0] + sweep[i1]) * 0.5 + sweep[i0 + 1 : i1] = avg + first_idx = int(known_idx[0]) + last_idx = int(known_idx[-1]) + if first_idx > 0: + sweep[:first_idx] = sweep[first_idx] + if last_idx < sweep.size - 1: + sweep[last_idx + 1 :] = sweep[last_idx] + except Exception: + pass + + try: + m = float(np.nanmean(sweep)) + if np.isfinite(m) and m < DATA_INVERSION_THRESHOLD: + sweep *= -1.0 + except Exception: + pass + + self._sweep_idx += 1 + if len(ch_list) > 1: + sys.stderr.write( + f"[warn] Sweep {self._sweep_idx}: изменялся номер канала: {ch_list}\n" + ) + now = time.time() + if self._last_sweep_ts is None: + dt_ms = float("nan") + else: + dt_ms = (now - self._last_sweep_ts) * 1000.0 + self._last_sweep_ts = now + self._n_valid_hist.append((now, n_valid_cur)) + while self._n_valid_hist and (now - self._n_valid_hist[0][0]) > 1.0: + self._n_valid_hist.popleft() + if self._n_valid_hist: + n_valid = float(sum(v for _t, v in self._n_valid_hist) / len(self._n_valid_hist)) + else: + n_valid = float(n_valid_cur) + + if n_valid_cur > 0: + vmin = float(np.nanmin(sweep)) + vmax = float(np.nanmax(sweep)) + mean = float(np.nanmean(sweep)) + std = float(np.nanstd(sweep)) + else: + vmin = vmax = mean = std = float("nan") + info: SweepInfo = { + "sweep": self._sweep_idx, + "ch": ch_primary, + "chs": ch_list, + "n_valid": n_valid, + "min": vmin, + "max": vmax, + "mean": mean, + "std": std, + "dt_ms": dt_ms, + } + + try: + self._q.put_nowait((sweep, info)) + except Full: + try: + _ = self._q.get_nowait() + except Exception: + pass + try: + self._q.put_nowait((sweep, info)) + except Exception: + pass + + def run(self): + xs: list = [] + ys: list = [] + cur_channel: Optional[int] = None + cur_channels: set = set() + + try: + self._src = SerialLineSource(self._port_path, self._baud, timeout=1.0) + sys.stderr.write(f"[info] Открыл порт {self._port_path} ({self._src._using})\n") + except Exception as e: + sys.stderr.write(f"[error] {e}\n") + return + + try: + chunk_reader = SerialChunkReader(self._src) + buf = bytearray() + while not self._stop.is_set(): + data = chunk_reader.read_available() + if data: + buf += data + else: + time.sleep(0.0005) + continue + + while True: + nl = buf.find(b"\n") + if nl == -1: + break + line = bytes(buf[:nl]) + del buf[: nl + 1] + if line.endswith(b"\r"): + line = line[:-1] + if not line: + continue + + if line.startswith(b"Sweep_start"): + self._finalize_current(xs, ys, cur_channels) + xs.clear() + ys.clear() + cur_channel = None + cur_channels.clear() + continue + + if len(line) >= 3: + parts = line.split() + if len(parts) >= 3 and (parts[0].lower() == b"s" or parts[0].lower().startswith(b"s")): + try: + if parts[0].lower() == b"s": + if len(parts) >= 4: + ch = int(parts[1], 10) + x = int(parts[2], 10) + y = int(parts[3], 10) + else: + ch = 0 + x = int(parts[1], 10) + y = int(parts[2], 10) + else: + ch = int(parts[0][1:], 10) + x = int(parts[1], 10) + y = int(parts[2], 10) + except Exception: + continue + if cur_channel is None: + cur_channel = ch + cur_channels.add(ch) + xs.append(x) + ys.append(y) + + if len(buf) > 1_000_000: + del buf[:-262144] + finally: + try: + self._finalize_current(xs, ys, cur_channels) + except Exception: + pass + try: + if self._src is not None: + self._src.close() + except Exception: + pass diff --git a/rfg_adc_plotter/main.py b/rfg_adc_plotter/main.py new file mode 100644 index 0000000..78e4e02 --- /dev/null +++ b/rfg_adc_plotter/main.py @@ -0,0 +1,108 @@ +#!/usr/bin/env python3 +""" +Реалтайм-плоттер для свипов из виртуального COM-порта. + +Формат строк: + - "Sweep_start" — начало нового свипа (предыдущий считается завершённым) + - "s CH X Y" — точка (номер канала, индекс X, значение Y), все целые со знаком + +Отрисовываются четыре графика: + - Сырые данные: последний полученный свип (Y vs X) + - Водопад сырых данных: последние N свипов + - FFT текущего свипа + - B-scan: водопад FFT-строк + +Зависимости: numpy. PySerial опционален — при его отсутствии +используется сырой доступ к TTY через termios. +GUI: matplotlib (совместимый) или pyqtgraph (быстрый). +""" + +import argparse +import sys + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + description=( + "Читает свипы из виртуального COM-порта и рисует: " + "последний свип и водопад (реалтайм)." + ) + ) + parser.add_argument( + "port", + help="Путь к порту, например /dev/ttyACM1 или COM3 (COM10+: \\\\.\\COM10)", + ) + parser.add_argument("--baud", type=int, default=115200, help="Скорость (по умолчанию 115200)") + parser.add_argument("--max-sweeps", type=int, default=200, help="Количество видимых свипов в водопаде") + parser.add_argument("--max-fps", type=float, default=30.0, help="Лимит частоты отрисовки, кадров/с") + parser.add_argument("--cmap", default="viridis", help="Цветовая карта водопада") + parser.add_argument( + "--spec-clip", + default="2,98", + help=( + "Процентильная обрезка уровней водопада спектров, %% (min,max). " + "Напр. 2,98. 'off' — отключить" + ), + ) + parser.add_argument( + "--spec-mean-sec", + type=float, + default=0.0, + help=( + "Вычитание среднего по каждой частоте за последние N секунд " + "в водопаде спектров (0 — отключить)" + ), + ) + parser.add_argument("--title", default="ADC Sweeps", help="Заголовок окна") + parser.add_argument( + "--fancy", + action="store_true", + help="Заполнять выпавшие точки средними значениями между соседними", + ) + parser.add_argument( + "--ylim", + type=str, + default=None, + help="Фиксированные Y-пределы для кривой формата min,max (например -1000,1000). По умолчанию авто", + ) + parser.add_argument( + "--backend", + choices=["auto", "pg", "mpl"], + default="auto", + help="Графический бэкенд: pyqtgraph (pg) — быстрее; matplotlib (mpl) — совместимый. По умолчанию auto", + ) + parser.add_argument( + "--norm-type", + choices=["projector", "simple"], + default="projector", + help="Тип нормировки: projector (по огибающим в [-1000,+1000]) или simple (raw/calib)", + ) + return parser + + +def main(): + args = build_parser().parse_args() + + if args.backend == "pg": + from rfg_adc_plotter.gui.pyqtgraph_backend import run_pyqtgraph + try: + run_pyqtgraph(args) + except Exception as e: + sys.stderr.write(f"[error] PyQtGraph бэкенд недоступен: {e}\n") + sys.exit(1) + return + + if args.backend == "auto": + try: + from rfg_adc_plotter.gui.pyqtgraph_backend import run_pyqtgraph + run_pyqtgraph(args) + return + except Exception: + pass # Откатываемся на matplotlib + + from rfg_adc_plotter.gui.matplotlib_backend import run_matplotlib + run_matplotlib(args) + + +if __name__ == "__main__": + main() diff --git a/rfg_adc_plotter/processing/__init__.py b/rfg_adc_plotter/processing/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/rfg_adc_plotter/processing/normalizer.py b/rfg_adc_plotter/processing/normalizer.py new file mode 100644 index 0000000..10780d5 --- /dev/null +++ b/rfg_adc_plotter/processing/normalizer.py @@ -0,0 +1,115 @@ +"""Алгоритмы нормировки свипов по калибровочной кривой.""" + +from typing import Tuple + +import numpy as np + + +def normalize_simple(raw: np.ndarray, calib: np.ndarray) -> np.ndarray: + """Простая нормировка: поэлементное деление raw/calib.""" + w = min(raw.size, calib.size) + if w <= 0: + return raw + out = np.full_like(raw, np.nan, dtype=np.float32) + with np.errstate(divide="ignore", invalid="ignore"): + out[:w] = raw[:w] / calib[:w] + out = np.nan_to_num(out, nan=np.nan, posinf=np.nan, neginf=np.nan) + return out + + +def build_calib_envelopes(calib: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: + """Оценить нижнюю/верхнюю огибающие калибровочной кривой.""" + n = int(calib.size) + if n <= 0: + empty = np.zeros((0,), dtype=np.float32) + return empty, empty + + y = np.asarray(calib, dtype=np.float32) + finite = np.isfinite(y) + if not np.any(finite): + zeros = np.zeros_like(y, dtype=np.float32) + return zeros, zeros + + if not np.all(finite): + x = np.arange(n, dtype=np.float32) + y = y.copy() + y[~finite] = np.interp(x[~finite], x[finite], y[finite]).astype(np.float32) + + if n < 3: + return y.copy(), y.copy() + + dy = np.diff(y) + s = np.sign(dy).astype(np.int8, copy=False) + + if np.any(s == 0): + for i in range(1, s.size): + if s[i] == 0: + s[i] = s[i - 1] + for i in range(s.size - 2, -1, -1): + if s[i] == 0: + s[i] = s[i + 1] + s[s == 0] = 1 + + max_idx = np.where((s[:-1] > 0) & (s[1:] < 0))[0] + 1 + min_idx = np.where((s[:-1] < 0) & (s[1:] > 0))[0] + 1 + + x = np.arange(n, dtype=np.float32) + + def _interp_nodes(nodes: np.ndarray) -> np.ndarray: + if nodes.size == 0: + idx = np.array([0, n - 1], dtype=np.int64) + else: + idx = np.unique(np.concatenate(([0], nodes, [n - 1]))).astype(np.int64) + return np.interp(x, idx.astype(np.float32), y[idx]).astype(np.float32) + + upper = _interp_nodes(max_idx) + lower = _interp_nodes(min_idx) + + swap = lower > upper + if np.any(swap): + tmp = upper[swap].copy() + upper[swap] = lower[swap] + lower[swap] = tmp + + return lower, upper + + +def normalize_projector(raw: np.ndarray, calib: np.ndarray) -> np.ndarray: + """Нормировка через проекцию между огибающими калибровки в диапазон [-1000, +1000].""" + w = min(raw.size, calib.size) + if w <= 0: + return raw + + out = np.full_like(raw, np.nan, dtype=np.float32) + raw_seg = np.asarray(raw[:w], dtype=np.float32) + lower, upper = build_calib_envelopes(np.asarray(calib[:w], dtype=np.float32)) + span = upper - lower + + finite_span = span[np.isfinite(span) & (span > 0)] + if finite_span.size > 0: + eps = max(float(np.median(finite_span)) * 1e-6, 1e-9) + else: + eps = 1e-9 + + valid = ( + np.isfinite(raw_seg) + & np.isfinite(lower) + & np.isfinite(upper) + & (span > eps) + ) + if np.any(valid): + proj = np.empty_like(raw_seg, dtype=np.float32) + proj[valid] = ((2.0 * (raw_seg[valid] - lower[valid]) / span[valid]) - 1.0) * 1000.0 + proj[valid] = np.clip(proj[valid], -1000.0, 1000.0) + proj[~valid] = np.nan + out[:w] = proj + + return out + + +def normalize_by_calib(raw: np.ndarray, calib: np.ndarray, norm_type: str) -> np.ndarray: + """Нормировка свипа по выбранному алгоритму.""" + nt = str(norm_type).strip().lower() + if nt == "simple": + return normalize_simple(raw, calib) + return normalize_projector(raw, calib) diff --git a/rfg_adc_plotter/state/__init__.py b/rfg_adc_plotter/state/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/rfg_adc_plotter/state/app_state.py b/rfg_adc_plotter/state/app_state.py new file mode 100644 index 0000000..1ad5682 --- /dev/null +++ b/rfg_adc_plotter/state/app_state.py @@ -0,0 +1,119 @@ +"""Состояние приложения: текущие свипы и настройки калибровки/нормировки.""" + +from queue import Empty, Queue +from typing import Any, Dict, Mapping, Optional + +import numpy as np + +from rfg_adc_plotter.processing.normalizer import normalize_by_calib +from rfg_adc_plotter.state.ring_buffer import RingBuffer +from rfg_adc_plotter.types import SweepInfo, SweepPacket + + +def format_status(data: Mapping[str, Any]) -> str: + """Преобразовать словарь метрик в одну строку 'k:v'.""" + + def _fmt(v: Any) -> str: + if v is None: + return "NA" + try: + fv = float(v) + except Exception: + return str(v) + if not np.isfinite(fv): + return "nan" + if abs(fv) >= 1000 or (0 < abs(fv) < 0.01): + return f"{fv:.3g}" + return f"{fv:.3f}".rstrip("0").rstrip(".") + + parts = [f"{k}:{_fmt(v)}" for k, v in data.items()] + return " ".join(parts) + + +class AppState: + """Весь изменяемый GUI-state: текущие данные, калибровка, настройки. + + Методы drain_queue и set_calib_enabled заменяют одноимённые closures + с nonlocal из оригинального кода. + """ + + def __init__(self, norm_type: str = "projector"): + self.current_sweep_raw: Optional[np.ndarray] = None + self.current_sweep_norm: Optional[np.ndarray] = None + self.last_calib_sweep: Optional[np.ndarray] = None + self.current_info: Optional[SweepInfo] = None + self.calib_enabled: bool = False + self.norm_type: str = norm_type + + def _normalize(self, raw: np.ndarray, calib: np.ndarray) -> np.ndarray: + return normalize_by_calib(raw, calib, self.norm_type) + + def set_calib_enabled(self, enabled: bool): + """Включить/выключить режим калибровки, пересчитать norm-свип.""" + self.calib_enabled = enabled + if ( + self.calib_enabled + and self.current_sweep_raw is not None + and self.last_calib_sweep is not None + ): + self.current_sweep_norm = self._normalize( + self.current_sweep_raw, self.last_calib_sweep + ) + else: + self.current_sweep_norm = None + + def drain_queue(self, q: "Queue[SweepPacket]", ring: RingBuffer) -> int: + """Вытащить все ожидающие свипы из очереди, обновить state и ring. + + Возвращает количество обработанных свипов. + """ + drained = 0 + while True: + try: + s, info = q.get_nowait() + except Empty: + break + drained += 1 + self.current_sweep_raw = s + self.current_info = info + + ch = 0 + try: + ch = int(info.get("ch", 0)) if isinstance(info, dict) else 0 + except Exception: + ch = 0 + + # Канал 0 — опорный (калибровочный) свип + if ch == 0: + self.last_calib_sweep = s + self.current_sweep_norm = None + sweep_for_ring = s + else: + if self.calib_enabled and self.last_calib_sweep is not None: + self.current_sweep_norm = self._normalize(s, self.last_calib_sweep) + sweep_for_ring = self.current_sweep_norm + else: + self.current_sweep_norm = None + sweep_for_ring = s + + ring.ensure_init(s.size) + ring.push(sweep_for_ring) + return drained + + def format_channel_label(self) -> str: + """Строка с номерами каналов для подписи на графике.""" + if self.current_info is None: + return "" + info = self.current_info + chs = info.get("chs") if isinstance(info, dict) else None + if chs is None: + chs = info.get("ch") if isinstance(info, dict) else None + if chs is None: + return "" + try: + if isinstance(chs, (list, tuple, set)): + ch_list = sorted(int(v) for v in chs) + return "chs " + ", ".join(str(v) for v in ch_list) + return f"chs {int(chs)}" + except Exception: + return f"chs {chs}" diff --git a/rfg_adc_plotter/state/ring_buffer.py b/rfg_adc_plotter/state/ring_buffer.py new file mode 100644 index 0000000..56d3334 --- /dev/null +++ b/rfg_adc_plotter/state/ring_buffer.py @@ -0,0 +1,166 @@ +"""Кольцевой буфер свипов и FFT-строк для водопадного отображения.""" + +import time +from typing import Optional, Tuple + +import numpy as np + +from rfg_adc_plotter.constants import FFT_LEN, WF_WIDTH + + +class RingBuffer: + """Хранит последние N свипов и соответствующие FFT-строки. + + Все мутабельные поля водопада инкапсулированы здесь, + что устраняет необходимость nonlocal в GUI-коде. + """ + + def __init__(self, max_sweeps: int): + self.max_sweeps = max_sweeps + self.fft_bins = FFT_LEN // 2 + 1 + + # Инициализируются при первом свипе (ensure_init) + self.ring: Optional[np.ndarray] = None # (max_sweeps, WF_WIDTH) + self.ring_fft: Optional[np.ndarray] = None # (max_sweeps, fft_bins) + self.ring_time: Optional[np.ndarray] = None # (max_sweeps,) + self.head: int = 0 + self.width: Optional[int] = None + self.x_shared: Optional[np.ndarray] = None + self.freq_shared: Optional[np.ndarray] = None + self.y_min_fft: Optional[float] = None + self.y_max_fft: Optional[float] = None + # FFT последнего свипа (для отображения без повторного вычисления) + self.last_fft_vals: Optional[np.ndarray] = None + + @property + def is_ready(self) -> bool: + return self.ring is not None + + def ensure_init(self, sweep_width: int): + """Инициализировать буферы при первом свипе. Повторные вызовы — no-op.""" + if self.ring is not None: + return + self.width = WF_WIDTH + self.x_shared = np.arange(self.width, dtype=np.int32) + self.ring = np.full((self.max_sweeps, self.width), np.nan, dtype=np.float32) + self.ring_time = np.full((self.max_sweeps,), np.nan, dtype=np.float64) + self.ring_fft = np.full((self.max_sweeps, self.fft_bins), np.nan, dtype=np.float32) + self.freq_shared = np.arange(self.fft_bins, dtype=np.int32) + self.head = 0 + + def push(self, s: np.ndarray): + """Добавить строку свипа в кольцевой буфер, вычислить FFT-строку.""" + if s is None or s.size == 0 or self.ring is None: + return + w = self.ring.shape[1] + row = np.full((w,), np.nan, dtype=np.float32) + take = min(w, s.size) + row[:take] = s[:take] + self.ring[self.head, :] = row + self.ring_time[self.head] = time.time() + self.head = (self.head + 1) % self.ring.shape[0] + + self._push_fft(s) + + def _push_fft(self, s: np.ndarray): + bins = self.ring_fft.shape[1] + take_fft = min(int(s.size), FFT_LEN) + if take_fft <= 0: + fft_row = np.full((bins,), np.nan, dtype=np.float32) + else: + fft_in = np.zeros((FFT_LEN,), dtype=np.float32) + seg = np.nan_to_num(s[:take_fft], nan=0.0).astype(np.float32, copy=False) + win = np.hanning(take_fft).astype(np.float32) + fft_in[:take_fft] = seg * win + spec = np.fft.rfft(fft_in) + mag = np.abs(spec).astype(np.float32) + fft_row = (20.0 * np.log10(mag + 1e-9)).astype(np.float32) + if fft_row.shape[0] != bins: + fft_row = fft_row[:bins] + + prev_head = (self.head - 1) % self.ring_fft.shape[0] + self.ring_fft[prev_head, :] = fft_row + self.last_fft_vals = fft_row + + fr_min = np.nanmin(fft_row) + fr_max = float(np.nanpercentile(fft_row, 90)) + if self.y_min_fft is None or (not np.isnan(fr_min) and fr_min < self.y_min_fft): + self.y_min_fft = float(fr_min) + if self.y_max_fft is None or (not np.isnan(fr_max) and fr_max > self.y_max_fft): + self.y_max_fft = float(fr_max) + + def get_display_ring(self) -> np.ndarray: + """Кольцо в порядке от старого к новому, ось времени по X. Форма: (width, time).""" + if self.ring is None: + return np.zeros((1, 1), dtype=np.float32) + base = self.ring if self.head == 0 else np.roll(self.ring, -self.head, axis=0) + return base.T # (width, time) + + def get_display_ring_fft(self) -> np.ndarray: + """FFT-кольцо в порядке от старого к новому. Форма: (bins, time).""" + if self.ring_fft is None: + return np.zeros((1, 1), dtype=np.float32) + base = self.ring_fft if self.head == 0 else np.roll(self.ring_fft, -self.head, axis=0) + return base.T # (bins, time) + + def get_display_times(self) -> Optional[np.ndarray]: + """Временные метки строк в порядке от старого к новому.""" + if self.ring_time is None: + return None + return self.ring_time if self.head == 0 else np.roll(self.ring_time, -self.head) + + def subtract_recent_mean_fft( + self, disp_fft: np.ndarray, spec_mean_sec: float + ) -> np.ndarray: + """Вычесть среднее по каждой частоте за последние spec_mean_sec секунд.""" + if spec_mean_sec <= 0.0: + return disp_fft + disp_times = self.get_display_times() + if disp_times is None: + return disp_fft + now_t = time.time() + mask = np.isfinite(disp_times) & (disp_times >= (now_t - spec_mean_sec)) + if not np.any(mask): + return disp_fft + try: + mean_spec = np.nanmean(disp_fft[:, mask], axis=1) + except Exception: + return disp_fft + mean_spec = np.nan_to_num(mean_spec, nan=0.0) + return disp_fft - mean_spec[:, None] + + def compute_fft_levels( + self, disp_fft: np.ndarray, spec_clip: Optional[Tuple[float, float]] + ) -> Optional[Tuple[float, float]]: + """Вычислить (vmin, vmax) для отображения водопада спектров.""" + # 1. По среднему спектру за видимое время + try: + mean_spec = np.nanmean(disp_fft, axis=1) + vmin_v = float(np.nanmin(mean_spec)) + vmax_v = float(np.nanmax(mean_spec)) + if np.isfinite(vmin_v) and np.isfinite(vmax_v) and vmin_v != vmax_v: + return (vmin_v, vmax_v) + except Exception: + pass + + # 2. Процентильная обрезка + if spec_clip is not None: + try: + vmin_v = float(np.nanpercentile(disp_fft, spec_clip[0])) + vmax_v = float(np.nanpercentile(disp_fft, spec_clip[1])) + if np.isfinite(vmin_v) and np.isfinite(vmax_v) and vmin_v != vmax_v: + return (vmin_v, vmax_v) + except Exception: + pass + + # 3. Глобальные накопленные мин/макс + if ( + self.y_min_fft is not None + and self.y_max_fft is not None + and np.isfinite(self.y_min_fft) + and np.isfinite(self.y_max_fft) + and self.y_min_fft != self.y_max_fft + ): + return (self.y_min_fft, self.y_max_fft) + + return None diff --git a/rfg_adc_plotter/types.py b/rfg_adc_plotter/types.py new file mode 100644 index 0000000..7b53fb3 --- /dev/null +++ b/rfg_adc_plotter/types.py @@ -0,0 +1,7 @@ +from typing import Any, Dict, Tuple, Union + +import numpy as np + +Number = Union[int, float] +SweepInfo = Dict[str, Any] +SweepPacket = Tuple[np.ndarray, SweepInfo]