Files
RFG_stm32_ADC_receiver_GUI/RFG_ADC_dataplotter.py
2025-12-21 14:39:27 +03:00

953 lines
39 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Реалтайм-плоттер для свипов из виртуального COM-порта.
Формат строк:
- "Sweep_start" — начало нового свипа (предыдущий считается завершённым)
- "stp X Y" — точка (индекс X, значение Y), все целые со знаком
Отрисовываются два графика:
- Левый: последний полученный свип (Y vs X)
- Правый: водопад (последние N свипов во времени)
Оптимизации для скорости:
- Парсинг и чтение в фоновой нити
- Анимация с обновлением только данных (без лишнего пересоздания фигур)
- Кольцевой буфер под водопад с фиксированным числом свипов
Зависимости: matplotlib, numpy. PySerial опционален — при его отсутствии
используется сырой доступ к TTY через termios.
"""
import argparse
import io
import os
import sys
import threading
import time
from queue import Queue, Empty, Full
from typing import Optional, Tuple
import numpy as np
WF_WIDTH = 1000 # максимальное число точек в ряду водопада
FFT_LEN = 1024 # длина БПФ для спектра/водопада спектров
def _parse_spec_clip(spec: Optional[str]) -> Optional[Tuple[float, float]]:
"""Разобрать строку вида "low,high" процентов для контрастного отображения водопада спектров.
Возвращает пару (low, high) или None для отключения. Допустимы значения 0..100, low < high.
Ключевые слова отключения: "off", "none", "no".
"""
if not spec:
return None
s = str(spec).strip().lower()
if s in ("off", "none", "no"):
return None
try:
p0, p1 = s.replace(";", ",").split(",")
low = float(p0)
high = float(p1)
if not (0.0 <= low < high <= 100.0):
return None
return (low, high)
except Exception:
return None
def try_open_pyserial(path: str, baud: int, timeout: float):
try:
import serial # type: ignore
except Exception:
return None
try:
ser = serial.Serial(path, baudrate=baud, timeout=timeout)
return ser
except Exception:
return None
class FDReader:
"""Простой враппер чтения строк из файлового дескриптора TTY."""
def __init__(self, fd: int):
# Отдельно буферизуем для корректной readline()
self._fd = fd
raw = os.fdopen(fd, "rb", closefd=False)
self._file = raw
self._buf = io.BufferedReader(raw, buffer_size=65536)
def fileno(self) -> int:
return self._fd
def readline(self) -> bytes:
return self._buf.readline()
def close(self):
try:
self._buf.close()
except Exception:
pass
def open_raw_tty(path: str, baud: int) -> Optional[FDReader]:
"""Открыть TTY без pyserial и настроить порт через termios.
Возвращает FDReader или None при ошибке.
"""
try:
import termios
import tty
except Exception:
return None
try:
fd = os.open(path, os.O_RDONLY | os.O_NOCTTY)
except Exception:
return None
try:
attrs = termios.tcgetattr(fd)
# Установим «сырое» состояние
tty.setraw(fd)
# Скорость
baud_map = {
9600: termios.B9600,
19200: termios.B19200,
38400: termios.B38400,
57600: termios.B57600,
115200: termios.B115200,
230400: getattr(termios, "B230400", None),
460800: getattr(termios, "B460800", None),
}
b = baud_map.get(baud) or termios.B115200
attrs[4] = b # ispeed
attrs[5] = b # ospeed
# VMIN=1, VTIME=0 — блокирующее чтение по байту
cc = attrs[6]
cc[termios.VMIN] = 1
cc[termios.VTIME] = 0
attrs[6] = cc
termios.tcsetattr(fd, termios.TCSANOW, attrs)
except Exception:
try:
os.close(fd)
except Exception:
pass
return None
return FDReader(fd)
class SerialLineSource:
"""Единый интерфейс для чтения строк из порта (pyserial или raw TTY)."""
def __init__(self, path: str, baud: int, timeout: float = 1.0):
self._pyserial = try_open_pyserial(path, baud, timeout)
self._fdreader = None
self._using = "pyserial" if self._pyserial is not None else "raw"
if self._pyserial is None:
self._fdreader = open_raw_tty(path, baud)
if self._fdreader is None:
raise RuntimeError(
f"Не удалось открыть порт '{path}' (pyserial и raw TTY не сработали)"
)
def readline(self) -> bytes:
if self._pyserial is not None:
try:
return self._pyserial.readline()
except Exception:
return b""
else:
try:
return self._fdreader.readline() # type: ignore[union-attr]
except Exception:
return b""
def close(self):
try:
if self._pyserial is not None:
self._pyserial.close()
elif self._fdreader is not None:
self._fdreader.close()
except Exception:
pass
class SerialChunkReader:
"""Быстрое неблокирующее чтение чанков из serial/raw TTY для максимального дренажа буфера."""
def __init__(self, src: SerialLineSource):
self._src = src
self._ser = src._pyserial
self._fd: Optional[int] = None
if self._ser is not None:
# Неблокирующий режим для быстрой откачки
try:
self._ser.timeout = 0
except Exception:
pass
else:
try:
self._fd = src._fdreader.fileno() # type: ignore[union-attr]
try:
os.set_blocking(self._fd, False)
except Exception:
pass
except Exception:
self._fd = None
def read_available(self) -> bytes:
"""Вернёт доступные байты (b"" если данных нет)."""
if self._ser is not None:
try:
n = int(getattr(self._ser, "in_waiting", 0))
except Exception:
n = 0
if n > 0:
try:
return self._ser.read(n)
except Exception:
return b""
return b""
if self._fd is None:
return b""
out = bytearray()
while True:
try:
chunk = os.read(self._fd, 65536)
if not chunk:
break
out += chunk
if len(chunk) < 65536:
break
except BlockingIOError:
break
except Exception:
break
return bytes(out)
class SweepReader(threading.Thread):
"""Фоновый поток: читает строки, формирует завершённые свипы и кладёт в очередь."""
def __init__(self, port_path: str, baud: int, out_queue: Queue, stop_event: threading.Event, fancy: bool = False):
super().__init__(daemon=True)
self._port_path = port_path
self._baud = baud
self._q = out_queue
self._stop = stop_event
self._src: Optional[SerialLineSource] = None
self._fancy = bool(fancy)
def _finalize_current(self, xs, ys):
if not xs:
return
max_x = max(xs)
width = max_x + 1
# Быстрый векторизованный путь
sweep = np.full((width,), np.nan, dtype=np.float32)
try:
idx = np.asarray(xs, dtype=np.int64)
vals = np.asarray(ys, dtype=np.float32)
sweep[idx] = vals
except Exception:
# Запасной путь
for x, y in zip(xs, ys):
if 0 <= x < width:
sweep[x] = float(y)
# Дополнительная обработка пропусков: при --fancy заполняем внутренние разрывы
if self._fancy:
try:
known = ~np.isnan(sweep)
if np.any(known):
known_idx = np.nonzero(known)[0]
# Для каждой пары соседних известных индексов заполним промежуток средним значением
for i0, i1 in zip(known_idx[:-1], known_idx[1:]):
if i1 - i0 > 1:
avg = (sweep[i0] + sweep[i1]) * 0.5
sweep[i0 + 1 : i1] = avg
except Exception:
# В случае ошибки просто оставляем как есть
pass
# Кладём готовый свип (если очередь полна — выбрасываем самый старый)
try:
self._q.put_nowait(sweep)
except Full:
try:
_ = self._q.get_nowait()
except Exception:
pass
try:
self._q.put_nowait(sweep)
except Exception:
pass
def run(self):
# Состояние текущего свипа
xs: list[int] = []
ys: list[int] = []
try:
self._src = SerialLineSource(self._port_path, self._baud, timeout=1.0)
sys.stderr.write(f"[info] Открыл порт {self._port_path} ({self._src._using})\n")
except Exception as e:
sys.stderr.write(f"[error] {e}\n")
return
try:
# Быстрый неблокирующий дренаж порта с разбором по байтам
chunk_reader = SerialChunkReader(self._src)
buf = bytearray()
while not self._stop.is_set():
data = chunk_reader.read_available()
if data:
buf += data
else:
# Короткая уступка CPU, если нет новых данных
time.sleep(0.0005)
continue
# Обрабатываем все полные строки
while True:
nl = buf.find(b"\n")
if nl == -1:
break
line = bytes(buf[:nl])
del buf[: nl + 1]
if line.endswith(b"\r"):
line = line[:-1]
if not line:
continue
if line.startswith(b"Sweep_start"):
self._finalize_current(xs, ys)
xs.clear()
ys.clear()
continue
# stp X Y (оба целые со знаком). Разделяем по любым пробелам/табам.
if len(line) >= 3:
parts = line.split()
if len(parts) >= 3 and parts[0].lower() == b"stp":
try:
x = int(parts[1], 10)
y = int(parts[2], 10) # поддержка знака: "+…" и "-…"
except Exception:
continue
xs.append(x)
ys.append(y)
# Защита от переполнения буфера при отсутствии переводов строки
if len(buf) > 1_000_000:
del buf[:-262144]
finally:
try:
# Завершаем оставшийся свип
self._finalize_current(xs, ys)
except Exception:
pass
try:
if self._src is not None:
self._src.close()
except Exception:
pass
def main():
parser = argparse.ArgumentParser(
description=(
"Читает свипы из виртуального COM-порта и рисует: "
"последний свип и водопад (реалтайм)."
)
)
parser.add_argument("port", help="Путь к порту, например /dev/ttyACM1")
parser.add_argument("--baud", type=int, default=115200, help="Скорость (по умолчанию 115200)")
parser.add_argument("--max-sweeps", type=int, default=200, help="Количество видимых свипов в водопаде")
parser.add_argument("--max-fps", type=float, default=30.0, help="Лимит частоты отрисовки, кадров/с")
parser.add_argument("--cmap", default="viridis", help="Цветовая карта водопада")
parser.add_argument(
"--spec-clip",
default="2,98",
help=(
"Процентильная обрезка уровней водопада спектров, % (min,max). "
"Напр. 2,98. 'off' — отключить"
),
)
parser.add_argument("--title", default="ADC Sweeps", help="Заголовок окна")
parser.add_argument(
"--fancy",
action="store_true",
help="Заполнять выпавшие точки средними значениями между соседними",
)
parser.add_argument(
"--ylim",
type=str,
default=None,
help="Фиксированные Y-пределы для кривой формата min,max (например -1000,1000). По умолчанию авто",
)
parser.add_argument(
"--backend",
choices=["auto", "pg", "mpl"],
default="auto",
help="Графический бэкенд: pyqtgraph (pg) — быстрее; matplotlib (mpl) — совместимый. По умолчанию auto",
)
args = parser.parse_args()
# Попробуем быстрый бэкенд (pyqtgraph) при auto/pg
if args.backend in ("auto", "pg"):
try:
return run_pyqtgraph(args)
except Exception as e:
if args.backend == "pg":
sys.stderr.write(f"[error] PyQtGraph бэкенд недоступен: {e}\n")
sys.exit(1)
# При auto — тихо откатываемся на matplotlib
try:
import matplotlib
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
except Exception as e:
sys.stderr.write(f"[error] Нужны matplotlib и ее зависимости: {e}\n")
sys.exit(1)
# Очередь завершённых свипов и поток чтения
q: Queue[np.ndarray] = Queue(maxsize=1000)
stop_event = threading.Event()
reader = SweepReader(args.port, args.baud, q, stop_event, fancy=bool(args.fancy))
reader.start()
# Графика
fig, axs = plt.subplots(2, 2, figsize=(12, 8))
(ax_line, ax_img), (ax_fft, ax_spec) = axs
fig.canvas.manager.set_window_title(args.title) if hasattr(fig.canvas.manager, "set_window_title") else None
fig.tight_layout()
# Состояние для отображения
current_sweep: Optional[np.ndarray] = None
x_shared: Optional[np.ndarray] = None
width: Optional[int] = None
max_sweeps = int(max(10, args.max_sweeps))
ring = None # type: Optional[np.ndarray]
head = 0
y_min, y_max = None, None
# FFT состояние
fft_bins = FFT_LEN // 2 + 1
ring_fft = None # type: Optional[np.ndarray]
y_min_fft, y_max_fft = None, None
freq_shared: Optional[np.ndarray] = None
# Параметры контраста водопада спектров
spec_clip = _parse_spec_clip(getattr(args, "spec_clip", None))
# Линейный график последнего свипа
line_obj, = ax_line.plot([], [], lw=1)
ax_line.set_title("Сырые данные")
ax_line.set_xlabel("X")
ax_line.set_ylabel("Y")
# Линейный график спектра текущего свипа
fft_line_obj, = ax_fft.plot([], [], lw=1)
ax_fft.set_title("FFT")
ax_fft.set_xlabel("Бин")
ax_fft.set_ylabel("Амплитуда, дБ")
# Диапазон по Y для последнего свипа: авто по умолчанию (поддерживает отрицательные значения)
fixed_ylim: Optional[Tuple[float, float]] = None
# CLI переопределение при необходимости
if args.ylim:
try:
y0, y1 = args.ylim.split(",")
fixed_ylim = (float(y0), float(y1))
except Exception:
sys.stderr.write("[warn] Некорректный формат --ylim, игнорирую. Ожидалось min,max\n")
if fixed_ylim is not None:
ax_line.set_ylim(fixed_ylim)
# Водопад (будет инициализирован при первом свипе)
img_obj = ax_img.imshow(
np.zeros((1, 1), dtype=np.float32),
aspect="auto",
interpolation="nearest",
origin="upper",
cmap=args.cmap,
)
ax_img.set_title("Сырые данные водопад")
ax_img.set_xlabel("X")
ax_img.set_ylabel("Время →")
# Водопад спектров
img_fft_obj = ax_spec.imshow(
np.zeros((1, 1), dtype=np.float32),
aspect="auto",
interpolation="nearest",
origin="upper",
cmap=args.cmap,
)
ax_spec.set_title("B-scan (дБ)")
ax_spec.set_xlabel("Бин")
ax_spec.set_ylabel("Время →")
# Для контроля частоты обновления
max_fps = max(1.0, float(args.max_fps))
interval_ms = int(1000.0 / max_fps)
frames_since_ylim_update = 0
def ensure_buffer(_w: int):
nonlocal ring, width, head, x_shared, ring_fft, freq_shared
if ring is not None:
return
width = WF_WIDTH
x_shared = np.arange(width, dtype=np.int32)
ring = np.full((max_sweeps, width), np.nan, dtype=np.float32)
head = 0
# Обновляем изображение под новые размеры
img_obj.set_data(ring)
img_obj.set_extent((0, width - 1 if width > 0 else 1, 0, max_sweeps - 1))
ax_img.set_xlim(0, max(1, width - 1))
ax_img.set_ylim(max_sweeps - 1, 0)
# FFT буферы
ring_fft = np.full((max_sweeps, fft_bins), np.nan, dtype=np.float32)
img_fft_obj.set_data(ring_fft)
img_fft_obj.set_extent((0, fft_bins - 1, 0, max_sweeps - 1))
ax_spec.set_xlim(0, max(1, fft_bins - 1))
ax_spec.set_ylim(max_sweeps - 1, 0)
freq_shared = np.arange(fft_bins, dtype=np.int32)
def push_sweep(s: np.ndarray):
nonlocal ring, head, y_min, y_max, ring_fft, y_min_fft, y_max_fft
if s is None or s.size == 0 or ring is None:
return
# Нормализуем длину до фиксированной ширины
w = ring.shape[1]
row = np.full((w,), np.nan, dtype=np.float32)
take = min(w, s.size)
row[:take] = s[:take]
ring[head, :] = row
head = (head + 1) % ring.shape[0]
# Обновляем мин/макс по данным (игнорим NaN)
sv_min = np.nanmin(row)
sv_max = np.nanmax(row)
if y_min is None or (not np.isnan(sv_min) and sv_min < y_min):
y_min = float(sv_min)
if y_max is None or (not np.isnan(sv_max) and sv_max > y_max):
y_max = float(sv_max)
# FFT строка (дБ)
if ring_fft is not None:
bins = ring_fft.shape[1]
# Подготовка входа FFT_LEN, замена NaN на 0
take_fft = min(int(s.size), FFT_LEN)
if take_fft <= 0:
fft_row = np.full((bins,), np.nan, dtype=np.float32)
else:
fft_in = np.zeros((FFT_LEN,), dtype=np.float32)
seg = s[:take_fft]
if isinstance(seg, np.ndarray):
seg = np.nan_to_num(seg, nan=0.0).astype(np.float32, copy=False)
else:
seg = np.asarray(seg, dtype=np.float32)
seg = np.nan_to_num(seg, nan=0.0)
# Окно Хэннинга
win = np.hanning(take_fft).astype(np.float32)
fft_in[:take_fft] = seg * win
spec = np.fft.rfft(fft_in)
mag = np.abs(spec).astype(np.float32)
fft_row = 20.0 * np.log10(mag + 1e-9)
if fft_row.shape[0] != bins:
# rfft длиной FFT_LEN даёт bins == FFT_LEN//2+1
fft_row = fft_row[:bins]
ring_fft[(head - 1) % ring_fft.shape[0], :] = fft_row
# Экстремумы для цветовой шкалы
fr_min = np.nanmin(fft_row)
fr_max = np.nanmax(fft_row)
if y_min_fft is None or (not np.isnan(fr_min) and fr_min < y_min_fft):
y_min_fft = float(fr_min)
if y_max_fft is None or (not np.isnan(fr_max) and fr_max > y_max_fft):
y_max_fft = float(fr_max)
def drain_queue():
nonlocal current_sweep
drained = 0
while True:
try:
s = q.get_nowait()
except Empty:
break
drained += 1
current_sweep = s
ensure_buffer(s.size)
push_sweep(s)
return drained
def make_display_ring():
# Возвращаем буфер с правильным порядком строк (старые сверху, новые снизу)
if ring is None:
return np.zeros((1, 1), dtype=np.float32)
if head == 0:
return ring
return np.roll(ring, -head, axis=0)
def make_display_ring_fft():
if ring_fft is None:
return np.zeros((1, 1), dtype=np.float32)
if head == 0:
return ring_fft
return np.roll(ring_fft, -head, axis=0)
def update(_frame):
nonlocal frames_since_ylim_update
changed = drain_queue() > 0
# Обновление линии последнего свипа
if current_sweep is not None:
if x_shared is not None and current_sweep.size <= x_shared.size:
xs = x_shared[: current_sweep.size]
else:
xs = np.arange(current_sweep.size, dtype=np.int32)
line_obj.set_data(xs, current_sweep)
# Лимиты по X постоянные под текущую ширину
ax_line.set_xlim(0, max(1, current_sweep.size - 1))
# Адаптивные Y-лимиты (если не задан --ylim)
if fixed_ylim is None:
y0 = float(np.nanmin(current_sweep))
y1 = float(np.nanmax(current_sweep))
if np.isfinite(y0) and np.isfinite(y1):
if y0 == y1:
pad = max(1.0, abs(y0) * 0.05)
y0 -= pad
y1 += pad
else:
pad = 0.05 * (y1 - y0)
y0 -= pad
y1 += pad
ax_line.set_ylim(y0, y1)
# Обновление спектра текущего свипа
take_fft = min(int(current_sweep.size), FFT_LEN)
if take_fft > 0 and freq_shared is not None:
fft_in = np.zeros((FFT_LEN,), dtype=np.float32)
seg = np.nan_to_num(current_sweep[:take_fft], nan=0.0).astype(np.float32, copy=False)
win = np.hanning(take_fft).astype(np.float32)
fft_in[:take_fft] = seg * win
spec = np.fft.rfft(fft_in)
mag = np.abs(spec).astype(np.float32)
fft_vals = 20.0 * np.log10(mag + 1e-9)
xs_fft = freq_shared
if fft_vals.size > xs_fft.size:
fft_vals = fft_vals[: xs_fft.size]
fft_line_obj.set_data(xs_fft[: fft_vals.size], fft_vals)
# Авто-диапазон по Y для спектра
if np.isfinite(np.nanmin(fft_vals)) and np.isfinite(np.nanmax(fft_vals)):
ax_fft.set_xlim(0, max(1, xs_fft.size - 1))
ax_fft.set_ylim(float(np.nanmin(fft_vals)), float(np.nanmax(fft_vals)))
# Обновление водопада
if changed and ring is not None:
disp = make_display_ring()
img_obj.set_data(disp)
# Актуализируем цветовую шкалу только при расширении экстремумов
if y_min is not None and y_max is not None and np.isfinite(y_min) and np.isfinite(y_max):
if y_min != y_max:
img_obj.set_clim(vmin=y_min, vmax=y_max)
# Обновление водопада спектров
if changed and ring_fft is not None:
disp_fft = make_display_ring_fft()
img_fft_obj.set_data(disp_fft)
# Автодиапазон по средним (по видимым данным): берём средний спектр по времени
try:
mean_spec = np.nanmean(disp_fft, axis=0)
vmin_v = float(np.nanmin(mean_spec))
vmax_v = float(np.nanmax(mean_spec))
except Exception:
vmin_v = vmax_v = None
# Если средние не дают валидный диапазон — используем процентильную обрезку (если задана)
if (vmin_v is None or not np.isfinite(vmin_v)) or (vmax_v is None or not np.isfinite(vmax_v)) or vmin_v == vmax_v:
if spec_clip is not None:
try:
vmin_v = float(np.nanpercentile(disp_fft, spec_clip[0]))
vmax_v = float(np.nanpercentile(disp_fft, spec_clip[1]))
except Exception:
vmin_v = vmax_v = None
# Фолбэк к отслеживаемым минимум/максимумам
if (vmin_v is None or not np.isfinite(vmin_v)) or (vmax_v is None or not np.isfinite(vmax_v)) or vmin_v == vmax_v:
if y_min_fft is not None and y_max_fft is not None and np.isfinite(y_min_fft) and np.isfinite(y_max_fft) and y_min_fft != y_max_fft:
vmin_v, vmax_v = y_min_fft, y_max_fft
if vmin_v is not None and vmax_v is not None and vmin_v != vmax_v:
img_fft_obj.set_clim(vmin=vmin_v, vmax=vmax_v)
# Возвращаем обновлённые артисты
return (line_obj, img_obj, fft_line_obj, img_fft_obj)
ani = FuncAnimation(fig, update, interval=interval_ms, blit=False)
plt.show()
# Нормальное завершение при закрытии окна
stop_event.set()
reader.join(timeout=1.0)
def run_pyqtgraph(args):
"""Быстрый GUI на PyQtGraph. Требует pyqtgraph и PyQt5/PySide6."""
try:
import pyqtgraph as pg
from PyQt5 import QtCore, QtWidgets # noqa: F401
except Exception:
# Возможно установлена PySide6
try:
import pyqtgraph as pg
from PySide6 import QtCore, QtWidgets # noqa: F401
except Exception as e:
raise RuntimeError(
"pyqtgraph/PyQt5(Pyside6) не найдены. Установите: pip install pyqtgraph PyQt5"
) from e
# Очередь завершённых свипов и поток чтения
q: Queue[np.ndarray] = Queue(maxsize=1000)
stop_event = threading.Event()
reader = SweepReader(args.port, args.baud, q, stop_event, fancy=bool(args.fancy))
reader.start()
# Настройки скорости
max_sweeps = int(max(10, args.max_sweeps))
max_fps = max(1.0, float(args.max_fps))
interval_ms = int(1000.0 / max_fps)
# PyQtGraph настройки
pg.setConfigOptions(useOpenGL=True, antialias=False)
app = pg.mkQApp(args.title)
win = pg.GraphicsLayoutWidget(show=True, title=args.title)
win.resize(1200, 600)
# Плот последнего свипа (слева-сверху)
p_line = win.addPlot(row=0, col=0, title="Сырые данные")
p_line.showGrid(x=True, y=True, alpha=0.3)
curve = p_line.plot(pen=pg.mkPen((80, 120, 255), width=1))
p_line.setLabel("bottom", "X")
p_line.setLabel("left", "Y")
# Водопад (справа-сверху)
p_img = win.addPlot(row=0, col=1, title="Сырые данные водопад")
p_img.invertY(True) # 0 сверху, новые снизу
p_img.showGrid(x=False, y=False)
p_img.setLabel("bottom", "X")
p_img.setLabel("left", "Время →")
img = pg.ImageItem()
p_img.addItem(img)
# FFT (слева-снизу)
p_fft = win.addPlot(row=1, col=0, title="FFT")
p_fft.showGrid(x=True, y=True, alpha=0.3)
curve_fft = p_fft.plot(pen=pg.mkPen((255, 120, 80), width=1))
p_fft.setLabel("bottom", "Бин")
p_fft.setLabel("left", "Амплитуда, дБ")
# Водопад спектров (справа-снизу)
p_spec = win.addPlot(row=1, col=1, title="B-scan (дБ)")
p_spec.invertY(True)
p_spec.showGrid(x=False, y=False)
p_spec.setLabel("bottom", "Бин")
p_spec.setLabel("left", "Время →")
img_fft = pg.ImageItem()
p_spec.addItem(img_fft)
# Состояние
ring: Optional[np.ndarray] = None
head = 0
width: Optional[int] = None
x_shared: Optional[np.ndarray] = None
current_sweep: Optional[np.ndarray] = None
y_min, y_max = None, None
# Для спектров
fft_bins = FFT_LEN // 2 + 1
ring_fft: Optional[np.ndarray] = None
freq_shared: Optional[np.ndarray] = None
y_min_fft, y_max_fft = None, None
# Параметры контраста водопада спектров (процентильная обрезка)
spec_clip = _parse_spec_clip(getattr(args, "spec_clip", None))
# Диапазон по Y: авто по умолчанию (поддерживает отрицательные значения)
fixed_ylim: Optional[Tuple[float, float]] = None
if args.ylim:
try:
y0, y1 = args.ylim.split(",")
fixed_ylim = (float(y0), float(y1))
except Exception:
pass
if fixed_ylim is not None:
p_line.setYRange(fixed_ylim[0], fixed_ylim[1], padding=0)
def ensure_buffer(_w: int):
nonlocal ring, head, width, x_shared, ring_fft, freq_shared
if ring is not None:
return
width = WF_WIDTH
x_shared = np.arange(width, dtype=np.int32)
ring = np.full((max_sweeps, width), np.nan, dtype=np.float32)
head = 0
img.setImage(ring, autoLevels=False)
p_img.setRange(xRange=(0, max(1, width - 1)), yRange=(0, max_sweeps - 1), padding=0)
p_line.setXRange(0, max(1, width - 1), padding=0)
# FFT
ring_fft = np.full((max_sweeps, fft_bins), np.nan, dtype=np.float32)
img_fft.setImage(ring_fft, autoLevels=False)
p_spec.setRange(xRange=(0, max(1, fft_bins - 1)), yRange=(0, max_sweeps - 1), padding=0)
p_fft.setXRange(0, max(1, fft_bins - 1), padding=0)
freq_shared = np.arange(fft_bins, dtype=np.int32)
def push_sweep(s: np.ndarray):
nonlocal ring, head, y_min, y_max, ring_fft, y_min_fft, y_max_fft
if s is None or s.size == 0 or ring is None:
return
w = ring.shape[1]
row = np.full((w,), np.nan, dtype=np.float32)
take = min(w, s.size)
row[:take] = s[:take]
ring[head, :] = row
head = (head + 1) % ring.shape[0]
# Обновление экстремумов
sv_min = np.nanmin(row)
sv_max = np.nanmax(row)
if y_min is None or (not np.isnan(sv_min) and sv_min < y_min):
y_min = float(sv_min)
if y_max is None or (not np.isnan(sv_max) and sv_max > y_max):
y_max = float(sv_max)
# FFT строка (дБ)
if ring_fft is not None:
bins = ring_fft.shape[1]
take_fft = min(int(s.size), FFT_LEN)
if take_fft > 0:
fft_in = np.zeros((FFT_LEN,), dtype=np.float32)
seg = np.nan_to_num(s[:take_fft], nan=0.0).astype(np.float32, copy=False)
win = np.hanning(take_fft).astype(np.float32)
fft_in[:take_fft] = seg * win
spec = np.fft.rfft(fft_in)
mag = np.abs(spec).astype(np.float32)
fft_row = 20.0 * np.log10(mag + 1e-9)
if fft_row.shape[0] != bins:
fft_row = fft_row[:bins]
else:
fft_row = np.full((bins,), np.nan, dtype=np.float32)
ring_fft[(head - 1) % ring_fft.shape[0], :] = fft_row
fr_min = np.nanmin(fft_row)
fr_max = np.nanmax(fft_row)
if y_min_fft is None or (not np.isnan(fr_min) and fr_min < y_min_fft):
y_min_fft = float(fr_min)
if y_max_fft is None or (not np.isnan(fr_max) and fr_max > y_max_fft):
y_max_fft = float(fr_max)
def drain_queue():
nonlocal current_sweep
drained = 0
while True:
try:
s = q.get_nowait()
except Empty:
break
drained += 1
current_sweep = s
ensure_buffer(s.size)
push_sweep(s)
return drained
# Попытка применить LUT из колормэпа (если доступен)
try:
cm_mod = getattr(pg, "colormap", None)
if cm_mod is not None:
cm = cm_mod.get(args.cmap)
img.setLookupTable(cm.getLookupTable(0.0, 1.0, 256))
except Exception:
pass
def update():
changed = drain_queue() > 0
if current_sweep is not None and x_shared is not None:
if current_sweep.size <= x_shared.size:
xs = x_shared[: current_sweep.size]
else:
xs = np.arange(current_sweep.size)
curve.setData(xs, current_sweep, autoDownsample=True)
if fixed_ylim is None:
y0 = float(np.nanmin(current_sweep))
y1 = float(np.nanmax(current_sweep))
if np.isfinite(y0) and np.isfinite(y1):
margin = 0.05 * max(1.0, (y1 - y0))
p_line.setYRange(y0 - margin, y1 + margin, padding=0)
# Обновим спектр
take_fft = min(int(current_sweep.size), FFT_LEN)
if take_fft > 0 and freq_shared is not None:
fft_in = np.zeros((FFT_LEN,), dtype=np.float32)
seg = np.nan_to_num(current_sweep[:take_fft], nan=0.0).astype(np.float32, copy=False)
win = np.hanning(take_fft).astype(np.float32)
fft_in[:take_fft] = seg * win
spec = np.fft.rfft(fft_in)
mag = np.abs(spec).astype(np.float32)
fft_vals = 20.0 * np.log10(mag + 1e-9)
xs_fft = freq_shared
if fft_vals.size > xs_fft.size:
fft_vals = fft_vals[: xs_fft.size]
curve_fft.setData(xs_fft[: fft_vals.size], fft_vals)
p_fft.setYRange(float(np.nanmin(fft_vals)), float(np.nanmax(fft_vals)), padding=0)
if changed and ring is not None:
disp = ring if head == 0 else np.roll(ring, -head, axis=0)
if y_min is not None and y_max is not None and y_min != y_max and np.isfinite(y_min) and np.isfinite(y_max):
img.setImage(disp, autoLevels=False, levels=(y_min, y_max))
else:
img.setImage(disp, autoLevels=False)
if changed and ring_fft is not None:
disp_fft = ring_fft if head == 0 else np.roll(ring_fft, -head, axis=0)
# Автодиапазон по среднему спектру за видимый интервал
levels = None
try:
mean_spec = np.nanmean(disp_fft, axis=0)
vmin_v = float(np.nanmin(mean_spec))
vmax_v = float(np.nanmax(mean_spec))
if np.isfinite(vmin_v) and np.isfinite(vmax_v) and vmin_v != vmax_v:
levels = (vmin_v, vmax_v)
except Exception:
levels = None
# Процентильная обрезка как запасной вариант
if levels is None and spec_clip is not None:
try:
vmin_v = float(np.nanpercentile(disp_fft, spec_clip[0]))
vmax_v = float(np.nanpercentile(disp_fft, spec_clip[1]))
if np.isfinite(vmin_v) and np.isfinite(vmax_v) and vmin_v != vmax_v:
levels = (vmin_v, vmax_v)
except Exception:
levels = None
# Ещё один фолбэк — глобальные накопленные мин/макс
if levels is None and y_min_fft is not None and y_max_fft is not None and np.isfinite(y_min_fft) and np.isfinite(y_max_fft) and y_min_fft != y_max_fft:
levels = (y_min_fft, y_max_fft)
if levels is not None:
img_fft.setImage(disp_fft, autoLevels=False, levels=levels)
else:
img_fft.setImage(disp_fft, autoLevels=False)
timer = pg.QtCore.QTimer()
timer.timeout.connect(update)
timer.start(interval_ms)
def on_quit():
stop_event.set()
reader.join(timeout=1.0)
app.aboutToQuit.connect(on_quit)
win.show()
exec_fn = getattr(app, "exec_", None) or getattr(app, "exec", None)
exec_fn()
# На случай если aboutToQuit не сработал
on_quit()
if __name__ == "__main__":
main()