Files
RFG_stm32_ADC_receiver_GUI/RFG_ADC_dataplotter.py

2672 lines
105 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Реалтайм-плоттер для свипов из виртуального COM-порта.
Формат строк:
- "Sweep_start" — начало нового свипа (предыдущий считается завершённым)
- "s CH X Y" — точка (номер канала, индекс X, значение Y), все целые со знаком
Отрисовываются два графика:
- Левый: последний полученный свип (Y vs X)
- Правый: водопад (последние N свипов во времени)
Оптимизации для скорости:
- Парсинг и чтение в фоновой нити
- Анимация с обновлением только данных (без лишнего пересоздания фигур)
- Кольцевой буфер под водопад с фиксированным числом свипов
Зависимости: matplotlib, numpy. PySerial опционален — при его отсутствии
используется сырой доступ к TTY через termios.
"""
import argparse
import io
import os
import signal
import sys
import threading
import time
from collections import deque
from queue import Queue, Empty, Full
from typing import Any, Dict, Mapping, Optional, Tuple, Union
import numpy as np
WF_WIDTH = 1000 # максимальное число точек в ряду водопада
FFT_LEN = 1024 # длина БПФ для спектра/водопада спектров
LOG_BASE = 10.0
LOG_SCALER = 0.001 # int32 значения приходят в fixed-point лог-шкале с шагом 1e-3
LOG_POSTSCALER = 1000
LOG_EXP_LIMIT = 300.0 # запас до переполнения float64 при возведении LOG_BASE в степень
C_M_S = 299_792_458.0
# Порог для инверсии сырых данных: если среднее значение свипа ниже порога —
# считаем, что сигнал «меньше нуля» и домножаем свип на -1
DATA_INVERSION_THRASHOLD = 10.0
Number = Union[int, float]
SweepInfo = Dict[str, Any]
SweepData = Dict[str, np.ndarray]
SweepAuxCurves = Optional[Tuple[np.ndarray, np.ndarray]]
SweepPacket = Tuple[np.ndarray, SweepInfo, SweepAuxCurves]
CALIBRATION_C = np.asarray([0.0, 1.0, 0.025], dtype=np.float64)
def _format_status_kv(data: Mapping[str, Any]) -> str:
"""Преобразовать словарь метрик в одну строку 'k:v'."""
def _fmt(v: Any) -> str:
if v is None:
return "NA"
try:
fv = float(v)
except Exception:
return str(v)
if not np.isfinite(fv):
return "nan"
# Достаточно компактно для статус-строки.
if abs(fv) >= 1000 or (0 < abs(fv) < 0.01):
return f"{fv:.3g}"
return f"{fv:.3f}".rstrip("0").rstrip(".")
parts = [f"{k}:{_fmt(v)}" for k, v in data.items()]
return " ".join(parts)
def _parse_spec_clip(spec: Optional[str]) -> Optional[Tuple[float, float]]:
"""Разобрать строку вида "low,high" процентов для контрастного отображения водопада спектров.
Возвращает пару (low, high) или None для отключения. Допустимы значения 0..100, low < high.
Ключевые слова отключения: "off", "none", "no".
"""
if not spec:
return None
s = str(spec).strip().lower()
if s in ("off", "none", "no"):
return None
try:
p0, p1 = s.replace(";", ",").split(",")
low = float(p0)
high = float(p1)
if not (0.0 <= low < high <= 100.0):
return None
return (low, high)
except Exception:
return None
def _log_value_to_linear(value: int) -> float:
"""Преобразовать fixed-point логарифмическое значение в линейную шкалу."""
exponent = max(-LOG_EXP_LIMIT, min(LOG_EXP_LIMIT, float(value) * LOG_SCALER))
return float(LOG_BASE ** exponent)
def _log_pair_to_sweep(avg_1: int, avg_2: int) -> float:
"""Разность двух логарифмических усреднений в линейной шкале."""
return (_log_value_to_linear(avg_1) - _log_value_to_linear(avg_2))*LOG_POSTSCALER
def _compute_auto_ylim(*series_list: Optional[np.ndarray]) -> Optional[Tuple[float, float]]:
"""Общий Y-диапазон по всем переданным кривым с небольшим запасом."""
y_min: Optional[float] = None
y_max: Optional[float] = None
for series in series_list:
if series is None:
continue
arr = np.asarray(series)
if arr.size == 0:
continue
finite = arr[np.isfinite(arr)]
if finite.size == 0:
continue
cur_min = float(np.min(finite))
cur_max = float(np.max(finite))
y_min = cur_min if y_min is None else min(y_min, cur_min)
y_max = cur_max if y_max is None else max(y_max, cur_max)
if y_min is None or y_max is None:
return None
if y_min == y_max:
pad = max(1.0, abs(y_min) * 0.05)
else:
pad = 0.05 * (y_max - y_min)
return (y_min - pad, y_max + pad)
def calibrate_freqs(sweep: Mapping[str, Any]) -> SweepData:
"""Вернуть копию свипа для будущей калибровки частотной оси."""
F = np.asarray(sweep["F"], dtype=np.float64).copy()
I = np.asarray(sweep["I"], dtype=np.float64).copy()
tmp = []
C = np.asarray(CALIBRATION_C, dtype=np.float64)
for f in F:
val = C[0] + (f**1) * C[1] + (f**2) * C[2]
tmp.append(val)
F = np.asanyarray(tmp, dtype=np.float64)
if F.size >= 2:
F_cal = np.linspace(float(F[0]), float(F[-1]), F.size, dtype=np.float64)
I_cal = np.interp(F_cal, F, I).astype(np.float64)
else:
F_cal = F.copy()
I_cal = I.copy()
return {
"F": F_cal,
"I": I_cal,
}
def _prepare_fft_segment(
sweep: np.ndarray,
freqs: Optional[np.ndarray],
fft_len: int = FFT_LEN,
) -> Optional[Tuple[np.ndarray, int]]:
"""Подготовить свип к FFT, пересэмплируя его на равномерную сетку по частоте."""
take_fft = min(int(sweep.size), int(fft_len))
if take_fft <= 0:
return None
sweep_seg = np.asarray(sweep[:take_fft], dtype=np.float32)
fallback = np.nan_to_num(sweep_seg, nan=0.0).astype(np.float32, copy=False)
if freqs is None:
return fallback, take_fft
freq_arr = np.asarray(freqs)
if freq_arr.size < take_fft:
return fallback, take_fft
freq_seg = np.asarray(freq_arr[:take_fft], dtype=np.float64)
valid = np.isfinite(sweep_seg) & np.isfinite(freq_seg)
if int(np.count_nonzero(valid)) < 2:
return fallback, take_fft
x_valid = freq_seg[valid]
y_valid = sweep_seg[valid]
order = np.argsort(x_valid, kind="mergesort")
x_valid = x_valid[order]
y_valid = y_valid[order]
x_unique, unique_idx = np.unique(x_valid, return_index=True)
y_unique = y_valid[unique_idx]
if x_unique.size < 2 or x_unique[-1] <= x_unique[0]:
return fallback, take_fft
x_uniform = np.linspace(float(x_unique[0]), float(x_unique[-1]), take_fft, dtype=np.float64)
resampled = np.interp(x_uniform, x_unique, y_unique).astype(np.float32)
return resampled, take_fft
def _fft_mag_to_db(mag: np.ndarray) -> np.ndarray:
"""Перевод модуля спектра в дБ с отсечкой отрицательных значений после вычитания фона."""
mag_arr = np.asarray(mag, dtype=np.float32)
safe_mag = np.maximum(mag_arr, 0.0)
return (20.0 * np.log10(safe_mag + 1e-9)).astype(np.float32, copy=False)
def _compute_fft_mag_row(
sweep: np.ndarray,
freqs: Optional[np.ndarray],
bins: int,
) -> np.ndarray:
"""Посчитать линейный модуль FFT-строки, используя калиброванную частотную ось."""
if bins <= 0:
return np.zeros((0,), dtype=np.float32)
prepared = _prepare_fft_segment(sweep, freqs, fft_len=FFT_LEN)
if prepared is None:
return np.full((bins,), np.nan, dtype=np.float32)
fft_seg, take_fft = prepared
fft_in = np.zeros((FFT_LEN,), dtype=np.float32)
win = np.hanning(take_fft).astype(np.float32)
fft_in[:take_fft] = fft_seg * win
spec = np.fft.ifft(fft_in)
mag = np.abs(spec).astype(np.float32)
if mag.shape[0] != bins:
mag = mag[:bins]
return mag
def _compute_fft_row(
sweep: np.ndarray,
freqs: Optional[np.ndarray],
bins: int,
) -> np.ndarray:
"""Посчитать FFT-строку в дБ."""
return _fft_mag_to_db(_compute_fft_mag_row(sweep, freqs, bins))
def _compute_distance_axis(freqs: Optional[np.ndarray], bins: int) -> np.ndarray:
"""Рассчитать ось расстояния для IFFT по равномерной частотной сетке."""
if bins <= 0:
return np.zeros((0,), dtype=np.float64)
if freqs is None:
return np.arange(bins, dtype=np.float64)
freq_arr = np.asarray(freqs, dtype=np.float64)
finite = freq_arr[np.isfinite(freq_arr)]
if finite.size < 2:
return np.arange(bins, dtype=np.float64)
df_ghz = float((finite[-1] - finite[0]) / max(1, finite.size - 1))
df_hz = abs(df_ghz) * 1e9
if not np.isfinite(df_hz) or df_hz <= 0.0:
return np.arange(bins, dtype=np.float64)
step_m = C_M_S / (2.0 * FFT_LEN * df_hz)
return np.arange(bins, dtype=np.float64) * step_m
def _find_peak_width_markers(xs: np.ndarray, ys: np.ndarray) -> Optional[Dict[str, float]]:
"""Найти главный ненулевой пик и его ширину по уровню половины высоты над фоном."""
x_arr = np.asarray(xs, dtype=np.float64)
y_arr = np.asarray(ys, dtype=np.float64)
valid = np.isfinite(x_arr) & np.isfinite(y_arr) & (x_arr > 0.0)
if int(np.count_nonzero(valid)) < 3:
return None
x = x_arr[valid]
y = y_arr[valid]
x_min = float(x[0])
x_max = float(x[-1])
x_span = x_max - x_min
central_mask = (x >= (x_min + 0.25 * x_span)) & (x <= (x_min + 0.75 * x_span))
if int(np.count_nonzero(central_mask)) > 0:
central_idx = np.flatnonzero(central_mask)
peak_idx = int(central_idx[int(np.argmax(y[central_mask]))])
else:
peak_idx = int(np.argmax(y))
peak_y = float(y[peak_idx])
shoulder_gap = max(1, min(8, y.size // 64 if y.size > 0 else 1))
shoulder_width = max(4, min(32, y.size // 16 if y.size > 0 else 4))
left_lo = max(0, peak_idx - shoulder_gap - shoulder_width)
left_hi = max(0, peak_idx - shoulder_gap)
right_lo = min(y.size, peak_idx + shoulder_gap + 1)
right_hi = min(y.size, right_lo + shoulder_width)
background_parts = []
if left_hi > left_lo:
background_parts.append(float(np.nanmedian(y[left_lo:left_hi])))
if right_hi > right_lo:
background_parts.append(float(np.nanmedian(y[right_lo:right_hi])))
if background_parts:
background = float(np.mean(background_parts))
else:
background = float(np.nanpercentile(y, 10))
if not np.isfinite(peak_y) or not np.isfinite(background) or peak_y <= background:
return None
half_level = background + 0.5 * (peak_y - background)
def _interp_cross(x0: float, y0: float, x1: float, y1: float) -> float:
if not (np.isfinite(x0) and np.isfinite(y0) and np.isfinite(x1) and np.isfinite(y1)):
return x1
dy = y1 - y0
if dy == 0.0:
return x1
t = (half_level - y0) / dy
t = min(1.0, max(0.0, t))
return x0 + t * (x1 - x0)
left_x = float(x[0])
for i in range(peak_idx, 0, -1):
if y[i - 1] <= half_level <= y[i]:
left_x = _interp_cross(float(x[i - 1]), float(y[i - 1]), float(x[i]), float(y[i]))
break
else:
left_x = float(x[0])
right_x = float(x[-1])
for i in range(peak_idx, x.size - 1):
if y[i] >= half_level >= y[i + 1]:
right_x = _interp_cross(float(x[i]), float(y[i]), float(x[i + 1]), float(y[i + 1]))
break
else:
right_x = float(x[-1])
width = right_x - left_x
if not np.isfinite(width) or width <= 0.0:
return None
return {
"background": background,
"left": left_x,
"right": right_x,
"width": width,
}
def _normalize_sweep_simple(raw: np.ndarray, calib: np.ndarray) -> np.ndarray:
"""Простая нормировка: поэлементное деление raw/calib."""
w = min(raw.size, calib.size)
if w <= 0:
return raw
out = np.full_like(raw, np.nan, dtype=np.float32)
with np.errstate(divide="ignore", invalid="ignore"):
out[:w] = raw[:w] / calib[:w]
out = np.nan_to_num(out, nan=np.nan, posinf=np.nan, neginf=np.nan)
return out
def _build_calib_envelopes(calib: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""Оценить нижнюю/верхнюю огибающие калибровочной кривой."""
n = int(calib.size)
if n <= 0:
empty = np.zeros((0,), dtype=np.float32)
return empty, empty
y = np.asarray(calib, dtype=np.float32)
finite = np.isfinite(y)
if not np.any(finite):
zeros = np.zeros_like(y, dtype=np.float32)
return zeros, zeros
if not np.all(finite):
x = np.arange(n, dtype=np.float32)
y = y.copy()
y[~finite] = np.interp(x[~finite], x[finite], y[finite]).astype(np.float32)
if n < 3:
return y.copy(), y.copy()
dy = np.diff(y)
s = np.sign(dy).astype(np.int8, copy=False)
if np.any(s == 0):
for i in range(1, s.size):
if s[i] == 0:
s[i] = s[i - 1]
for i in range(s.size - 2, -1, -1):
if s[i] == 0:
s[i] = s[i + 1]
s[s == 0] = 1
max_idx = np.where((s[:-1] > 0) & (s[1:] < 0))[0] + 1
min_idx = np.where((s[:-1] < 0) & (s[1:] > 0))[0] + 1
x = np.arange(n, dtype=np.float32)
def _interp_nodes(nodes: np.ndarray) -> np.ndarray:
if nodes.size == 0:
idx = np.array([0, n - 1], dtype=np.int64)
else:
idx = np.unique(np.concatenate(([0], nodes, [n - 1]))).astype(np.int64)
return np.interp(x, idx.astype(np.float32), y[idx]).astype(np.float32)
upper = _interp_nodes(max_idx)
lower = _interp_nodes(min_idx)
swap = lower > upper
if np.any(swap):
tmp = upper[swap].copy()
upper[swap] = lower[swap]
lower[swap] = tmp
return lower, upper
def _normalize_sweep_projector(raw: np.ndarray, calib: np.ndarray) -> np.ndarray:
"""Нормировка через проекцию между огибающими калибровки в диапазон [-1, +1]."""
w = min(raw.size, calib.size)
if w <= 0:
return raw
out = np.full_like(raw, np.nan, dtype=np.float32)
raw_seg = np.asarray(raw[:w], dtype=np.float32)
lower, upper = _build_calib_envelopes(np.asarray(calib[:w], dtype=np.float32))
span = upper - lower
finite_span = span[np.isfinite(span) & (span > 0)]
if finite_span.size > 0:
eps = max(float(np.median(finite_span)) * 1e-6, 1e-9)
else:
eps = 1e-9
valid = (
np.isfinite(raw_seg)
& np.isfinite(lower)
& np.isfinite(upper)
& (span > eps)
)
if np.any(valid):
proj = np.empty_like(raw_seg, dtype=np.float32)
proj[valid] = ((2.0 * (raw_seg[valid] - lower[valid]) / span[valid]) - 1.0) * 1000.0
proj[valid] = np.clip(proj[valid], -1000.0, 1000.0)
proj[~valid] = np.nan
out[:w] = proj
return out
def _normalize_by_calib(raw: np.ndarray, calib: np.ndarray, norm_type: str) -> np.ndarray:
"""Нормировка свипа по выбранному алгоритму."""
nt = str(norm_type).strip().lower()
if nt == "simple":
return _normalize_sweep_simple(raw, calib)
return _normalize_sweep_projector(raw, calib)
def try_open_pyserial(path: str, baud: int, timeout: float):
try:
import serial # type: ignore
except Exception:
return None
try:
ser = serial.Serial(path, baudrate=baud, timeout=timeout)
return ser
except Exception:
return None
class FDReader:
"""Простой враппер чтения строк из файлового дескриптора TTY."""
def __init__(self, fd: int):
# Отдельно буферизуем для корректной readline()
self._fd = fd
raw = os.fdopen(fd, "rb", closefd=False)
self._file = raw
self._buf = io.BufferedReader(raw, buffer_size=65536)
def fileno(self) -> int:
return self._fd
def readline(self) -> bytes:
return self._buf.readline()
def close(self):
try:
self._buf.close()
except Exception:
pass
def open_raw_tty(path: str, baud: int) -> Optional[FDReader]:
"""Открыть TTY без pyserial и настроить порт через termios.
Возвращает FDReader или None при ошибке.
"""
try:
import termios
import tty
except Exception:
return None
try:
fd = os.open(path, os.O_RDONLY | os.O_NOCTTY)
except Exception:
return None
try:
attrs = termios.tcgetattr(fd)
# Установим «сырое» состояние
tty.setraw(fd)
# Скорость
baud_map = {
9600: termios.B9600,
19200: termios.B19200,
38400: termios.B38400,
57600: termios.B57600,
115200: termios.B115200,
230400: getattr(termios, "B230400", None),
460800: getattr(termios, "B460800", None),
}
b = baud_map.get(baud) or termios.B115200
attrs[4] = b # ispeed
attrs[5] = b # ospeed
# VMIN=1, VTIME=0 — блокирующее чтение по байту
cc = attrs[6]
cc[termios.VMIN] = 1
cc[termios.VTIME] = 0
attrs[6] = cc
termios.tcsetattr(fd, termios.TCSANOW, attrs)
except Exception:
try:
os.close(fd)
except Exception:
pass
return None
return FDReader(fd)
class SerialLineSource:
"""Единый интерфейс для чтения строк из порта (pyserial или raw TTY)."""
def __init__(self, path: str, baud: int, timeout: float = 1.0):
self._pyserial = try_open_pyserial(path, baud, timeout)
self._fdreader = None
self._using = "pyserial" if self._pyserial is not None else "raw"
if self._pyserial is None:
self._fdreader = open_raw_tty(path, baud)
if self._fdreader is None:
msg = f"Не удалось открыть порт '{path}' (pyserial и raw TTY не сработали)"
if sys.platform.startswith("win"):
msg += ". На Windows нужен pyserial: pip install pyserial"
raise RuntimeError(msg)
def readline(self) -> bytes:
if self._pyserial is not None:
try:
return self._pyserial.readline()
except Exception:
return b""
else:
try:
return self._fdreader.readline() # type: ignore[union-attr]
except Exception:
return b""
def close(self):
try:
if self._pyserial is not None:
self._pyserial.close()
elif self._fdreader is not None:
self._fdreader.close()
except Exception:
pass
class SerialChunkReader:
"""Быстрое неблокирующее чтение чанков из serial/raw TTY для максимального дренажа буфера."""
def __init__(self, src: SerialLineSource):
self._src = src
self._ser = src._pyserial
self._fd: Optional[int] = None
if self._ser is not None:
# Неблокирующий режим для быстрой откачки
try:
self._ser.timeout = 0
except Exception:
pass
else:
try:
self._fd = src._fdreader.fileno() # type: ignore[union-attr]
try:
os.set_blocking(self._fd, False)
except Exception:
pass
except Exception:
self._fd = None
def read_available(self) -> bytes:
"""Вернёт доступные байты (b"" если данных нет)."""
if self._ser is not None:
try:
n = int(getattr(self._ser, "in_waiting", 0))
except Exception:
n = 0
if n > 0:
try:
return self._ser.read(n)
except Exception:
return b""
return b""
if self._fd is None:
return b""
out = bytearray()
while True:
try:
chunk = os.read(self._fd, 65536)
if not chunk:
break
out += chunk
if len(chunk) < 65536:
break
except BlockingIOError:
break
except Exception:
break
return bytes(out)
class SweepReader(threading.Thread):
"""Фоновый поток: читает строки, формирует завершённые свипы и кладёт в очередь."""
def __init__(
self,
port_path: str,
baud: int,
out_queue: Queue[SweepPacket],
stop_event: threading.Event,
fancy: bool = False,
bin_mode: bool = False,
logscale: bool = False,
parser_16_bit_x2: bool = False,
):
super().__init__(daemon=True)
self._port_path = port_path
self._baud = baud
self._q = out_queue
self._stop = stop_event
self._src: Optional[SerialLineSource] = None
self._fancy = bool(fancy)
self._bin_mode = bool(bin_mode)
self._logscale = bool(logscale)
self._parser_16_bit_x2 = bool(parser_16_bit_x2)
self._max_width: int = 0
self._sweep_idx: int = 0
self._last_sweep_ts: Optional[float] = None
self._n_valid_hist = deque()
@staticmethod
def _u32_to_i32(v: int) -> int:
"""Преобразование 32-bit слова в знаковое значение."""
return v - 0x1_0000_0000 if (v & 0x8000_0000) else v
@staticmethod
def _u16_to_i16(v: int) -> int:
"""Преобразование 16-bit слова в знаковое значение."""
return v - 0x1_0000 if (v & 0x8000) else v
def _finalize_current(
self,
xs,
ys,
channels: Optional[set[int]],
raw_curves: Optional[Tuple[list[int], list[int]]] = None,
apply_inversion: bool = True,
):
if not xs:
return
ch_list = sorted(channels) if channels else [0]
ch_primary = ch_list[0] if ch_list else 0
max_x = max(xs)
width = max_x + 1
self._max_width = max(self._max_width, width)
target_width = self._max_width if self._fancy else width
def _scatter(values, dtype) -> np.ndarray:
series = np.full((target_width,), np.nan, dtype=dtype)
try:
idx = np.asarray(xs, dtype=np.int64)
vals = np.asarray(values, dtype=dtype)
series[idx] = vals
except Exception:
for x, y in zip(xs, values):
if 0 <= x < target_width:
series[x] = y
return series
def _fill_missing(series: np.ndarray):
known = ~np.isnan(series)
if not np.any(known):
return
known_idx = np.nonzero(known)[0]
for i0, i1 in zip(known_idx[:-1], known_idx[1:]):
if i1 - i0 > 1:
avg = (series[i0] + series[i1]) * 0.5
series[i0 + 1 : i1] = avg
first_idx = int(known_idx[0])
last_idx = int(known_idx[-1])
if first_idx > 0:
series[:first_idx] = series[first_idx]
if last_idx < series.size - 1:
series[last_idx + 1 :] = series[last_idx]
# Быстрый векторизованный путь
sweep = _scatter(ys, np.float32)
aux_curves: SweepAuxCurves = None
if raw_curves is not None:
aux_curves = (
_scatter(raw_curves[0], np.float32),
_scatter(raw_curves[1], np.float32),
)
# Метрики валидных точек до заполнения пропусков
finite_pre = np.isfinite(sweep)
n_valid_cur = int(np.count_nonzero(finite_pre))
# Дополнительная обработка пропусков: при --fancy заполняем внутренние разрывы, края и дотягиваем до максимальной длины
if self._fancy:
try:
_fill_missing(sweep)
if aux_curves is not None:
_fill_missing(aux_curves[0])
_fill_missing(aux_curves[1])
except Exception:
# В случае ошибки просто оставляем как есть
pass
# Инверсия данных при «отрицательном» уровне (среднее ниже порога)
if apply_inversion:
try:
m = float(np.nanmean(sweep))
if np.isfinite(m) and m < DATA_INVERSION_THRASHOLD:
sweep *= -1.0
except Exception:
pass
#sweep = np.abs(sweep)
#sweep -= float(np.nanmean(sweep))
# Метрики для статусной строки (вид словаря: переменная -> значение)
self._sweep_idx += 1
if len(ch_list) > 1:
sys.stderr.write(
f"[warn] Sweep {self._sweep_idx}: изменялся номер канала: {ch_list}\n"
)
now = time.time()
if self._last_sweep_ts is None:
dt_ms = float("nan")
else:
dt_ms = (now - self._last_sweep_ts) * 1000.0
self._last_sweep_ts = now
self._n_valid_hist.append((now, n_valid_cur))
while self._n_valid_hist and (now - self._n_valid_hist[0][0]) > 1.0:
self._n_valid_hist.popleft()
if self._n_valid_hist:
n_valid = float(sum(v for _t, v in self._n_valid_hist) / len(self._n_valid_hist))
else:
n_valid = float(n_valid_cur)
if n_valid_cur > 0:
vmin = float(np.nanmin(sweep))
vmax = float(np.nanmax(sweep))
mean = float(np.nanmean(sweep))
std = float(np.nanstd(sweep))
else:
vmin = vmax = mean = std = float("nan")
info: SweepInfo = {
"sweep": self._sweep_idx,
"ch": ch_primary,
"chs": ch_list,
"n_valid": n_valid,
"min": vmin,
"max": vmax,
"mean": mean,
"std": std,
"dt_ms": dt_ms,
}
# Кладём готовый свип (если очередь полна — выбрасываем самый старый)
try:
self._q.put_nowait((sweep, info, aux_curves))
except Full:
try:
_ = self._q.get_nowait()
except Exception:
pass
try:
self._q.put_nowait((sweep, info, aux_curves))
except Exception:
pass
def _run_ascii_stream(self, chunk_reader: SerialChunkReader):
xs: list[int] = []
ys: list[int] = []
cur_channel: Optional[int] = None
cur_channels: set[int] = set()
buf = bytearray()
while not self._stop.is_set():
data = chunk_reader.read_available()
if data:
buf += data
else:
time.sleep(0.0005)
continue
while True:
nl = buf.find(b"\n")
if nl == -1:
break
line = bytes(buf[:nl])
del buf[: nl + 1]
if line.endswith(b"\r"):
line = line[:-1]
if not line:
continue
if line.startswith(b"Sweep_start"):
self._finalize_current(xs, ys, cur_channels)
xs.clear()
ys.clear()
cur_channel = None
cur_channels.clear()
continue
# sCH X Y или s CH X Y (все целые со знаком). Разделяем по любым пробелам/табам.
if len(line) >= 3:
parts = line.split()
if len(parts) >= 3 and (parts[0].lower() == b"s" or parts[0].lower().startswith(b"s")):
try:
if parts[0].lower() == b"s":
if len(parts) >= 4:
ch = int(parts[1], 10)
x = int(parts[2], 10)
y = int(parts[3], 10) # поддержка знака: "+…" и "-…"
else:
ch = 0
x = int(parts[1], 10)
y = int(parts[2], 10) # поддержка знака: "+…" и "-…"
else:
# формат вида "s0"
ch = int(parts[0][1:], 10)
x = int(parts[1], 10)
y = int(parts[2], 10) # поддержка знака: "+…" и "-…"
except Exception:
continue
if cur_channel is None:
cur_channel = ch
cur_channels.add(ch)
xs.append(x)
ys.append(y)
if len(buf) > 1_000_000:
del buf[:-262144]
self._finalize_current(xs, ys, cur_channels)
def _run_binary_stream(self, chunk_reader: SerialChunkReader):
xs: list[int] = []
ys: list[int] = []
cur_channel: Optional[int] = None
cur_channels: set[int] = set()
words = deque()
buf = bytearray()
while not self._stop.is_set():
data = chunk_reader.read_available()
if data:
buf += data
else:
time.sleep(0.0005)
continue
usable = len(buf) & ~1
if usable == 0:
continue
i = 0
while i < usable:
w = int(buf[i]) | (int(buf[i + 1]) << 8)
words.append(w)
i += 2
# Бинарный протокол:
# старт свипа (актуальный): 0xFFFF, 0xFFFF, 0xFFFF, (ch<<8)|0x0A
# старт свипа (legacy): 0xFFFF, 0xFFFF, channel, 0x0A0A
# точка: step, value_hi, value_lo, 0x000A
while len(words) >= 4:
w0 = int(words[0])
w1 = int(words[1])
w2 = int(words[2])
w3 = int(words[3])
if w0 == 0xFFFF and w1 == 0xFFFF and w2 == 0xFFFF and (w3 & 0x00FF) == 0x000A:
self._finalize_current(xs, ys, cur_channels)
xs.clear()
ys.clear()
cur_channels.clear()
cur_channel = (w3 >> 8) & 0x00FF
cur_channels.add(cur_channel)
for _ in range(4):
words.popleft()
continue
if w0 == 0xFFFF and w1 == 0xFFFF and w3 == 0x0A0A:
self._finalize_current(xs, ys, cur_channels)
xs.clear()
ys.clear()
cur_channels.clear()
cur_channel = w2
cur_channels.add(cur_channel)
for _ in range(4):
words.popleft()
continue
if w3 == 0x000A:
if cur_channel is not None:
cur_channels.add(cur_channel)
xs.append(w0)
value_u32 = (w1 << 16) | w2
ys.append(self._u32_to_i32(value_u32))
for _ in range(4):
words.popleft()
continue
# Поток может начаться с середины пакета; сдвигаемся по слову до ресинхронизации.
words.popleft()
del buf[:usable]
if len(buf) > 1_000_000:
del buf[:-262144]
self._finalize_current(xs, ys, cur_channels)
def _run_logscale_binary_stream(self, chunk_reader: SerialChunkReader):
xs: list[int] = []
ys: list[float] = []
avg_1_vals: list[int] = []
avg_2_vals: list[int] = []
cur_channel: Optional[int] = None
cur_channels: set[int] = set()
words = deque()
buf = bytearray()
while not self._stop.is_set():
data = chunk_reader.read_available()
if data:
buf += data
else:
time.sleep(0.0005)
continue
usable = len(buf) & ~1
if usable == 0:
continue
i = 0
while i < usable:
w = int(buf[i]) | (int(buf[i + 1]) << 8)
words.append(w)
i += 2
# Бинарный logscale-протокол:
# старт свипа: 0xFFFF x5, затем (ch<<8)|0x0A
# точка: step, avg1_hi, avg1_lo, avg2_hi, avg2_lo, 0x000A
while len(words) >= 6:
w0 = int(words[0])
w1 = int(words[1])
w2 = int(words[2])
w3 = int(words[3])
w4 = int(words[4])
w5 = int(words[5])
if (
w0 == 0xFFFF
and w1 == 0xFFFF
and w2 == 0xFFFF
and w3 == 0xFFFF
and w4 == 0xFFFF
and (w5 & 0x00FF) == 0x000A
):
self._finalize_current(
xs,
ys,
cur_channels,
raw_curves=(avg_1_vals, avg_2_vals),
apply_inversion=False,
)
xs.clear()
ys.clear()
avg_1_vals.clear()
avg_2_vals.clear()
cur_channels.clear()
cur_channel = (w5 >> 8) & 0x00FF
cur_channels.add(cur_channel)
for _ in range(6):
words.popleft()
continue
if w5 == 0x000A:
if cur_channel is not None:
cur_channels.add(cur_channel)
avg_1 = self._u32_to_i32((w1 << 16) | w2)
avg_2 = self._u32_to_i32((w3 << 16) | w4)
xs.append(w0)
avg_1_vals.append(avg_1)
avg_2_vals.append(avg_2)
ys.append(_log_pair_to_sweep(avg_1, avg_2))
#ys.append(LOG_BASE**(avg_1/LOG_SCALER) - LOG_BASE**(avg_2/LOG_SCALER))
for _ in range(6):
words.popleft()
continue
words.popleft()
del buf[:usable]
if len(buf) > 1_000_000:
del buf[:-262144]
self._finalize_current(
xs,
ys,
cur_channels,
raw_curves=(avg_1_vals, avg_2_vals),
apply_inversion=False,
)
def _run_logscale_16_bit_x2_binary_stream(self, chunk_reader: SerialChunkReader):
xs: list[int] = []
ys: list[float] = []
avg_1_vals: list[int] = []
avg_2_vals: list[int] = []
cur_channel: Optional[int] = None
cur_channels: set[int] = set()
words = deque()
buf = bytearray()
while not self._stop.is_set():
data = chunk_reader.read_available()
if data:
buf += data
else:
time.sleep(0.0005)
continue
usable = len(buf) & ~1
if usable == 0:
continue
i = 0
while i < usable:
w = int(buf[i]) | (int(buf[i + 1]) << 8)
words.append(w)
i += 2
# Бинарный logscale-протокол (16-bit x2):
# старт свипа (новый): 0xFFFF, 0xFFFF, (ch<<8)|0x0A
# старт свипа (fallback): 0xFFFF, 0xFFFF, 0xFFFF, (ch<<8)|0x0A
# точка: step, avg1_lo16, avg2_lo16, 0x000A
while len(words) >= 3:
w0 = int(words[0])
w1 = int(words[1])
w2 = int(words[2])
if w0 == 0xFFFF and w1 == 0xFFFF and (w2 & 0x00FF) == 0x000A:
self._finalize_current(
xs,
ys,
cur_channels,
raw_curves=(avg_1_vals, avg_2_vals),
apply_inversion=False,
)
xs.clear()
ys.clear()
avg_1_vals.clear()
avg_2_vals.clear()
cur_channels.clear()
cur_channel = (w2 >> 8) & 0x00FF
cur_channels.add(cur_channel)
for _ in range(3):
words.popleft()
continue
if len(words) < 4:
break
w3 = int(words[3])
if w0 == 0xFFFF and w1 == 0xFFFF and w2 == 0xFFFF and (w3 & 0x00FF) == 0x000A:
self._finalize_current(
xs,
ys,
cur_channels,
raw_curves=(avg_1_vals, avg_2_vals),
apply_inversion=False,
)
xs.clear()
ys.clear()
avg_1_vals.clear()
avg_2_vals.clear()
cur_channels.clear()
cur_channel = (w3 >> 8) & 0x00FF
cur_channels.add(cur_channel)
for _ in range(4):
words.popleft()
continue
if w3 == 0x000A:
if cur_channel is not None:
cur_channels.add(cur_channel)
avg_1 = self._u16_to_i16(w1)
avg_2 = self._u16_to_i16(w2)
xs.append(w0)
avg_1_vals.append(avg_1)
avg_2_vals.append(avg_2)
ys.append(_log_pair_to_sweep(avg_1, avg_2))
for _ in range(4):
words.popleft()
continue
words.popleft()
del buf[:usable]
if len(buf) > 1_000_000:
del buf[:-262144]
self._finalize_current(
xs,
ys,
cur_channels,
raw_curves=(avg_1_vals, avg_2_vals),
apply_inversion=False,
)
def run(self):
try:
self._src = SerialLineSource(self._port_path, self._baud, timeout=1.0)
sys.stderr.write(f"[info] Открыл порт {self._port_path} ({self._src._using})\n")
except Exception as e:
sys.stderr.write(f"[error] {e}\n")
return
try:
chunk_reader = SerialChunkReader(self._src)
if self._parser_16_bit_x2:
self._run_logscale_16_bit_x2_binary_stream(chunk_reader)
elif self._logscale:
self._run_logscale_binary_stream(chunk_reader)
elif self._bin_mode:
self._run_binary_stream(chunk_reader)
else:
self._run_ascii_stream(chunk_reader)
finally:
try:
if self._src is not None:
self._src.close()
except Exception:
pass
def main():
parser = argparse.ArgumentParser(
description=(
"Читает свипы из виртуального COM-порта и рисует: "
"последний свип и водопад (реалтайм)."
)
)
parser.add_argument(
"port",
help="Путь к порту, например /dev/ttyACM1 или COM3 (COM10+: \\\\.\\COM10)",
)
parser.add_argument("--baud", type=int, default=115200, help="Скорость (по умолчанию 115200)")
parser.add_argument("--max-sweeps", type=int, default=200, help="Количество видимых свипов в водопаде")
parser.add_argument("--max-fps", type=float, default=30.0, help="Лимит частоты отрисовки, кадров/с")
parser.add_argument("--cmap", default="viridis", help="Цветовая карта водопада")
parser.add_argument(
"--spec-clip",
default="2,98",
help=(
"Процентильная обрезка уровней водопада спектров, % (min,max). "
"Напр. 2,98. 'off' — отключить"
),
)
parser.add_argument(
"--spec-mean-sec",
type=float,
default=0.0,
help=(
"Вычитание среднего по каждой частоте за последние N секунд "
"в водопаде спектров (0 — отключить)"
),
)
parser.add_argument("--title", default="ADC Sweeps", help="Заголовок окна")
parser.add_argument(
"--fancy",
action="store_true",
help="Заполнять выпавшие точки средними значениями между соседними",
)
parser.add_argument(
"--ylim",
type=str,
default=None,
help="Фиксированные Y-пределы для кривой формата min,max (например -1000,1000). По умолчанию авто",
)
parser.add_argument(
"--backend",
choices=["auto", "pg", "mpl"],
default="pg",
help="Графический бэкенд: pyqtgraph (pg) — быстрее; matplotlib (mpl) — совместимый. По умолчанию pg",
)
parser.add_argument(
"--norm-type",
choices=["projector", "simple"],
default="projector",
help="Тип нормировки: projector (по огибающим в [-1,+1]) или simple (raw/calib)",
)
parser.add_argument(
"--bin",
dest="bin_mode",
action="store_true",
help=(
"Бинарный протокол: старт свипа 0xFFFF,0xFFFF,0xFFFF,(CH<<8)|0x0A; "
"точки step,uint32(hi16,lo16),0x000A"
),
)
parser.add_argument(
"--logscale",
action="store_true",
default=True,
help=(
"Новый бинарный протокол: точка несёт пару int32 (avg_1, avg_2), "
"а свип считается как 10**(avg_1*0.001) - 10**(avg_2*0.001)"
),
)
parser.add_argument(
"--parser_16_bit_x2",
action="store_true",
help=(
"Бинарный logscale-протокол c парой int16 (avg_1, avg_2): "
"старт 0xFFFF,0xFFFF,(CH<<8)|0x0A; точка step,avg1_lo16,avg2_lo16,0x000A"
),
)
parser.add_argument(
"--calibrate",
action="store_true",
help=(
"Режим измерения ширины главного пика FFT: рисует красные маркеры "
"границ и фона и выводит ширину пика в статус"
),
)
args = parser.parse_args()
# Попробуем быстрый бэкенд (pyqtgraph) при auto/pg
if args.backend == "pg":
try:
return run_pyqtgraph(args)
except Exception as e:
sys.stderr.write(f"[error] PyQtGraph бэкенд недоступен: {e}\n")
sys.exit(1)
if args.backend == "auto":
try:
return run_pyqtgraph(args)
except Exception:
pass # При auto — тихо откатываемся на matplotlib
try:
import matplotlib
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from matplotlib.widgets import Slider, CheckButtons, TextBox
except Exception as e:
sys.stderr.write(f"[error] Нужны matplotlib и ее зависимости: {e}\n")
sys.exit(1)
# Очередь завершённых свипов и поток чтения
q: Queue[SweepPacket] = Queue(maxsize=1000)
stop_event = threading.Event()
reader = SweepReader(
args.port,
args.baud,
q,
stop_event,
fancy=bool(args.fancy),
bin_mode=bool(args.bin_mode),
logscale=bool(args.logscale),
parser_16_bit_x2=bool(args.parser_16_bit_x2),
)
reader.start()
# Графика
fig, axs = plt.subplots(2, 2, figsize=(12, 8))
(ax_line, ax_img), (ax_fft, ax_spec) = axs
fig.canvas.manager.set_window_title(args.title) if hasattr(fig.canvas.manager, "set_window_title") else None
# Увеличим расстояния и оставим место справа под ползунки оси Y B-scan
fig.subplots_adjust(wspace=0.25, hspace=0.35, left=0.07, right=0.90, top=0.92, bottom=0.08)
# Состояние для отображения
current_freqs: Optional[np.ndarray] = None
current_distances: Optional[np.ndarray] = None
current_sweep_raw: Optional[np.ndarray] = None
current_aux_curves: SweepAuxCurves = None
current_sweep_norm: Optional[np.ndarray] = None
last_calib_sweep: Optional[np.ndarray] = None
current_info: Optional[SweepInfo] = None
x_shared: Optional[np.ndarray] = None
width: Optional[int] = None
max_sweeps = int(max(10, args.max_sweeps))
ring = None # type: Optional[np.ndarray]
ring_time = None # type: Optional[np.ndarray]
head = 0
# Авто-уровни цветовой шкалы водопада сырых данных пересчитываются по видимой области.
# FFT состояние
fft_bins = FFT_LEN // 2 + 1
ring_fft = None # type: Optional[np.ndarray]
y_min_fft, y_max_fft = None, None
distance_shared: Optional[np.ndarray] = None
# Параметры контраста водопада спектров
spec_clip = _parse_spec_clip(getattr(args, "spec_clip", None))
spec_mean_sec = float(getattr(args, "spec_mean_sec", 0.0))
# Ползунки управления Y для B-scan и контрастом
ymin_slider = None
ymax_slider = None
contrast_slider = None
calib_enabled = False
bg_subtract_enabled = False
peak_calibrate_mode = bool(getattr(args, "calibrate", False))
current_peak_width: Optional[float] = None
norm_type = str(getattr(args, "norm_type", "projector")).strip().lower()
cb = None
c_boxes = []
# Статусная строка (внизу окна)
status_text = fig.text(
0.01,
0.01,
"",
ha="left",
va="bottom",
fontsize=8,
family="monospace",
)
# Линейный график последнего свипа
line_avg1_obj, = ax_line.plot([], [], lw=1, color="0.65")
line_avg2_obj, = ax_line.plot([], [], lw=1, color="0.45")
line_obj, = ax_line.plot([], [], lw=1, color="tab:blue")
line_calib_obj, = ax_line.plot([], [], lw=1, color="tab:red")
line_norm_obj, = ax_line.plot([], [], lw=1, color="tab:green")
ax_line.set_title("Сырые данные", pad=1)
ax_line.set_xlabel("ГГц")
ax_line.set_ylabel("")
channel_text = ax_line.text(
0.98,
0.98,
"",
transform=ax_line.transAxes,
ha="right",
va="top",
fontsize=9,
family="monospace",
)
# Линейный график спектра текущего свипа
fft_line_obj, = ax_fft.plot([], [], lw=1)
fft_bg_obj = ax_fft.axhline(0.0, lw=1, color="red", visible=False)
fft_left_obj = ax_fft.axvline(0.0, lw=1, color="red", visible=False)
fft_right_obj = ax_fft.axvline(0.0, lw=1, color="red", visible=False)
ax_fft.set_title("FFT", pad=1)
ax_fft.set_xlabel("Расстояние, м")
ax_fft.set_ylabel("дБ")
# Диапазон по Y для последнего свипа: авто по умолчанию (поддерживает отрицательные значения)
fixed_ylim: Optional[Tuple[float, float]] = None
# CLI переопределение при необходимости
if args.ylim:
try:
y0, y1 = args.ylim.split(",")
fixed_ylim = (float(y0), float(y1))
except Exception:
sys.stderr.write("[warn] Некорректный формат --ylim, игнорирую. Ожидалось min,max\n")
if fixed_ylim is not None:
ax_line.set_ylim(fixed_ylim)
# Водопад (будет инициализирован при первом свипе)
img_obj = ax_img.imshow(
np.zeros((1, 1), dtype=np.float32),
aspect="auto",
interpolation="nearest",
origin="lower",
cmap=args.cmap,
)
ax_img.set_title("Сырые данные", pad=12)
ax_img.set_xlabel("")
ax_img.set_ylabel("ГГц")
# Не показываем численные значения по времени на водопаде сырых данных
try:
ax_img.tick_params(axis="x", labelbottom=False)
except Exception:
pass
# Водопад спектров
img_fft_obj = ax_spec.imshow(
np.zeros((1, 1), dtype=np.float32),
aspect="auto",
interpolation="nearest",
origin="lower",
cmap=args.cmap,
)
ax_spec.set_title("B-scan (дБ)", pad=12)
ax_spec.set_xlabel("")
ax_spec.set_ylabel("Расстояние, м")
# Не показываем численные значения по времени на B-scan
try:
ax_spec.tick_params(axis="x", labelbottom=False)
except Exception:
pass
spec_left_obj = ax_spec.axhline(0.0, lw=1, color="red", visible=False)
spec_right_obj = ax_spec.axhline(0.0, lw=1, color="red", visible=False)
def _normalize_sweep(raw: np.ndarray, calib: np.ndarray) -> np.ndarray:
return _normalize_by_calib(raw, calib, norm_type=norm_type)
def _sync_checkbox_states():
nonlocal calib_enabled, bg_subtract_enabled, current_sweep_norm
try:
states = cb.get_status() if cb is not None else ()
calib_enabled = bool(states[0]) if len(states) > 0 else False
bg_subtract_enabled = bool(states[1]) if len(states) > 1 else False
except Exception:
calib_enabled = False
bg_subtract_enabled = False
if calib_enabled and current_sweep_raw is not None and last_calib_sweep is not None:
current_sweep_norm = _normalize_sweep(current_sweep_raw, last_calib_sweep)
else:
current_sweep_norm = None
# Слайдеры для управления осью Y B-scan (мин/макс) и контрастом
try:
ax_smin = fig.add_axes([0.92, 0.55, 0.02, 0.35])
ax_smax = fig.add_axes([0.95, 0.55, 0.02, 0.35])
ax_sctr = fig.add_axes([0.98, 0.55, 0.02, 0.35])
ax_cb = fig.add_axes([0.90, 0.40, 0.10, 0.14])
ymin_slider = Slider(ax_smin, "R min", 0.0, 1.0, valinit=0.0, orientation="vertical")
ymax_slider = Slider(ax_smax, "R max", 0.0, 1.0, valinit=1.0, orientation="vertical")
contrast_slider = Slider(ax_sctr, "Int max", 0, 100, valinit=100, valstep=1, orientation="vertical")
cb = CheckButtons(ax_cb, ["нормировка", "вычет фона"], [False, False])
def _on_ylim_change(_val):
try:
y0 = float(min(ymin_slider.val, ymax_slider.val))
y1 = float(max(ymin_slider.val, ymax_slider.val))
ax_spec.set_ylim(y0, y1)
fig.canvas.draw_idle()
except Exception:
pass
ymin_slider.on_changed(_on_ylim_change)
ymax_slider.on_changed(_on_ylim_change)
# Контраст влияет на верхнюю границу цветовой шкалы (процент от авто-диапазона)
contrast_slider.on_changed(lambda _v: fig.canvas.draw_idle())
cb.on_clicked(lambda _v: _sync_checkbox_states())
except Exception:
pass
if peak_calibrate_mode:
try:
def _set_c_value(idx: int, text: str):
global CALIBRATION_C
try:
CALIBRATION_C[idx] = float(text.strip())
except Exception:
pass
for idx, ypos in enumerate((0.36, 0.30, 0.24)):
ax_c = fig.add_axes([0.92, ypos, 0.08, 0.045])
tb = TextBox(ax_c, f"C{idx}", initial=f"{float(CALIBRATION_C[idx]):.6g}")
tb.on_submit(lambda text, i=idx: _set_c_value(i, text))
c_boxes.append(tb)
except Exception:
pass
# Для контроля частоты обновления
max_fps = max(1.0, float(args.max_fps))
interval_ms = int(1000.0 / max_fps)
frames_since_ylim_update = 0
spec_slider_initialized = False
def ensure_buffer(_w: int):
nonlocal ring, width, head, x_shared, ring_fft, distance_shared, ring_time
if ring is not None:
return
width = WF_WIDTH
x_shared = np.linspace(3.3, 14.3, width, dtype=np.float32)
ring = np.full((max_sweeps, width), np.nan, dtype=np.float32)
ring_time = np.full((max_sweeps,), np.nan, dtype=np.float64)
head = 0
# Обновляем изображение под новые размеры: время по X (горизонталь), X по Y
img_obj.set_data(np.zeros((width, max_sweeps), dtype=np.float32))
img_obj.set_extent((0, max_sweeps - 1, 3.3, 14.3))
ax_img.set_xlim(0, max_sweeps - 1)
ax_img.set_ylim(3.3, 14.3)
# FFT буферы: время по X, бин по Y
ring_fft = np.full((max_sweeps, fft_bins), np.nan, dtype=np.float32)
img_fft_obj.set_data(np.zeros((fft_bins, max_sweeps), dtype=np.float32))
img_fft_obj.set_extent((0, max_sweeps - 1, 0.0, 1.0))
ax_spec.set_xlim(0, max_sweeps - 1)
ax_spec.set_ylim(0.0, 1.0)
distance_shared = _compute_distance_axis(current_freqs, fft_bins)
def _update_physical_axes():
nonlocal distance_shared, spec_slider_initialized
if current_freqs is not None and current_freqs.size > 0:
finite_f = current_freqs[np.isfinite(current_freqs)]
if finite_f.size > 0:
f_min = float(np.min(finite_f))
f_max = float(np.max(finite_f))
if f_max <= f_min:
f_max = f_min + 1.0
img_obj.set_extent((0, max_sweeps - 1, f_min, f_max))
ax_img.set_ylim(f_min, f_max)
distance_shared = _compute_distance_axis(current_freqs, fft_bins)
if distance_shared.size > 0:
d_min = float(distance_shared[0])
d_max = float(distance_shared[-1]) if distance_shared.size > 1 else float(distance_shared[0] + 1.0)
if d_max <= d_min:
d_max = d_min + 1.0
img_fft_obj.set_extent((0, max_sweeps - 1, d_min, d_max))
ax_spec.set_ylim(d_min, d_max)
if ymin_slider is not None and ymax_slider is not None:
try:
ymin_slider.valmin = d_min
ymin_slider.valmax = d_max
ymax_slider.valmin = d_min
ymax_slider.valmax = d_max
ymin_slider.ax.set_ylim(d_min, d_max)
ymax_slider.ax.set_ylim(d_min, d_max)
if (not spec_slider_initialized) or (not (d_min <= ymin_slider.val <= d_max)):
ymin_slider.set_val(d_min)
if (not spec_slider_initialized) or (not (d_min <= ymax_slider.val <= d_max)):
ymax_slider.set_val(d_max)
spec_slider_initialized = True
except Exception:
pass
def _visible_levels_matplotlib(data: np.ndarray, axis) -> Optional[Tuple[float, float]]:
"""(vmin, vmax) по центральным 90% значений в видимой области imshow."""
if data.size == 0:
return None
ny, nx = data.shape[0], data.shape[1]
try:
x0, x1 = axis.get_xlim()
y0, y1 = axis.get_ylim()
except Exception:
x0, x1 = 0.0, float(nx - 1)
y0, y1 = 0.0, float(ny - 1)
xmin, xmax = sorted((float(x0), float(x1)))
ymin, ymax = sorted((float(y0), float(y1)))
ix0 = max(0, min(nx - 1, int(np.floor(xmin))))
ix1 = max(0, min(nx - 1, int(np.ceil(xmax))))
iy0 = max(0, min(ny - 1, int(np.floor(ymin))))
iy1 = max(0, min(ny - 1, int(np.ceil(ymax))))
if ix1 < ix0:
ix1 = ix0
if iy1 < iy0:
iy1 = iy0
sub = data[iy0 : iy1 + 1, ix0 : ix1 + 1]
finite = np.isfinite(sub)
if not finite.any():
return None
vals = sub[finite]
vmin = float(np.nanpercentile(vals, 5))
vmax = float(np.nanpercentile(vals, 95))
if not (np.isfinite(vmin) and np.isfinite(vmax)) or vmin == vmax:
return None
return (vmin, vmax)
def push_sweep(s: np.ndarray, freqs: Optional[np.ndarray] = None):
nonlocal ring, head, ring_fft, y_min_fft, y_max_fft, ring_time
if s is None or s.size == 0 or ring is None:
return
# Нормализуем длину до фиксированной ширины
w = ring.shape[1]
row = np.full((w,), np.nan, dtype=np.float32)
take = min(w, s.size)
row[:take] = s[:take]
ring[head, :] = row
if ring_time is not None:
ring_time[head] = time.time()
head = (head + 1) % ring.shape[0]
# FFT строка (линейный модуль; перевод в дБ делаем при отображении)
if ring_fft is not None:
bins = ring_fft.shape[1]
fft_mag = _compute_fft_mag_row(s, freqs, bins)
ring_fft[(head - 1) % ring_fft.shape[0], :] = fft_mag
fft_row = _fft_mag_to_db(fft_mag)
# Экстремумы для цветовой шкалы
fr_min = np.nanmin(fft_row)
fr_max = np.nanmax(fft_row)
fr_max = np.nanpercentile(fft_row, 90)
if y_min_fft is None or (not np.isnan(fr_min) and fr_min < y_min_fft):
y_min_fft = float(fr_min)
if y_max_fft is None or (not np.isnan(fr_max) and fr_max > y_max_fft):
y_max_fft = float(fr_max)
def drain_queue():
nonlocal current_freqs, current_distances, current_sweep_raw, current_aux_curves, current_sweep_norm, current_info, last_calib_sweep
drained = 0
while True:
try:
s, info, aux_curves = q.get_nowait()
except Empty:
break
drained += 1
calibrated = calibrate_freqs(
{
"F": np.linspace(3.3, 14.3, s.size, dtype=np.float64),
"I": s,
}
)
current_freqs = calibrated["F"]
current_distances = _compute_distance_axis(current_freqs, fft_bins)
s = calibrated["I"]
current_sweep_raw = s
current_aux_curves = aux_curves
current_info = info
ch = 0
try:
ch = int(info.get("ch", 0)) if isinstance(info, dict) else 0
except Exception:
ch = 0
if ch == 0:
last_calib_sweep = s
current_sweep_norm = None
sweep_for_proc = s
else:
if calib_enabled and last_calib_sweep is not None:
current_sweep_norm = _normalize_sweep(s, last_calib_sweep)
sweep_for_proc = current_sweep_norm
else:
current_sweep_norm = None
sweep_for_proc = s
ensure_buffer(s.size)
_update_physical_axes()
push_sweep(sweep_for_proc, current_freqs)
return drained
def make_display_ring():
# Возвращаем буфер с правильным порядком по времени (старые→новые) и осью времени по X
if ring is None:
return np.zeros((1, 1), dtype=np.float32)
base = ring if head == 0 else np.roll(ring, -head, axis=0)
return base.T # (width, time)
def make_display_times():
if ring_time is None:
return None
base_t = ring_time if head == 0 else np.roll(ring_time, -head)
return base_t
def _subtract_recent_mean_fft(disp_fft: np.ndarray) -> np.ndarray:
"""Вычесть среднее по каждой дальности за последние spec_mean_sec секунд в линейной области."""
if spec_mean_sec <= 0.0:
return disp_fft
disp_times = make_display_times()
if disp_times is None:
return disp_fft
now_t = time.time()
mask = np.isfinite(disp_times) & (disp_times >= (now_t - spec_mean_sec))
if not np.any(mask):
return disp_fft
try:
mean_spec = np.nanmean(disp_fft[:, mask], axis=1)
except Exception:
return disp_fft
mean_spec = np.nan_to_num(mean_spec, nan=0.0)
return disp_fft - mean_spec[:, None]
def _visible_bg_fft(disp_fft: np.ndarray) -> Optional[np.ndarray]:
"""Оценка фона по медиане в текущем видимом по времени окне B-scan."""
if not bg_subtract_enabled or disp_fft.size == 0:
return None
ny, nx = disp_fft.shape
if ny <= 0 or nx <= 0:
return None
try:
x0, x1 = ax_spec.get_xlim()
except Exception:
x0, x1 = 0.0, float(nx - 1)
xmin, xmax = sorted((float(x0), float(x1)))
ix0 = max(0, min(nx - 1, int(np.floor(xmin))))
ix1 = max(0, min(nx - 1, int(np.ceil(xmax))))
if ix1 < ix0:
ix1 = ix0
window = disp_fft[:, ix0 : ix1 + 1]
if window.size == 0:
return None
try:
bg_spec = np.nanmedian(window, axis=1)
except Exception:
return None
if not np.any(np.isfinite(bg_spec)):
return None
return np.nan_to_num(bg_spec, nan=0.0).astype(np.float32, copy=False)
def make_display_ring_fft():
if ring_fft is None:
return np.zeros((1, 1), dtype=np.float32)
base = ring_fft if head == 0 else np.roll(ring_fft, -head, axis=0)
return base.T # (bins, time)
def update(_frame):
nonlocal frames_since_ylim_update, current_peak_width
if peak_calibrate_mode and any(getattr(tb, "capturekeystrokes", False) for tb in c_boxes):
return (
line_obj,
line_avg1_obj,
line_avg2_obj,
line_calib_obj,
line_norm_obj,
img_obj,
fft_line_obj,
fft_bg_obj,
fft_left_obj,
fft_right_obj,
img_fft_obj,
spec_left_obj,
spec_right_obj,
status_text,
channel_text,
)
changed = drain_queue() > 0
# Обновление линии последнего свипа
if current_sweep_raw is not None:
if current_freqs is not None and current_freqs.size == current_sweep_raw.size:
xs = current_freqs
elif x_shared is not None and current_sweep_raw.size <= x_shared.size:
xs = x_shared[: current_sweep_raw.size]
else:
xs = np.arange(current_sweep_raw.size, dtype=np.int32)
line_obj.set_data(xs, current_sweep_raw)
if current_aux_curves is not None:
avg_1_curve, avg_2_curve = current_aux_curves
line_avg1_obj.set_data(xs[: avg_1_curve.size], avg_1_curve)
line_avg2_obj.set_data(xs[: avg_2_curve.size], avg_2_curve)
else:
line_avg1_obj.set_data([], [])
line_avg2_obj.set_data([], [])
if last_calib_sweep is not None:
line_calib_obj.set_data(xs[: last_calib_sweep.size], last_calib_sweep)
else:
line_calib_obj.set_data([], [])
if current_sweep_norm is not None:
line_norm_obj.set_data(xs[: current_sweep_norm.size], current_sweep_norm)
else:
line_norm_obj.set_data([], [])
# Лимиты по X: 3.3 ГГц .. 14.3 ГГц
if isinstance(xs, np.ndarray) and xs.size > 0 and np.isfinite(xs[0]) and np.isfinite(xs[-1]):
ax_line.set_xlim(float(np.nanmin(xs)), float(np.nanmax(xs)))
else:
ax_line.set_xlim(3.3, 14.3)
# Адаптивные Y-лимиты (если не задан --ylim)
if fixed_ylim is None:
y_series = [current_sweep_raw, last_calib_sweep, current_sweep_norm]
if current_aux_curves is not None:
y_series.extend(current_aux_curves)
y_limits = _compute_auto_ylim(*y_series)
if y_limits is not None:
ax_line.set_ylim(y_limits[0], y_limits[1])
# Обновление спектра текущего свипа
sweep_for_fft = current_sweep_norm if current_sweep_norm is not None else current_sweep_raw
if sweep_for_fft.size > 0 and distance_shared is not None:
fft_vals = _compute_fft_row(sweep_for_fft, current_freqs, distance_shared.size)
xs_fft = current_distances if current_distances is not None else distance_shared
if fft_vals.size > xs_fft.size:
fft_vals = fft_vals[: xs_fft.size]
fft_line_obj.set_data(xs_fft[: fft_vals.size], fft_vals)
# Авто-диапазон по Y для спектра
if np.isfinite(np.nanmin(fft_vals)) and np.isfinite(np.nanmax(fft_vals)):
finite_x = xs_fft[: fft_vals.size][np.isfinite(xs_fft[: fft_vals.size])]
if finite_x.size > 0:
ax_fft.set_xlim(float(np.min(finite_x)), float(np.max(finite_x)))
ax_fft.set_ylim(float(np.nanmin(fft_vals)), float(np.nanmax(fft_vals)))
if peak_calibrate_mode:
markers = _find_peak_width_markers(xs_fft[: fft_vals.size], fft_vals)
if markers is not None:
fft_bg_obj.set_ydata([markers["background"], markers["background"]])
fft_left_obj.set_xdata([markers["left"], markers["left"]])
fft_right_obj.set_xdata([markers["right"], markers["right"]])
spec_left_obj.set_ydata([markers["left"], markers["left"]])
spec_right_obj.set_ydata([markers["right"], markers["right"]])
fft_bg_obj.set_visible(True)
fft_left_obj.set_visible(True)
fft_right_obj.set_visible(True)
spec_left_obj.set_visible(True)
spec_right_obj.set_visible(True)
current_peak_width = markers["width"]
else:
fft_bg_obj.set_visible(False)
fft_left_obj.set_visible(False)
fft_right_obj.set_visible(False)
spec_left_obj.set_visible(False)
spec_right_obj.set_visible(False)
current_peak_width = None
else:
fft_bg_obj.set_visible(False)
fft_left_obj.set_visible(False)
fft_right_obj.set_visible(False)
spec_left_obj.set_visible(False)
spec_right_obj.set_visible(False)
current_peak_width = None
# Обновление водопада
if changed and ring is not None:
disp = make_display_ring()
# Новые данные справа: без реверса
img_obj.set_data(disp)
# Подписи времени не обновляем динамически (оставляем авто-тики)
# Авто-уровни: по видимой области (не накапливаем за всё время)
levels = _visible_levels_matplotlib(disp, ax_img)
if levels is not None:
img_obj.set_clim(vmin=levels[0], vmax=levels[1])
# Обновление водопада спектров
if changed and ring_fft is not None:
disp_fft_lin = make_display_ring_fft()
disp_fft_lin = _subtract_recent_mean_fft(disp_fft_lin)
bg_spec = _visible_bg_fft(disp_fft_lin)
if bg_spec is not None:
num = np.maximum(disp_fft_lin, 0.0).astype(np.float32, copy=False) + 1e-9
den = bg_spec[:, None] + 1e-9
disp_fft = (20.0 * np.log10(num / den)).astype(np.float32, copy=False)
else:
disp_fft = _fft_mag_to_db(disp_fft_lin)
# Новые данные справа: без реверса
img_fft_obj.set_data(disp_fft)
# Подписи времени не обновляем динамически (оставляем авто-тики)
# Автодиапазон по среднему спектру за видимый интервал (как в хорошей версии)
if bg_spec is not None:
try:
p5 = float(np.nanpercentile(disp_fft, 5))
p95 = float(np.nanpercentile(disp_fft, 95))
span = max(abs(p5), abs(p95))
except Exception:
span = float("nan")
if np.isfinite(span) and span > 0.0:
try:
c = float(contrast_slider.val) / 100.0 if contrast_slider is not None else 1.0
except Exception:
c = 1.0
span_eff = max(span * c, 1e-6)
img_fft_obj.set_clim(vmin=-span_eff, vmax=span_eff)
else:
try:
# disp_fft имеет форму (bins, time); берём среднее по времени
mean_spec = np.nanmean(disp_fft, axis=1)
vmin_v = float(np.nanmin(mean_spec))
vmax_v = float(np.nanmax(mean_spec))
except Exception:
vmin_v = vmax_v = None
# Если средние не дают валидный диапазон — используем процентильную обрезку (если задана)
if (vmin_v is None or not np.isfinite(vmin_v)) or (vmax_v is None or not np.isfinite(vmax_v)) or vmin_v == vmax_v:
if spec_clip is not None:
try:
vmin_v = float(np.nanpercentile(disp_fft, spec_clip[0]))
vmax_v = float(np.nanpercentile(disp_fft, spec_clip[1]))
except Exception:
vmin_v = vmax_v = None
# Фолбэк к отслеживаемым минимум/максимумам
if (vmin_v is None or not np.isfinite(vmin_v)) or (vmax_v is None or not np.isfinite(vmax_v)) or vmin_v == vmax_v:
if y_min_fft is not None and y_max_fft is not None and np.isfinite(y_min_fft) and np.isfinite(y_max_fft) and y_min_fft != y_max_fft:
vmin_v, vmax_v = y_min_fft, y_max_fft
if vmin_v is not None and vmax_v is not None and vmin_v != vmax_v:
# Применим скалирование контрастом (верхняя граница)
try:
c = float(contrast_slider.val) / 100.0 if contrast_slider is not None else 1.0
except Exception:
c = 1.0
vmax_eff = vmin_v + c * (vmax_v - vmin_v)
img_fft_obj.set_clim(vmin=vmin_v, vmax=vmax_eff)
if changed and current_info:
status_payload = dict(current_info)
if peak_calibrate_mode and current_peak_width is not None:
status_payload["peak_w"] = current_peak_width
status_text.set_text(_format_status_kv(status_payload))
chs = current_info.get("chs") if isinstance(current_info, dict) else None
if chs is None:
chs = current_info.get("ch") if isinstance(current_info, dict) else None
if chs is None:
channel_text.set_text("")
else:
try:
if isinstance(chs, (list, tuple, set)):
ch_list = sorted(int(v) for v in chs)
ch_text_val = ", ".join(str(v) for v in ch_list)
else:
ch_text_val = str(int(chs))
channel_text.set_text(f"chs {ch_text_val}")
except Exception:
channel_text.set_text(f"chs {chs}")
# Возвращаем обновлённые артисты
return (
line_obj,
line_avg1_obj,
line_avg2_obj,
line_calib_obj,
line_norm_obj,
img_obj,
fft_line_obj,
fft_bg_obj,
fft_left_obj,
fft_right_obj,
img_fft_obj,
spec_left_obj,
spec_right_obj,
status_text,
channel_text,
)
ani = FuncAnimation(fig, update, interval=interval_ms, blit=False)
cleanup_done = False
def _cleanup():
nonlocal cleanup_done
if cleanup_done:
return
cleanup_done = True
stop_event.set()
reader.join(timeout=1.0)
def _handle_sigint(_signum, _frame):
_cleanup()
try:
plt.close(fig)
except Exception:
pass
prev_sigint = signal.getsignal(signal.SIGINT)
try:
fig.canvas.mpl_connect("close_event", lambda _evt: _cleanup())
except Exception:
pass
try:
signal.signal(signal.SIGINT, _handle_sigint)
except Exception:
prev_sigint = None
try:
plt.show()
finally:
_cleanup()
if prev_sigint is not None:
try:
signal.signal(signal.SIGINT, prev_sigint)
except Exception:
pass
def run_pyqtgraph(args):
"""Быстрый GUI на PyQtGraph. Требует pyqtgraph и PyQt5/PySide6."""
peak_calibrate_mode = bool(getattr(args, "calibrate", False))
try:
import pyqtgraph as pg
from pyqtgraph.Qt import QtCore, QtWidgets # type: ignore
except Exception as e:
raise RuntimeError(
"pyqtgraph и совместимый Qt backend не найдены. Установите: pip install pyqtgraph PyQt5"
) from e
# Очередь завершённых свипов и поток чтения
q: Queue[SweepPacket] = Queue(maxsize=1000)
stop_event = threading.Event()
reader = SweepReader(
args.port,
args.baud,
q,
stop_event,
fancy=bool(args.fancy),
bin_mode=bool(args.bin_mode),
logscale=bool(args.logscale),
parser_16_bit_x2=bool(args.parser_16_bit_x2),
)
reader.start()
# Настройки скорости
max_sweeps = int(max(10, args.max_sweeps))
max_fps = max(1.0, float(args.max_fps))
interval_ms = int(1000.0 / max_fps)
# PyQtGraph настройки
pg.setConfigOptions(
useOpenGL=not peak_calibrate_mode,
antialias=False,
imageAxisOrder="row-major",
)
app = QtWidgets.QApplication.instance()
if app is None:
app = QtWidgets.QApplication([])
try:
app.setApplicationName(str(args.title))
except Exception:
pass
try:
app.setQuitOnLastWindowClosed(True)
except Exception:
pass
main_window = QtWidgets.QWidget()
try:
main_window.setWindowTitle(str(args.title))
except Exception:
pass
main_layout = QtWidgets.QHBoxLayout(main_window)
main_layout.setContentsMargins(6, 6, 6, 6)
main_layout.setSpacing(6)
win = pg.GraphicsLayoutWidget(show=False, title=args.title)
main_layout.addWidget(win)
settings_widget = QtWidgets.QWidget()
settings_layout = QtWidgets.QVBoxLayout(settings_widget)
settings_layout.setContentsMargins(6, 6, 6, 6)
settings_layout.setSpacing(8)
try:
settings_widget.setMinimumWidth(170)
except Exception:
pass
main_layout.addWidget(settings_widget)
# Плот последнего свипа (слева-сверху)
p_line = win.addPlot(row=0, col=0, title="Сырые данные")
p_line.showGrid(x=True, y=True, alpha=0.3)
curve_avg1 = p_line.plot(pen=pg.mkPen((170, 170, 170), width=1))
curve_avg2 = p_line.plot(pen=pg.mkPen((110, 110, 110), width=1))
curve = p_line.plot(pen=pg.mkPen((80, 120, 255), width=1))
curve_calib = p_line.plot(pen=pg.mkPen((220, 60, 60), width=1))
curve_norm = p_line.plot(pen=pg.mkPen((60, 180, 90), width=1))
p_line.setLabel("bottom", "ГГц")
p_line.setLabel("left", "Y")
ch_text = pg.TextItem("", anchor=(1, 1))
ch_text.setZValue(10)
p_line.addItem(ch_text)
# Водопад (справа-сверху)
p_img = win.addPlot(row=0, col=1, title="Сырые данные водопад")
p_img.invertY(False)
p_img.showGrid(x=False, y=False)
p_img.setLabel("bottom", "Время, с (новое справа)")
try:
p_img.getAxis("bottom").setStyle(showValues=False)
except Exception:
pass
p_img.setLabel("left", "ГГц")
img = pg.ImageItem()
p_img.addItem(img)
# FFT (слева-снизу)
p_fft = win.addPlot(row=1, col=0, title="FFT")
p_fft.showGrid(x=True, y=True, alpha=0.3)
curve_fft = p_fft.plot(pen=pg.mkPen((255, 120, 80), width=1))
peak_pen = pg.mkPen((255, 0, 0), width=1)
fft_bg_line = pg.InfiniteLine(angle=0, movable=False, pen=peak_pen)
fft_left_line = pg.InfiniteLine(angle=90, movable=False, pen=peak_pen)
fft_right_line = pg.InfiniteLine(angle=90, movable=False, pen=peak_pen)
p_fft.addItem(fft_bg_line)
p_fft.addItem(fft_left_line)
p_fft.addItem(fft_right_line)
fft_bg_line.setVisible(False)
fft_left_line.setVisible(False)
fft_right_line.setVisible(False)
p_fft.setLabel("bottom", "Расстояние, м")
p_fft.setLabel("left", "дБ")
# Водопад спектров (справа-снизу)
p_spec = win.addPlot(row=1, col=1, title="B-scan (дБ)")
p_spec.invertY(False)
p_spec.showGrid(x=False, y=False)
p_spec.setLabel("bottom", "Время, с (новое справа)")
try:
p_spec.getAxis("bottom").setStyle(showValues=False)
except Exception:
pass
p_spec.setLabel("left", "Расстояние, м")
img_fft = pg.ImageItem()
p_spec.addItem(img_fft)
spec_left_line = pg.InfiniteLine(angle=0, movable=False, pen=peak_pen)
spec_right_line = pg.InfiniteLine(angle=0, movable=False, pen=peak_pen)
p_spec.addItem(spec_left_line)
p_spec.addItem(spec_right_line)
spec_left_line.setVisible(False)
spec_right_line.setVisible(False)
# Правая панель настроек внутри основного окна.
calib_cb = QtWidgets.QCheckBox("нормировка")
bg_subtract_cb = QtWidgets.QCheckBox("вычет фона")
try:
settings_title = QtWidgets.QLabel("Настройки")
settings_layout.addWidget(settings_title)
except Exception:
pass
settings_layout.addWidget(calib_cb)
settings_layout.addWidget(bg_subtract_cb)
calib_window = None
# Статусная строка (внизу окна)
status = pg.LabelItem(justify="left")
win.addItem(status, row=3, col=0, colspan=2)
# Состояние
ring: Optional[np.ndarray] = None
ring_time: Optional[np.ndarray] = None
head = 0
width: Optional[int] = None
x_shared: Optional[np.ndarray] = None
current_freqs: Optional[np.ndarray] = None
current_distances: Optional[np.ndarray] = None
current_sweep_raw: Optional[np.ndarray] = None
current_aux_curves: SweepAuxCurves = None
current_sweep_norm: Optional[np.ndarray] = None
last_calib_sweep: Optional[np.ndarray] = None
current_info: Optional[SweepInfo] = None
# Авто-уровни цветовой шкалы водопада сырых данных пересчитываются по видимой области.
# Для спектров
fft_bins = FFT_LEN // 2 + 1
ring_fft: Optional[np.ndarray] = None
distance_shared: Optional[np.ndarray] = None
y_min_fft, y_max_fft = None, None
# Параметры контраста водопада спектров (процентильная обрезка)
spec_clip = _parse_spec_clip(getattr(args, "spec_clip", None))
spec_mean_sec = float(getattr(args, "spec_mean_sec", 0.0))
calib_enabled = False
bg_subtract_enabled = False
current_peak_width: Optional[float] = None
norm_type = str(getattr(args, "norm_type", "projector")).strip().lower()
c_edits = []
# Диапазон по Y: авто по умолчанию (поддерживает отрицательные значения)
fixed_ylim: Optional[Tuple[float, float]] = None
if args.ylim:
try:
y0, y1 = args.ylim.split(",")
fixed_ylim = (float(y0), float(y1))
except Exception:
pass
if fixed_ylim is not None:
p_line.setYRange(fixed_ylim[0], fixed_ylim[1], padding=0)
def _normalize_sweep(raw: np.ndarray, calib: np.ndarray) -> np.ndarray:
return _normalize_by_calib(raw, calib, norm_type=norm_type)
def _set_calib_enabled():
nonlocal calib_enabled, current_sweep_norm
try:
calib_enabled = bool(calib_cb.isChecked())
except Exception:
calib_enabled = False
if calib_enabled and current_sweep_raw is not None and last_calib_sweep is not None:
current_sweep_norm = _normalize_sweep(current_sweep_raw, last_calib_sweep)
else:
current_sweep_norm = None
def _set_bg_subtract_enabled():
nonlocal bg_subtract_enabled
try:
bg_subtract_enabled = bool(bg_subtract_cb.isChecked())
except Exception:
bg_subtract_enabled = False
try:
calib_cb.stateChanged.connect(lambda _v: _set_calib_enabled())
except Exception:
pass
try:
bg_subtract_cb.stateChanged.connect(lambda _v: _set_bg_subtract_enabled())
except Exception:
pass
if peak_calibrate_mode:
try:
calib_window = QtWidgets.QWidget()
try:
calib_window.setWindowTitle(f"{args.title} freq calibration")
except Exception:
pass
calib_layout = QtWidgets.QFormLayout(calib_window)
calib_layout.setContentsMargins(8, 8, 8, 8)
def _apply_c_value(idx: int, edit):
global CALIBRATION_C
try:
CALIBRATION_C[idx] = float(edit.text().strip())
except Exception:
try:
edit.setText(f"{float(CALIBRATION_C[idx]):.6g}")
except Exception:
pass
for idx in range(3):
edit = QtWidgets.QLineEdit(f"{float(CALIBRATION_C[idx]):.6g}")
try:
edit.setMaximumWidth(120)
except Exception:
pass
try:
edit.editingFinished.connect(lambda i=idx, e=edit: _apply_c_value(i, e))
except Exception:
pass
calib_layout.addRow(f"C{idx}", edit)
c_edits.append(edit)
try:
calib_window.show()
except Exception:
pass
except Exception:
pass
try:
settings_layout.addStretch(1)
except Exception:
pass
def ensure_buffer(_w: int):
nonlocal ring, ring_time, head, width, x_shared, ring_fft, distance_shared
if ring is not None:
return
width = WF_WIDTH
x_shared = np.linspace(3.3, 14.3, width, dtype=np.float32)
ring = np.full((max_sweeps, width), np.nan, dtype=np.float32)
ring_time = np.full((max_sweeps,), np.nan, dtype=np.float64)
head = 0
# Водопад: время по оси X, X по оси Y (ось Y: 3.3..14.3 ГГц)
img.setImage(ring.T, autoLevels=False)
img.setRect(0, 3.3, max_sweeps, 14.3 - 3.3)
p_img.setRange(xRange=(0, max_sweeps - 1), yRange=(3.3, 14.3), padding=0)
p_line.setXRange(3.3, 14.3, padding=0)
# FFT: время по оси X, бин по оси Y
ring_fft = np.full((max_sweeps, fft_bins), np.nan, dtype=np.float32)
img_fft.setImage(ring_fft.T, autoLevels=False)
img_fft.setRect(0, 0.0, max_sweeps, 1.0)
p_spec.setRange(xRange=(0, max_sweeps - 1), yRange=(0.0, 1.0), padding=0)
p_fft.setXRange(0.0, 1.0, padding=0)
distance_shared = _compute_distance_axis(current_freqs, fft_bins)
def _update_physical_axes():
nonlocal distance_shared
if current_freqs is not None and current_freqs.size > 0:
finite_f = current_freqs[np.isfinite(current_freqs)]
if finite_f.size > 0:
f_min = float(np.min(finite_f))
f_max = float(np.max(finite_f))
if f_max <= f_min:
f_max = f_min + 1.0
img.setRect(0, f_min, max_sweeps, f_max - f_min)
p_img.setRange(xRange=(0, max_sweeps - 1), yRange=(f_min, f_max), padding=0)
distance_shared = _compute_distance_axis(current_freqs, fft_bins)
if distance_shared.size > 0:
d_min = float(distance_shared[0])
d_max = float(distance_shared[-1]) if distance_shared.size > 1 else float(distance_shared[0] + 1.0)
if d_max <= d_min:
d_max = d_min + 1.0
img_fft.setRect(0, d_min, max_sweeps, d_max - d_min)
p_spec.setRange(xRange=(0, max_sweeps - 1), yRange=(d_min, d_max), padding=0)
def _visible_levels_pyqtgraph(data: np.ndarray) -> Optional[Tuple[float, float]]:
"""(vmin, vmax) по центральным 90% значений в видимой области ImageItem."""
if data.size == 0:
return None
ny, nx = data.shape[0], data.shape[1]
try:
(x0, x1), (y0, y1) = p_img.viewRange()
except Exception:
x0, x1 = 0.0, float(nx - 1)
y0, y1 = 0.0, float(ny - 1)
xmin, xmax = sorted((float(x0), float(x1)))
ymin, ymax = sorted((float(y0), float(y1)))
ix0 = max(0, min(nx - 1, int(np.floor(xmin))))
ix1 = max(0, min(nx - 1, int(np.ceil(xmax))))
iy0 = max(0, min(ny - 1, int(np.floor(ymin))))
iy1 = max(0, min(ny - 1, int(np.ceil(ymax))))
if ix1 < ix0:
ix1 = ix0
if iy1 < iy0:
iy1 = iy0
sub = data[iy0 : iy1 + 1, ix0 : ix1 + 1]
finite = np.isfinite(sub)
if not finite.any():
return None
vals = sub[finite]
vmin = float(np.nanpercentile(vals, 5))
vmax = float(np.nanpercentile(vals, 95))
if not (np.isfinite(vmin) and np.isfinite(vmax)) or vmin == vmax:
return None
return (vmin, vmax)
def _visible_bg_fft(disp_fft: np.ndarray) -> Optional[np.ndarray]:
"""Оценка фона по медиане в текущем видимом по времени окне B-scan."""
if not bg_subtract_enabled or disp_fft.size == 0:
return None
ny, nx = disp_fft.shape
if ny <= 0 or nx <= 0:
return None
try:
(x0, x1), _ = p_spec.viewRange()
except Exception:
x0, x1 = 0.0, float(nx - 1)
xmin, xmax = sorted((float(x0), float(x1)))
ix0 = max(0, min(nx - 1, int(np.floor(xmin))))
ix1 = max(0, min(nx - 1, int(np.ceil(xmax))))
if ix1 < ix0:
ix1 = ix0
window = disp_fft[:, ix0 : ix1 + 1]
if window.size == 0:
return None
try:
bg_spec = np.nanmedian(window, axis=1)
except Exception:
return None
if not np.any(np.isfinite(bg_spec)):
return None
return np.nan_to_num(bg_spec, nan=0.0).astype(np.float32, copy=False)
def push_sweep(s: np.ndarray, freqs: Optional[np.ndarray] = None):
nonlocal ring, ring_time, head, ring_fft, y_min_fft, y_max_fft
if s is None or s.size == 0 or ring is None:
return
w = ring.shape[1]
row = np.full((w,), np.nan, dtype=np.float32)
take = min(w, s.size)
row[:take] = s[:take]
ring[head, :] = row
if ring_time is not None:
ring_time[head] = time.time()
head = (head + 1) % ring.shape[0]
# FFT строка (линейный модуль; перевод в дБ делаем при отображении)
if ring_fft is not None:
bins = ring_fft.shape[1]
fft_mag = _compute_fft_mag_row(s, freqs, bins)
ring_fft[(head - 1) % ring_fft.shape[0], :] = fft_mag
fft_row = _fft_mag_to_db(fft_mag)
fr_min = np.nanmin(fft_row)
fr_max = np.nanmax(fft_row)
if y_min_fft is None or (not np.isnan(fr_min) and fr_min < y_min_fft):
y_min_fft = float(fr_min)
if y_max_fft is None or (not np.isnan(fr_max) and fr_max > y_max_fft):
y_max_fft = float(fr_max)
def drain_queue():
nonlocal current_freqs, current_distances, current_sweep_raw, current_aux_curves, current_sweep_norm, current_info, last_calib_sweep
drained = 0
while True:
try:
s, info, aux_curves = q.get_nowait()
except Empty:
break
drained += 1
calibrated = calibrate_freqs(
{
"F": np.linspace(3.3, 14.3, s.size, dtype=np.float64),
"I": s,
}
)
current_freqs = calibrated["F"]
current_distances = _compute_distance_axis(current_freqs, fft_bins)
s = calibrated["I"]
current_sweep_raw = s
current_aux_curves = aux_curves
current_info = info
ch = 0
try:
ch = int(info.get("ch", 0)) if isinstance(info, dict) else 0
except Exception:
ch = 0
if ch == 0:
last_calib_sweep = s
current_sweep_norm = None
sweep_for_proc = s
else:
if calib_enabled and last_calib_sweep is not None:
current_sweep_norm = _normalize_sweep(s, last_calib_sweep)
sweep_for_proc = current_sweep_norm
else:
current_sweep_norm = None
sweep_for_proc = s
ensure_buffer(s.size)
_update_physical_axes()
push_sweep(sweep_for_proc, current_freqs)
return drained
# Попытка применить LUT из колормэпа (если доступен)
try:
cm_mod = getattr(pg, "colormap", None)
if cm_mod is not None:
cm = cm_mod.get(args.cmap)
lut = cm.getLookupTable(0.0, 1.0, 256)
img.setLookupTable(lut)
img_fft.setLookupTable(lut)
except Exception:
pass
def update():
nonlocal current_peak_width
if peak_calibrate_mode and any(edit.hasFocus() for edit in c_edits):
return
changed = drain_queue() > 0
if current_sweep_raw is not None and x_shared is not None:
if current_freqs is not None and current_freqs.size == current_sweep_raw.size:
xs = current_freqs
elif current_sweep_raw.size <= x_shared.size:
xs = x_shared[: current_sweep_raw.size]
else:
xs = np.arange(current_sweep_raw.size)
curve.setData(xs, current_sweep_raw, autoDownsample=True)
if current_aux_curves is not None:
avg_1_curve, avg_2_curve = current_aux_curves
curve_avg1.setData(xs[: avg_1_curve.size], avg_1_curve, autoDownsample=True)
curve_avg2.setData(xs[: avg_2_curve.size], avg_2_curve, autoDownsample=True)
else:
curve_avg1.setData([], [])
curve_avg2.setData([], [])
if last_calib_sweep is not None:
curve_calib.setData(xs[: last_calib_sweep.size], last_calib_sweep, autoDownsample=True)
else:
curve_calib.setData([], [])
if current_sweep_norm is not None:
curve_norm.setData(xs[: current_sweep_norm.size], current_sweep_norm, autoDownsample=True)
else:
curve_norm.setData([], [])
if fixed_ylim is None:
y_series = [current_sweep_raw, last_calib_sweep, current_sweep_norm]
if current_aux_curves is not None:
y_series.extend(current_aux_curves)
y_limits = _compute_auto_ylim(*y_series)
if y_limits is not None:
p_line.setYRange(y_limits[0], y_limits[1], padding=0)
if isinstance(xs, np.ndarray) and xs.size > 0:
finite_x = xs[np.isfinite(xs)]
if finite_x.size > 0:
p_line.setXRange(float(np.min(finite_x)), float(np.max(finite_x)), padding=0)
# Обновим спектр
sweep_for_fft = current_sweep_norm if current_sweep_norm is not None else current_sweep_raw
if sweep_for_fft.size > 0 and distance_shared is not None:
fft_vals = _compute_fft_row(sweep_for_fft, current_freqs, distance_shared.size)
xs_fft = current_distances if current_distances is not None else distance_shared
if fft_vals.size > xs_fft.size:
fft_vals = fft_vals[: xs_fft.size]
curve_fft.setData(xs_fft[: fft_vals.size], fft_vals)
finite_x = xs_fft[: fft_vals.size][np.isfinite(xs_fft[: fft_vals.size])]
if finite_x.size > 0:
p_fft.setXRange(float(np.min(finite_x)), float(np.max(finite_x)), padding=0)
p_fft.setYRange(float(np.nanmin(fft_vals)), float(np.nanmax(fft_vals)), padding=0)
if peak_calibrate_mode:
markers = _find_peak_width_markers(xs_fft[: fft_vals.size], fft_vals)
if markers is not None:
fft_bg_line.setValue(markers["background"])
fft_left_line.setValue(markers["left"])
fft_right_line.setValue(markers["right"])
spec_left_line.setValue(markers["left"])
spec_right_line.setValue(markers["right"])
fft_bg_line.setVisible(True)
fft_left_line.setVisible(True)
fft_right_line.setVisible(True)
spec_left_line.setVisible(True)
spec_right_line.setVisible(True)
current_peak_width = markers["width"]
else:
fft_bg_line.setVisible(False)
fft_left_line.setVisible(False)
fft_right_line.setVisible(False)
spec_left_line.setVisible(False)
spec_right_line.setVisible(False)
current_peak_width = None
else:
fft_bg_line.setVisible(False)
fft_left_line.setVisible(False)
fft_right_line.setVisible(False)
spec_left_line.setVisible(False)
spec_right_line.setVisible(False)
current_peak_width = None
if changed and ring is not None:
disp = ring if head == 0 else np.roll(ring, -head, axis=0)
disp = disp.T # (width, time with newest at right)
levels = _visible_levels_pyqtgraph(disp)
if levels is not None:
img.setImage(disp, autoLevels=False, levels=levels)
else:
img.setImage(disp, autoLevels=False)
if changed and current_info:
try:
status_payload = dict(current_info)
if peak_calibrate_mode and current_peak_width is not None:
status_payload["peak_w"] = current_peak_width
status.setText(_format_status_kv(status_payload))
except Exception:
pass
try:
chs = current_info.get("chs") if isinstance(current_info, dict) else None
if chs is None:
chs = current_info.get("ch") if isinstance(current_info, dict) else None
if chs is None:
ch_text.setText("")
else:
if isinstance(chs, (list, tuple, set)):
ch_list = sorted(int(v) for v in chs)
ch_text_val = ", ".join(str(v) for v in ch_list)
else:
ch_text_val = str(int(chs))
ch_text.setText(f"chs {ch_text_val}")
(x0, x1), (y0, y1) = p_line.viewRange()
dx = 0.01 * max(1.0, float(x1 - x0))
dy = 0.01 * max(1.0, float(y1 - y0))
ch_text.setPos(float(x1 - dx), float(y1 - dy))
except Exception:
pass
if changed and ring_fft is not None:
disp_fft_lin = ring_fft if head == 0 else np.roll(ring_fft, -head, axis=0)
disp_fft_lin = disp_fft_lin.T
if spec_mean_sec > 0.0 and ring_time is not None:
disp_times = ring_time if head == 0 else np.roll(ring_time, -head)
now_t = time.time()
mask = np.isfinite(disp_times) & (disp_times >= (now_t - spec_mean_sec))
if np.any(mask):
try:
mean_spec = np.nanmean(disp_fft_lin[:, mask], axis=1)
mean_spec = np.nan_to_num(mean_spec, nan=0.0)
disp_fft_lin = disp_fft_lin - mean_spec[:, None]
except Exception:
pass
bg_spec = _visible_bg_fft(disp_fft_lin)
if bg_spec is not None:
num = np.maximum(disp_fft_lin, 0.0).astype(np.float32, copy=False) + 1e-9
den = bg_spec[:, None] + 1e-9
disp_fft = (20.0 * np.log10(num / den)).astype(np.float32, copy=False)
else:
disp_fft = _fft_mag_to_db(disp_fft_lin)
# Автодиапазон по среднему спектру за видимый интервал (как в хорошей версии)
levels = None
if bg_spec is not None:
try:
p5 = float(np.nanpercentile(disp_fft, 5))
p95 = float(np.nanpercentile(disp_fft, 95))
span = max(abs(p5), abs(p95))
if np.isfinite(span) and span > 0.0:
levels = (-span, span)
except Exception:
levels = None
else:
try:
mean_spec = np.nanmean(disp_fft, axis=1)
vmin_v = float(np.nanmin(mean_spec))
vmax_v = float(np.nanmax(mean_spec))
if np.isfinite(vmin_v) and np.isfinite(vmax_v) and vmin_v != vmax_v:
levels = (vmin_v, vmax_v)
except Exception:
levels = None
# Процентильная обрезка как запасной вариант
if levels is None and spec_clip is not None:
try:
vmin_v = float(np.nanpercentile(disp_fft, spec_clip[0]))
vmax_v = float(np.nanpercentile(disp_fft, spec_clip[1]))
if np.isfinite(vmin_v) and np.isfinite(vmax_v) and vmin_v != vmax_v:
levels = (vmin_v, vmax_v)
except Exception:
levels = None
# Ещё один фолбэк — глобальные накопленные мин/макс
if levels is None and y_min_fft is not None and y_max_fft is not None and np.isfinite(y_min_fft) and np.isfinite(y_max_fft) and y_min_fft != y_max_fft:
levels = (y_min_fft, y_max_fft)
if levels is not None:
img_fft.setImage(disp_fft, autoLevels=False, levels=levels)
else:
img_fft.setImage(disp_fft, autoLevels=False)
timer = pg.QtCore.QTimer()
timer.timeout.connect(update)
timer.start(interval_ms)
sigint_requested = threading.Event()
sigint_timer = pg.QtCore.QTimer()
sigint_timer.setInterval(50)
sigint_timer.timeout.connect(lambda: app.quit() if sigint_requested.is_set() else None)
sigint_timer.start()
cleanup_done = False
def on_quit():
nonlocal cleanup_done
if cleanup_done:
return
cleanup_done = True
try:
timer.stop()
except Exception:
pass
try:
sigint_timer.stop()
except Exception:
pass
stop_event.set()
reader.join(timeout=1.0)
try:
main_window.close()
except Exception:
pass
if calib_window is not None:
try:
calib_window.close()
except Exception:
pass
def _handle_sigint(_signum, _frame):
sigint_requested.set()
prev_sigint = signal.getsignal(signal.SIGINT)
try:
signal.signal(signal.SIGINT, _handle_sigint)
except Exception:
prev_sigint = None
orig_close_event = getattr(main_window, "closeEvent", None)
def _close_event(event):
try:
if callable(orig_close_event):
orig_close_event(event)
else:
event.accept()
except Exception:
try:
event.accept()
except Exception:
pass
try:
app.quit()
except Exception:
pass
try:
main_window.closeEvent = _close_event # type: ignore[method-assign]
except Exception:
pass
app.aboutToQuit.connect(on_quit)
try:
main_window.resize(1200, 680)
except Exception:
pass
main_window.show()
exec_fn = getattr(app, "exec_", None) or getattr(app, "exec", None)
try:
exec_fn()
finally:
on_quit()
if prev_sigint is not None:
try:
signal.signal(signal.SIGINT, prev_sigint)
except Exception:
pass
if __name__ == "__main__":
main()