diff --git a/RFG_ADC_dataplotter.py b/RFG_ADC_dataplotter.py deleted file mode 100755 index d4cade9..0000000 --- a/RFG_ADC_dataplotter.py +++ /dev/null @@ -1,1544 +0,0 @@ -#!/usr/bin/env python3 -""" -Реалтайм-плоттер для свипов из виртуального COM-порта. - -Формат строк: - - "Sweep_start" — начало нового свипа (предыдущий считается завершённым) - - "s CH X Y" — точка (номер канала, индекс X, значение Y), все целые со знаком - -Отрисовываются два графика: - - Левый: последний полученный свип (Y vs X) - - Правый: водопад (последние N свипов во времени) - -Оптимизации для скорости: - - Парсинг и чтение в фоновой нити - - Анимация с обновлением только данных (без лишнего пересоздания фигур) - - Кольцевой буфер под водопад с фиксированным числом свипов - -Зависимости: matplotlib, numpy. PySerial опционален — при его отсутствии -используется сырой доступ к TTY через termios. -""" - -import argparse -import io -import os -import sys -import threading -import time -from collections import deque -from queue import Queue, Empty, Full -from typing import Any, Dict, Mapping, Optional, Tuple, Union - -import numpy as np - -WF_WIDTH = 1000 # максимальное число точек в ряду водопада -FFT_LEN = 1024 # длина БПФ для спектра/водопада спектров -# Порог для инверсии сырых данных: если среднее значение свипа ниже порога — -# считаем, что сигнал «меньше нуля» и домножаем свип на -1 -DATA_INVERSION_THRASHOLD = 10.0 - -Number = Union[int, float] -SweepInfo = Dict[str, Any] -SweepPacket = Tuple[np.ndarray, SweepInfo] - - -def _format_status_kv(data: Mapping[str, Any]) -> str: - """Преобразовать словарь метрик в одну строку 'k:v'.""" - - def _fmt(v: Any) -> str: - if v is None: - return "NA" - try: - fv = float(v) - except Exception: - return str(v) - if not np.isfinite(fv): - return "nan" - # Достаточно компактно для статус-строки. - if abs(fv) >= 1000 or (0 < abs(fv) < 0.01): - return f"{fv:.3g}" - return f"{fv:.3f}".rstrip("0").rstrip(".") - - parts = [f"{k}:{_fmt(v)}" for k, v in data.items()] - return " ".join(parts) - - -def _parse_spec_clip(spec: Optional[str]) -> Optional[Tuple[float, float]]: - """Разобрать строку вида "low,high" процентов для контрастного отображения водопада спектров. - - Возвращает пару (low, high) или None для отключения. Допустимы значения 0..100, low < high. - Ключевые слова отключения: "off", "none", "no". - """ - if not spec: - return None - s = str(spec).strip().lower() - if s in ("off", "none", "no"): - return None - try: - p0, p1 = s.replace(";", ",").split(",") - low = float(p0) - high = float(p1) - if not (0.0 <= low < high <= 100.0): - return None - return (low, high) - except Exception: - return None - - -def _normalize_sweep_simple(raw: np.ndarray, calib: np.ndarray) -> np.ndarray: - """Простая нормировка: поэлементное деление raw/calib.""" - w = min(raw.size, calib.size) - if w <= 0: - return raw - out = np.full_like(raw, np.nan, dtype=np.float32) - with np.errstate(divide="ignore", invalid="ignore"): - out[:w] = raw[:w] / calib[:w] - out = np.nan_to_num(out, nan=np.nan, posinf=np.nan, neginf=np.nan) - return out - - -def _build_calib_envelopes(calib: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: - """Оценить нижнюю/верхнюю огибающие калибровочной кривой.""" - n = int(calib.size) - if n <= 0: - empty = np.zeros((0,), dtype=np.float32) - return empty, empty - - y = np.asarray(calib, dtype=np.float32) - finite = np.isfinite(y) - if not np.any(finite): - zeros = np.zeros_like(y, dtype=np.float32) - return zeros, zeros - - if not np.all(finite): - x = np.arange(n, dtype=np.float32) - y = y.copy() - y[~finite] = np.interp(x[~finite], x[finite], y[finite]).astype(np.float32) - - if n < 3: - return y.copy(), y.copy() - - dy = np.diff(y) - s = np.sign(dy).astype(np.int8, copy=False) - - if np.any(s == 0): - for i in range(1, s.size): - if s[i] == 0: - s[i] = s[i - 1] - for i in range(s.size - 2, -1, -1): - if s[i] == 0: - s[i] = s[i + 1] - s[s == 0] = 1 - - max_idx = np.where((s[:-1] > 0) & (s[1:] < 0))[0] + 1 - min_idx = np.where((s[:-1] < 0) & (s[1:] > 0))[0] + 1 - - x = np.arange(n, dtype=np.float32) - - def _interp_nodes(nodes: np.ndarray) -> np.ndarray: - if nodes.size == 0: - idx = np.array([0, n - 1], dtype=np.int64) - else: - idx = np.unique(np.concatenate(([0], nodes, [n - 1]))).astype(np.int64) - return np.interp(x, idx.astype(np.float32), y[idx]).astype(np.float32) - - upper = _interp_nodes(max_idx) - lower = _interp_nodes(min_idx) - - swap = lower > upper - if np.any(swap): - tmp = upper[swap].copy() - upper[swap] = lower[swap] - lower[swap] = tmp - - return lower, upper - - -def _normalize_sweep_projector(raw: np.ndarray, calib: np.ndarray) -> np.ndarray: - """Нормировка через проекцию между огибающими калибровки в диапазон [-1, +1].""" - w = min(raw.size, calib.size) - if w <= 0: - return raw - - out = np.full_like(raw, np.nan, dtype=np.float32) - raw_seg = np.asarray(raw[:w], dtype=np.float32) - lower, upper = _build_calib_envelopes(np.asarray(calib[:w], dtype=np.float32)) - span = upper - lower - - finite_span = span[np.isfinite(span) & (span > 0)] - if finite_span.size > 0: - eps = max(float(np.median(finite_span)) * 1e-6, 1e-9) - else: - eps = 1e-9 - - valid = ( - np.isfinite(raw_seg) - & np.isfinite(lower) - & np.isfinite(upper) - & (span > eps) - ) - if np.any(valid): - proj = np.empty_like(raw_seg, dtype=np.float32) - proj[valid] = ((2.0 * (raw_seg[valid] - lower[valid]) / span[valid]) - 1.0) * 1000.0 - proj[valid] = np.clip(proj[valid], -1000.0, 1000.0) - proj[~valid] = np.nan - out[:w] = proj - - return out - - -def _normalize_by_calib(raw: np.ndarray, calib: np.ndarray, norm_type: str) -> np.ndarray: - """Нормировка свипа по выбранному алгоритму.""" - nt = str(norm_type).strip().lower() - if nt == "simple": - return _normalize_sweep_simple(raw, calib) - return _normalize_sweep_projector(raw, calib) - - -def try_open_pyserial(path: str, baud: int, timeout: float): - try: - import serial # type: ignore - except Exception: - return None - try: - ser = serial.Serial(path, baudrate=baud, timeout=timeout) - return ser - except Exception: - return None - - -class FDReader: - """Простой враппер чтения строк из файлового дескриптора TTY.""" - - def __init__(self, fd: int): - # Отдельно буферизуем для корректной readline() - self._fd = fd - raw = os.fdopen(fd, "rb", closefd=False) - self._file = raw - self._buf = io.BufferedReader(raw, buffer_size=65536) - - def fileno(self) -> int: - return self._fd - - def readline(self) -> bytes: - return self._buf.readline() - - def close(self): - try: - self._buf.close() - except Exception: - pass - - -def open_raw_tty(path: str, baud: int) -> Optional[FDReader]: - """Открыть TTY без pyserial и настроить порт через termios. - - Возвращает FDReader или None при ошибке. - """ - try: - import termios - import tty - except Exception: - return None - - try: - fd = os.open(path, os.O_RDONLY | os.O_NOCTTY) - except Exception: - return None - - try: - attrs = termios.tcgetattr(fd) - # Установим «сырое» состояние - tty.setraw(fd) - - # Скорость - baud_map = { - 9600: termios.B9600, - 19200: termios.B19200, - 38400: termios.B38400, - 57600: termios.B57600, - 115200: termios.B115200, - 230400: getattr(termios, "B230400", None), - 460800: getattr(termios, "B460800", None), - } - b = baud_map.get(baud) or termios.B115200 - - attrs[4] = b # ispeed - attrs[5] = b # ospeed - - # VMIN=1, VTIME=0 — блокирующее чтение по байту - cc = attrs[6] - cc[termios.VMIN] = 1 - cc[termios.VTIME] = 0 - attrs[6] = cc - - termios.tcsetattr(fd, termios.TCSANOW, attrs) - except Exception: - try: - os.close(fd) - except Exception: - pass - return None - - return FDReader(fd) - - -class SerialLineSource: - """Единый интерфейс для чтения строк из порта (pyserial или raw TTY).""" - - def __init__(self, path: str, baud: int, timeout: float = 1.0): - self._pyserial = try_open_pyserial(path, baud, timeout) - self._fdreader = None - self._using = "pyserial" if self._pyserial is not None else "raw" - if self._pyserial is None: - self._fdreader = open_raw_tty(path, baud) - if self._fdreader is None: - msg = f"Не удалось открыть порт '{path}' (pyserial и raw TTY не сработали)" - if sys.platform.startswith("win"): - msg += ". На Windows нужен pyserial: pip install pyserial" - raise RuntimeError(msg) - - def readline(self) -> bytes: - if self._pyserial is not None: - try: - return self._pyserial.readline() - except Exception: - return b"" - else: - try: - return self._fdreader.readline() # type: ignore[union-attr] - except Exception: - return b"" - - def close(self): - try: - if self._pyserial is not None: - self._pyserial.close() - elif self._fdreader is not None: - self._fdreader.close() - except Exception: - pass - - -class SerialChunkReader: - """Быстрое неблокирующее чтение чанков из serial/raw TTY для максимального дренажа буфера.""" - - def __init__(self, src: SerialLineSource): - self._src = src - self._ser = src._pyserial - self._fd: Optional[int] = None - if self._ser is not None: - # Неблокирующий режим для быстрой откачки - try: - self._ser.timeout = 0 - except Exception: - pass - else: - try: - self._fd = src._fdreader.fileno() # type: ignore[union-attr] - try: - os.set_blocking(self._fd, False) - except Exception: - pass - except Exception: - self._fd = None - - def read_available(self) -> bytes: - """Вернёт доступные байты (b"" если данных нет).""" - if self._ser is not None: - try: - n = int(getattr(self._ser, "in_waiting", 0)) - except Exception: - n = 0 - if n > 0: - try: - return self._ser.read(n) - except Exception: - return b"" - return b"" - if self._fd is None: - return b"" - out = bytearray() - while True: - try: - chunk = os.read(self._fd, 65536) - if not chunk: - break - out += chunk - if len(chunk) < 65536: - break - except BlockingIOError: - break - except Exception: - break - return bytes(out) - - -class SweepReader(threading.Thread): - """Фоновый поток: читает строки, формирует завершённые свипы и кладёт в очередь.""" - - def __init__( - self, - port_path: str, - baud: int, - out_queue: Queue[SweepPacket], - stop_event: threading.Event, - fancy: bool = False, - ): - super().__init__(daemon=True) - self._port_path = port_path - self._baud = baud - self._q = out_queue - self._stop = stop_event - self._src: Optional[SerialLineSource] = None - self._fancy = bool(fancy) - self._max_width: int = 0 - self._sweep_idx: int = 0 - self._last_sweep_ts: Optional[float] = None - self._n_valid_hist = deque() - - def _finalize_current(self, xs, ys, channels: Optional[set[int]]): - if not xs: - return - ch_list = sorted(channels) if channels else [0] - ch_primary = ch_list[0] if ch_list else 0 - max_x = max(xs) - width = max_x + 1 - self._max_width = max(self._max_width, width) - target_width = self._max_width if self._fancy else width - # Быстрый векторизованный путь - sweep = np.full((target_width,), np.nan, dtype=np.float32) - try: - idx = np.asarray(xs, dtype=np.int64) - vals = np.asarray(ys, dtype=np.float32) - sweep[idx] = vals - except Exception: - # Запасной путь - for x, y in zip(xs, ys): - if 0 <= x < target_width: - sweep[x] = float(y) - # Метрики валидных точек до заполнения пропусков - finite_pre = np.isfinite(sweep) - n_valid_cur = int(np.count_nonzero(finite_pre)) - - # Дополнительная обработка пропусков: при --fancy заполняем внутренние разрывы, края и дотягиваем до максимальной длины - if self._fancy: - try: - known = ~np.isnan(sweep) - if np.any(known): - known_idx = np.nonzero(known)[0] - # Для каждой пары соседних известных индексов заполним промежуток средним значением - for i0, i1 in zip(known_idx[:-1], known_idx[1:]): - if i1 - i0 > 1: - avg = (sweep[i0] + sweep[i1]) * 0.5 - sweep[i0 + 1 : i1] = avg - first_idx = int(known_idx[0]) - last_idx = int(known_idx[-1]) - if first_idx > 0: - sweep[:first_idx] = sweep[first_idx] - if last_idx < sweep.size - 1: - sweep[last_idx + 1 :] = sweep[last_idx] - except Exception: - # В случае ошибки просто оставляем как есть - pass - # Инверсия данных при «отрицательном» уровне (среднее ниже порога) - try: - m = float(np.nanmean(sweep)) - if np.isfinite(m) and m < DATA_INVERSION_THRASHOLD: - sweep *= -1.0 - except Exception: - pass - #sweep -= float(np.nanmean(sweep)) - - # Метрики для статусной строки (вид словаря: переменная -> значение) - self._sweep_idx += 1 - if len(ch_list) > 1: - sys.stderr.write( - f"[warn] Sweep {self._sweep_idx}: изменялся номер канала: {ch_list}\n" - ) - now = time.time() - if self._last_sweep_ts is None: - dt_ms = float("nan") - else: - dt_ms = (now - self._last_sweep_ts) * 1000.0 - self._last_sweep_ts = now - self._n_valid_hist.append((now, n_valid_cur)) - while self._n_valid_hist and (now - self._n_valid_hist[0][0]) > 1.0: - self._n_valid_hist.popleft() - if self._n_valid_hist: - n_valid = float(sum(v for _t, v in self._n_valid_hist) / len(self._n_valid_hist)) - else: - n_valid = float(n_valid_cur) - - if n_valid_cur > 0: - vmin = float(np.nanmin(sweep)) - vmax = float(np.nanmax(sweep)) - mean = float(np.nanmean(sweep)) - std = float(np.nanstd(sweep)) - else: - vmin = vmax = mean = std = float("nan") - info: SweepInfo = { - "sweep": self._sweep_idx, - "ch": ch_primary, - "chs": ch_list, - "n_valid": n_valid, - "min": vmin, - "max": vmax, - "mean": mean, - "std": std, - "dt_ms": dt_ms, - } - - # Кладём готовый свип (если очередь полна — выбрасываем самый старый) - try: - self._q.put_nowait((sweep, info)) - except Full: - try: - _ = self._q.get_nowait() - except Exception: - pass - try: - self._q.put_nowait((sweep, info)) - except Exception: - pass - - def run(self): - # Состояние текущего свипа - xs: list[int] = [] - ys: list[int] = [] - cur_channel: Optional[int] = None - cur_channels: set[int] = set() - - try: - self._src = SerialLineSource(self._port_path, self._baud, timeout=1.0) - sys.stderr.write(f"[info] Открыл порт {self._port_path} ({self._src._using})\n") - except Exception as e: - sys.stderr.write(f"[error] {e}\n") - return - - try: - # Быстрый неблокирующий дренаж порта с разбором по байтам - chunk_reader = SerialChunkReader(self._src) - buf = bytearray() - while not self._stop.is_set(): - data = chunk_reader.read_available() - if data: - buf += data - else: - # Короткая уступка CPU, если нет новых данных - time.sleep(0.0005) - continue - - # Обрабатываем все полные строки - while True: - nl = buf.find(b"\n") - if nl == -1: - break - line = bytes(buf[:nl]) - del buf[: nl + 1] - if line.endswith(b"\r"): - line = line[:-1] - if not line: - continue - - if line.startswith(b"Sweep_start"): - self._finalize_current(xs, ys, cur_channels) - xs.clear() - ys.clear() - cur_channel = None - cur_channels.clear() - continue - - # sCH X Y или s CH X Y (все целые со знаком). Разделяем по любым пробелам/табам. - if len(line) >= 3: - parts = line.split() - if len(parts) >= 3 and (parts[0].lower() == b"s" or parts[0].lower().startswith(b"s")): - try: - if parts[0].lower() == b"s": - if len(parts) >= 4: - ch = int(parts[1], 10) - x = int(parts[2], 10) - y = int(parts[3], 10) # поддержка знака: "+…" и "-…" - else: - ch = 0 - x = int(parts[1], 10) - y = int(parts[2], 10) # поддержка знака: "+…" и "-…" - else: - # формат вида "s0" - ch = int(parts[0][1:], 10) - x = int(parts[1], 10) - y = int(parts[2], 10) # поддержка знака: "+…" и "-…" - except Exception: - continue - if cur_channel is None: - cur_channel = ch - cur_channels.add(ch) - xs.append(x) - ys.append(y) - - # Защита от переполнения буфера при отсутствии переводов строки - if len(buf) > 1_000_000: - del buf[:-262144] - finally: - try: - # Завершаем оставшийся свип - self._finalize_current(xs, ys, cur_channels) - except Exception: - pass - try: - if self._src is not None: - self._src.close() - except Exception: - pass - - -def main(): - parser = argparse.ArgumentParser( - description=( - "Читает свипы из виртуального COM-порта и рисует: " - "последний свип и водопад (реалтайм)." - ) - ) - parser.add_argument( - "port", - help="Путь к порту, например /dev/ttyACM1 или COM3 (COM10+: \\\\.\\COM10)", - ) - parser.add_argument("--baud", type=int, default=115200, help="Скорость (по умолчанию 115200)") - parser.add_argument("--max-sweeps", type=int, default=200, help="Количество видимых свипов в водопаде") - parser.add_argument("--max-fps", type=float, default=30.0, help="Лимит частоты отрисовки, кадров/с") - parser.add_argument("--cmap", default="viridis", help="Цветовая карта водопада") - parser.add_argument( - "--spec-clip", - default="2,98", - help=( - "Процентильная обрезка уровней водопада спектров, % (min,max). " - "Напр. 2,98. 'off' — отключить" - ), - ) - parser.add_argument( - "--spec-mean-sec", - type=float, - default=0.0, - help=( - "Вычитание среднего по каждой частоте за последние N секунд " - "в водопаде спектров (0 — отключить)" - ), - ) - parser.add_argument("--title", default="ADC Sweeps", help="Заголовок окна") - parser.add_argument( - "--fancy", - action="store_true", - help="Заполнять выпавшие точки средними значениями между соседними", - ) - parser.add_argument( - "--ylim", - type=str, - default=None, - help="Фиксированные Y-пределы для кривой формата min,max (например -1000,1000). По умолчанию авто", - ) - parser.add_argument( - "--backend", - choices=["auto", "pg", "mpl"], - default="auto", - help="Графический бэкенд: pyqtgraph (pg) — быстрее; matplotlib (mpl) — совместимый. По умолчанию auto", - ) - parser.add_argument( - "--norm-type", - choices=["projector", "simple"], - default="projector", - help="Тип нормировки: projector (по огибающим в [-1,+1]) или simple (raw/calib)", - ) - - args = parser.parse_args() - - # Попробуем быстрый бэкенд (pyqtgraph) при auto/pg - if args.backend in ("pg"): - try: - return run_pyqtgraph(args) - except Exception as e: - if args.backend == "pg": - sys.stderr.write(f"[error] PyQtGraph бэкенд недоступен: {e}\n") - sys.exit(1) - # При auto — тихо откатываемся на matplotlib - - try: - import matplotlib - import matplotlib.pyplot as plt - from matplotlib.animation import FuncAnimation - from matplotlib.widgets import Slider, CheckButtons - except Exception as e: - sys.stderr.write(f"[error] Нужны matplotlib и ее зависимости: {e}\n") - sys.exit(1) - - # Очередь завершённых свипов и поток чтения - q: Queue[SweepPacket] = Queue(maxsize=1000) - stop_event = threading.Event() - reader = SweepReader(args.port, args.baud, q, stop_event, fancy=bool(args.fancy)) - reader.start() - - # Графика - fig, axs = plt.subplots(2, 2, figsize=(12, 8)) - (ax_line, ax_img), (ax_fft, ax_spec) = axs - fig.canvas.manager.set_window_title(args.title) if hasattr(fig.canvas.manager, "set_window_title") else None - # Увеличим расстояния и оставим место справа под ползунки оси Y B-scan - fig.subplots_adjust(wspace=0.25, hspace=0.35, left=0.07, right=0.90, top=0.92, bottom=0.08) - - # Состояние для отображения - current_sweep_raw: Optional[np.ndarray] = None - current_sweep_norm: Optional[np.ndarray] = None - last_calib_sweep: Optional[np.ndarray] = None - current_info: Optional[SweepInfo] = None - x_shared: Optional[np.ndarray] = None - width: Optional[int] = None - max_sweeps = int(max(10, args.max_sweeps)) - ring = None # type: Optional[np.ndarray] - ring_time = None # type: Optional[np.ndarray] - head = 0 - # Авто-уровни цветовой шкалы водопада сырых данных пересчитываются по видимой области. - # FFT состояние - fft_bins = FFT_LEN // 2 + 1 - ring_fft = None # type: Optional[np.ndarray] - y_min_fft, y_max_fft = None, None - freq_shared: Optional[np.ndarray] = None - # Параметры контраста водопада спектров - spec_clip = _parse_spec_clip(getattr(args, "spec_clip", None)) - spec_mean_sec = float(getattr(args, "spec_mean_sec", 0.0)) - # Ползунки управления Y для B-scan и контрастом - ymin_slider = None - ymax_slider = None - contrast_slider = None - calib_enabled = False - norm_type = str(getattr(args, "norm_type", "projector")).strip().lower() - cb = None - - # Статусная строка (внизу окна) - status_text = fig.text( - 0.01, - 0.01, - "", - ha="left", - va="bottom", - fontsize=8, - family="monospace", - ) - - # Линейный график последнего свипа - line_obj, = ax_line.plot([], [], lw=1, color="tab:blue") - line_calib_obj, = ax_line.plot([], [], lw=1, color="tab:red") - line_norm_obj, = ax_line.plot([], [], lw=1, color="tab:green") - ax_line.set_title("Сырые данные", pad=1) - ax_line.set_xlabel("F") - ax_line.set_ylabel("") - channel_text = ax_line.text( - 0.98, - 0.98, - "", - transform=ax_line.transAxes, - ha="right", - va="top", - fontsize=9, - family="monospace", - ) - - # Линейный график спектра текущего свипа - fft_line_obj, = ax_fft.plot([], [], lw=1) - ax_fft.set_title("FFT", pad=1) - ax_fft.set_xlabel("X") - ax_fft.set_ylabel("Амплитуда, дБ") - - # Диапазон по Y для последнего свипа: авто по умолчанию (поддерживает отрицательные значения) - fixed_ylim: Optional[Tuple[float, float]] = None - # CLI переопределение при необходимости - if args.ylim: - try: - y0, y1 = args.ylim.split(",") - fixed_ylim = (float(y0), float(y1)) - except Exception: - sys.stderr.write("[warn] Некорректный формат --ylim, игнорирую. Ожидалось min,max\n") - if fixed_ylim is not None: - ax_line.set_ylim(fixed_ylim) - - # Водопад (будет инициализирован при первом свипе) - img_obj = ax_img.imshow( - np.zeros((1, 1), dtype=np.float32), - aspect="auto", - interpolation="nearest", - origin="lower", - cmap=args.cmap, - ) - ax_img.set_title("Сырые данные", pad=12) - ax_img.set_xlabel("") - ax_img.set_ylabel("частота") - # Не показываем численные значения по времени на водопаде сырых данных - try: - ax_img.tick_params(axis="x", labelbottom=False) - except Exception: - pass - - # Водопад спектров - img_fft_obj = ax_spec.imshow( - np.zeros((1, 1), dtype=np.float32), - aspect="auto", - interpolation="nearest", - origin="lower", - cmap=args.cmap, - ) - ax_spec.set_title("B-scan (дБ)", pad=12) - ax_spec.set_xlabel("") - ax_spec.set_ylabel("расстояние") - # Не показываем численные значения по времени на B-scan - try: - ax_spec.tick_params(axis="x", labelbottom=False) - except Exception: - pass - - def _normalize_sweep(raw: np.ndarray, calib: np.ndarray) -> np.ndarray: - return _normalize_by_calib(raw, calib, norm_type=norm_type) - - def _set_calib_enabled(): - nonlocal calib_enabled, current_sweep_norm - try: - calib_enabled = bool(cb.get_status()[0]) if cb is not None else False - except Exception: - calib_enabled = False - if calib_enabled and current_sweep_raw is not None and last_calib_sweep is not None: - current_sweep_norm = _normalize_sweep(current_sweep_raw, last_calib_sweep) - else: - current_sweep_norm = None - - # Слайдеры для управления осью Y B-scan (мин/макс) и контрастом - try: - ax_smin = fig.add_axes([0.92, 0.55, 0.02, 0.35]) - ax_smax = fig.add_axes([0.95, 0.55, 0.02, 0.35]) - ax_sctr = fig.add_axes([0.98, 0.55, 0.02, 0.35]) - ax_cb = fig.add_axes([0.92, 0.45, 0.08, 0.08]) - ymin_slider = Slider(ax_smin, "Y min", 0, max(1, fft_bins - 1), valinit=0, valstep=1, orientation="vertical") - ymax_slider = Slider(ax_smax, "Y max", 0, max(1, fft_bins - 1), valinit=max(1, fft_bins - 1), valstep=1, orientation="vertical") - contrast_slider = Slider(ax_sctr, "Int max", 0, 100, valinit=100, valstep=1, orientation="vertical") - cb = CheckButtons(ax_cb, ["калибровка"], [False]) - - def _on_ylim_change(_val): - try: - y0 = int(min(ymin_slider.val, ymax_slider.val)) - y1 = int(max(ymin_slider.val, ymax_slider.val)) - ax_spec.set_ylim(y0, y1) - fig.canvas.draw_idle() - except Exception: - pass - - ymin_slider.on_changed(_on_ylim_change) - ymax_slider.on_changed(_on_ylim_change) - # Контраст влияет на верхнюю границу цветовой шкалы (процент от авто-диапазона) - contrast_slider.on_changed(lambda _v: fig.canvas.draw_idle()) - cb.on_clicked(lambda _v: _set_calib_enabled()) - except Exception: - pass - - # Для контроля частоты обновления - max_fps = max(1.0, float(args.max_fps)) - interval_ms = int(1000.0 / max_fps) - frames_since_ylim_update = 0 - - - def ensure_buffer(_w: int): - nonlocal ring, width, head, x_shared, ring_fft, freq_shared, ring_time - if ring is not None: - return - width = WF_WIDTH - x_shared = np.arange(width, dtype=np.int32) - ring = np.full((max_sweeps, width), np.nan, dtype=np.float32) - ring_time = np.full((max_sweeps,), np.nan, dtype=np.float64) - head = 0 - # Обновляем изображение под новые размеры: время по X (горизонталь), X по Y - img_obj.set_data(np.zeros((width, max_sweeps), dtype=np.float32)) - img_obj.set_extent((0, max_sweeps - 1, 0, width - 1 if width > 0 else 1)) - ax_img.set_xlim(0, max_sweeps - 1) - ax_img.set_ylim(0, max(1, width - 1)) - # FFT буферы: время по X, бин по Y - ring_fft = np.full((max_sweeps, fft_bins), np.nan, dtype=np.float32) - img_fft_obj.set_data(np.zeros((fft_bins, max_sweeps), dtype=np.float32)) - img_fft_obj.set_extent((0, max_sweeps - 1, 0, fft_bins - 1)) - ax_spec.set_xlim(0, max_sweeps - 1) - ax_spec.set_ylim(0, max(1, fft_bins - 1)) - freq_shared = np.arange(fft_bins, dtype=np.int32) - - def _visible_levels_matplotlib(data: np.ndarray, axis) -> Optional[Tuple[float, float]]: - """(vmin, vmax) по текущей видимой области imshow (без накопления по времени).""" - if data.size == 0: - return None - ny, nx = data.shape[0], data.shape[1] - try: - x0, x1 = axis.get_xlim() - y0, y1 = axis.get_ylim() - except Exception: - x0, x1 = 0.0, float(nx - 1) - y0, y1 = 0.0, float(ny - 1) - xmin, xmax = sorted((float(x0), float(x1))) - ymin, ymax = sorted((float(y0), float(y1))) - ix0 = max(0, min(nx - 1, int(np.floor(xmin)))) - ix1 = max(0, min(nx - 1, int(np.ceil(xmax)))) - iy0 = max(0, min(ny - 1, int(np.floor(ymin)))) - iy1 = max(0, min(ny - 1, int(np.ceil(ymax)))) - if ix1 < ix0: - ix1 = ix0 - if iy1 < iy0: - iy1 = iy0 - sub = data[iy0 : iy1 + 1, ix0 : ix1 + 1] - finite = np.isfinite(sub) - if not finite.any(): - return None - vals = sub[finite] - vmin = float(np.min(vals)) - vmax = float(np.max(vals)) - if not (np.isfinite(vmin) and np.isfinite(vmax)) or vmin == vmax: - return None - return (vmin, vmax) - - def push_sweep(s: np.ndarray): - nonlocal ring, head, ring_fft, y_min_fft, y_max_fft, ring_time - if s is None or s.size == 0 or ring is None: - return - # Нормализуем длину до фиксированной ширины - w = ring.shape[1] - row = np.full((w,), np.nan, dtype=np.float32) - take = min(w, s.size) - row[:take] = s[:take] - ring[head, :] = row - if ring_time is not None: - ring_time[head] = time.time() - head = (head + 1) % ring.shape[0] - # FFT строка (дБ) - if ring_fft is not None: - bins = ring_fft.shape[1] - # Подготовка входа FFT_LEN, замена NaN на 0 - take_fft = min(int(s.size), FFT_LEN) - if take_fft <= 0: - fft_row = np.full((bins,), np.nan, dtype=np.float32) - else: - fft_in = np.zeros((FFT_LEN,), dtype=np.float32) - seg = s[:take_fft] - if isinstance(seg, np.ndarray): - seg = np.nan_to_num(seg, nan=0.0).astype(np.float32, copy=False) - else: - seg = np.asarray(seg, dtype=np.float32) - seg = np.nan_to_num(seg, nan=0.0) - # Окно Хэннинга - win = np.hanning(take_fft).astype(np.float32) - fft_in[:take_fft] = seg * win - spec = np.fft.rfft(fft_in) - mag = np.abs(spec).astype(np.float32) - fft_row = 20.0 * np.log10(mag + 1e-9) - if fft_row.shape[0] != bins: - # rfft длиной FFT_LEN даёт bins == FFT_LEN//2+1 - fft_row = fft_row[:bins] - ring_fft[(head - 1) % ring_fft.shape[0], :] = fft_row - # Экстремумы для цветовой шкалы - fr_min = np.nanmin(fft_row) - fr_max = np.nanmax(fft_row) - fr_max = np.nanpercentile(fft_row, 90) - if y_min_fft is None or (not np.isnan(fr_min) and fr_min < y_min_fft): - y_min_fft = float(fr_min) - if y_max_fft is None or (not np.isnan(fr_max) and fr_max > y_max_fft): - y_max_fft = float(fr_max) - - def drain_queue(): - nonlocal current_sweep_raw, current_sweep_norm, current_info, last_calib_sweep - drained = 0 - while True: - try: - s, info = q.get_nowait() - except Empty: - break - drained += 1 - current_sweep_raw = s - current_info = info - ch = 0 - try: - ch = int(info.get("ch", 0)) if isinstance(info, dict) else 0 - except Exception: - ch = 0 - if ch == 0: - last_calib_sweep = s - current_sweep_norm = None - sweep_for_proc = s - else: - if calib_enabled and last_calib_sweep is not None: - current_sweep_norm = _normalize_sweep(s, last_calib_sweep) - sweep_for_proc = current_sweep_norm - else: - current_sweep_norm = None - sweep_for_proc = s - ensure_buffer(s.size) - push_sweep(sweep_for_proc) - return drained - - def make_display_ring(): - # Возвращаем буфер с правильным порядком по времени (старые→новые) и осью времени по X - if ring is None: - return np.zeros((1, 1), dtype=np.float32) - base = ring if head == 0 else np.roll(ring, -head, axis=0) - return base.T # (width, time) - - def make_display_times(): - if ring_time is None: - return None - base_t = ring_time if head == 0 else np.roll(ring_time, -head) - return base_t - - def _subtract_recent_mean_fft(disp_fft: np.ndarray) -> np.ndarray: - """Вычесть среднее по каждой частоте за последние spec_mean_sec секунд.""" - if spec_mean_sec <= 0.0: - return disp_fft - disp_times = make_display_times() - if disp_times is None: - return disp_fft - now_t = time.time() - mask = np.isfinite(disp_times) & (disp_times >= (now_t - spec_mean_sec)) - if not np.any(mask): - return disp_fft - try: - mean_spec = np.nanmean(disp_fft[:, mask], axis=1) - except Exception: - return disp_fft - mean_spec = np.nan_to_num(mean_spec, nan=0.0) - return disp_fft - mean_spec[:, None] - - def make_display_ring_fft(): - if ring_fft is None: - return np.zeros((1, 1), dtype=np.float32) - base = ring_fft if head == 0 else np.roll(ring_fft, -head, axis=0) - return base.T # (bins, time) - - def update(_frame): - nonlocal frames_since_ylim_update - changed = drain_queue() > 0 - - # Обновление линии последнего свипа - if current_sweep_raw is not None: - if x_shared is not None and current_sweep_raw.size <= x_shared.size: - xs = x_shared[: current_sweep_raw.size] - else: - xs = np.arange(current_sweep_raw.size, dtype=np.int32) - def _norm_to_max(data): - m = float(np.nanmax(np.abs(data))) - return data / m if m > 0.0 else data - line_obj.set_data(xs, _norm_to_max(current_sweep_raw)) - if last_calib_sweep is not None: - line_calib_obj.set_data(xs[: last_calib_sweep.size], _norm_to_max(last_calib_sweep)) - else: - line_calib_obj.set_data([], []) - if current_sweep_norm is not None: - line_norm_obj.set_data(xs[: current_sweep_norm.size], _norm_to_max(current_sweep_norm)) - else: - line_norm_obj.set_data([], []) - # Лимиты по X постоянные под текущую ширину - ax_line.set_xlim(0, max(1, current_sweep_raw.size - 1)) - # Фиксированные Y-лимиты после нормировки на максимум - if fixed_ylim is None: - ax_line.set_ylim(-1.05, 1.05) - ax_line.set_ylabel("/ max") - - # Обновление спектра текущего свипа - sweep_for_fft = current_sweep_norm if current_sweep_norm is not None else current_sweep_raw - take_fft = min(int(sweep_for_fft.size), FFT_LEN) - if take_fft > 0 and freq_shared is not None: - fft_in = np.zeros((FFT_LEN,), dtype=np.float32) - seg = np.nan_to_num(sweep_for_fft[:take_fft], nan=0.0).astype(np.float32, copy=False) - win = np.hanning(take_fft).astype(np.float32) - fft_in[:take_fft] = seg * win - spec = np.fft.rfft(fft_in) - mag = np.abs(spec).astype(np.float32) - fft_vals = 20.0 * np.log10(mag + 1e-9) - xs_fft = freq_shared - if fft_vals.size > xs_fft.size: - fft_vals = fft_vals[: xs_fft.size] - fft_line_obj.set_data(xs_fft[: fft_vals.size], fft_vals) - # Авто-диапазон по Y для спектра - if np.isfinite(np.nanmin(fft_vals)) and np.isfinite(np.nanmax(fft_vals)): - ax_fft.set_xlim(0, max(1, xs_fft.size - 1)) - ax_fft.set_ylim(float(np.nanmin(fft_vals)), float(np.nanmax(fft_vals))) - - # Обновление водопада - if changed and ring is not None: - disp = make_display_ring() - # Новые данные справа: без реверса - img_obj.set_data(disp) - # Подписи времени не обновляем динамически (оставляем авто-тики) - # Авто-уровни: по видимой области (не накапливаем за всё время) - levels = _visible_levels_matplotlib(disp, ax_img) - if levels is not None: - img_obj.set_clim(vmin=levels[0], vmax=levels[1]) - - # Обновление водопада спектров - if changed and ring_fft is not None: - disp_fft = make_display_ring_fft() - disp_fft = _subtract_recent_mean_fft(disp_fft) - # Новые данные справа: без реверса - img_fft_obj.set_data(disp_fft) - # Подписи времени не обновляем динамически (оставляем авто-тики) - # Автодиапазон по среднему спектру за видимый интервал (как в хорошей версии) - try: - # disp_fft имеет форму (bins, time); берём среднее по времени - mean_spec = np.nanmean(disp_fft, axis=1) - vmin_v = float(np.nanmin(mean_spec)) - vmax_v = float(np.nanmax(mean_spec)) - except Exception: - vmin_v = vmax_v = None - # Если средние не дают валидный диапазон — используем процентильную обрезку (если задана) - if (vmin_v is None or not np.isfinite(vmin_v)) or (vmax_v is None or not np.isfinite(vmax_v)) or vmin_v == vmax_v: - if spec_clip is not None: - try: - vmin_v = float(np.nanpercentile(disp_fft, spec_clip[0])) - vmax_v = float(np.nanpercentile(disp_fft, spec_clip[1])) - except Exception: - vmin_v = vmax_v = None - # Фолбэк к отслеживаемым минимум/максимумам - if (vmin_v is None or not np.isfinite(vmin_v)) or (vmax_v is None or not np.isfinite(vmax_v)) or vmin_v == vmax_v: - if y_min_fft is not None and y_max_fft is not None and np.isfinite(y_min_fft) and np.isfinite(y_max_fft) and y_min_fft != y_max_fft: - vmin_v, vmax_v = y_min_fft, y_max_fft - if vmin_v is not None and vmax_v is not None and vmin_v != vmax_v: - # Применим скалирование контрастом (верхняя граница) - try: - c = float(contrast_slider.val) / 100.0 if contrast_slider is not None else 1.0 - except Exception: - c = 1.0 - vmax_eff = vmin_v + c * (vmax_v - vmin_v) - img_fft_obj.set_clim(vmin=vmin_v, vmax=vmax_eff) - - if changed and current_info: - status_text.set_text(_format_status_kv(current_info)) - chs = current_info.get("chs") if isinstance(current_info, dict) else None - if chs is None: - chs = current_info.get("ch") if isinstance(current_info, dict) else None - if chs is None: - channel_text.set_text("") - else: - try: - if isinstance(chs, (list, tuple, set)): - ch_list = sorted(int(v) for v in chs) - ch_text_val = ", ".join(str(v) for v in ch_list) - else: - ch_text_val = str(int(chs)) - channel_text.set_text(f"chs {ch_text_val}") - except Exception: - channel_text.set_text(f"chs {chs}") - - # Возвращаем обновлённые артисты - return ( - line_obj, - line_calib_obj, - line_norm_obj, - img_obj, - fft_line_obj, - img_fft_obj, - status_text, - channel_text, - ) - - ani = FuncAnimation(fig, update, interval=interval_ms, blit=False) - - plt.show() - # Нормальное завершение при закрытии окна - stop_event.set() - reader.join(timeout=1.0) - - -def run_pyqtgraph(args): - """Быстрый GUI на PyQtGraph. Требует pyqtgraph и PyQt5/PySide6.""" - try: - import pyqtgraph as pg - from PyQt5 import QtCore, QtWidgets # noqa: F401 - except Exception: - # Возможно установлена PySide6 - try: - import pyqtgraph as pg - from PySide6 import QtCore, QtWidgets # noqa: F401 - except Exception as e: - raise RuntimeError( - "pyqtgraph/PyQt5(Pyside6) не найдены. Установите: pip install pyqtgraph PyQt5" - ) from e - - # Очередь завершённых свипов и поток чтения - q: Queue[SweepPacket] = Queue(maxsize=1000) - stop_event = threading.Event() - reader = SweepReader(args.port, args.baud, q, stop_event, fancy=bool(args.fancy)) - reader.start() - - # Настройки скорости - max_sweeps = int(max(10, args.max_sweeps)) - max_fps = max(1.0, float(args.max_fps)) - interval_ms = int(1000.0 / max_fps) - - # PyQtGraph настройки - pg.setConfigOptions(useOpenGL=True, antialias=False) - app = pg.mkQApp(args.title) - win = pg.GraphicsLayoutWidget(show=True, title=args.title) - win.resize(1200, 600) - - # Плот последнего свипа (слева-сверху) - p_line = win.addPlot(row=0, col=0, title="Сырые данные") - p_line.showGrid(x=True, y=True, alpha=0.3) - curve = p_line.plot(pen=pg.mkPen((80, 120, 255), width=1)) - curve_calib = p_line.plot(pen=pg.mkPen((220, 60, 60), width=1)) - curve_norm = p_line.plot(pen=pg.mkPen((60, 180, 90), width=1)) - p_line.setLabel("bottom", "X") - p_line.setLabel("left", "Y") - ch_text = pg.TextItem("", anchor=(1, 1)) - ch_text.setZValue(10) - p_line.addItem(ch_text) - - # Водопад (справа-сверху) - p_img = win.addPlot(row=0, col=1, title="Сырые данные водопад") - p_img.invertY(False) - p_img.showGrid(x=False, y=False) - p_img.setLabel("bottom", "Время, с (новое справа)") - try: - p_img.getAxis("bottom").setStyle(showValues=False) - except Exception: - pass - p_img.setLabel("left", "X (0 снизу)") - img = pg.ImageItem() - p_img.addItem(img) - - # FFT (слева-снизу) - p_fft = win.addPlot(row=1, col=0, title="FFT") - p_fft.showGrid(x=True, y=True, alpha=0.3) - curve_fft = p_fft.plot(pen=pg.mkPen((255, 120, 80), width=1)) - p_fft.setLabel("bottom", "Бин") - p_fft.setLabel("left", "Амплитуда, дБ") - - # Водопад спектров (справа-снизу) - p_spec = win.addPlot(row=1, col=1, title="B-scan (дБ)") - p_spec.invertY(True) - p_spec.showGrid(x=False, y=False) - p_spec.setLabel("bottom", "Время, с (новое справа)") - try: - p_spec.getAxis("bottom").setStyle(showValues=False) - except Exception: - pass - p_spec.setLabel("left", "Бин (0 снизу)") - img_fft = pg.ImageItem() - p_spec.addItem(img_fft) - - # Чекбокс калибровки - calib_cb = QtWidgets.QCheckBox("калибровка") - cb_proxy = QtWidgets.QGraphicsProxyWidget() - cb_proxy.setWidget(calib_cb) - win.addItem(cb_proxy, row=2, col=1) - - # Статусная строка (внизу окна) - status = pg.LabelItem(justify="left") - win.addItem(status, row=3, col=0, colspan=2) - - # Состояние - ring: Optional[np.ndarray] = None - ring_time: Optional[np.ndarray] = None - head = 0 - width: Optional[int] = None - x_shared: Optional[np.ndarray] = None - current_sweep_raw: Optional[np.ndarray] = None - current_sweep_norm: Optional[np.ndarray] = None - last_calib_sweep: Optional[np.ndarray] = None - current_info: Optional[SweepInfo] = None - # Авто-уровни цветовой шкалы водопада сырых данных пересчитываются по видимой области. - # Для спектров - fft_bins = FFT_LEN // 2 + 1 - ring_fft: Optional[np.ndarray] = None - freq_shared: Optional[np.ndarray] = None - y_min_fft, y_max_fft = None, None - # Параметры контраста водопада спектров (процентильная обрезка) - spec_clip = _parse_spec_clip(getattr(args, "spec_clip", None)) - spec_mean_sec = float(getattr(args, "spec_mean_sec", 0.0)) - calib_enabled = False - norm_type = str(getattr(args, "norm_type", "projector")).strip().lower() - # Диапазон по Y: авто по умолчанию (поддерживает отрицательные значения) - fixed_ylim: Optional[Tuple[float, float]] = None - if args.ylim: - try: - y0, y1 = args.ylim.split(",") - fixed_ylim = (float(y0), float(y1)) - except Exception: - pass - if fixed_ylim is not None: - p_line.setYRange(fixed_ylim[0], fixed_ylim[1], padding=0) - - def _normalize_sweep(raw: np.ndarray, calib: np.ndarray) -> np.ndarray: - return _normalize_by_calib(raw, calib, norm_type=norm_type) - - def _set_calib_enabled(): - nonlocal calib_enabled, current_sweep_norm - try: - calib_enabled = bool(calib_cb.isChecked()) - except Exception: - calib_enabled = False - if calib_enabled and current_sweep_raw is not None and last_calib_sweep is not None: - current_sweep_norm = _normalize_sweep(current_sweep_raw, last_calib_sweep) - else: - current_sweep_norm = None - - try: - calib_cb.stateChanged.connect(lambda _v: _set_calib_enabled()) - except Exception: - pass - - def ensure_buffer(_w: int): - nonlocal ring, ring_time, head, width, x_shared, ring_fft, freq_shared - if ring is not None: - return - width = WF_WIDTH - x_shared = np.arange(width, dtype=np.int32) - ring = np.full((max_sweeps, width), np.nan, dtype=np.float32) - ring_time = np.full((max_sweeps,), np.nan, dtype=np.float64) - head = 0 - # Водопад: время по оси X, X по оси Y - img.setImage(ring.T, autoLevels=False) - p_img.setRange(xRange=(0, max_sweeps - 1), yRange=(0, max(1, width - 1)), padding=0) - p_line.setXRange(0, max(1, width - 1), padding=0) - # FFT: время по оси X, бин по оси Y - ring_fft = np.full((max_sweeps, fft_bins), np.nan, dtype=np.float32) - img_fft.setImage(ring_fft.T, autoLevels=False) - p_spec.setRange(xRange=(0, max_sweeps - 1), yRange=(0, max(1, fft_bins - 1)), padding=0) - p_fft.setXRange(0, max(1, fft_bins - 1), padding=0) - freq_shared = np.arange(fft_bins, dtype=np.int32) - - def _visible_levels_pyqtgraph(data: np.ndarray) -> Optional[Tuple[float, float]]: - """(vmin, vmax) по текущей видимой области ImageItem (без накопления по времени).""" - if data.size == 0: - return None - ny, nx = data.shape[0], data.shape[1] - try: - (x0, x1), (y0, y1) = p_img.viewRange() - except Exception: - x0, x1 = 0.0, float(nx - 1) - y0, y1 = 0.0, float(ny - 1) - xmin, xmax = sorted((float(x0), float(x1))) - ymin, ymax = sorted((float(y0), float(y1))) - ix0 = max(0, min(nx - 1, int(np.floor(xmin)))) - ix1 = max(0, min(nx - 1, int(np.ceil(xmax)))) - iy0 = max(0, min(ny - 1, int(np.floor(ymin)))) - iy1 = max(0, min(ny - 1, int(np.ceil(ymax)))) - if ix1 < ix0: - ix1 = ix0 - if iy1 < iy0: - iy1 = iy0 - sub = data[iy0 : iy1 + 1, ix0 : ix1 + 1] - finite = np.isfinite(sub) - if not finite.any(): - return None - vals = sub[finite] - vmin = float(np.min(vals)) - vmax = float(np.max(vals)) - if not (np.isfinite(vmin) and np.isfinite(vmax)) or vmin == vmax: - return None - return (vmin, vmax) - - def push_sweep(s: np.ndarray): - nonlocal ring, ring_time, head, ring_fft, y_min_fft, y_max_fft - if s is None or s.size == 0 or ring is None: - return - w = ring.shape[1] - row = np.full((w,), np.nan, dtype=np.float32) - take = min(w, s.size) - row[:take] = s[:take] - ring[head, :] = row - if ring_time is not None: - ring_time[head] = time.time() - head = (head + 1) % ring.shape[0] - # FFT строка (дБ) - if ring_fft is not None: - bins = ring_fft.shape[1] - take_fft = min(int(s.size), FFT_LEN) - if take_fft > 0: - fft_in = np.zeros((FFT_LEN,), dtype=np.float32) - seg = np.nan_to_num(s[:take_fft], nan=0.0).astype(np.float32, copy=False) - win = np.hanning(take_fft).astype(np.float32) - fft_in[:take_fft] = seg * win - spec = np.fft.rfft(fft_in) - mag = np.abs(spec).astype(np.float32) - fft_row = 20.0 * np.log10(mag + 1e-9) - if fft_row.shape[0] != bins: - fft_row = fft_row[:bins] - else: - fft_row = np.full((bins,), np.nan, dtype=np.float32) - ring_fft[(head - 1) % ring_fft.shape[0], :] = fft_row - fr_min = np.nanmin(fft_row) - fr_max = np.nanmax(fft_row) - if y_min_fft is None or (not np.isnan(fr_min) and fr_min < y_min_fft): - y_min_fft = float(fr_min) - if y_max_fft is None or (not np.isnan(fr_max) and fr_max > y_max_fft): - y_max_fft = float(fr_max) - - def drain_queue(): - nonlocal current_sweep_raw, current_sweep_norm, current_info, last_calib_sweep - drained = 0 - while True: - try: - s, info = q.get_nowait() - except Empty: - break - drained += 1 - current_sweep_raw = s - current_info = info - ch = 0 - try: - ch = int(info.get("ch", 0)) if isinstance(info, dict) else 0 - except Exception: - ch = 0 - if ch == 0: - last_calib_sweep = s - current_sweep_norm = None - sweep_for_proc = s - else: - if calib_enabled and last_calib_sweep is not None: - current_sweep_norm = _normalize_sweep(s, last_calib_sweep) - sweep_for_proc = current_sweep_norm - else: - current_sweep_norm = None - sweep_for_proc = s - ensure_buffer(s.size) - push_sweep(sweep_for_proc) - return drained - - # Попытка применить LUT из колормэпа (если доступен) - try: - cm_mod = getattr(pg, "colormap", None) - if cm_mod is not None: - cm = cm_mod.get(args.cmap) - img.setLookupTable(cm.getLookupTable(0.0, 1.0, 256)) - except Exception: - pass - - def update(): - changed = drain_queue() > 0 - if current_sweep_raw is not None and x_shared is not None: - if current_sweep_raw.size <= x_shared.size: - xs = x_shared[: current_sweep_raw.size] - else: - xs = np.arange(current_sweep_raw.size) - def _norm_to_max(data): - m = float(np.nanmax(np.abs(data))) - return data / m if m > 0.0 else data - curve.setData(xs, _norm_to_max(current_sweep_raw), autoDownsample=True) - if last_calib_sweep is not None: - curve_calib.setData(xs[: last_calib_sweep.size], _norm_to_max(last_calib_sweep), autoDownsample=True) - else: - curve_calib.setData([], []) - if current_sweep_norm is not None: - curve_norm.setData(xs[: current_sweep_norm.size], _norm_to_max(current_sweep_norm), autoDownsample=True) - else: - curve_norm.setData([], []) - if fixed_ylim is None: - p_line.setYRange(-1.05, 1.05, padding=0) - p_line.setLabel("left", "/ max") - - # Обновим спектр - sweep_for_fft = current_sweep_norm if current_sweep_norm is not None else current_sweep_raw - take_fft = min(int(sweep_for_fft.size), FFT_LEN) - if take_fft > 0 and freq_shared is not None: - fft_in = np.zeros((FFT_LEN,), dtype=np.float32) - seg = np.nan_to_num(sweep_for_fft[:take_fft], nan=0.0).astype(np.float32, copy=False) - win = np.hanning(take_fft).astype(np.float32) - fft_in[:take_fft] = seg * win - spec = np.fft.rfft(fft_in) - mag = np.abs(spec).astype(np.float32) - fft_vals = 20.0 * np.log10(mag + 1e-9) - xs_fft = freq_shared - if fft_vals.size > xs_fft.size: - fft_vals = fft_vals[: xs_fft.size] - curve_fft.setData(xs_fft[: fft_vals.size], fft_vals) - p_fft.setYRange(float(np.nanmin(fft_vals)), float(np.nanmax(fft_vals)), padding=0) - - if changed and ring is not None: - disp = ring if head == 0 else np.roll(ring, -head, axis=0) - disp = disp.T[:, ::-1] # (width, time with newest at left) - levels = _visible_levels_pyqtgraph(disp) - if levels is not None: - img.setImage(disp, autoLevels=False, levels=levels) - else: - img.setImage(disp, autoLevels=False) - - if changed and current_info: - try: - status.setText(_format_status_kv(current_info)) - except Exception: - pass - try: - chs = current_info.get("chs") if isinstance(current_info, dict) else None - if chs is None: - chs = current_info.get("ch") if isinstance(current_info, dict) else None - if chs is None: - ch_text.setText("") - else: - if isinstance(chs, (list, tuple, set)): - ch_list = sorted(int(v) for v in chs) - ch_text_val = ", ".join(str(v) for v in ch_list) - else: - ch_text_val = str(int(chs)) - ch_text.setText(f"chs {ch_text_val}") - (x0, x1), (y0, y1) = p_line.viewRange() - dx = 0.01 * max(1.0, float(x1 - x0)) - dy = 0.01 * max(1.0, float(y1 - y0)) - ch_text.setPos(float(x1 - dx), float(y1 - dy)) - except Exception: - pass - - if changed and ring_fft is not None: - disp_fft = ring_fft if head == 0 else np.roll(ring_fft, -head, axis=0) - disp_fft = disp_fft.T[:, ::-1] - if spec_mean_sec > 0.0 and ring_time is not None: - disp_times = ring_time if head == 0 else np.roll(ring_time, -head) - disp_times = disp_times[::-1] - now_t = time.time() - mask = np.isfinite(disp_times) & (disp_times >= (now_t - spec_mean_sec)) - if np.any(mask): - try: - mean_spec = np.nanmean(disp_fft[:, mask], axis=1) - mean_spec = np.nan_to_num(mean_spec, nan=0.0) - disp_fft = disp_fft - mean_spec[:, None] - except Exception: - pass - # Автодиапазон по среднему спектру за видимый интервал (как в хорошей версии) - levels = None - try: - mean_spec = np.nanmean(disp_fft, axis=1) - vmin_v = float(np.nanmin(mean_spec)) - vmax_v = float(np.nanmax(mean_spec)) - if np.isfinite(vmin_v) and np.isfinite(vmax_v) and vmin_v != vmax_v: - levels = (vmin_v, vmax_v) - except Exception: - levels = None - # Процентильная обрезка как запасной вариант - if levels is None and spec_clip is not None: - try: - vmin_v = float(np.nanpercentile(disp_fft, spec_clip[0])) - vmax_v = float(np.nanpercentile(disp_fft, spec_clip[1])) - if np.isfinite(vmin_v) and np.isfinite(vmax_v) and vmin_v != vmax_v: - levels = (vmin_v, vmax_v) - except Exception: - levels = None - # Ещё один фолбэк — глобальные накопленные мин/макс - if levels is None and y_min_fft is not None and y_max_fft is not None and np.isfinite(y_min_fft) and np.isfinite(y_max_fft) and y_min_fft != y_max_fft: - levels = (y_min_fft, y_max_fft) - if levels is not None: - img_fft.setImage(disp_fft, autoLevels=False, levels=levels) - else: - img_fft.setImage(disp_fft, autoLevels=False) - - timer = pg.QtCore.QTimer() - timer.timeout.connect(update) - timer.start(interval_ms) - - def on_quit(): - stop_event.set() - reader.join(timeout=1.0) - - app.aboutToQuit.connect(on_quit) - win.show() - exec_fn = getattr(app, "exec_", None) or getattr(app, "exec", None) - exec_fn() - # На случай если aboutToQuit не сработал - on_quit() - - -if __name__ == "__main__": - main() diff --git a/rfg_adc_plotter/constants.py b/rfg_adc_plotter/constants.py index 47ca55c..1ccfef8 100644 --- a/rfg_adc_plotter/constants.py +++ b/rfg_adc_plotter/constants.py @@ -1,5 +1,13 @@ WF_WIDTH = 1000 # максимальное число точек в ряду водопада -FFT_LEN = 1024 # длина БПФ для спектра/водопада спектров +FFT_LEN = 2048 # длина БПФ для спектра/водопада спектров # Порог для инверсии сырых данных: если среднее значение свипа ниже порога — # считаем, что сигнал «меньше нуля» и домножаем свип на -1 DATA_INVERSION_THRESHOLD = 10.0 + +# Параметры IFFT-спектра (временной профиль из спектра 3.2..14.3 ГГц) +# Двусторонний спектр формируется как: [нули -14.3..-3.2 | нули -3.2..+3.2 | данные +3.2..+14.3] +ZEROS_LOW = 758 # нули от -14.3 до -3.2 ГГц +ZEROS_MID = 437 # нули от -3.2 до +3.2 ГГц +SWEEP_LEN = 758 # ожидаемая длина свипа (3.2 → 14.3 ГГц) +FREQ_SPAN_GHZ = 28.6 # полная двусторонняя полоса (-14.3 .. +14.3 ГГц) +IFFT_LEN = ZEROS_LOW + ZEROS_MID + SWEEP_LEN # = 1953 diff --git a/rfg_adc_plotter/gui/matplotlib_backend.py b/rfg_adc_plotter/gui/matplotlib_backend.py index 1f80a8a..93eb0a4 100644 --- a/rfg_adc_plotter/gui/matplotlib_backend.py +++ b/rfg_adc_plotter/gui/matplotlib_backend.py @@ -243,15 +243,14 @@ def run_matplotlib(args): ax_line.set_ylim(-1.05, 1.05) ax_line.set_ylabel("/ max") - # Спектр — используем уже вычисленный в ring FFT - if ring.last_fft_vals is not None and ring.freq_shared is not None: + # Спектр — используем уже вычисленный в ring IFFT (временной профиль) + if ring.last_fft_vals is not None and ring.fft_time_axis is not None: fft_vals = ring.last_fft_vals - xs_fft = ring.freq_shared - if fft_vals.size > xs_fft.size: - fft_vals = fft_vals[: xs_fft.size] - fft_line_obj.set_data(xs_fft[: fft_vals.size], fft_vals) + xs_fft = ring.fft_time_axis + n = min(fft_vals.size, xs_fft.size) + fft_line_obj.set_data(xs_fft[:n], fft_vals[:n]) if np.isfinite(np.nanmin(fft_vals)) and np.isfinite(np.nanmax(fft_vals)): - ax_fft.set_xlim(0, max(1, xs_fft.size - 1)) + ax_fft.set_xlim(0, float(xs_fft[n - 1])) ax_fft.set_ylim(float(np.nanmin(fft_vals)), float(np.nanmax(fft_vals))) # Водопад сырых данных diff --git a/rfg_adc_plotter/gui/pyqtgraph_backend.py b/rfg_adc_plotter/gui/pyqtgraph_backend.py index 54136be..d2a9ac1 100644 --- a/rfg_adc_plotter/gui/pyqtgraph_backend.py +++ b/rfg_adc_plotter/gui/pyqtgraph_backend.py @@ -7,12 +7,16 @@ from typing import Optional, Tuple import numpy as np +from rfg_adc_plotter.constants import FREQ_SPAN_GHZ, IFFT_LEN from rfg_adc_plotter.io.sweep_reader import SweepReader from rfg_adc_plotter.processing.normalizer import build_calib_envelopes from rfg_adc_plotter.state.app_state import AppState, format_status from rfg_adc_plotter.state.ring_buffer import RingBuffer from rfg_adc_plotter.types import SweepPacket +# Максимальное значение временной оси IFFT в нс +_IFFT_T_MAX_NS = float((IFFT_LEN - 1) / (FREQ_SPAN_GHZ * 1e9) * 1e9) + def _parse_ylim(ylim_str: Optional[str]) -> Optional[Tuple[float, float]]: if not ylim_str: @@ -164,8 +168,8 @@ def run_pyqtgraph(args): p_fft = win.addPlot(row=1, col=0, title="FFT") p_fft.showGrid(x=True, y=True, alpha=0.3) curve_fft = p_fft.plot(pen=pg.mkPen((255, 120, 80), width=1)) - p_fft.setLabel("bottom", "Бин") - p_fft.setLabel("left", "Амплитуда, дБ") + p_fft.setLabel("bottom", "Время, нс") + p_fft.setLabel("left", "Мощность, дБ") # Водопад спектров (справа-снизу) p_spec = win.addPlot(row=1, col=1, title="B-scan (дБ)") @@ -176,7 +180,7 @@ def run_pyqtgraph(args): p_spec.getAxis("bottom").setStyle(showValues=False) except Exception: pass - p_spec.setLabel("left", "Бин (0 снизу)") + p_spec.setLabel("left", "Время, нс") img_fft = pg.ImageItem() p_spec.addItem(img_fft) @@ -197,7 +201,6 @@ def run_pyqtgraph(args): FREQ_MAX = 14.323 def _init_imshow_extents(): - w = ring.width ms = ring.max_sweeps fb = ring.fft_bins img.setImage(ring.ring.T, autoLevels=False) @@ -205,8 +208,9 @@ def run_pyqtgraph(args): p_img.setRange(xRange=(0, ms - 1), yRange=(FREQ_MIN, FREQ_MAX), padding=0) p_line.setXRange(FREQ_MIN, FREQ_MAX, padding=0) img_fft.setImage(ring.ring_fft.T, autoLevels=False) - p_spec.setRange(xRange=(0, ms - 1), yRange=(0, max(1, fb - 1)), padding=0) - p_fft.setXRange(0, max(1, fb - 1), padding=0) + img_fft.setRect(pg.QtCore.QRectF(0.0, 0.0, float(ms), _IFFT_T_MAX_NS)) + p_spec.setRange(xRange=(0, ms - 1), yRange=(0.0, _IFFT_T_MAX_NS), padding=0) + p_fft.setXRange(0.0, _IFFT_T_MAX_NS, padding=0) def _img_rect(ms: int) -> "pg.QtCore.QRectF": return pg.QtCore.QRectF(0.0, FREQ_MIN, float(ms), FREQ_MAX - FREQ_MIN) @@ -245,13 +249,12 @@ def run_pyqtgraph(args): p_line.setYRange(-1.05, 1.05, padding=0) p_line.setLabel("left", "/ max") - # Спектр — используем уже вычисленный в ring FFT - if ring.last_fft_vals is not None and ring.freq_shared is not None: + # Спектр — используем уже вычисленный в ring IFFT (временной профиль) + if ring.last_fft_vals is not None and ring.fft_time_axis is not None: fft_vals = ring.last_fft_vals - xs_fft = ring.freq_shared - if fft_vals.size > xs_fft.size: - fft_vals = fft_vals[: xs_fft.size] - curve_fft.setData(xs_fft[: fft_vals.size], fft_vals) + xs_fft = ring.fft_time_axis + n = min(fft_vals.size, xs_fft.size) + curve_fft.setData(xs_fft[:n], fft_vals[:n]) p_fft.setYRange(float(np.nanmin(fft_vals)), float(np.nanmax(fft_vals)), padding=0) # Позиция подписи канала @@ -290,6 +293,7 @@ def run_pyqtgraph(args): img_fft.setImage(disp_fft, autoLevels=False, levels=levels) else: img_fft.setImage(disp_fft, autoLevels=False) + img_fft.setRect(pg.QtCore.QRectF(0.0, 0.0, float(ring.max_sweeps), _IFFT_T_MAX_NS)) timer = pg.QtCore.QTimer() timer.timeout.connect(update) diff --git a/rfg_adc_plotter/state/ring_buffer.py b/rfg_adc_plotter/state/ring_buffer.py index 1b9ecd5..11f9f09 100644 --- a/rfg_adc_plotter/state/ring_buffer.py +++ b/rfg_adc_plotter/state/ring_buffer.py @@ -5,7 +5,15 @@ from typing import Optional, Tuple import numpy as np -from rfg_adc_plotter.constants import FFT_LEN, WF_WIDTH +from rfg_adc_plotter.constants import ( + FFT_LEN, + FREQ_SPAN_GHZ, + IFFT_LEN, + SWEEP_LEN, + WF_WIDTH, + ZEROS_LOW, + ZEROS_MID, +) class RingBuffer: @@ -17,7 +25,7 @@ class RingBuffer: def __init__(self, max_sweeps: int): self.max_sweeps = max_sweeps - self.fft_bins = FFT_LEN // 2 + 1 + self.fft_bins = IFFT_LEN # = 1953 (полная длина IFFT-результата) # Инициализируются при первом свипе (ensure_init) self.ring: Optional[np.ndarray] = None # (max_sweeps, WF_WIDTH) @@ -26,7 +34,7 @@ class RingBuffer: self.head: int = 0 self.width: Optional[int] = None self.x_shared: Optional[np.ndarray] = None - self.freq_shared: Optional[np.ndarray] = None + self.fft_time_axis: Optional[np.ndarray] = None # временная ось IFFT в нс self.y_min_fft: Optional[float] = None self.y_max_fft: Optional[float] = None # FFT последнего свипа (для отображения без повторного вычисления) @@ -43,7 +51,10 @@ class RingBuffer: self.ring = np.full((self.max_sweeps, self.width), np.nan, dtype=np.float32) self.ring_time = np.full((self.max_sweeps,), np.nan, dtype=np.float64) self.ring_fft = np.full((self.max_sweeps, self.fft_bins), np.nan, dtype=np.float32) - self.freq_shared = np.arange(self.fft_bins, dtype=np.int32) + # Временная ось IFFT: шаг dt = 1/(FREQ_SPAN_GHZ*1e9), переведём в нс + self.fft_time_axis = ( + np.arange(IFFT_LEN, dtype=np.float64) / (FREQ_SPAN_GHZ * 1e9) * 1e9 + ).astype(np.float32) self.head = 0 # Обновляем x_shared если пришёл свип большего размера if self.x_shared is None or sweep_width > self.x_shared.size: @@ -64,20 +75,29 @@ class RingBuffer: self._push_fft(s) def _push_fft(self, s: np.ndarray): - bins = self.ring_fft.shape[1] - take_fft = min(int(s.size), FFT_LEN) - if take_fft <= 0: + bins = self.ring_fft.shape[1] # = IFFT_LEN = 1953 + if s is None or s.size == 0: fft_row = np.full((bins,), np.nan, dtype=np.float32) else: - fft_in = np.zeros((FFT_LEN,), dtype=np.float32) - seg = np.nan_to_num(s[:take_fft], nan=0.0).astype(np.float32, copy=False) - win = np.hanning(take_fft).astype(np.float32) - fft_in[:take_fft] = seg * win - spec = np.fft.rfft(fft_in) - mag = np.abs(spec).astype(np.float32) + # 1. Взять первые SWEEP_LEN отсчётов (остаток — нули если свип короче) + sig = np.zeros(SWEEP_LEN, dtype=np.float32) + take = min(int(s.size), SWEEP_LEN) + seg = np.nan_to_num(s[:take], nan=0.0).astype(np.float32, copy=False) + sig[:take] = seg + + # 2. Собрать двусторонний спектр: + # [ZEROS_LOW нулей | ZEROS_MID нулей | SWEEP_LEN данных] + # = [-14.3..-3.2 ГГц | -3.2..+3.2 ГГц | +3.2..+14.3 ГГц] + data = np.zeros(IFFT_LEN, dtype=np.complex64) + data[ZEROS_LOW + ZEROS_MID:] = sig + + # 3. ifftshift + ifft → временной профиль + spec = np.fft.ifftshift(data) + result = np.fft.ifft(spec) + + # 4. Амплитуда в дБ + mag = np.abs(result).astype(np.float32) fft_row = (20.0 * np.log10(mag + 1e-9)).astype(np.float32) - if fft_row.shape[0] != bins: - fft_row = fft_row[:bins] prev_head = (self.head - 1) % self.ring_fft.shape[0] self.ring_fft[prev_head, :] = fft_row