diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c18c73e --- /dev/null +++ b/.gitignore @@ -0,0 +1,9 @@ +my_picocom_logfile.txt +*pyc +__pycache__/ +*.log +*.tmp +*.bak +*.swp +*.swo +acm_9 diff --git a/RFG_ADC_dataplotter.py b/RFG_ADC_dataplotter.py deleted file mode 100755 index b3ab06d..0000000 --- a/RFG_ADC_dataplotter.py +++ /dev/null @@ -1,1446 +0,0 @@ -#!/usr/bin/env python3 -""" -Реалтайм-плоттер для свипов из виртуального COM-порта. - -Формат строк: - - "Sweep_start" — начало нового свипа (предыдущий считается завершённым) - - "s CH X Y" — точка (номер канала, индекс X, значение Y), все целые со знаком - -Отрисовываются два графика: - - Левый: последний полученный свип (Y vs X) - - Правый: водопад (последние N свипов во времени) - -Оптимизации для скорости: - - Парсинг и чтение в фоновой нити - - Анимация с обновлением только данных (без лишнего пересоздания фигур) - - Кольцевой буфер под водопад с фиксированным числом свипов - -Зависимости: matplotlib, numpy. PySerial опционален — при его отсутствии -используется сырой доступ к TTY через termios. -""" - -import argparse -import io -import os -import sys -import threading -import time -from collections import deque -from queue import Queue, Empty, Full -from typing import Any, Dict, Mapping, Optional, Tuple, Union - -import numpy as np - -WF_WIDTH = 1000 # максимальное число точек в ряду водопада -FFT_LEN = 1024 # длина БПФ для спектра/водопада спектров -# Порог для инверсии сырых данных: если среднее значение свипа ниже порога — -# считаем, что сигнал «меньше нуля» и домножаем свип на -1 -DATA_INVERSION_THRASHOLD = 10.0 - -Number = Union[int, float] -SweepInfo = Dict[str, Any] -SweepPacket = Tuple[np.ndarray, SweepInfo] - - -def _format_status_kv(data: Mapping[str, Any]) -> str: - """Преобразовать словарь метрик в одну строку 'k:v'.""" - - def _fmt(v: Any) -> str: - if v is None: - return "NA" - try: - fv = float(v) - except Exception: - return str(v) - if not np.isfinite(fv): - return "nan" - # Достаточно компактно для статус-строки. - if abs(fv) >= 1000 or (0 < abs(fv) < 0.01): - return f"{fv:.3g}" - return f"{fv:.3f}".rstrip("0").rstrip(".") - - parts = [f"{k}:{_fmt(v)}" for k, v in data.items()] - return " ".join(parts) - - -def _parse_spec_clip(spec: Optional[str]) -> Optional[Tuple[float, float]]: - """Разобрать строку вида "low,high" процентов для контрастного отображения водопада спектров. - - Возвращает пару (low, high) или None для отключения. Допустимы значения 0..100, low < high. - Ключевые слова отключения: "off", "none", "no". - """ - if not spec: - return None - s = str(spec).strip().lower() - if s in ("off", "none", "no"): - return None - try: - p0, p1 = s.replace(";", ",").split(",") - low = float(p0) - high = float(p1) - if not (0.0 <= low < high <= 100.0): - return None - return (low, high) - except Exception: - return None - - -def try_open_pyserial(path: str, baud: int, timeout: float): - try: - import serial # type: ignore - except Exception: - return None - try: - ser = serial.Serial(path, baudrate=baud, timeout=timeout) - return ser - except Exception: - return None - - -class FDReader: - """Простой враппер чтения строк из файлового дескриптора TTY.""" - - def __init__(self, fd: int): - # Отдельно буферизуем для корректной readline() - self._fd = fd - raw = os.fdopen(fd, "rb", closefd=False) - self._file = raw - self._buf = io.BufferedReader(raw, buffer_size=65536) - - def fileno(self) -> int: - return self._fd - - def readline(self) -> bytes: - return self._buf.readline() - - def close(self): - try: - self._buf.close() - except Exception: - pass - - -def open_raw_tty(path: str, baud: int) -> Optional[FDReader]: - """Открыть TTY без pyserial и настроить порт через termios. - - Возвращает FDReader или None при ошибке. - """ - try: - import termios - import tty - except Exception: - return None - - try: - fd = os.open(path, os.O_RDONLY | os.O_NOCTTY) - except Exception: - return None - - try: - attrs = termios.tcgetattr(fd) - # Установим «сырое» состояние - tty.setraw(fd) - - # Скорость - baud_map = { - 9600: termios.B9600, - 19200: termios.B19200, - 38400: termios.B38400, - 57600: termios.B57600, - 115200: termios.B115200, - 230400: getattr(termios, "B230400", None), - 460800: getattr(termios, "B460800", None), - } - b = baud_map.get(baud) or termios.B115200 - - attrs[4] = b # ispeed - attrs[5] = b # ospeed - - # VMIN=1, VTIME=0 — блокирующее чтение по байту - cc = attrs[6] - cc[termios.VMIN] = 1 - cc[termios.VTIME] = 0 - attrs[6] = cc - - termios.tcsetattr(fd, termios.TCSANOW, attrs) - except Exception: - try: - os.close(fd) - except Exception: - pass - return None - - return FDReader(fd) - - -class SerialLineSource: - """Единый интерфейс для чтения строк из порта (pyserial или raw TTY).""" - - def __init__(self, path: str, baud: int, timeout: float = 1.0): - self._pyserial = try_open_pyserial(path, baud, timeout) - self._fdreader = None - self._using = "pyserial" if self._pyserial is not None else "raw" - if self._pyserial is None: - self._fdreader = open_raw_tty(path, baud) - if self._fdreader is None: - msg = f"Не удалось открыть порт '{path}' (pyserial и raw TTY не сработали)" - if sys.platform.startswith("win"): - msg += ". На Windows нужен pyserial: pip install pyserial" - raise RuntimeError(msg) - - def readline(self) -> bytes: - if self._pyserial is not None: - try: - return self._pyserial.readline() - except Exception: - return b"" - else: - try: - return self._fdreader.readline() # type: ignore[union-attr] - except Exception: - return b"" - - def close(self): - try: - if self._pyserial is not None: - self._pyserial.close() - elif self._fdreader is not None: - self._fdreader.close() - except Exception: - pass - - -class SerialChunkReader: - """Быстрое неблокирующее чтение чанков из serial/raw TTY для максимального дренажа буфера.""" - - def __init__(self, src: SerialLineSource): - self._src = src - self._ser = src._pyserial - self._fd: Optional[int] = None - if self._ser is not None: - # Неблокирующий режим для быстрой откачки - try: - self._ser.timeout = 0 - except Exception: - pass - else: - try: - self._fd = src._fdreader.fileno() # type: ignore[union-attr] - try: - os.set_blocking(self._fd, False) - except Exception: - pass - except Exception: - self._fd = None - - def read_available(self) -> bytes: - """Вернёт доступные байты (b"" если данных нет).""" - if self._ser is not None: - try: - n = int(getattr(self._ser, "in_waiting", 0)) - except Exception: - n = 0 - if n > 0: - try: - return self._ser.read(n) - except Exception: - return b"" - return b"" - if self._fd is None: - return b"" - out = bytearray() - while True: - try: - chunk = os.read(self._fd, 65536) - if not chunk: - break - out += chunk - if len(chunk) < 65536: - break - except BlockingIOError: - break - except Exception: - break - return bytes(out) - - -class SweepReader(threading.Thread): - """Фоновый поток: читает строки, формирует завершённые свипы и кладёт в очередь.""" - - def __init__( - self, - port_path: str, - baud: int, - out_queue: Queue[SweepPacket], - stop_event: threading.Event, - fancy: bool = False, - ): - super().__init__(daemon=True) - self._port_path = port_path - self._baud = baud - self._q = out_queue - self._stop = stop_event - self._src: Optional[SerialLineSource] = None - self._fancy = bool(fancy) - self._max_width: int = 0 - self._sweep_idx: int = 0 - self._last_sweep_ts: Optional[float] = None - self._n_valid_hist = deque() - - def _finalize_current(self, xs, ys, channels: Optional[set[int]]): - if not xs: - return - ch_list = sorted(channels) if channels else [0] - ch_primary = ch_list[0] if ch_list else 0 - max_x = max(xs) - width = max_x + 1 - self._max_width = max(self._max_width, width) - target_width = self._max_width if self._fancy else width - # Быстрый векторизованный путь - sweep = np.full((target_width,), np.nan, dtype=np.float32) - try: - idx = np.asarray(xs, dtype=np.int64) - vals = np.asarray(ys, dtype=np.float32) - sweep[idx] = vals - except Exception: - # Запасной путь - for x, y in zip(xs, ys): - if 0 <= x < target_width: - sweep[x] = float(y) - # Метрики валидных точек до заполнения пропусков - finite_pre = np.isfinite(sweep) - n_valid_cur = int(np.count_nonzero(finite_pre)) - - # Дополнительная обработка пропусков: при --fancy заполняем внутренние разрывы, края и дотягиваем до максимальной длины - if self._fancy: - try: - known = ~np.isnan(sweep) - if np.any(known): - known_idx = np.nonzero(known)[0] - # Для каждой пары соседних известных индексов заполним промежуток средним значением - for i0, i1 in zip(known_idx[:-1], known_idx[1:]): - if i1 - i0 > 1: - avg = (sweep[i0] + sweep[i1]) * 0.5 - sweep[i0 + 1 : i1] = avg - first_idx = int(known_idx[0]) - last_idx = int(known_idx[-1]) - if first_idx > 0: - sweep[:first_idx] = sweep[first_idx] - if last_idx < sweep.size - 1: - sweep[last_idx + 1 :] = sweep[last_idx] - except Exception: - # В случае ошибки просто оставляем как есть - pass - # Инверсия данных при «отрицательном» уровне (среднее ниже порога) - try: - m = float(np.nanmean(sweep)) - if np.isfinite(m) and m < DATA_INVERSION_THRASHOLD: - sweep *= -1.0 - except Exception: - pass - #sweep -= float(np.nanmean(sweep)) - - # Метрики для статусной строки (вид словаря: переменная -> значение) - self._sweep_idx += 1 - if len(ch_list) > 1: - sys.stderr.write( - f"[warn] Sweep {self._sweep_idx}: изменялся номер канала: {ch_list}\n" - ) - now = time.time() - if self._last_sweep_ts is None: - dt_ms = float("nan") - else: - dt_ms = (now - self._last_sweep_ts) * 1000.0 - self._last_sweep_ts = now - self._n_valid_hist.append((now, n_valid_cur)) - while self._n_valid_hist and (now - self._n_valid_hist[0][0]) > 1.0: - self._n_valid_hist.popleft() - if self._n_valid_hist: - n_valid = float(sum(v for _t, v in self._n_valid_hist) / len(self._n_valid_hist)) - else: - n_valid = float(n_valid_cur) - - if n_valid_cur > 0: - vmin = float(np.nanmin(sweep)) - vmax = float(np.nanmax(sweep)) - mean = float(np.nanmean(sweep)) - std = float(np.nanstd(sweep)) - else: - vmin = vmax = mean = std = float("nan") - info: SweepInfo = { - "sweep": self._sweep_idx, - "ch": ch_primary, - "chs": ch_list, - "n_valid": n_valid, - "min": vmin, - "max": vmax, - "mean": mean, - "std": std, - "dt_ms": dt_ms, - } - - # Кладём готовый свип (если очередь полна — выбрасываем самый старый) - try: - self._q.put_nowait((sweep, info)) - except Full: - try: - _ = self._q.get_nowait() - except Exception: - pass - try: - self._q.put_nowait((sweep, info)) - except Exception: - pass - - def run(self): - # Состояние текущего свипа - xs: list[int] = [] - ys: list[int] = [] - cur_channel: Optional[int] = None - cur_channels: set[int] = set() - - try: - self._src = SerialLineSource(self._port_path, self._baud, timeout=1.0) - sys.stderr.write(f"[info] Открыл порт {self._port_path} ({self._src._using})\n") - except Exception as e: - sys.stderr.write(f"[error] {e}\n") - return - - try: - # Быстрый неблокирующий дренаж порта с разбором по байтам - chunk_reader = SerialChunkReader(self._src) - buf = bytearray() - while not self._stop.is_set(): - data = chunk_reader.read_available() - if data: - buf += data - else: - # Короткая уступка CPU, если нет новых данных - time.sleep(0.0005) - continue - - # Обрабатываем все полные строки - while True: - nl = buf.find(b"\n") - if nl == -1: - break - line = bytes(buf[:nl]) - del buf[: nl + 1] - if line.endswith(b"\r"): - line = line[:-1] - if not line: - continue - - if line.startswith(b"Sweep_start"): - self._finalize_current(xs, ys, cur_channels) - xs.clear() - ys.clear() - cur_channel = None - cur_channels.clear() - continue - - # sCH X Y или s CH X Y (все целые со знаком). Разделяем по любым пробелам/табам. - if len(line) >= 3: - parts = line.split() - if len(parts) >= 3 and (parts[0].lower() == b"s" or parts[0].lower().startswith(b"s")): - try: - if parts[0].lower() == b"s": - if len(parts) >= 4: - ch = int(parts[1], 10) - x = int(parts[2], 10) - y = int(parts[3], 10) # поддержка знака: "+…" и "-…" - else: - ch = 0 - x = int(parts[1], 10) - y = int(parts[2], 10) # поддержка знака: "+…" и "-…" - else: - # формат вида "s0" - ch = int(parts[0][1:], 10) - x = int(parts[1], 10) - y = int(parts[2], 10) # поддержка знака: "+…" и "-…" - except Exception: - continue - if cur_channel is None: - cur_channel = ch - cur_channels.add(ch) - xs.append(x) - ys.append(y) - - # Защита от переполнения буфера при отсутствии переводов строки - if len(buf) > 1_000_000: - del buf[:-262144] - finally: - try: - # Завершаем оставшийся свип - self._finalize_current(xs, ys, cur_channels) - except Exception: - pass - try: - if self._src is not None: - self._src.close() - except Exception: - pass - - -def main(): - parser = argparse.ArgumentParser( - description=( - "Читает свипы из виртуального COM-порта и рисует: " - "последний свип и водопад (реалтайм)." - ) - ) - parser.add_argument( - "port", - help="Путь к порту, например /dev/ttyACM1 или COM3 (COM10+: \\\\.\\COM10)", - ) - parser.add_argument("--baud", type=int, default=115200, help="Скорость (по умолчанию 115200)") - parser.add_argument("--max-sweeps", type=int, default=200, help="Количество видимых свипов в водопаде") - parser.add_argument("--max-fps", type=float, default=30.0, help="Лимит частоты отрисовки, кадров/с") - parser.add_argument("--cmap", default="viridis", help="Цветовая карта водопада") - parser.add_argument( - "--spec-clip", - default="2,98", - help=( - "Процентильная обрезка уровней водопада спектров, % (min,max). " - "Напр. 2,98. 'off' — отключить" - ), - ) - parser.add_argument( - "--spec-mean-sec", - type=float, - default=0.0, - help=( - "Вычитание среднего по каждой частоте за последние N секунд " - "в водопаде спектров (0 — отключить)" - ), - ) - parser.add_argument("--title", default="ADC Sweeps", help="Заголовок окна") - parser.add_argument( - "--fancy", - action="store_true", - help="Заполнять выпавшие точки средними значениями между соседними", - ) - parser.add_argument( - "--ylim", - type=str, - default=None, - help="Фиксированные Y-пределы для кривой формата min,max (например -1000,1000). По умолчанию авто", - ) - parser.add_argument( - "--backend", - choices=["auto", "pg", "mpl"], - default="auto", - help="Графический бэкенд: pyqtgraph (pg) — быстрее; matplotlib (mpl) — совместимый. По умолчанию auto", - ) - - args = parser.parse_args() - - # Попробуем быстрый бэкенд (pyqtgraph) при auto/pg - if args.backend in ("pg"): - try: - return run_pyqtgraph(args) - except Exception as e: - if args.backend == "pg": - sys.stderr.write(f"[error] PyQtGraph бэкенд недоступен: {e}\n") - sys.exit(1) - # При auto — тихо откатываемся на matplotlib - - try: - import matplotlib - import matplotlib.pyplot as plt - from matplotlib.animation import FuncAnimation - from matplotlib.widgets import Slider, CheckButtons - except Exception as e: - sys.stderr.write(f"[error] Нужны matplotlib и ее зависимости: {e}\n") - sys.exit(1) - - # Очередь завершённых свипов и поток чтения - q: Queue[SweepPacket] = Queue(maxsize=1000) - stop_event = threading.Event() - reader = SweepReader(args.port, args.baud, q, stop_event, fancy=bool(args.fancy)) - reader.start() - - # Графика - fig, axs = plt.subplots(2, 2, figsize=(12, 8)) - (ax_line, ax_img), (ax_fft, ax_spec) = axs - fig.canvas.manager.set_window_title(args.title) if hasattr(fig.canvas.manager, "set_window_title") else None - # Увеличим расстояния и оставим место справа под ползунки оси Y B-scan - fig.subplots_adjust(wspace=0.25, hspace=0.35, left=0.07, right=0.90, top=0.92, bottom=0.08) - - # Состояние для отображения - current_sweep_raw: Optional[np.ndarray] = None - current_sweep_norm: Optional[np.ndarray] = None - last_calib_sweep: Optional[np.ndarray] = None - current_info: Optional[SweepInfo] = None - x_shared: Optional[np.ndarray] = None - width: Optional[int] = None - max_sweeps = int(max(10, args.max_sweeps)) - ring = None # type: Optional[np.ndarray] - ring_time = None # type: Optional[np.ndarray] - head = 0 - # Авто-уровни цветовой шкалы водопада сырых данных пересчитываются по видимой области. - # FFT состояние - fft_bins = FFT_LEN // 2 + 1 - ring_fft = None # type: Optional[np.ndarray] - y_min_fft, y_max_fft = None, None - freq_shared: Optional[np.ndarray] = None - # Параметры контраста водопада спектров - spec_clip = _parse_spec_clip(getattr(args, "spec_clip", None)) - spec_mean_sec = float(getattr(args, "spec_mean_sec", 0.0)) - # Ползунки управления Y для B-scan и контрастом - ymin_slider = None - ymax_slider = None - contrast_slider = None - calib_enabled = False - cb = None - - # Статусная строка (внизу окна) - status_text = fig.text( - 0.01, - 0.01, - "", - ha="left", - va="bottom", - fontsize=8, - family="monospace", - ) - - # Линейный график последнего свипа - line_obj, = ax_line.plot([], [], lw=1, color="tab:blue") - line_calib_obj, = ax_line.plot([], [], lw=1, color="tab:red") - line_norm_obj, = ax_line.plot([], [], lw=1, color="tab:green") - ax_line.set_title("Сырые данные", pad=1) - ax_line.set_xlabel("F") - ax_line.set_ylabel("") - channel_text = ax_line.text( - 0.98, - 0.98, - "", - transform=ax_line.transAxes, - ha="right", - va="top", - fontsize=9, - family="monospace", - ) - - # Линейный график спектра текущего свипа - fft_line_obj, = ax_fft.plot([], [], lw=1) - ax_fft.set_title("FFT", pad=1) - ax_fft.set_xlabel("X") - ax_fft.set_ylabel("Амплитуда, дБ") - - # Диапазон по Y для последнего свипа: авто по умолчанию (поддерживает отрицательные значения) - fixed_ylim: Optional[Tuple[float, float]] = None - # CLI переопределение при необходимости - if args.ylim: - try: - y0, y1 = args.ylim.split(",") - fixed_ylim = (float(y0), float(y1)) - except Exception: - sys.stderr.write("[warn] Некорректный формат --ylim, игнорирую. Ожидалось min,max\n") - if fixed_ylim is not None: - ax_line.set_ylim(fixed_ylim) - - # Водопад (будет инициализирован при первом свипе) - img_obj = ax_img.imshow( - np.zeros((1, 1), dtype=np.float32), - aspect="auto", - interpolation="nearest", - origin="lower", - cmap=args.cmap, - ) - ax_img.set_title("Сырые данные", pad=12) - ax_img.set_xlabel("") - ax_img.set_ylabel("частота") - # Не показываем численные значения по времени на водопаде сырых данных - try: - ax_img.tick_params(axis="x", labelbottom=False) - except Exception: - pass - - # Водопад спектров - img_fft_obj = ax_spec.imshow( - np.zeros((1, 1), dtype=np.float32), - aspect="auto", - interpolation="nearest", - origin="lower", - cmap=args.cmap, - ) - ax_spec.set_title("B-scan (дБ)", pad=12) - ax_spec.set_xlabel("") - ax_spec.set_ylabel("расстояние") - # Не показываем численные значения по времени на B-scan - try: - ax_spec.tick_params(axis="x", labelbottom=False) - except Exception: - pass - def _normalize_sweep(raw: np.ndarray, calib: np.ndarray) -> np.ndarray: - w = min(raw.size, calib.size) - if w <= 0: - return raw - out = np.full_like(raw, np.nan, dtype=np.float32) - with np.errstate(divide="ignore", invalid="ignore"): - out[:w] = raw[:w] / calib[:w] - out = np.nan_to_num(out, nan=np.nan, posinf=np.nan, neginf=np.nan) - return out - - def _set_calib_enabled(): - nonlocal calib_enabled, current_sweep_norm - try: - calib_enabled = bool(cb.get_status()[0]) if cb is not None else False - except Exception: - calib_enabled = False - if calib_enabled and current_sweep_raw is not None and last_calib_sweep is not None: - current_sweep_norm = _normalize_sweep(current_sweep_raw, last_calib_sweep) - else: - current_sweep_norm = None - - # Слайдеры для управления осью Y B-scan (мин/макс) и контрастом - try: - ax_smin = fig.add_axes([0.92, 0.55, 0.02, 0.35]) - ax_smax = fig.add_axes([0.95, 0.55, 0.02, 0.35]) - ax_sctr = fig.add_axes([0.98, 0.55, 0.02, 0.35]) - ax_cb = fig.add_axes([0.92, 0.45, 0.08, 0.08]) - ymin_slider = Slider(ax_smin, "Y min", 0, max(1, fft_bins - 1), valinit=0, valstep=1, orientation="vertical") - ymax_slider = Slider(ax_smax, "Y max", 0, max(1, fft_bins - 1), valinit=max(1, fft_bins - 1), valstep=1, orientation="vertical") - contrast_slider = Slider(ax_sctr, "Int max", 0, 100, valinit=100, valstep=1, orientation="vertical") - cb = CheckButtons(ax_cb, ["калибровка"], [False]) - - def _on_ylim_change(_val): - try: - y0 = int(min(ymin_slider.val, ymax_slider.val)) - y1 = int(max(ymin_slider.val, ymax_slider.val)) - ax_spec.set_ylim(y0, y1) - fig.canvas.draw_idle() - except Exception: - pass - - ymin_slider.on_changed(_on_ylim_change) - ymax_slider.on_changed(_on_ylim_change) - # Контраст влияет на верхнюю границу цветовой шкалы (процент от авто-диапазона) - contrast_slider.on_changed(lambda _v: fig.canvas.draw_idle()) - cb.on_clicked(lambda _v: _set_calib_enabled()) - except Exception: - pass - - # Для контроля частоты обновления - max_fps = max(1.0, float(args.max_fps)) - interval_ms = int(1000.0 / max_fps) - frames_since_ylim_update = 0 - - - def ensure_buffer(_w: int): - nonlocal ring, width, head, x_shared, ring_fft, freq_shared, ring_time - if ring is not None: - return - width = WF_WIDTH - x_shared = np.arange(width, dtype=np.int32) - ring = np.full((max_sweeps, width), np.nan, dtype=np.float32) - ring_time = np.full((max_sweeps,), np.nan, dtype=np.float64) - head = 0 - # Обновляем изображение под новые размеры: время по X (горизонталь), X по Y - img_obj.set_data(np.zeros((width, max_sweeps), dtype=np.float32)) - img_obj.set_extent((0, max_sweeps - 1, 0, width - 1 if width > 0 else 1)) - ax_img.set_xlim(0, max_sweeps - 1) - ax_img.set_ylim(0, max(1, width - 1)) - # FFT буферы: время по X, бин по Y - ring_fft = np.full((max_sweeps, fft_bins), np.nan, dtype=np.float32) - img_fft_obj.set_data(np.zeros((fft_bins, max_sweeps), dtype=np.float32)) - img_fft_obj.set_extent((0, max_sweeps - 1, 0, fft_bins - 1)) - ax_spec.set_xlim(0, max_sweeps - 1) - ax_spec.set_ylim(0, max(1, fft_bins - 1)) - freq_shared = np.arange(fft_bins, dtype=np.int32) - - def _visible_levels_matplotlib(data: np.ndarray, axis) -> Optional[Tuple[float, float]]: - """(vmin, vmax) по текущей видимой области imshow (без накопления по времени).""" - if data.size == 0: - return None - ny, nx = data.shape[0], data.shape[1] - try: - x0, x1 = axis.get_xlim() - y0, y1 = axis.get_ylim() - except Exception: - x0, x1 = 0.0, float(nx - 1) - y0, y1 = 0.0, float(ny - 1) - xmin, xmax = sorted((float(x0), float(x1))) - ymin, ymax = sorted((float(y0), float(y1))) - ix0 = max(0, min(nx - 1, int(np.floor(xmin)))) - ix1 = max(0, min(nx - 1, int(np.ceil(xmax)))) - iy0 = max(0, min(ny - 1, int(np.floor(ymin)))) - iy1 = max(0, min(ny - 1, int(np.ceil(ymax)))) - if ix1 < ix0: - ix1 = ix0 - if iy1 < iy0: - iy1 = iy0 - sub = data[iy0 : iy1 + 1, ix0 : ix1 + 1] - finite = np.isfinite(sub) - if not finite.any(): - return None - vals = sub[finite] - vmin = float(np.min(vals)) - vmax = float(np.max(vals)) - if not (np.isfinite(vmin) and np.isfinite(vmax)) or vmin == vmax: - return None - return (vmin, vmax) - - def push_sweep(s: np.ndarray): - nonlocal ring, head, ring_fft, y_min_fft, y_max_fft, ring_time - if s is None or s.size == 0 or ring is None: - return - # Нормализуем длину до фиксированной ширины - w = ring.shape[1] - row = np.full((w,), np.nan, dtype=np.float32) - take = min(w, s.size) - row[:take] = s[:take] - ring[head, :] = row - if ring_time is not None: - ring_time[head] = time.time() - head = (head + 1) % ring.shape[0] - # FFT строка (дБ) - if ring_fft is not None: - bins = ring_fft.shape[1] - # Подготовка входа FFT_LEN, замена NaN на 0 - take_fft = min(int(s.size), FFT_LEN) - if take_fft <= 0: - fft_row = np.full((bins,), np.nan, dtype=np.float32) - else: - fft_in = np.zeros((FFT_LEN,), dtype=np.float32) - seg = s[:take_fft] - if isinstance(seg, np.ndarray): - seg = np.nan_to_num(seg, nan=0.0).astype(np.float32, copy=False) - else: - seg = np.asarray(seg, dtype=np.float32) - seg = np.nan_to_num(seg, nan=0.0) - # Окно Хэннинга - win = np.hanning(take_fft).astype(np.float32) - fft_in[:take_fft] = seg * win - spec = np.fft.rfft(fft_in) - mag = np.abs(spec).astype(np.float32) - fft_row = 20.0 * np.log10(mag + 1e-9) - if fft_row.shape[0] != bins: - # rfft длиной FFT_LEN даёт bins == FFT_LEN//2+1 - fft_row = fft_row[:bins] - ring_fft[(head - 1) % ring_fft.shape[0], :] = fft_row - # Экстремумы для цветовой шкалы - fr_min = np.nanmin(fft_row) - fr_max = np.nanmax(fft_row) - fr_max = np.nanpercentile(fft_row, 90) - if y_min_fft is None or (not np.isnan(fr_min) and fr_min < y_min_fft): - y_min_fft = float(fr_min) - if y_max_fft is None or (not np.isnan(fr_max) and fr_max > y_max_fft): - y_max_fft = float(fr_max) - - def drain_queue(): - nonlocal current_sweep_raw, current_sweep_norm, current_info, last_calib_sweep - drained = 0 - while True: - try: - s, info = q.get_nowait() - except Empty: - break - drained += 1 - current_sweep_raw = s - current_info = info - ch = 0 - try: - ch = int(info.get("ch", 0)) if isinstance(info, dict) else 0 - except Exception: - ch = 0 - if ch == 0: - last_calib_sweep = s - current_sweep_norm = None - sweep_for_proc = s - else: - if calib_enabled and last_calib_sweep is not None: - current_sweep_norm = _normalize_sweep(s, last_calib_sweep) - sweep_for_proc = current_sweep_norm - else: - current_sweep_norm = None - sweep_for_proc = s - ensure_buffer(s.size) - push_sweep(sweep_for_proc) - return drained - - def make_display_ring(): - # Возвращаем буфер с правильным порядком по времени (старые→новые) и осью времени по X - if ring is None: - return np.zeros((1, 1), dtype=np.float32) - base = ring if head == 0 else np.roll(ring, -head, axis=0) - return base.T # (width, time) - - def make_display_times(): - if ring_time is None: - return None - base_t = ring_time if head == 0 else np.roll(ring_time, -head) - return base_t - - def _subtract_recent_mean_fft(disp_fft: np.ndarray) -> np.ndarray: - """Вычесть среднее по каждой частоте за последние spec_mean_sec секунд.""" - if spec_mean_sec <= 0.0: - return disp_fft - disp_times = make_display_times() - if disp_times is None: - return disp_fft - now_t = time.time() - mask = np.isfinite(disp_times) & (disp_times >= (now_t - spec_mean_sec)) - if not np.any(mask): - return disp_fft - try: - mean_spec = np.nanmean(disp_fft[:, mask], axis=1) - except Exception: - return disp_fft - mean_spec = np.nan_to_num(mean_spec, nan=0.0) - return disp_fft - mean_spec[:, None] - - def make_display_ring_fft(): - if ring_fft is None: - return np.zeros((1, 1), dtype=np.float32) - base = ring_fft if head == 0 else np.roll(ring_fft, -head, axis=0) - return base.T # (bins, time) - - def update(_frame): - nonlocal frames_since_ylim_update - changed = drain_queue() > 0 - - # Обновление линии последнего свипа - if current_sweep_raw is not None: - if x_shared is not None and current_sweep_raw.size <= x_shared.size: - xs = x_shared[: current_sweep_raw.size] - else: - xs = np.arange(current_sweep_raw.size, dtype=np.int32) - line_obj.set_data(xs, current_sweep_raw) - if last_calib_sweep is not None: - line_calib_obj.set_data(xs[: last_calib_sweep.size], last_calib_sweep) - else: - line_calib_obj.set_data([], []) - if current_sweep_norm is not None: - line_norm_obj.set_data(xs[: current_sweep_norm.size], current_sweep_norm) - else: - line_norm_obj.set_data([], []) - # Лимиты по X постоянные под текущую ширину - ax_line.set_xlim(0, max(1, current_sweep_raw.size - 1)) - # Адаптивные Y-лимиты (если не задан --ylim) - if fixed_ylim is None: - y0 = float(np.nanmin(current_sweep_raw)) - y1 = float(np.nanmax(current_sweep_raw)) - if np.isfinite(y0) and np.isfinite(y1): - if y0 == y1: - pad = max(1.0, abs(y0) * 0.05) - y0 -= pad - y1 += pad - else: - pad = 0.05 * (y1 - y0) - y0 -= pad - y1 += pad - ax_line.set_ylim(y0, y1) - - # Обновление спектра текущего свипа - sweep_for_fft = current_sweep_norm if current_sweep_norm is not None else current_sweep_raw - take_fft = min(int(sweep_for_fft.size), FFT_LEN) - if take_fft > 0 and freq_shared is not None: - fft_in = np.zeros((FFT_LEN,), dtype=np.float32) - seg = np.nan_to_num(sweep_for_fft[:take_fft], nan=0.0).astype(np.float32, copy=False) - win = np.hanning(take_fft).astype(np.float32) - fft_in[:take_fft] = seg * win - spec = np.fft.rfft(fft_in) - mag = np.abs(spec).astype(np.float32) - fft_vals = 20.0 * np.log10(mag + 1e-9) - xs_fft = freq_shared - if fft_vals.size > xs_fft.size: - fft_vals = fft_vals[: xs_fft.size] - fft_line_obj.set_data(xs_fft[: fft_vals.size], fft_vals) - # Авто-диапазон по Y для спектра - if np.isfinite(np.nanmin(fft_vals)) and np.isfinite(np.nanmax(fft_vals)): - ax_fft.set_xlim(0, max(1, xs_fft.size - 1)) - ax_fft.set_ylim(float(np.nanmin(fft_vals)), float(np.nanmax(fft_vals))) - - # Обновление водопада - if changed and ring is not None: - disp = make_display_ring() - # Новые данные справа: без реверса - img_obj.set_data(disp) - # Подписи времени не обновляем динамически (оставляем авто-тики) - # Авто-уровни: по видимой области (не накапливаем за всё время) - levels = _visible_levels_matplotlib(disp, ax_img) - if levels is not None: - img_obj.set_clim(vmin=levels[0], vmax=levels[1]) - - # Обновление водопада спектров - if changed and ring_fft is not None: - disp_fft = make_display_ring_fft() - disp_fft = _subtract_recent_mean_fft(disp_fft) - # Новые данные справа: без реверса - img_fft_obj.set_data(disp_fft) - # Подписи времени не обновляем динамически (оставляем авто-тики) - # Автодиапазон по среднему спектру за видимый интервал (как в хорошей версии) - try: - # disp_fft имеет форму (bins, time); берём среднее по времени - mean_spec = np.nanmean(disp_fft, axis=1) - vmin_v = float(np.nanmin(mean_spec)) - vmax_v = float(np.nanmax(mean_spec)) - except Exception: - vmin_v = vmax_v = None - # Если средние не дают валидный диапазон — используем процентильную обрезку (если задана) - if (vmin_v is None or not np.isfinite(vmin_v)) or (vmax_v is None or not np.isfinite(vmax_v)) or vmin_v == vmax_v: - if spec_clip is not None: - try: - vmin_v = float(np.nanpercentile(disp_fft, spec_clip[0])) - vmax_v = float(np.nanpercentile(disp_fft, spec_clip[1])) - except Exception: - vmin_v = vmax_v = None - # Фолбэк к отслеживаемым минимум/максимумам - if (vmin_v is None or not np.isfinite(vmin_v)) or (vmax_v is None or not np.isfinite(vmax_v)) or vmin_v == vmax_v: - if y_min_fft is not None and y_max_fft is not None and np.isfinite(y_min_fft) and np.isfinite(y_max_fft) and y_min_fft != y_max_fft: - vmin_v, vmax_v = y_min_fft, y_max_fft - if vmin_v is not None and vmax_v is not None and vmin_v != vmax_v: - # Применим скалирование контрастом (верхняя граница) - try: - c = float(contrast_slider.val) / 100.0 if contrast_slider is not None else 1.0 - except Exception: - c = 1.0 - vmax_eff = vmin_v + c * (vmax_v - vmin_v) - img_fft_obj.set_clim(vmin=vmin_v, vmax=vmax_eff) - - if changed and current_info: - status_text.set_text(_format_status_kv(current_info)) - chs = current_info.get("chs") if isinstance(current_info, dict) else None - if chs is None: - chs = current_info.get("ch") if isinstance(current_info, dict) else None - if chs is None: - channel_text.set_text("") - else: - try: - if isinstance(chs, (list, tuple, set)): - ch_list = sorted(int(v) for v in chs) - ch_text_val = ", ".join(str(v) for v in ch_list) - else: - ch_text_val = str(int(chs)) - channel_text.set_text(f"chs {ch_text_val}") - except Exception: - channel_text.set_text(f"chs {chs}") - - # Возвращаем обновлённые артисты - return ( - line_obj, - line_calib_obj, - line_norm_obj, - img_obj, - fft_line_obj, - img_fft_obj, - status_text, - channel_text, - ) - - ani = FuncAnimation(fig, update, interval=interval_ms, blit=False) - - plt.show() - # Нормальное завершение при закрытии окна - stop_event.set() - reader.join(timeout=1.0) - - -def run_pyqtgraph(args): - """Быстрый GUI на PyQtGraph. Требует pyqtgraph и PyQt5/PySide6.""" - try: - import pyqtgraph as pg - from PyQt5 import QtCore, QtWidgets # noqa: F401 - except Exception: - # Возможно установлена PySide6 - try: - import pyqtgraph as pg - from PySide6 import QtCore, QtWidgets # noqa: F401 - except Exception as e: - raise RuntimeError( - "pyqtgraph/PyQt5(Pyside6) не найдены. Установите: pip install pyqtgraph PyQt5" - ) from e - - # Очередь завершённых свипов и поток чтения - q: Queue[SweepPacket] = Queue(maxsize=1000) - stop_event = threading.Event() - reader = SweepReader(args.port, args.baud, q, stop_event, fancy=bool(args.fancy)) - reader.start() - - # Настройки скорости - max_sweeps = int(max(10, args.max_sweeps)) - max_fps = max(1.0, float(args.max_fps)) - interval_ms = int(1000.0 / max_fps) - - # PyQtGraph настройки - pg.setConfigOptions(useOpenGL=True, antialias=False) - app = pg.mkQApp(args.title) - win = pg.GraphicsLayoutWidget(show=True, title=args.title) - win.resize(1200, 600) - - # Плот последнего свипа (слева-сверху) - p_line = win.addPlot(row=0, col=0, title="Сырые данные") - p_line.showGrid(x=True, y=True, alpha=0.3) - curve = p_line.plot(pen=pg.mkPen((80, 120, 255), width=1)) - curve_calib = p_line.plot(pen=pg.mkPen((220, 60, 60), width=1)) - curve_norm = p_line.plot(pen=pg.mkPen((60, 180, 90), width=1)) - p_line.setLabel("bottom", "X") - p_line.setLabel("left", "Y") - ch_text = pg.TextItem("", anchor=(1, 1)) - ch_text.setZValue(10) - p_line.addItem(ch_text) - - # Водопад (справа-сверху) - p_img = win.addPlot(row=0, col=1, title="Сырые данные водопад") - p_img.invertY(False) - p_img.showGrid(x=False, y=False) - p_img.setLabel("bottom", "Время, с (новое справа)") - try: - p_img.getAxis("bottom").setStyle(showValues=False) - except Exception: - pass - p_img.setLabel("left", "X (0 снизу)") - img = pg.ImageItem() - p_img.addItem(img) - - # FFT (слева-снизу) - p_fft = win.addPlot(row=1, col=0, title="FFT") - p_fft.showGrid(x=True, y=True, alpha=0.3) - curve_fft = p_fft.plot(pen=pg.mkPen((255, 120, 80), width=1)) - p_fft.setLabel("bottom", "Бин") - p_fft.setLabel("left", "Амплитуда, дБ") - - # Водопад спектров (справа-снизу) - p_spec = win.addPlot(row=1, col=1, title="B-scan (дБ)") - p_spec.invertY(True) - p_spec.showGrid(x=False, y=False) - p_spec.setLabel("bottom", "Время, с (новое справа)") - try: - p_spec.getAxis("bottom").setStyle(showValues=False) - except Exception: - pass - p_spec.setLabel("left", "Бин (0 снизу)") - img_fft = pg.ImageItem() - p_spec.addItem(img_fft) - - # Чекбокс калибровки - calib_cb = QtWidgets.QCheckBox("калибровка") - cb_proxy = QtWidgets.QGraphicsProxyWidget() - cb_proxy.setWidget(calib_cb) - win.addItem(cb_proxy, row=2, col=1) - - # Статусная строка (внизу окна) - status = pg.LabelItem(justify="left") - win.addItem(status, row=3, col=0, colspan=2) - - # Состояние - ring: Optional[np.ndarray] = None - ring_time: Optional[np.ndarray] = None - head = 0 - width: Optional[int] = None - x_shared: Optional[np.ndarray] = None - current_sweep_raw: Optional[np.ndarray] = None - current_sweep_norm: Optional[np.ndarray] = None - last_calib_sweep: Optional[np.ndarray] = None - current_info: Optional[SweepInfo] = None - # Авто-уровни цветовой шкалы водопада сырых данных пересчитываются по видимой области. - # Для спектров - fft_bins = FFT_LEN // 2 + 1 - ring_fft: Optional[np.ndarray] = None - freq_shared: Optional[np.ndarray] = None - y_min_fft, y_max_fft = None, None - # Параметры контраста водопада спектров (процентильная обрезка) - spec_clip = _parse_spec_clip(getattr(args, "spec_clip", None)) - spec_mean_sec = float(getattr(args, "spec_mean_sec", 0.0)) - calib_enabled = False - # Диапазон по Y: авто по умолчанию (поддерживает отрицательные значения) - fixed_ylim: Optional[Tuple[float, float]] = None - if args.ylim: - try: - y0, y1 = args.ylim.split(",") - fixed_ylim = (float(y0), float(y1)) - except Exception: - pass - if fixed_ylim is not None: - p_line.setYRange(fixed_ylim[0], fixed_ylim[1], padding=0) - - def _normalize_sweep(raw: np.ndarray, calib: np.ndarray) -> np.ndarray: - w = min(raw.size, calib.size) - if w <= 0: - return raw - out = np.full_like(raw, np.nan, dtype=np.float32) - with np.errstate(divide="ignore", invalid="ignore"): - out[:w] = raw[:w] / calib[:w] - out = np.nan_to_num(out, nan=np.nan, posinf=np.nan, neginf=np.nan) - return out - - def _set_calib_enabled(): - nonlocal calib_enabled, current_sweep_norm - try: - calib_enabled = bool(calib_cb.isChecked()) - except Exception: - calib_enabled = False - if calib_enabled and current_sweep_raw is not None and last_calib_sweep is not None: - current_sweep_norm = _normalize_sweep(current_sweep_raw, last_calib_sweep) - else: - current_sweep_norm = None - - try: - calib_cb.stateChanged.connect(lambda _v: _set_calib_enabled()) - except Exception: - pass - - def ensure_buffer(_w: int): - nonlocal ring, ring_time, head, width, x_shared, ring_fft, freq_shared - if ring is not None: - return - width = WF_WIDTH - x_shared = np.arange(width, dtype=np.int32) - ring = np.full((max_sweeps, width), np.nan, dtype=np.float32) - ring_time = np.full((max_sweeps,), np.nan, dtype=np.float64) - head = 0 - # Водопад: время по оси X, X по оси Y - img.setImage(ring.T, autoLevels=False) - p_img.setRange(xRange=(0, max_sweeps - 1), yRange=(0, max(1, width - 1)), padding=0) - p_line.setXRange(0, max(1, width - 1), padding=0) - # FFT: время по оси X, бин по оси Y - ring_fft = np.full((max_sweeps, fft_bins), np.nan, dtype=np.float32) - img_fft.setImage(ring_fft.T, autoLevels=False) - p_spec.setRange(xRange=(0, max_sweeps - 1), yRange=(0, max(1, fft_bins - 1)), padding=0) - p_fft.setXRange(0, max(1, fft_bins - 1), padding=0) - freq_shared = np.arange(fft_bins, dtype=np.int32) - - def _visible_levels_pyqtgraph(data: np.ndarray) -> Optional[Tuple[float, float]]: - """(vmin, vmax) по текущей видимой области ImageItem (без накопления по времени).""" - if data.size == 0: - return None - ny, nx = data.shape[0], data.shape[1] - try: - (x0, x1), (y0, y1) = p_img.viewRange() - except Exception: - x0, x1 = 0.0, float(nx - 1) - y0, y1 = 0.0, float(ny - 1) - xmin, xmax = sorted((float(x0), float(x1))) - ymin, ymax = sorted((float(y0), float(y1))) - ix0 = max(0, min(nx - 1, int(np.floor(xmin)))) - ix1 = max(0, min(nx - 1, int(np.ceil(xmax)))) - iy0 = max(0, min(ny - 1, int(np.floor(ymin)))) - iy1 = max(0, min(ny - 1, int(np.ceil(ymax)))) - if ix1 < ix0: - ix1 = ix0 - if iy1 < iy0: - iy1 = iy0 - sub = data[iy0 : iy1 + 1, ix0 : ix1 + 1] - finite = np.isfinite(sub) - if not finite.any(): - return None - vals = sub[finite] - vmin = float(np.min(vals)) - vmax = float(np.max(vals)) - if not (np.isfinite(vmin) and np.isfinite(vmax)) or vmin == vmax: - return None - return (vmin, vmax) - - def push_sweep(s: np.ndarray): - nonlocal ring, ring_time, head, ring_fft, y_min_fft, y_max_fft - if s is None or s.size == 0 or ring is None: - return - w = ring.shape[1] - row = np.full((w,), np.nan, dtype=np.float32) - take = min(w, s.size) - row[:take] = s[:take] - ring[head, :] = row - if ring_time is not None: - ring_time[head] = time.time() - head = (head + 1) % ring.shape[0] - # FFT строка (дБ) - if ring_fft is not None: - bins = ring_fft.shape[1] - take_fft = min(int(s.size), FFT_LEN) - if take_fft > 0: - fft_in = np.zeros((FFT_LEN,), dtype=np.float32) - seg = np.nan_to_num(s[:take_fft], nan=0.0).astype(np.float32, copy=False) - win = np.hanning(take_fft).astype(np.float32) - fft_in[:take_fft] = seg * win - spec = np.fft.rfft(fft_in) - mag = np.abs(spec).astype(np.float32) - fft_row = 20.0 * np.log10(mag + 1e-9) - if fft_row.shape[0] != bins: - fft_row = fft_row[:bins] - else: - fft_row = np.full((bins,), np.nan, dtype=np.float32) - ring_fft[(head - 1) % ring_fft.shape[0], :] = fft_row - fr_min = np.nanmin(fft_row) - fr_max = np.nanmax(fft_row) - if y_min_fft is None or (not np.isnan(fr_min) and fr_min < y_min_fft): - y_min_fft = float(fr_min) - if y_max_fft is None or (not np.isnan(fr_max) and fr_max > y_max_fft): - y_max_fft = float(fr_max) - - def drain_queue(): - nonlocal current_sweep_raw, current_sweep_norm, current_info, last_calib_sweep - drained = 0 - while True: - try: - s, info = q.get_nowait() - except Empty: - break - drained += 1 - current_sweep_raw = s - current_info = info - ch = 0 - try: - ch = int(info.get("ch", 0)) if isinstance(info, dict) else 0 - except Exception: - ch = 0 - if ch == 0: - last_calib_sweep = s - current_sweep_norm = None - sweep_for_proc = s - else: - if calib_enabled and last_calib_sweep is not None: - current_sweep_norm = _normalize_sweep(s, last_calib_sweep) - sweep_for_proc = current_sweep_norm - else: - current_sweep_norm = None - sweep_for_proc = s - ensure_buffer(s.size) - push_sweep(sweep_for_proc) - return drained - - # Попытка применить LUT из колормэпа (если доступен) - try: - cm_mod = getattr(pg, "colormap", None) - if cm_mod is not None: - cm = cm_mod.get(args.cmap) - img.setLookupTable(cm.getLookupTable(0.0, 1.0, 256)) - except Exception: - pass - - def update(): - changed = drain_queue() > 0 - if current_sweep_raw is not None and x_shared is not None: - if current_sweep_raw.size <= x_shared.size: - xs = x_shared[: current_sweep_raw.size] - else: - xs = np.arange(current_sweep_raw.size) - curve.setData(xs, current_sweep_raw, autoDownsample=True) - if last_calib_sweep is not None: - curve_calib.setData(xs[: last_calib_sweep.size], last_calib_sweep, autoDownsample=True) - else: - curve_calib.setData([], []) - if current_sweep_norm is not None: - curve_norm.setData(xs[: current_sweep_norm.size], current_sweep_norm, autoDownsample=True) - else: - curve_norm.setData([], []) - if fixed_ylim is None: - y0 = float(np.nanmin(current_sweep_raw)) - y1 = float(np.nanmax(current_sweep_raw)) - if np.isfinite(y0) and np.isfinite(y1): - margin = 0.05 * max(1.0, (y1 - y0)) - p_line.setYRange(y0 - margin, y1 + margin, padding=0) - - # Обновим спектр - sweep_for_fft = current_sweep_norm if current_sweep_norm is not None else current_sweep_raw - take_fft = min(int(sweep_for_fft.size), FFT_LEN) - if take_fft > 0 and freq_shared is not None: - fft_in = np.zeros((FFT_LEN,), dtype=np.float32) - seg = np.nan_to_num(sweep_for_fft[:take_fft], nan=0.0).astype(np.float32, copy=False) - win = np.hanning(take_fft).astype(np.float32) - fft_in[:take_fft] = seg * win - spec = np.fft.rfft(fft_in) - mag = np.abs(spec).astype(np.float32) - fft_vals = 20.0 * np.log10(mag + 1e-9) - xs_fft = freq_shared - if fft_vals.size > xs_fft.size: - fft_vals = fft_vals[: xs_fft.size] - curve_fft.setData(xs_fft[: fft_vals.size], fft_vals) - p_fft.setYRange(float(np.nanmin(fft_vals)), float(np.nanmax(fft_vals)), padding=0) - - if changed and ring is not None: - disp = ring if head == 0 else np.roll(ring, -head, axis=0) - disp = disp.T[:, ::-1] # (width, time with newest at left) - levels = _visible_levels_pyqtgraph(disp) - if levels is not None: - img.setImage(disp, autoLevels=False, levels=levels) - else: - img.setImage(disp, autoLevels=False) - - if changed and current_info: - try: - status.setText(_format_status_kv(current_info)) - except Exception: - pass - try: - chs = current_info.get("chs") if isinstance(current_info, dict) else None - if chs is None: - chs = current_info.get("ch") if isinstance(current_info, dict) else None - if chs is None: - ch_text.setText("") - else: - if isinstance(chs, (list, tuple, set)): - ch_list = sorted(int(v) for v in chs) - ch_text_val = ", ".join(str(v) for v in ch_list) - else: - ch_text_val = str(int(chs)) - ch_text.setText(f"chs {ch_text_val}") - (x0, x1), (y0, y1) = p_line.viewRange() - dx = 0.01 * max(1.0, float(x1 - x0)) - dy = 0.01 * max(1.0, float(y1 - y0)) - ch_text.setPos(float(x1 - dx), float(y1 - dy)) - except Exception: - pass - - if changed and ring_fft is not None: - disp_fft = ring_fft if head == 0 else np.roll(ring_fft, -head, axis=0) - disp_fft = disp_fft.T[:, ::-1] - if spec_mean_sec > 0.0 and ring_time is not None: - disp_times = ring_time if head == 0 else np.roll(ring_time, -head) - disp_times = disp_times[::-1] - now_t = time.time() - mask = np.isfinite(disp_times) & (disp_times >= (now_t - spec_mean_sec)) - if np.any(mask): - try: - mean_spec = np.nanmean(disp_fft[:, mask], axis=1) - mean_spec = np.nan_to_num(mean_spec, nan=0.0) - disp_fft = disp_fft - mean_spec[:, None] - except Exception: - pass - # Автодиапазон по среднему спектру за видимый интервал (как в хорошей версии) - levels = None - try: - mean_spec = np.nanmean(disp_fft, axis=1) - vmin_v = float(np.nanmin(mean_spec)) - vmax_v = float(np.nanmax(mean_spec)) - if np.isfinite(vmin_v) and np.isfinite(vmax_v) and vmin_v != vmax_v: - levels = (vmin_v, vmax_v) - except Exception: - levels = None - # Процентильная обрезка как запасной вариант - if levels is None and spec_clip is not None: - try: - vmin_v = float(np.nanpercentile(disp_fft, spec_clip[0])) - vmax_v = float(np.nanpercentile(disp_fft, spec_clip[1])) - if np.isfinite(vmin_v) and np.isfinite(vmax_v) and vmin_v != vmax_v: - levels = (vmin_v, vmax_v) - except Exception: - levels = None - # Ещё один фолбэк — глобальные накопленные мин/макс - if levels is None and y_min_fft is not None and y_max_fft is not None and np.isfinite(y_min_fft) and np.isfinite(y_max_fft) and y_min_fft != y_max_fft: - levels = (y_min_fft, y_max_fft) - if levels is not None: - img_fft.setImage(disp_fft, autoLevels=False, levels=levels) - else: - img_fft.setImage(disp_fft, autoLevels=False) - - timer = pg.QtCore.QTimer() - timer.timeout.connect(update) - timer.start(interval_ms) - - def on_quit(): - stop_event.set() - reader.join(timeout=1.0) - - app.aboutToQuit.connect(on_quit) - win.show() - exec_fn = getattr(app, "exec_", None) or getattr(app, "exec", None) - exec_fn() - # На случай если aboutToQuit не сработал - on_quit() - - -if __name__ == "__main__": - main() diff --git a/background.npy b/background.npy new file mode 100644 index 0000000..b5c57ef Binary files /dev/null and b/background.npy differ diff --git a/calib_envelope.npy b/calib_envelope.npy new file mode 100644 index 0000000..bb4541f Binary files /dev/null and b/calib_envelope.npy differ diff --git a/replay_pty.py b/replay_pty.py new file mode 100644 index 0000000..00a0798 --- /dev/null +++ b/replay_pty.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python3 +""" +Эмулятор серийного порта: воспроизводит лог-файл в цикле через PTY. + +Использование: + python3 replay_pty.py my_picocom_logfile.txt + python3 replay_pty.py my_picocom_logfile.txt --pty /tmp/ttyVIRT0 + python3 replay_pty.py my_picocom_logfile.txt --speed 2.0 # в 2 раза быстрее реального + python3 replay_pty.py my_picocom_logfile.txt --speed 0 # максимально быстро + +Затем в другом терминале: + python -m rfg_adc_plotter.main /tmp/ttyVIRT0 +""" + +import argparse +import os +import sys +import time + + +def main(): + parser = argparse.ArgumentParser( + description="Воспроизводит лог-файл через PTY как виртуальный серийный порт." + ) + parser.add_argument("file", help="Путь к лог-файлу (например my_picocom_logfile.txt)") + parser.add_argument( + "--pty", + default="/tmp/ttyVIRT0", + help="Путь симлинка PTY (по умолчанию /tmp/ttyVIRT0)", + ) + parser.add_argument( + "--speed", + type=float, + default=1.0, + help=( + "Множитель скорости воспроизведения: " + "1.0 = реальное время при --baud, " + "2.0 = вдвое быстрее, " + "0 = максимально быстро" + ), + ) + parser.add_argument( + "--baud", + type=int, + default=115200, + help="Скорость (бод) для расчёта задержек (по умолчанию 115200)", + ) + args = parser.parse_args() + + if not os.path.isfile(args.file): + sys.stderr.write(f"[error] Файл не найден: {args.file}\n") + sys.exit(1) + + # Открываем PTY-пару: master (мы пишем) / slave (GUI читает) + master_fd, slave_fd = os.openpty() + slave_path = os.ttyname(slave_fd) + os.close(slave_fd) # GUI откроет slave сам по симлинку + + # Симлинк с удобным именем + try: + os.unlink(args.pty) + except FileNotFoundError: + pass + os.symlink(slave_path, args.pty) + + print(f"PTY slave : {slave_path}") + print(f"Симлинк : {args.pty} → {slave_path}") + print(f"Запустите : python -m rfg_adc_plotter.main {args.pty}") + print("Ctrl+C для остановки.\n") + + # Задержка на байт: 10 бит (8N1) / скорость / множитель + if args.speed > 0: + bytes_per_sec = args.baud / 10.0 * args.speed + delay_per_byte = 1.0 / bytes_per_sec + else: + delay_per_byte = 0.0 + + loop = 0 + try: + while True: + loop += 1 + print(f"[loop {loop}] {args.file}") + with open(args.file, "rb") as f: + for line in f: + os.write(master_fd, line) + if delay_per_byte > 0: + time.sleep(delay_per_byte * len(line)) + except KeyboardInterrupt: + print("\nОстановлено.") + finally: + try: + os.unlink(args.pty) + except Exception: + pass + try: + os.close(master_fd) + except Exception: + pass + + +if __name__ == "__main__": + main() diff --git a/rfg_adc_plotter/__init__.py b/rfg_adc_plotter/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/rfg_adc_plotter/constants.py b/rfg_adc_plotter/constants.py new file mode 100644 index 0000000..1ccfef8 --- /dev/null +++ b/rfg_adc_plotter/constants.py @@ -0,0 +1,13 @@ +WF_WIDTH = 1000 # максимальное число точек в ряду водопада +FFT_LEN = 2048 # длина БПФ для спектра/водопада спектров +# Порог для инверсии сырых данных: если среднее значение свипа ниже порога — +# считаем, что сигнал «меньше нуля» и домножаем свип на -1 +DATA_INVERSION_THRESHOLD = 10.0 + +# Параметры IFFT-спектра (временной профиль из спектра 3.2..14.3 ГГц) +# Двусторонний спектр формируется как: [нули -14.3..-3.2 | нули -3.2..+3.2 | данные +3.2..+14.3] +ZEROS_LOW = 758 # нули от -14.3 до -3.2 ГГц +ZEROS_MID = 437 # нули от -3.2 до +3.2 ГГц +SWEEP_LEN = 758 # ожидаемая длина свипа (3.2 → 14.3 ГГц) +FREQ_SPAN_GHZ = 28.6 # полная двусторонняя полоса (-14.3 .. +14.3 ГГц) +IFFT_LEN = ZEROS_LOW + ZEROS_MID + SWEEP_LEN # = 1953 diff --git a/rfg_adc_plotter/gui/__init__.py b/rfg_adc_plotter/gui/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/rfg_adc_plotter/gui/matplotlib_backend.py b/rfg_adc_plotter/gui/matplotlib_backend.py new file mode 100644 index 0000000..b768553 --- /dev/null +++ b/rfg_adc_plotter/gui/matplotlib_backend.py @@ -0,0 +1,353 @@ +"""Matplotlib-бэкенд реалтайм-плоттера свипов.""" + +import sys +import threading +from queue import Queue +from typing import Optional, Tuple + +import numpy as np + +from rfg_adc_plotter.constants import FFT_LEN, FREQ_SPAN_GHZ, IFFT_LEN + +_IFFT_T_MAX_NS = float((IFFT_LEN - 1) / (FREQ_SPAN_GHZ * 1e9) * 1e9) +from rfg_adc_plotter.io.sweep_reader import SweepReader +from rfg_adc_plotter.processing.normalizer import build_calib_envelopes +from rfg_adc_plotter.state.app_state import BACKGROUND_PATH, CALIB_ENVELOPE_PATH, AppState, format_status +from rfg_adc_plotter.state.ring_buffer import RingBuffer +from rfg_adc_plotter.types import SweepPacket + + +def _parse_ylim(ylim_str: Optional[str]) -> Optional[Tuple[float, float]]: + if not ylim_str: + return None + try: + y0, y1 = ylim_str.split(",") + return (float(y0), float(y1)) + except Exception: + sys.stderr.write("[warn] Некорректный формат --ylim, игнорирую. Ожидалось min,max\n") + return None + + +def _parse_spec_clip(spec: Optional[str]) -> Optional[Tuple[float, float]]: + if not spec: + return None + s = str(spec).strip().lower() + if s in ("off", "none", "no"): + return None + try: + p0, p1 = s.replace(";", ",").split(",") + low, high = float(p0), float(p1) + if not (0.0 <= low < high <= 100.0): + return None + return (low, high) + except Exception: + return None + + +def _visible_levels(data: np.ndarray, axis) -> Optional[Tuple[float, float]]: + """(vmin, vmax) по текущей видимой области imshow.""" + if data.size == 0: + return None + ny, nx = data.shape[0], data.shape[1] + try: + x0, x1 = axis.get_xlim() + y0, y1 = axis.get_ylim() + except Exception: + x0, x1 = 0.0, float(nx - 1) + y0, y1 = 0.0, float(ny - 1) + xmin, xmax = sorted((float(x0), float(x1))) + ymin, ymax = sorted((float(y0), float(y1))) + ix0 = max(0, min(nx - 1, int(np.floor(xmin)))) + ix1 = max(0, min(nx - 1, int(np.ceil(xmax)))) + iy0 = max(0, min(ny - 1, int(np.floor(ymin)))) + iy1 = max(0, min(ny - 1, int(np.ceil(ymax)))) + if ix1 < ix0: + ix1 = ix0 + if iy1 < iy0: + iy1 = iy0 + sub = data[iy0 : iy1 + 1, ix0 : ix1 + 1] + finite = np.isfinite(sub) + if not finite.any(): + return None + vals = sub[finite] + vmin = float(np.min(vals)) + vmax = float(np.max(vals)) + if not (np.isfinite(vmin) and np.isfinite(vmax)) or vmin == vmax: + return None + return (vmin, vmax) + + +def run_matplotlib(args): + try: + import matplotlib + import matplotlib.pyplot as plt + from matplotlib.animation import FuncAnimation + from matplotlib.widgets import CheckButtons, Slider + except Exception as e: + sys.stderr.write(f"[error] Нужны matplotlib и её зависимости: {e}\n") + sys.exit(1) + + q: Queue[SweepPacket] = Queue(maxsize=1000) + stop_event = threading.Event() + reader = SweepReader( + args.port, + args.baud, + q, + stop_event, + fancy=bool(args.fancy), + bin_mode=bool(getattr(args, "bin_mode", False)), + ) + reader.start() + + max_sweeps = int(max(10, args.max_sweeps)) + max_fps = max(1.0, float(args.max_fps)) + interval_ms = int(1000.0 / max_fps) + spec_clip = _parse_spec_clip(getattr(args, "spec_clip", None)) + spec_mean_sec = float(getattr(args, "spec_mean_sec", 0.0)) + fixed_ylim = _parse_ylim(getattr(args, "ylim", None)) + norm_type = str(getattr(args, "norm_type", "projector")).strip().lower() + + state = AppState(norm_type=norm_type) + ring = RingBuffer(max_sweeps) + + # --- Создание фигуры --- + fig, axs = plt.subplots(2, 2, figsize=(12, 8)) + (ax_line, ax_img), (ax_fft, ax_spec) = axs + if hasattr(fig.canvas.manager, "set_window_title"): + fig.canvas.manager.set_window_title(args.title) + fig.subplots_adjust(wspace=0.25, hspace=0.35, left=0.07, right=0.90, top=0.92, bottom=0.08) + + # Статусная строка + status_text = fig.text(0.01, 0.01, "", ha="left", va="bottom", fontsize=8, family="monospace") + + # График последнего свипа + line_obj, = ax_line.plot([], [], lw=1, color="tab:blue") + line_norm_obj, = ax_line.plot([], [], lw=1, color="tab:green") + line_env_lo, = ax_line.plot([], [], lw=1, color="tab:orange", linestyle="--", alpha=0.7) + line_env_hi, = ax_line.plot([], [], lw=1, color="tab:orange", linestyle="--", alpha=0.7) + ax_line.set_title("Сырые данные", pad=1) + ax_line.set_xlabel("Частота, ГГц") + channel_text = ax_line.text( + 0.98, 0.98, "", transform=ax_line.transAxes, + ha="right", va="top", fontsize=9, family="monospace", + ) + if fixed_ylim is not None: + ax_line.set_ylim(fixed_ylim) + + # График спектра + fft_line_obj, = ax_fft.plot([], [], lw=1) + ax_fft.set_title("FFT", pad=1) + ax_fft.set_xlabel("Время, нс") + ax_fft.set_ylabel("Мощность, дБ") + + # Водопад сырых данных + img_obj = ax_img.imshow( + np.zeros((1, 1), dtype=np.float32), + aspect="auto", interpolation="nearest", origin="lower", cmap=args.cmap, + ) + ax_img.set_title("Сырые данные", pad=12) + ax_img.set_ylabel("частота") + try: + ax_img.tick_params(axis="x", labelbottom=False) + except Exception: + pass + + # Водопад спектров + img_fft_obj = ax_spec.imshow( + np.zeros((1, 1), dtype=np.float32), + aspect="auto", interpolation="nearest", origin="lower", cmap=args.cmap, + ) + ax_spec.set_title("B-scan (дБ)", pad=12) + ax_spec.set_ylabel("Время, нс") + try: + ax_spec.tick_params(axis="x", labelbottom=False) + except Exception: + pass + + # Слайдеры и чекбокс + contrast_slider = None + try: + fft_bins = ring.fft_bins + ax_smin = fig.add_axes([0.92, 0.55, 0.02, 0.35]) + ax_smax = fig.add_axes([0.95, 0.55, 0.02, 0.35]) + ax_sctr = fig.add_axes([0.98, 0.55, 0.02, 0.35]) + ax_cb = fig.add_axes([0.92, 0.45, 0.08, 0.08]) + ax_cb_file = fig.add_axes([0.92, 0.36, 0.08, 0.08]) + ymin_slider = Slider(ax_smin, "Y min", 0, max(1, fft_bins - 1), valinit=0, valstep=1, orientation="vertical") + ymax_slider = Slider(ax_smax, "Y max", 0, max(1, fft_bins - 1), valinit=max(1, fft_bins - 1), valstep=1, orientation="vertical") + contrast_slider = Slider(ax_sctr, "Int max", 0, 100, valinit=100, valstep=1, orientation="vertical") + calib_cb = CheckButtons(ax_cb, ["калибровка"], [False]) + calib_file_cb = CheckButtons(ax_cb_file, ["из файла"], [False]) + + import os as _os + if not _os.path.isfile(CALIB_ENVELOPE_PATH): + ax_cb_file.set_visible(False) + + def _on_ylim_change(_val): + try: + y0 = int(min(ymin_slider.val, ymax_slider.val)) + y1 = int(max(ymin_slider.val, ymax_slider.val)) + ax_spec.set_ylim(y0, y1) + fig.canvas.draw_idle() + except Exception: + pass + + def _on_calib_file_clicked(_v): + use_file = bool(calib_file_cb.get_status()[0]) + if use_file: + ok = state.load_calib_envelope(CALIB_ENVELOPE_PATH) + if ok: + state.set_calib_mode("file") + else: + calib_file_cb.set_active(0) # снять галочку + else: + state.set_calib_mode("live") + state.set_calib_enabled(bool(calib_cb.get_status()[0])) + + def _on_calib_clicked(_v): + import os as _os2 + if _os2.path.isfile(CALIB_ENVELOPE_PATH): + ax_cb_file.set_visible(True) + state.set_calib_enabled(bool(calib_cb.get_status()[0])) + fig.canvas.draw_idle() + + ax_btn_bg = fig.add_axes([0.92, 0.27, 0.08, 0.05]) + ax_cb_bg = fig.add_axes([0.92, 0.20, 0.08, 0.06]) + from matplotlib.widgets import Button as MplButton + save_bg_btn = MplButton(ax_btn_bg, "Сохр. фон") + bg_cb = CheckButtons(ax_cb_bg, ["вычет фона"], [False]) + + def _on_save_bg(_event): + ok = state.save_background() + if ok: + state.load_background() + fig.canvas.draw_idle() + + def _on_bg_clicked(_v): + state.set_background_enabled(bool(bg_cb.get_status()[0])) + + save_bg_btn.on_clicked(_on_save_bg) + bg_cb.on_clicked(_on_bg_clicked) + + ymin_slider.on_changed(_on_ylim_change) + ymax_slider.on_changed(_on_ylim_change) + contrast_slider.on_changed(lambda _v: fig.canvas.draw_idle()) + calib_cb.on_clicked(_on_calib_clicked) + calib_file_cb.on_clicked(_on_calib_file_clicked) + except Exception: + calib_cb = None + + FREQ_MIN = 3.323 + FREQ_MAX = 14.323 + + # --- Инициализация imshow при первом свипе --- + def _init_imshow_extents(): + w = ring.width + ms = ring.max_sweeps + fb = ring.fft_bins + img_obj.set_data(np.zeros((w, ms), dtype=np.float32)) + img_obj.set_extent((0, ms - 1, FREQ_MIN, FREQ_MAX)) + ax_img.set_xlim(0, ms - 1) + ax_img.set_ylim(FREQ_MIN, FREQ_MAX) + img_fft_obj.set_data(np.zeros((fb, ms), dtype=np.float32)) + img_fft_obj.set_extent((0, ms - 1, 0.0, _IFFT_T_MAX_NS)) + ax_spec.set_xlim(0, ms - 1) + ax_spec.set_ylim(0.0, _IFFT_T_MAX_NS) + ax_fft.set_xlim(0.0, _IFFT_T_MAX_NS) + + _imshow_initialized = [False] + + def update(_frame): + changed = state.drain_queue(q, ring) > 0 + + if changed and not _imshow_initialized[0] and ring.is_ready: + _init_imshow_extents() + _imshow_initialized[0] = True + + # Линейный график свипа + if state.current_sweep_raw is not None: + raw = state.current_sweep_raw + if ring.x_shared is not None and raw.size <= ring.x_shared.size: + xs = ring.x_shared[: raw.size] + else: + xs = np.arange(raw.size, dtype=np.int32) + def _norm_to_max(data): + m = float(np.nanmax(np.abs(data))) + return data / m if m > 0.0 else data + line_obj.set_data(xs, _norm_to_max(raw)) + if state.calib_mode == "file" and state.calib_file_envelope is not None: + upper = state.calib_file_envelope + lower = -upper + m_env = float(np.nanmax(np.abs(upper))) + if m_env <= 0.0: + m_env = 1.0 + line_env_lo.set_data(xs[: upper.size], lower / m_env) + line_env_hi.set_data(xs[: upper.size], upper / m_env) + elif state.last_calib_sweep is not None: + calib = state.last_calib_sweep + m_calib = float(np.nanmax(np.abs(calib))) + if m_calib <= 0.0: + m_calib = 1.0 + lower, upper = build_calib_envelopes(calib) + line_env_lo.set_data(xs[: calib.size], lower / m_calib) + line_env_hi.set_data(xs[: calib.size], upper / m_calib) + else: + line_env_lo.set_data([], []) + line_env_hi.set_data([], []) + if state.current_sweep_norm is not None: + line_norm_obj.set_data(xs[: state.current_sweep_norm.size], _norm_to_max(state.current_sweep_norm)) + else: + line_norm_obj.set_data([], []) + ax_line.set_xlim(FREQ_MIN, FREQ_MAX) + if fixed_ylim is None: + ax_line.set_ylim(-1.05, 1.05) + ax_line.set_ylabel("/ max") + + # Спектр — используем уже вычисленный в ring IFFT (временной профиль) + if ring.last_fft_vals is not None and ring.fft_time_axis is not None: + fft_vals = ring.last_fft_vals + xs_fft = ring.fft_time_axis + n = min(fft_vals.size, xs_fft.size) + fft_line_obj.set_data(xs_fft[:n], fft_vals[:n]) + if np.isfinite(np.nanmin(fft_vals)) and np.isfinite(np.nanmax(fft_vals)): + ax_fft.set_xlim(0, float(xs_fft[n - 1])) + ax_fft.set_ylim(float(np.nanmin(fft_vals)), float(np.nanmax(fft_vals))) + + # Водопад сырых данных + if changed and ring.is_ready: + disp = ring.get_display_ring() + if ring.x_shared is not None: + n = ring.x_shared.size + disp = disp[:n, :] + img_obj.set_data(disp) + img_obj.set_extent((0, ring.max_sweeps - 1, FREQ_MIN, FREQ_MAX)) + ax_img.set_ylim(FREQ_MIN, FREQ_MAX) + levels = _visible_levels(disp, ax_img) + if levels is not None: + img_obj.set_clim(vmin=levels[0], vmax=levels[1]) + + # Водопад спектров + if changed and ring.is_ready: + disp_fft = ring.get_display_ring_fft() + disp_fft = ring.subtract_recent_mean_fft(disp_fft, spec_mean_sec) + img_fft_obj.set_data(disp_fft) + levels = ring.compute_fft_levels(disp_fft, spec_clip) + if levels is not None: + try: + c = float(contrast_slider.val) / 100.0 if contrast_slider is not None else 1.0 + except Exception: + c = 1.0 + vmax_eff = levels[0] + c * (levels[1] - levels[0]) + img_fft_obj.set_clim(vmin=levels[0], vmax=vmax_eff) + + # Статус и подпись канала + if changed and state.current_info: + status_text.set_text(format_status(state.current_info)) + channel_text.set_text(state.format_channel_label()) + + return (line_obj, line_norm_obj, line_env_lo, line_env_hi, img_obj, fft_line_obj, img_fft_obj, status_text, channel_text) + + ani = FuncAnimation(fig, update, interval=interval_ms, blit=False) + plt.show() + stop_event.set() + reader.join(timeout=1.0) diff --git a/rfg_adc_plotter/gui/pyqtgraph_backend.py b/rfg_adc_plotter/gui/pyqtgraph_backend.py new file mode 100644 index 0000000..a9069b5 --- /dev/null +++ b/rfg_adc_plotter/gui/pyqtgraph_backend.py @@ -0,0 +1,385 @@ +"""PyQtGraph-бэкенд реалтайм-плоттера свипов.""" + +import sys +import threading +from queue import Queue +from typing import Optional, Tuple + +import numpy as np + +from rfg_adc_plotter.constants import FREQ_SPAN_GHZ, IFFT_LEN +from rfg_adc_plotter.io.sweep_reader import SweepReader +from rfg_adc_plotter.processing.normalizer import build_calib_envelopes +from rfg_adc_plotter.state.app_state import BACKGROUND_PATH, CALIB_ENVELOPE_PATH, AppState, format_status +from rfg_adc_plotter.state.ring_buffer import RingBuffer +from rfg_adc_plotter.types import SweepPacket + +# Максимальное значение временной оси IFFT в нс +_IFFT_T_MAX_NS = float((IFFT_LEN - 1) / (FREQ_SPAN_GHZ * 1e9) * 1e9) + + +def _parse_ylim(ylim_str: Optional[str]) -> Optional[Tuple[float, float]]: + if not ylim_str: + return None + try: + y0, y1 = ylim_str.split(",") + return (float(y0), float(y1)) + except Exception: + return None + + +def _parse_spec_clip(spec: Optional[str]) -> Optional[Tuple[float, float]]: + if not spec: + return None + s = str(spec).strip().lower() + if s in ("off", "none", "no"): + return None + try: + p0, p1 = s.replace(";", ",").split(",") + low, high = float(p0), float(p1) + if not (0.0 <= low < high <= 100.0): + return None + return (low, high) + except Exception: + return None + + +def _visible_levels( + data: np.ndarray, + plot_item, + freq_min: Optional[float] = None, + freq_max: Optional[float] = None, +) -> Optional[Tuple[float, float]]: + """(vmin, vmax) по текущей видимой области ImageItem. + + Если freq_min/freq_max заданы, ось Y трактуется как частота [freq_min..freq_max] + и пересчитывается в индексы строк данных. + """ + if data.size == 0: + return None + ny, nx = data.shape[0], data.shape[1] + try: + (x0, x1), (y0, y1) = plot_item.viewRange() + except Exception: + x0, x1 = 0.0, float(nx - 1) + y0, y1 = 0.0, float(ny - 1) + xmin, xmax = sorted((float(x0), float(x1))) + ymin, ymax = sorted((float(y0), float(y1))) + ix0 = max(0, min(nx - 1, int(np.floor(xmin)))) + ix1 = max(0, min(nx - 1, int(np.ceil(xmax)))) + if freq_min is not None and freq_max is not None and freq_max > freq_min: + span = freq_max - freq_min + iy0 = max(0, min(ny - 1, int(np.floor((ymin - freq_min) / span * ny)))) + iy1 = max(0, min(ny - 1, int(np.ceil((ymax - freq_min) / span * ny)))) + else: + iy0 = max(0, min(ny - 1, int(np.floor(ymin)))) + iy1 = max(0, min(ny - 1, int(np.ceil(ymax)))) + if ix1 < ix0: + ix1 = ix0 + if iy1 < iy0: + iy1 = iy0 + sub = data[iy0 : iy1 + 1, ix0 : ix1 + 1] + finite = np.isfinite(sub) + if not finite.any(): + return None + vals = sub[finite] + vmin = float(np.min(vals)) + vmax = float(np.max(vals)) + if not (np.isfinite(vmin) and np.isfinite(vmax)) or vmin == vmax: + return None + return (vmin, vmax) + + +def run_pyqtgraph(args): + """Быстрый GUI на PyQtGraph. Требует pyqtgraph и PyQt5/PySide6.""" + try: + import pyqtgraph as pg + from PyQt5 import QtCore, QtWidgets # noqa: F401 + except Exception: + try: + import pyqtgraph as pg + from PySide6 import QtCore, QtWidgets # noqa: F401 + except Exception as e: + raise RuntimeError( + "pyqtgraph/PyQt5(PySide6) не найдены. Установите: pip install pyqtgraph PyQt5" + ) from e + + q: Queue[SweepPacket] = Queue(maxsize=1000) + stop_event = threading.Event() + reader = SweepReader( + args.port, + args.baud, + q, + stop_event, + fancy=bool(args.fancy), + bin_mode=bool(getattr(args, "bin_mode", False)), + ) + reader.start() + + max_sweeps = int(max(10, args.max_sweeps)) + max_fps = max(1.0, float(args.max_fps)) + interval_ms = int(1000.0 / max_fps) + spec_clip = _parse_spec_clip(getattr(args, "spec_clip", None)) + spec_mean_sec = float(getattr(args, "spec_mean_sec", 0.0)) + fixed_ylim = _parse_ylim(getattr(args, "ylim", None)) + norm_type = str(getattr(args, "norm_type", "projector")).strip().lower() + + state = AppState(norm_type=norm_type) + ring = RingBuffer(max_sweeps) + + # --- Создание окна --- + pg.setConfigOptions(useOpenGL=True, antialias=False) + app = pg.mkQApp(args.title) + win = pg.GraphicsLayoutWidget(show=True, title=args.title) + win.resize(1200, 600) + + # График последнего свипа (слева-сверху) + p_line = win.addPlot(row=0, col=0, title="Сырые данные") + p_line.showGrid(x=True, y=True, alpha=0.3) + curve = p_line.plot(pen=pg.mkPen((80, 120, 255), width=1)) + curve_norm = p_line.plot(pen=pg.mkPen((60, 180, 90), width=1)) + curve_env_lo = p_line.plot(pen=pg.mkPen((255, 165, 0), width=1, style=QtCore.Qt.DashLine)) + curve_env_hi = p_line.plot(pen=pg.mkPen((255, 165, 0), width=1, style=QtCore.Qt.DashLine)) + p_line.setLabel("bottom", "Частота, ГГц") + p_line.setLabel("left", "Y") + p_line.setXRange(3.323, 14.323, padding=0) + p_line.enableAutoRange(axis="x", enable=False) + ch_text = pg.TextItem("", anchor=(1, 1)) + ch_text.setZValue(10) + p_line.addItem(ch_text) + if fixed_ylim is not None: + p_line.setYRange(fixed_ylim[0], fixed_ylim[1], padding=0) + + # Водопад (справа-сверху) + p_img = win.addPlot(row=0, col=1, title="Сырые данные водопад") + p_img.invertY(False) + p_img.showGrid(x=False, y=False) + p_img.setLabel("bottom", "Время (новое справа)") + try: + p_img.getAxis("bottom").setStyle(showValues=False) + except Exception: + pass + p_img.setLabel("left", "Частота, ГГц") + p_img.enableAutoRange(enable=False) + img = pg.ImageItem() + p_img.addItem(img) + + # Применяем LUT из цветовой карты + try: + cm = pg.colormap.get(args.cmap) + img.setLookupTable(cm.getLookupTable(0.0, 1.0, 256)) + except Exception: + pass + + # FFT (слева-снизу) + p_fft = win.addPlot(row=1, col=0, title="FFT") + p_fft.showGrid(x=True, y=True, alpha=0.3) + curve_fft = p_fft.plot(pen=pg.mkPen((255, 120, 80), width=1)) + p_fft.setLabel("bottom", "Время, нс") + p_fft.setLabel("left", "Мощность, дБ") + + # Водопад спектров (справа-снизу) + p_spec = win.addPlot(row=1, col=1, title="B-scan (дБ)") + p_spec.invertY(True) + p_spec.showGrid(x=False, y=False) + p_spec.setLabel("bottom", "Время (новое справа)") + try: + p_spec.getAxis("bottom").setStyle(showValues=False) + except Exception: + pass + p_spec.setLabel("left", "Время, нс") + img_fft = pg.ImageItem() + p_spec.addItem(img_fft) + + # Чекбоксы калибровки — в одном контейнере + calib_widget = QtWidgets.QWidget() + calib_layout = QtWidgets.QHBoxLayout(calib_widget) + calib_layout.setContentsMargins(2, 2, 2, 2) + calib_layout.setSpacing(8) + + calib_cb = QtWidgets.QCheckBox("калибровка") + calib_file_cb = QtWidgets.QCheckBox("из файла") + calib_file_cb.setEnabled(False) # активируется только если файл существует + + calib_layout.addWidget(calib_cb) + calib_layout.addWidget(calib_file_cb) + + cb_container_proxy = QtWidgets.QGraphicsProxyWidget() + cb_container_proxy.setWidget(calib_widget) + win.addItem(cb_container_proxy, row=2, col=1) + + def _check_file_cb_available(): + import os + calib_file_cb.setEnabled(os.path.isfile(CALIB_ENVELOPE_PATH)) + + _check_file_cb_available() + + def _on_calib_file_toggled(checked): + if checked: + ok = state.load_calib_envelope(CALIB_ENVELOPE_PATH) + if ok: + state.set_calib_mode("file") + else: + calib_file_cb.setChecked(False) + else: + state.set_calib_mode("live") + state.set_calib_enabled(calib_cb.isChecked()) + + def _on_calib_toggled(_v): + _check_file_cb_available() + state.set_calib_enabled(calib_cb.isChecked()) + + calib_cb.stateChanged.connect(_on_calib_toggled) + calib_file_cb.stateChanged.connect(lambda _v: _on_calib_file_toggled(calib_file_cb.isChecked())) + + # Кнопка сохранения фона + чекбокс вычета фона + bg_widget = QtWidgets.QWidget() + bg_layout = QtWidgets.QHBoxLayout(bg_widget) + bg_layout.setContentsMargins(2, 2, 2, 2) + bg_layout.setSpacing(8) + + save_bg_btn = QtWidgets.QPushButton("Сохр. фон") + bg_cb = QtWidgets.QCheckBox("вычет фона") + bg_cb.setEnabled(False) + + bg_layout.addWidget(save_bg_btn) + bg_layout.addWidget(bg_cb) + + bg_container_proxy = QtWidgets.QGraphicsProxyWidget() + bg_container_proxy.setWidget(bg_widget) + win.addItem(bg_container_proxy, row=2, col=0) + + def _on_save_bg(): + ok = state.save_background() + if ok: + state.load_background() + bg_cb.setEnabled(True) + + save_bg_btn.clicked.connect(_on_save_bg) + bg_cb.stateChanged.connect(lambda _v: state.set_background_enabled(bg_cb.isChecked())) + + # Статусная строка + status = pg.LabelItem(justify="left") + win.addItem(status, row=3, col=0, colspan=2) + + _imshow_initialized = [False] + + FREQ_MIN = 3.323 + FREQ_MAX = 14.323 + + def _init_imshow_extents(): + ms = ring.max_sweeps + fb = ring.fft_bins + img.setImage(ring.ring.T, autoLevels=False) + img.setRect(pg.QtCore.QRectF(0.0, FREQ_MIN, float(ms), FREQ_MAX - FREQ_MIN)) + p_img.setRange(xRange=(0, ms - 1), yRange=(FREQ_MIN, FREQ_MAX), padding=0) + p_line.setXRange(FREQ_MIN, FREQ_MAX, padding=0) + img_fft.setImage(ring.ring_fft.T, autoLevels=False) + img_fft.setRect(pg.QtCore.QRectF(0.0, 0.0, float(ms), _IFFT_T_MAX_NS)) + p_spec.setRange(xRange=(0, ms - 1), yRange=(0.0, _IFFT_T_MAX_NS), padding=0) + p_fft.setXRange(0.0, _IFFT_T_MAX_NS, padding=0) + + def _img_rect(ms: int) -> "pg.QtCore.QRectF": + return pg.QtCore.QRectF(0.0, FREQ_MIN, float(ms), FREQ_MAX - FREQ_MIN) + + def update(): + changed = state.drain_queue(q, ring) > 0 + + if changed and not _imshow_initialized[0] and ring.is_ready: + _init_imshow_extents() + _imshow_initialized[0] = True + + # Линейный график свипа + if state.current_sweep_raw is not None and ring.x_shared is not None: + raw = state.current_sweep_raw + xs = ring.x_shared[: raw.size] if raw.size <= ring.x_shared.size else np.arange(raw.size) + def _norm_to_max(data): + m = float(np.nanmax(np.abs(data))) + return data / m if m > 0.0 else data + curve.setData(xs, _norm_to_max(raw), autoDownsample=True) + if state.calib_mode == "file" and state.calib_file_envelope is not None: + upper = state.calib_file_envelope + lower = -upper + m_env = float(np.nanmax(np.abs(upper))) + if m_env <= 0.0: + m_env = 1.0 + curve_env_lo.setData(xs[: upper.size], lower / m_env, autoDownsample=True) + curve_env_hi.setData(xs[: upper.size], upper / m_env, autoDownsample=True) + elif state.last_calib_sweep is not None: + calib = state.last_calib_sweep + m_calib = float(np.nanmax(np.abs(calib))) + if m_calib <= 0.0: + m_calib = 1.0 + lower, upper = build_calib_envelopes(calib) + curve_env_lo.setData(xs[: calib.size], lower / m_calib, autoDownsample=True) + curve_env_hi.setData(xs[: calib.size], upper / m_calib, autoDownsample=True) + else: + curve_env_lo.setData([], []) + curve_env_hi.setData([], []) + if state.current_sweep_norm is not None: + curve_norm.setData(xs[: state.current_sweep_norm.size], _norm_to_max(state.current_sweep_norm), autoDownsample=True) + else: + curve_norm.setData([], []) + if fixed_ylim is None: + p_line.setYRange(-1.05, 1.05, padding=0) + p_line.setLabel("left", "/ max") + + # Спектр — используем уже вычисленный в ring IFFT (временной профиль) + if ring.last_fft_vals is not None and ring.fft_time_axis is not None: + fft_vals = ring.last_fft_vals + xs_fft = ring.fft_time_axis + n = min(fft_vals.size, xs_fft.size) + curve_fft.setData(xs_fft[:n], fft_vals[:n]) + p_fft.setYRange(float(np.nanmin(fft_vals)), float(np.nanmax(fft_vals)), padding=0) + + # Позиция подписи канала + try: + (x0, x1), (y0, y1) = p_line.viewRange() + dx = 0.01 * max(1.0, float(x1 - x0)) + dy = 0.01 * max(1.0, float(y1 - y0)) + ch_text.setPos(float(x1 - dx), float(y1 - dy)) + except Exception: + pass + + # Водопад сырых данных — новые данные справа (без реверса) + if changed and ring.is_ready: + disp = ring.get_display_ring() # (width, time), новые справа + levels = _visible_levels(disp, p_img, FREQ_MIN, FREQ_MAX) + if levels is not None: + img.setImage(disp, autoLevels=False, levels=levels) + else: + img.setImage(disp, autoLevels=False) + img.setRect(_img_rect(ring.max_sweeps)) + + # Статус и подпись канала + if changed and state.current_info: + try: + status.setText(format_status(state.current_info)) + except Exception: + pass + ch_text.setText(state.format_channel_label()) + + # Водопад спектров — новые данные справа (без реверса) + if changed and ring.is_ready: + disp_fft = ring.get_display_ring_fft() # (bins, time), новые справа + disp_fft = ring.subtract_recent_mean_fft(disp_fft, spec_mean_sec) + levels = ring.compute_fft_levels(disp_fft, spec_clip) + if levels is not None: + img_fft.setImage(disp_fft, autoLevels=False, levels=levels) + else: + img_fft.setImage(disp_fft, autoLevels=False) + img_fft.setRect(pg.QtCore.QRectF(0.0, 0.0, float(ring.max_sweeps), _IFFT_T_MAX_NS)) + + timer = pg.QtCore.QTimer() + timer.timeout.connect(update) + timer.start(interval_ms) + + def on_quit(): + stop_event.set() + reader.join(timeout=1.0) + + app.aboutToQuit.connect(on_quit) + win.show() + exec_fn = getattr(app, "exec_", None) or getattr(app, "exec", None) + exec_fn() + on_quit() diff --git a/rfg_adc_plotter/io/__init__.py b/rfg_adc_plotter/io/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/rfg_adc_plotter/io/serial_source.py b/rfg_adc_plotter/io/serial_source.py new file mode 100644 index 0000000..31a2ba4 --- /dev/null +++ b/rfg_adc_plotter/io/serial_source.py @@ -0,0 +1,181 @@ +"""Источники последовательного ввода: обёртки над pyserial и raw TTY.""" + +import io +import os +import sys +from typing import Optional + + +def try_open_pyserial(path: str, baud: int, timeout: float): + try: + import serial # type: ignore + except Exception: + return None + try: + ser = serial.Serial(path, baudrate=baud, timeout=timeout) + return ser + except Exception: + return None + + +class FDReader: + """Простой враппер чтения строк из файлового дескриптора TTY.""" + + def __init__(self, fd: int): + self._fd = fd + raw = os.fdopen(fd, "rb", closefd=False) + self._file = raw + self._buf = io.BufferedReader(raw, buffer_size=65536) + + def fileno(self) -> int: + return self._fd + + def readline(self) -> bytes: + return self._buf.readline() + + def close(self): + try: + self._buf.close() + except Exception: + pass + + +def open_raw_tty(path: str, baud: int) -> Optional[FDReader]: + """Открыть TTY без pyserial и настроить порт через termios. + + Возвращает FDReader или None при ошибке. + """ + try: + import termios + import tty + except Exception: + return None + + try: + fd = os.open(path, os.O_RDONLY | os.O_NOCTTY) + except Exception: + return None + + try: + attrs = termios.tcgetattr(fd) + tty.setraw(fd) + + baud_map = { + 9600: termios.B9600, + 19200: termios.B19200, + 38400: termios.B38400, + 57600: termios.B57600, + 115200: termios.B115200, + 230400: getattr(termios, "B230400", None), + 460800: getattr(termios, "B460800", None), + } + b = baud_map.get(baud) or termios.B115200 + + attrs[4] = b # ispeed + attrs[5] = b # ospeed + + # VMIN=1, VTIME=0 — блокирующее чтение по байту + cc = attrs[6] + cc[termios.VMIN] = 1 + cc[termios.VTIME] = 0 + attrs[6] = cc + + termios.tcsetattr(fd, termios.TCSANOW, attrs) + except Exception: + try: + os.close(fd) + except Exception: + pass + return None + + return FDReader(fd) + + +class SerialLineSource: + """Единый интерфейс для чтения строк из порта (pyserial или raw TTY).""" + + def __init__(self, path: str, baud: int, timeout: float = 1.0): + self._pyserial = try_open_pyserial(path, baud, timeout) + self._fdreader = None + self._using = "pyserial" if self._pyserial is not None else "raw" + if self._pyserial is None: + self._fdreader = open_raw_tty(path, baud) + if self._fdreader is None: + msg = f"Не удалось открыть порт '{path}' (pyserial и raw TTY не сработали)" + if sys.platform.startswith("win"): + msg += ". На Windows нужен pyserial: pip install pyserial" + raise RuntimeError(msg) + + def readline(self) -> bytes: + if self._pyserial is not None: + try: + return self._pyserial.readline() + except Exception: + return b"" + else: + try: + return self._fdreader.readline() # type: ignore[union-attr] + except Exception: + return b"" + + def close(self): + try: + if self._pyserial is not None: + self._pyserial.close() + elif self._fdreader is not None: + self._fdreader.close() + except Exception: + pass + + +class SerialChunkReader: + """Быстрое неблокирующее чтение чанков из serial/raw TTY для максимального дренажа буфера.""" + + def __init__(self, src: SerialLineSource): + self._src = src + self._ser = src._pyserial + self._fd: Optional[int] = None + if self._ser is not None: + try: + self._ser.timeout = 0 + except Exception: + pass + else: + try: + self._fd = src._fdreader.fileno() # type: ignore[union-attr] + try: + os.set_blocking(self._fd, False) + except Exception: + pass + except Exception: + self._fd = None + + def read_available(self) -> bytes: + """Вернёт доступные байты (b"" если данных нет).""" + if self._ser is not None: + try: + n = int(getattr(self._ser, "in_waiting", 0)) + except Exception: + n = 0 + if n > 0: + try: + return self._ser.read(n) + except Exception: + return b"" + return b"" + if self._fd is None: + return b"" + out = bytearray() + while True: + try: + chunk = os.read(self._fd, 65536) + if not chunk: + break + out += chunk + if len(chunk) < 65536: + break + except BlockingIOError: + break + except Exception: + break + return bytes(out) diff --git a/rfg_adc_plotter/io/sweep_reader.py b/rfg_adc_plotter/io/sweep_reader.py new file mode 100644 index 0000000..705983c --- /dev/null +++ b/rfg_adc_plotter/io/sweep_reader.py @@ -0,0 +1,306 @@ +"""Фоновый поток чтения и парсинга свипов из последовательного порта.""" + +import sys +import threading +import time +from collections import deque +from queue import Full, Queue +from typing import Optional + +import numpy as np + +from rfg_adc_plotter.constants import DATA_INVERSION_THRESHOLD +from rfg_adc_plotter.io.serial_source import SerialChunkReader, SerialLineSource +from rfg_adc_plotter.types import SweepInfo, SweepPacket + + +class SweepReader(threading.Thread): + """Фоновый поток: читает строки, формирует завершённые свипы и кладёт в очередь.""" + + def __init__( + self, + port_path: str, + baud: int, + out_queue: "Queue[SweepPacket]", + stop_event: threading.Event, + fancy: bool = False, + bin_mode: bool = False, + ): + super().__init__(daemon=True) + self._port_path = port_path + self._baud = baud + self._q = out_queue + self._stop = stop_event + self._src: Optional[SerialLineSource] = None + self._fancy = bool(fancy) + self._bin_mode = bool(bin_mode) + self._max_width: int = 0 + self._sweep_idx: int = 0 + self._last_sweep_ts: Optional[float] = None + self._n_valid_hist = deque() + + @staticmethod + def _u32_to_i32(v: int) -> int: + """Преобразование 32-bit слова в знаковое значение.""" + return v - 0x1_0000_0000 if (v & 0x8000_0000) else v + + def _finalize_current(self, xs, ys, channels: Optional[set]): + if not xs: + return + ch_list = sorted(channels) if channels else [0] + ch_primary = ch_list[0] if ch_list else 0 + max_x = max(xs) + width = max_x + 1 + self._max_width = max(self._max_width, width) + target_width = self._max_width if self._fancy else width + + sweep = np.full((target_width,), np.nan, dtype=np.float32) + try: + idx = np.asarray(xs, dtype=np.int64) + vals = np.asarray(ys, dtype=np.float32) + sweep[idx] = vals + except Exception: + for x, y in zip(xs, ys): + if 0 <= x < target_width: + sweep[x] = float(y) + + finite_pre = np.isfinite(sweep) + n_valid_cur = int(np.count_nonzero(finite_pre)) + + if self._fancy: + try: + known = ~np.isnan(sweep) + if np.any(known): + known_idx = np.nonzero(known)[0] + for i0, i1 in zip(known_idx[:-1], known_idx[1:]): + if i1 - i0 > 1: + avg = (sweep[i0] + sweep[i1]) * 0.5 + sweep[i0 + 1 : i1] = avg + first_idx = int(known_idx[0]) + last_idx = int(known_idx[-1]) + if first_idx > 0: + sweep[:first_idx] = sweep[first_idx] + if last_idx < sweep.size - 1: + sweep[last_idx + 1 :] = sweep[last_idx] + except Exception: + pass + + try: + m = float(np.nanmean(sweep)) + if np.isfinite(m) and m < DATA_INVERSION_THRESHOLD: + sweep *= -1.0 + except Exception: + pass + + self._sweep_idx += 1 + if len(ch_list) > 1: + sys.stderr.write( + f"[warn] Sweep {self._sweep_idx}: изменялся номер канала: {ch_list}\n" + ) + now = time.time() + if self._last_sweep_ts is None: + dt_ms = float("nan") + else: + dt_ms = (now - self._last_sweep_ts) * 1000.0 + self._last_sweep_ts = now + self._n_valid_hist.append((now, n_valid_cur)) + while self._n_valid_hist and (now - self._n_valid_hist[0][0]) > 1.0: + self._n_valid_hist.popleft() + if self._n_valid_hist: + n_valid = float(sum(v for _t, v in self._n_valid_hist) / len(self._n_valid_hist)) + else: + n_valid = float(n_valid_cur) + + if n_valid_cur > 0: + vmin = float(np.nanmin(sweep)) + vmax = float(np.nanmax(sweep)) + mean = float(np.nanmean(sweep)) + std = float(np.nanstd(sweep)) + else: + vmin = vmax = mean = std = float("nan") + info: SweepInfo = { + "sweep": self._sweep_idx, + "ch": ch_primary, + "chs": ch_list, + "n_valid": n_valid, + "min": vmin, + "max": vmax, + "mean": mean, + "std": std, + "dt_ms": dt_ms, + } + + try: + self._q.put_nowait((sweep, info)) + except Full: + try: + _ = self._q.get_nowait() + except Exception: + pass + try: + self._q.put_nowait((sweep, info)) + except Exception: + pass + + def _run_ascii_stream(self, chunk_reader: SerialChunkReader): + xs: list[int] = [] + ys: list[int] = [] + cur_channel: Optional[int] = None + cur_channels: set[int] = set() + + buf = bytearray() + while not self._stop.is_set(): + data = chunk_reader.read_available() + if data: + buf += data + else: + time.sleep(0.0005) + continue + + while True: + nl = buf.find(b"\n") + if nl == -1: + break + line = bytes(buf[:nl]) + del buf[: nl + 1] + if line.endswith(b"\r"): + line = line[:-1] + if not line: + continue + + if line.startswith(b"Sweep_start"): + self._finalize_current(xs, ys, cur_channels) + xs.clear() + ys.clear() + cur_channel = None + cur_channels.clear() + continue + + if len(line) >= 3: + parts = line.split() + if len(parts) >= 3 and (parts[0].lower() == b"s" or parts[0].lower().startswith(b"s")): + try: + if parts[0].lower() == b"s": + if len(parts) >= 4: + ch = int(parts[1], 10) + x = int(parts[2], 10) + y = int(parts[3], 10) + else: + ch = 0 + x = int(parts[1], 10) + y = int(parts[2], 10) + else: + ch = int(parts[0][1:], 10) + x = int(parts[1], 10) + y = int(parts[2], 10) + except Exception: + continue + if cur_channel is None: + cur_channel = ch + cur_channels.add(ch) + xs.append(x) + ys.append(y) + + if len(buf) > 1_000_000: + del buf[:-262144] + + self._finalize_current(xs, ys, cur_channels) + + def _run_binary_stream(self, chunk_reader: SerialChunkReader): + xs: list[int] = [] + ys: list[int] = [] + cur_channel: Optional[int] = None + cur_channels: set[int] = set() + words = deque() + + buf = bytearray() + while not self._stop.is_set(): + data = chunk_reader.read_available() + if data: + buf += data + else: + time.sleep(0.0005) + continue + + usable = len(buf) & ~1 + if usable == 0: + continue + + i = 0 + while i < usable: + w = int(buf[i]) | (int(buf[i + 1]) << 8) + words.append(w) + i += 2 + + # Бинарный протокол: + # старт свипа (актуальный): 0xFFFF, 0xFFFF, 0xFFFF, (ch<<8)|0x0A + # старт свипа (legacy): 0xFFFF, 0xFFFF, channel, 0x0A0A + # точка: step, value_hi, value_lo, 0x000A + while len(words) >= 4: + w0 = int(words[0]) + w1 = int(words[1]) + w2 = int(words[2]) + w3 = int(words[3]) + + if w0 == 0xFFFF and w1 == 0xFFFF and w2 == 0xFFFF and (w3 & 0x00FF) == 0x000A: + self._finalize_current(xs, ys, cur_channels) + xs.clear() + ys.clear() + cur_channels.clear() + cur_channel = (w3 >> 8) & 0x00FF + cur_channels.add(cur_channel) + for _ in range(4): + words.popleft() + continue + + if w0 == 0xFFFF and w1 == 0xFFFF and w3 == 0x0A0A: + self._finalize_current(xs, ys, cur_channels) + xs.clear() + ys.clear() + cur_channels.clear() + cur_channel = w2 + cur_channels.add(cur_channel) + for _ in range(4): + words.popleft() + continue + + if w3 == 0x000A: + if cur_channel is not None: + cur_channels.add(cur_channel) + xs.append(w0) + value_u32 = (w1 << 16) | w2 + ys.append(self._u32_to_i32(value_u32)) + for _ in range(4): + words.popleft() + continue + + # Поток может начаться с середины пакета; сдвигаемся по слову до ресинхронизации. + words.popleft() + + del buf[:usable] + if len(buf) > 1_000_000: + del buf[:-262144] + + self._finalize_current(xs, ys, cur_channels) + + def run(self): + + try: + self._src = SerialLineSource(self._port_path, self._baud, timeout=1.0) + sys.stderr.write(f"[info] Открыл порт {self._port_path} ({self._src._using})\n") + except Exception as e: + sys.stderr.write(f"[error] {e}\n") + return + + try: + chunk_reader = SerialChunkReader(self._src) + if self._bin_mode: + self._run_binary_stream(chunk_reader) + else: + self._run_ascii_stream(chunk_reader) + finally: + try: + if self._src is not None: + self._src.close() + except Exception: + pass diff --git a/rfg_adc_plotter/main.py b/rfg_adc_plotter/main.py new file mode 100755 index 0000000..ff1feb6 --- /dev/null +++ b/rfg_adc_plotter/main.py @@ -0,0 +1,117 @@ +#!/usr/bin/env python3 +""" +Реалтайм-плоттер для свипов из виртуального COM-порта. + +Формат строк: + - "Sweep_start" — начало нового свипа (предыдущий считается завершённым) + - "s CH X Y" — точка (номер канала, индекс X, значение Y), все целые со знаком + +Отрисовываются четыре графика: + - Сырые данные: последний полученный свип (Y vs X) + - Водопад сырых данных: последние N свипов + - FFT текущего свипа + - B-scan: водопад FFT-строк + +Зависимости: numpy. PySerial опционален — при его отсутствии +используется сырой доступ к TTY через termios. +GUI: matplotlib (совместимый) или pyqtgraph (быстрый). +""" + +import argparse +import sys + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + description=( + "Читает свипы из виртуального COM-порта и рисует: " + "последний свип и водопад (реалтайм)." + ) + ) + parser.add_argument( + "port", + help="Путь к порту, например /dev/ttyACM1 или COM3 (COM10+: \\\\.\\COM10)", + ) + parser.add_argument("--baud", type=int, default=115200, help="Скорость (по умолчанию 115200)") + parser.add_argument("--max-sweeps", type=int, default=200, help="Количество видимых свипов в водопаде") + parser.add_argument("--max-fps", type=float, default=30.0, help="Лимит частоты отрисовки, кадров/с") + parser.add_argument("--cmap", default="viridis", help="Цветовая карта водопада") + parser.add_argument( + "--spec-clip", + default="2,98", + help=( + "Процентильная обрезка уровней водопада спектров, %% (min,max). " + "Напр. 2,98. 'off' — отключить" + ), + ) + parser.add_argument( + "--spec-mean-sec", + type=float, + default=0.0, + help=( + "Вычитание среднего по каждой частоте за последние N секунд " + "в водопаде спектров (0 — отключить)" + ), + ) + parser.add_argument("--title", default="ADC Sweeps", help="Заголовок окна") + parser.add_argument( + "--fancy", + action="store_true", + help="Заполнять выпавшие точки средними значениями между соседними", + ) + parser.add_argument( + "--ylim", + type=str, + default=None, + help="Фиксированные Y-пределы для кривой формата min,max (например -1000,1000). По умолчанию авто", + ) + parser.add_argument( + "--backend", + choices=["auto", "pg", "mpl"], + default="auto", + help="Графический бэкенд: pyqtgraph (pg) — быстрее; matplotlib (mpl) — совместимый. По умолчанию auto", + ) + parser.add_argument( + "--norm-type", + choices=["projector", "simple"], + default="projector", + help="Тип нормировки: projector (по огибающим в [-1000,+1000]) или simple (raw/calib)", + ) + parser.add_argument( + "--bin", + dest="bin_mode", + action="store_true", + help=( + "Бинарный протокол: старт свипа 0xFFFF,0xFFFF,0xFFFF,(CH<<8)|0x0A; " + "точки step,uint32(hi16,lo16),0x000A" + ), + ) + return parser + + +def main(): + args = build_parser().parse_args() + + if args.backend == "pg": + from rfg_adc_plotter.gui.pyqtgraph_backend import run_pyqtgraph + try: + run_pyqtgraph(args) + except Exception as e: + sys.stderr.write(f"[error] PyQtGraph бэкенд недоступен: {e}\n") + sys.exit(1) + return + + if args.backend == "auto": + try: + from rfg_adc_plotter.gui.pyqtgraph_backend import run_pyqtgraph + run_pyqtgraph(args) + return + except Exception: + pass # Откатываемся на matplotlib + + from rfg_adc_plotter.gui.matplotlib_backend import run_matplotlib + run_matplotlib(args) + + +if __name__ == "__main__": + main() diff --git a/rfg_adc_plotter/processing/__init__.py b/rfg_adc_plotter/processing/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/rfg_adc_plotter/processing/normalizer.py b/rfg_adc_plotter/processing/normalizer.py new file mode 100644 index 0000000..0b68760 --- /dev/null +++ b/rfg_adc_plotter/processing/normalizer.py @@ -0,0 +1,149 @@ +"""Алгоритмы нормировки свипов по калибровочной кривой.""" + +from typing import Tuple + +import numpy as np + + +def normalize_simple(raw: np.ndarray, calib: np.ndarray) -> np.ndarray: + """Простая нормировка: поэлементное деление raw/calib.""" + w = min(raw.size, calib.size) + if w <= 0: + return raw + out = np.full_like(raw, np.nan, dtype=np.float32) + with np.errstate(divide="ignore", invalid="ignore"): + out[:w] = raw[:w] / calib[:w] + out = np.nan_to_num(out, nan=np.nan, posinf=np.nan, neginf=np.nan) + return out + + +def build_calib_envelopes(calib: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: + """Оценить огибающую по модулю сигнала. + + Возвращает (lower, upper) = (-envelope, +envelope), где envelope — + интерполяция через локальные максимумы |calib|. + """ + n = int(calib.size) + if n <= 0: + empty = np.zeros((0,), dtype=np.float32) + return empty, empty + + y = np.asarray(calib, dtype=np.float32) + finite = np.isfinite(y) + if not np.any(finite): + zeros = np.zeros_like(y, dtype=np.float32) + return zeros, zeros + + if not np.all(finite): + x = np.arange(n, dtype=np.float32) + y = y.copy() + y[~finite] = np.interp(x[~finite], x[finite], y[finite]).astype(np.float32) + + a = np.abs(y) + + if n < 3: + env = a.copy() + return -env, env + + da = np.diff(a) + s = np.sign(da).astype(np.int8, copy=False) + + if np.any(s == 0): + for i in range(1, s.size): + if s[i] == 0: + s[i] = s[i - 1] + for i in range(s.size - 2, -1, -1): + if s[i] == 0: + s[i] = s[i + 1] + s[s == 0] = 1 + + max_idx = np.where((s[:-1] > 0) & (s[1:] < 0))[0] + 1 + + x = np.arange(n, dtype=np.float32) + + if max_idx.size == 0: + idx = np.array([0, n - 1], dtype=np.int64) + else: + idx = np.unique(np.concatenate(([0], max_idx, [n - 1]))).astype(np.int64) + env = np.interp(x, idx.astype(np.float32), a[idx]).astype(np.float32) + + return -env, env + + +def normalize_projector(raw: np.ndarray, calib: np.ndarray) -> np.ndarray: + """Нормировка через проекцию между огибающими калибровки в диапазон [-1000, +1000].""" + w = min(raw.size, calib.size) + if w <= 0: + return raw + + out = np.full_like(raw, np.nan, dtype=np.float32) + raw_seg = np.asarray(raw[:w], dtype=np.float32) + lower, upper = build_calib_envelopes(np.asarray(calib[:w], dtype=np.float32)) + span = upper - lower + + finite_span = span[np.isfinite(span) & (span > 0)] + if finite_span.size > 0: + eps = max(float(np.median(finite_span)) * 1e-6, 1e-9) + else: + eps = 1e-9 + + valid = ( + np.isfinite(raw_seg) + & np.isfinite(lower) + & np.isfinite(upper) + & (span > eps) + ) + if np.any(valid): + proj = np.empty_like(raw_seg, dtype=np.float32) + proj[valid] = ((2.0 * (raw_seg[valid] - lower[valid]) / span[valid]) - 1.0) * 1000.0 + proj[valid] = np.clip(proj[valid], -1000.0, 1000.0) + proj[~valid] = np.nan + out[:w] = proj + + return out + + +def normalize_by_calib(raw: np.ndarray, calib: np.ndarray, norm_type: str) -> np.ndarray: + """Нормировка свипа по выбранному алгоритму.""" + nt = str(norm_type).strip().lower() + if nt == "simple": + return normalize_simple(raw, calib) + return normalize_projector(raw, calib) + + +def normalize_by_envelope(raw: np.ndarray, envelope: np.ndarray) -> np.ndarray: + """Нормировка свипа через проекцию на огибающую из файла. + + Воспроизводит логику normalize_projector: проецирует raw в [-1000, +1000] + используя готовую верхнюю огибающую (upper = envelope, lower = -envelope). + """ + w = min(raw.size, envelope.size) + if w <= 0: + return raw + + out = np.full_like(raw, np.nan, dtype=np.float32) + raw_seg = np.asarray(raw[:w], dtype=np.float32) + upper = np.asarray(envelope[:w], dtype=np.float32) + lower = -upper + span = upper - lower # = 2 * upper + + finite_span = span[np.isfinite(span) & (span > 0)] + if finite_span.size > 0: + eps = max(float(np.median(finite_span)) * 1e-6, 1e-9) + else: + eps = 1e-9 + + valid = ( + np.isfinite(raw_seg) + & np.isfinite(lower) + & np.isfinite(upper) + & (span > eps) + ) + if np.any(valid): + proj = np.empty_like(raw_seg, dtype=np.float32) + proj[valid] = ((2.0 * (raw_seg[valid] - lower[valid]) / span[valid]) - 1.0) * 1000.0 + proj[valid] = np.clip(proj[valid], -1000.0, 1000.0) + proj[~valid] = np.nan + out[:w] = proj + + return out diff --git a/rfg_adc_plotter/state/__init__.py b/rfg_adc_plotter/state/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/rfg_adc_plotter/state/app_state.py b/rfg_adc_plotter/state/app_state.py new file mode 100644 index 0000000..0530fb4 --- /dev/null +++ b/rfg_adc_plotter/state/app_state.py @@ -0,0 +1,227 @@ +"""Состояние приложения: текущие свипы и настройки калибровки/нормировки.""" + +import os +from queue import Empty, Queue +from typing import Any, Dict, Mapping, Optional + +import numpy as np + +from rfg_adc_plotter.processing.normalizer import ( + build_calib_envelopes, + normalize_by_calib, + normalize_by_envelope, +) +from rfg_adc_plotter.state.ring_buffer import RingBuffer +from rfg_adc_plotter.types import SweepInfo, SweepPacket + +CALIB_ENVELOPE_PATH = "calib_envelope.npy" +BACKGROUND_PATH = "background.npy" + + +def format_status(data: Mapping[str, Any]) -> str: + """Преобразовать словарь метрик в одну строку 'k:v'.""" + + def _fmt(v: Any) -> str: + if v is None: + return "NA" + try: + fv = float(v) + except Exception: + return str(v) + if not np.isfinite(fv): + return "nan" + if abs(fv) >= 1000 or (0 < abs(fv) < 0.01): + return f"{fv:.3g}" + return f"{fv:.3f}".rstrip("0").rstrip(".") + + parts = [f"{k}:{_fmt(v)}" for k, v in data.items()] + return " ".join(parts) + + +class AppState: + """Весь изменяемый GUI-state: текущие данные, калибровка, настройки. + + Методы drain_queue и set_calib_enabled заменяют одноимённые closures + с nonlocal из оригинального кода. + """ + + def __init__(self, norm_type: str = "projector"): + self.current_sweep_raw: Optional[np.ndarray] = None + self.current_sweep_norm: Optional[np.ndarray] = None + self.last_calib_sweep: Optional[np.ndarray] = None + self.current_info: Optional[SweepInfo] = None + self.calib_enabled: bool = False + self.norm_type: str = norm_type + # "live" — нормировка по текущему ch0-свипу; "file" — по огибающей из файла + self.calib_mode: str = "live" + self.calib_file_envelope: Optional[np.ndarray] = None + # Вычет фона + self.background: Optional[np.ndarray] = None + self.background_enabled: bool = False + self._last_sweep_for_ring: Optional[np.ndarray] = None + + def _normalize(self, raw: np.ndarray, calib: np.ndarray) -> np.ndarray: + if self.calib_mode == "file" and self.calib_file_envelope is not None: + return normalize_by_envelope(raw, self.calib_file_envelope) + return normalize_by_calib(raw, calib, self.norm_type) + + def save_calib_envelope(self, path: str = CALIB_ENVELOPE_PATH) -> bool: + """Вычислить огибающую из last_calib_sweep и сохранить в файл. + + Возвращает True при успехе. + """ + if self.last_calib_sweep is None: + return False + try: + _lower, upper = build_calib_envelopes(self.last_calib_sweep) + np.save(path, upper) + return True + except Exception as exc: + import sys + sys.stderr.write(f"[warn] Не удалось сохранить огибающую: {exc}\n") + return False + + def load_calib_envelope(self, path: str = CALIB_ENVELOPE_PATH) -> bool: + """Загрузить огибающую из файла. + + Возвращает True при успехе. + """ + if not os.path.isfile(path): + return False + try: + env = np.load(path) + self.calib_file_envelope = np.asarray(env, dtype=np.float32) + return True + except Exception as exc: + import sys + sys.stderr.write(f"[warn] Не удалось загрузить огибающую: {exc}\n") + return False + + def set_calib_mode(self, mode: str): + """Переключить режим калибровки: 'live' или 'file'.""" + self.calib_mode = mode + + def save_background(self, path: str = BACKGROUND_PATH) -> bool: + """Сохранить текущий sweep_for_ring как фоновый спектр. + + Сохраняет последний свип, который был записан в ринг-буфер + (нормированный, если калибровка включена, иначе сырой). + Возвращает True при успехе. + """ + if self._last_sweep_for_ring is None: + return False + try: + np.save(path, self._last_sweep_for_ring) + return True + except Exception as exc: + import sys + sys.stderr.write(f"[warn] Не удалось сохранить фон: {exc}\n") + return False + + def load_background(self, path: str = BACKGROUND_PATH) -> bool: + """Загрузить фоновый спектр из файла. + + Возвращает True при успехе. + """ + if not os.path.isfile(path): + return False + try: + bg = np.load(path) + self.background = np.asarray(bg, dtype=np.float32) + return True + except Exception as exc: + import sys + sys.stderr.write(f"[warn] Не удалось загрузить фон: {exc}\n") + return False + + def set_background_enabled(self, enabled: bool): + """Включить/выключить вычет фона.""" + self.background_enabled = enabled + + def set_calib_enabled(self, enabled: bool): + """Включить/выключить режим калибровки, пересчитать norm-свип.""" + self.calib_enabled = enabled + if self.calib_enabled and self.current_sweep_raw is not None: + if self.calib_mode == "file" and self.calib_file_envelope is not None: + self.current_sweep_norm = normalize_by_envelope( + self.current_sweep_raw, self.calib_file_envelope + ) + elif self.calib_mode == "live" and self.last_calib_sweep is not None: + self.current_sweep_norm = self._normalize( + self.current_sweep_raw, self.last_calib_sweep + ) + else: + self.current_sweep_norm = None + else: + self.current_sweep_norm = None + + def drain_queue(self, q: "Queue[SweepPacket]", ring: RingBuffer) -> int: + """Вытащить все ожидающие свипы из очереди, обновить state и ring. + + Возвращает количество обработанных свипов. + """ + drained = 0 + while True: + try: + s, info = q.get_nowait() + except Empty: + break + drained += 1 + self.current_sweep_raw = s + self.current_info = info + + ch = 0 + try: + ch = int(info.get("ch", 0)) if isinstance(info, dict) else 0 + except Exception: + ch = 0 + + # Канал 0 — опорный (калибровочный) свип + if ch == 0: + self.last_calib_sweep = s + self.save_calib_envelope() + self.current_sweep_norm = None + sweep_for_ring = s + self._last_sweep_for_ring = sweep_for_ring + else: + can_normalize = self.calib_enabled and ( + (self.calib_mode == "file" and self.calib_file_envelope is not None) + or (self.calib_mode == "live" and self.last_calib_sweep is not None) + ) + if can_normalize: + calib_ref = self.last_calib_sweep if self.last_calib_sweep is not None else s + self.current_sweep_norm = self._normalize(s, calib_ref) + sweep_for_ring = self.current_sweep_norm + else: + self.current_sweep_norm = None + sweep_for_ring = s + + # Вычет фона (в том же домене что и sweep_for_ring) + if self.background_enabled and self.background is not None and ch != 0: + w = min(sweep_for_ring.size, self.background.size) + sweep_for_ring = sweep_for_ring.copy() + sweep_for_ring[:w] -= self.background[:w] + self.current_sweep_norm = sweep_for_ring + + self._last_sweep_for_ring = sweep_for_ring + ring.ensure_init(s.size) + ring.push(sweep_for_ring) + return drained + + def format_channel_label(self) -> str: + """Строка с номерами каналов для подписи на графике.""" + if self.current_info is None: + return "" + info = self.current_info + chs = info.get("chs") if isinstance(info, dict) else None + if chs is None: + chs = info.get("ch") if isinstance(info, dict) else None + if chs is None: + return "" + try: + if isinstance(chs, (list, tuple, set)): + ch_list = sorted(int(v) for v in chs) + return "chs " + ", ".join(str(v) for v in ch_list) + return f"chs {int(chs)}" + except Exception: + return f"chs {chs}" diff --git a/rfg_adc_plotter/state/ring_buffer.py b/rfg_adc_plotter/state/ring_buffer.py new file mode 100644 index 0000000..11f9f09 --- /dev/null +++ b/rfg_adc_plotter/state/ring_buffer.py @@ -0,0 +1,187 @@ +"""Кольцевой буфер свипов и FFT-строк для водопадного отображения.""" + +import time +from typing import Optional, Tuple + +import numpy as np + +from rfg_adc_plotter.constants import ( + FFT_LEN, + FREQ_SPAN_GHZ, + IFFT_LEN, + SWEEP_LEN, + WF_WIDTH, + ZEROS_LOW, + ZEROS_MID, +) + + +class RingBuffer: + """Хранит последние N свипов и соответствующие FFT-строки. + + Все мутабельные поля водопада инкапсулированы здесь, + что устраняет необходимость nonlocal в GUI-коде. + """ + + def __init__(self, max_sweeps: int): + self.max_sweeps = max_sweeps + self.fft_bins = IFFT_LEN # = 1953 (полная длина IFFT-результата) + + # Инициализируются при первом свипе (ensure_init) + self.ring: Optional[np.ndarray] = None # (max_sweeps, WF_WIDTH) + self.ring_fft: Optional[np.ndarray] = None # (max_sweeps, fft_bins) + self.ring_time: Optional[np.ndarray] = None # (max_sweeps,) + self.head: int = 0 + self.width: Optional[int] = None + self.x_shared: Optional[np.ndarray] = None + self.fft_time_axis: Optional[np.ndarray] = None # временная ось IFFT в нс + self.y_min_fft: Optional[float] = None + self.y_max_fft: Optional[float] = None + # FFT последнего свипа (для отображения без повторного вычисления) + self.last_fft_vals: Optional[np.ndarray] = None + + @property + def is_ready(self) -> bool: + return self.ring is not None + + def ensure_init(self, sweep_width: int): + """Инициализировать буферы при первом свипе. Повторные вызовы — no-op (кроме x_shared).""" + if self.ring is None: + self.width = WF_WIDTH + self.ring = np.full((self.max_sweeps, self.width), np.nan, dtype=np.float32) + self.ring_time = np.full((self.max_sweeps,), np.nan, dtype=np.float64) + self.ring_fft = np.full((self.max_sweeps, self.fft_bins), np.nan, dtype=np.float32) + # Временная ось IFFT: шаг dt = 1/(FREQ_SPAN_GHZ*1e9), переведём в нс + self.fft_time_axis = ( + np.arange(IFFT_LEN, dtype=np.float64) / (FREQ_SPAN_GHZ * 1e9) * 1e9 + ).astype(np.float32) + self.head = 0 + # Обновляем x_shared если пришёл свип большего размера + if self.x_shared is None or sweep_width > self.x_shared.size: + self.x_shared = np.linspace(3.323, 14.323, sweep_width, dtype=np.float32) + + def push(self, s: np.ndarray): + """Добавить строку свипа в кольцевой буфер, вычислить FFT-строку.""" + if s is None or s.size == 0 or self.ring is None: + return + w = self.ring.shape[1] + row = np.full((w,), np.nan, dtype=np.float32) + take = min(w, s.size) + row[:take] = s[:take] + self.ring[self.head, :] = row + self.ring_time[self.head] = time.time() + self.head = (self.head + 1) % self.ring.shape[0] + + self._push_fft(s) + + def _push_fft(self, s: np.ndarray): + bins = self.ring_fft.shape[1] # = IFFT_LEN = 1953 + if s is None or s.size == 0: + fft_row = np.full((bins,), np.nan, dtype=np.float32) + else: + # 1. Взять первые SWEEP_LEN отсчётов (остаток — нули если свип короче) + sig = np.zeros(SWEEP_LEN, dtype=np.float32) + take = min(int(s.size), SWEEP_LEN) + seg = np.nan_to_num(s[:take], nan=0.0).astype(np.float32, copy=False) + sig[:take] = seg + + # 2. Собрать двусторонний спектр: + # [ZEROS_LOW нулей | ZEROS_MID нулей | SWEEP_LEN данных] + # = [-14.3..-3.2 ГГц | -3.2..+3.2 ГГц | +3.2..+14.3 ГГц] + data = np.zeros(IFFT_LEN, dtype=np.complex64) + data[ZEROS_LOW + ZEROS_MID:] = sig + + # 3. ifftshift + ifft → временной профиль + spec = np.fft.ifftshift(data) + result = np.fft.ifft(spec) + + # 4. Амплитуда в дБ + mag = np.abs(result).astype(np.float32) + fft_row = (20.0 * np.log10(mag + 1e-9)).astype(np.float32) + + prev_head = (self.head - 1) % self.ring_fft.shape[0] + self.ring_fft[prev_head, :] = fft_row + self.last_fft_vals = fft_row + + fr_min = np.nanmin(fft_row) + fr_max = float(np.nanpercentile(fft_row, 90)) + if self.y_min_fft is None or (not np.isnan(fr_min) and fr_min < self.y_min_fft): + self.y_min_fft = float(fr_min) + if self.y_max_fft is None or (not np.isnan(fr_max) and fr_max > self.y_max_fft): + self.y_max_fft = float(fr_max) + + def get_display_ring(self) -> np.ndarray: + """Кольцо в порядке от старого к новому, ось времени по X. Форма: (width, time).""" + if self.ring is None: + return np.zeros((1, 1), dtype=np.float32) + base = self.ring if self.head == 0 else np.roll(self.ring, -self.head, axis=0) + return base.T # (width, time) + + def get_display_ring_fft(self) -> np.ndarray: + """FFT-кольцо в порядке от старого к новому. Форма: (bins, time).""" + if self.ring_fft is None: + return np.zeros((1, 1), dtype=np.float32) + base = self.ring_fft if self.head == 0 else np.roll(self.ring_fft, -self.head, axis=0) + return base.T # (bins, time) + + def get_display_times(self) -> Optional[np.ndarray]: + """Временные метки строк в порядке от старого к новому.""" + if self.ring_time is None: + return None + return self.ring_time if self.head == 0 else np.roll(self.ring_time, -self.head) + + def subtract_recent_mean_fft( + self, disp_fft: np.ndarray, spec_mean_sec: float + ) -> np.ndarray: + """Вычесть среднее по каждой частоте за последние spec_mean_sec секунд.""" + if spec_mean_sec <= 0.0: + return disp_fft + disp_times = self.get_display_times() + if disp_times is None: + return disp_fft + now_t = time.time() + mask = np.isfinite(disp_times) & (disp_times >= (now_t - spec_mean_sec)) + if not np.any(mask): + return disp_fft + try: + mean_spec = np.nanmean(disp_fft[:, mask], axis=1) + except Exception: + return disp_fft + mean_spec = np.nan_to_num(mean_spec, nan=0.0) + return disp_fft - mean_spec[:, None] + + def compute_fft_levels( + self, disp_fft: np.ndarray, spec_clip: Optional[Tuple[float, float]] + ) -> Optional[Tuple[float, float]]: + """Вычислить (vmin, vmax) для отображения водопада спектров.""" + # 1. По среднему спектру за видимое время + try: + mean_spec = np.nanmean(disp_fft, axis=1) + vmin_v = float(np.nanmin(mean_spec)) + vmax_v = float(np.nanmax(mean_spec)) + if np.isfinite(vmin_v) and np.isfinite(vmax_v) and vmin_v != vmax_v: + return (vmin_v, vmax_v) + except Exception: + pass + + # 2. Процентильная обрезка + if spec_clip is not None: + try: + vmin_v = float(np.nanpercentile(disp_fft, spec_clip[0])) + vmax_v = float(np.nanpercentile(disp_fft, spec_clip[1])) + if np.isfinite(vmin_v) and np.isfinite(vmax_v) and vmin_v != vmax_v: + return (vmin_v, vmax_v) + except Exception: + pass + + # 3. Глобальные накопленные мин/макс + if ( + self.y_min_fft is not None + and self.y_max_fft is not None + and np.isfinite(self.y_min_fft) + and np.isfinite(self.y_max_fft) + and self.y_min_fft != self.y_max_fft + ): + return (self.y_min_fft, self.y_max_fft) + + return None diff --git a/rfg_adc_plotter/types.py b/rfg_adc_plotter/types.py new file mode 100644 index 0000000..7b53fb3 --- /dev/null +++ b/rfg_adc_plotter/types.py @@ -0,0 +1,7 @@ +from typing import Any, Dict, Tuple, Union + +import numpy as np + +Number = Union[int, float] +SweepInfo = Dict[str, Any] +SweepPacket = Tuple[np.ndarray, SweepInfo] diff --git a/run_dataplotter b/run_dataplotter new file mode 100755 index 0000000..e1a3f58 --- /dev/null +++ b/run_dataplotter @@ -0,0 +1,2 @@ +#!/usr/bin/bash +python3 -m rfg_adc_plotter.main --bin --backend mpl $@