2981 lines
117 KiB
Python
Executable File
2981 lines
117 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
"""
|
||
Реалтайм-плоттер для свипов из виртуального COM-порта.
|
||
|
||
Формат строк:
|
||
- "Sweep_start" — начало нового свипа (предыдущий считается завершённым)
|
||
- "s CH X Y" — точка (номер канала, индекс X, значение Y), все целые со знаком
|
||
|
||
Отрисовываются два графика:
|
||
- Левый: последний полученный свип (Y vs X)
|
||
- Правый: водопад (последние N свипов во времени)
|
||
|
||
Оптимизации для скорости:
|
||
- Парсинг и чтение в фоновой нити
|
||
- Анимация с обновлением только данных (без лишнего пересоздания фигур)
|
||
- Кольцевой буфер под водопад с фиксированным числом свипов
|
||
|
||
Зависимости: matplotlib, numpy. PySerial опционален — при его отсутствии
|
||
используется сырой доступ к TTY через termios.
|
||
"""
|
||
|
||
import argparse
|
||
import io
|
||
import os
|
||
import signal
|
||
import sys
|
||
import threading
|
||
import time
|
||
from collections import deque
|
||
from queue import Queue, Empty, Full
|
||
from typing import Any, Dict, Mapping, Optional, Tuple, Union
|
||
|
||
import numpy as np
|
||
|
||
WF_WIDTH = 1000 # максимальное число точек в ряду водопада
|
||
FFT_LEN = 1024 # длина БПФ для спектра/водопада спектров
|
||
SWEEP_FREQ_MIN_GHZ = 3.3
|
||
SWEEP_FREQ_MAX_GHZ = 14.3
|
||
LOG_BASE = 10.0
|
||
LOG_SCALER = 0.001 # int32 значения приходят в fixed-point лог-шкале с шагом 1e-3
|
||
LOG_POSTSCALER = 10
|
||
LOG_EXP_LIMIT = 300.0 # запас до переполнения float64 при возведении LOG_BASE в степень
|
||
C_M_S = 299_792_458.0
|
||
# Порог для инверсии сырых данных: если среднее значение свипа ниже порога —
|
||
# считаем, что сигнал «меньше нуля» и домножаем свип на -1
|
||
DATA_INVERSION_THRASHOLD = 10.0
|
||
|
||
Number = Union[int, float]
|
||
SweepInfo = Dict[str, Any]
|
||
SweepData = Dict[str, np.ndarray]
|
||
SweepAuxCurves = Optional[Tuple[np.ndarray, np.ndarray]]
|
||
SweepPacket = Tuple[np.ndarray, SweepInfo, SweepAuxCurves]
|
||
|
||
|
||
def recalculate_calibration_c(
|
||
base_coeffs: np.ndarray,
|
||
f_min: float = SWEEP_FREQ_MIN_GHZ,
|
||
f_max: float = SWEEP_FREQ_MAX_GHZ,
|
||
) -> np.ndarray:
|
||
"""Пересчитать коэффициенты так, чтобы калибровка сохраняла края диапазона свипа."""
|
||
coeffs = np.asarray(base_coeffs, dtype=np.float64).reshape(-1)
|
||
if coeffs.size < 3:
|
||
out = np.zeros((3,), dtype=np.float64)
|
||
out[: coeffs.size] = coeffs
|
||
coeffs = out
|
||
c0, c1, c2 = float(coeffs[0]), float(coeffs[1]), float(coeffs[2])
|
||
x0 = float(f_min)
|
||
x1 = float(f_max)
|
||
y0 = c0 + c1 * x0 + c2 * (x0 ** 2)
|
||
y1 = c0 + c1 * x1 + c2 * (x1 ** 2)
|
||
if not (np.isfinite(y0) and np.isfinite(y1)) or y1 == y0:
|
||
return np.asarray([c0, c1, c2], dtype=np.float64)
|
||
scale = (x1 - x0) / (y1 - y0)
|
||
shift = x0 - scale * y0
|
||
return np.asarray(
|
||
[
|
||
shift + scale * c0,
|
||
scale * c1,
|
||
scale * c2,
|
||
],
|
||
dtype=np.float64,
|
||
)
|
||
|
||
|
||
CALIBRATION_C_BASE = np.asarray([0.0, 1.0, 0.025], dtype=np.float64)
|
||
CALIBRATION_C = recalculate_calibration_c(CALIBRATION_C_BASE)
|
||
|
||
|
||
|
||
def _format_status_kv(data: Mapping[str, Any]) -> str:
|
||
"""Преобразовать словарь метрик в одну строку 'k:v'."""
|
||
|
||
def _fmt(v: Any) -> str:
|
||
if v is None:
|
||
return "NA"
|
||
try:
|
||
fv = float(v)
|
||
except Exception:
|
||
return str(v)
|
||
if not np.isfinite(fv):
|
||
return "nan"
|
||
# Достаточно компактно для статус-строки.
|
||
if abs(fv) >= 1000 or (0 < abs(fv) < 0.01):
|
||
return f"{fv:.3g}"
|
||
return f"{fv:.3f}".rstrip("0").rstrip(".")
|
||
|
||
parts = [f"{k}:{_fmt(v)}" for k, v in data.items()]
|
||
return " ".join(parts)
|
||
|
||
|
||
def _parse_spec_clip(spec: Optional[str]) -> Optional[Tuple[float, float]]:
|
||
"""Разобрать строку вида "low,high" процентов для контрастного отображения водопада спектров.
|
||
|
||
Возвращает пару (low, high) или None для отключения. Допустимы значения 0..100, low < high.
|
||
Ключевые слова отключения: "off", "none", "no".
|
||
"""
|
||
if not spec:
|
||
return None
|
||
s = str(spec).strip().lower()
|
||
if s in ("off", "none", "no"):
|
||
return None
|
||
try:
|
||
p0, p1 = s.replace(";", ",").split(",")
|
||
low = float(p0)
|
||
high = float(p1)
|
||
if not (0.0 <= low < high <= 100.0):
|
||
return None
|
||
return (low, high)
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def _log_value_to_linear(value: int) -> float:
|
||
"""Преобразовать fixed-point логарифмическое значение в линейную шкалу."""
|
||
exponent = max(-LOG_EXP_LIMIT, min(LOG_EXP_LIMIT, float(value) * LOG_SCALER))
|
||
return float(LOG_BASE ** exponent)
|
||
|
||
|
||
def _log_pair_to_sweep(avg_1: int, avg_2: int) -> float:
|
||
"""Разность двух логарифмических усреднений в линейной шкале."""
|
||
return (_log_value_to_linear(avg_1) - _log_value_to_linear(avg_2))*LOG_POSTSCALER
|
||
|
||
|
||
def _compute_auto_ylim(*series_list: Optional[np.ndarray]) -> Optional[Tuple[float, float]]:
|
||
"""Общий Y-диапазон по всем переданным кривым с небольшим запасом."""
|
||
y_min: Optional[float] = None
|
||
y_max: Optional[float] = None
|
||
for series in series_list:
|
||
if series is None:
|
||
continue
|
||
arr = np.asarray(series)
|
||
if arr.size == 0:
|
||
continue
|
||
finite = arr[np.isfinite(arr)]
|
||
if finite.size == 0:
|
||
continue
|
||
cur_min = float(np.min(finite))
|
||
cur_max = float(np.max(finite))
|
||
y_min = cur_min if y_min is None else min(y_min, cur_min)
|
||
y_max = cur_max if y_max is None else max(y_max, cur_max)
|
||
|
||
if y_min is None or y_max is None:
|
||
return None
|
||
if y_min == y_max:
|
||
pad = max(1.0, abs(y_min) * 0.05)
|
||
else:
|
||
pad = 0.05 * (y_max - y_min)
|
||
return (y_min - pad, y_max + pad)
|
||
|
||
|
||
def calibrate_freqs(sweep: Mapping[str, Any]) -> SweepData:
|
||
"""Вернуть копию свипа с применённой калибровкой частотной оси."""
|
||
F = np.asarray(sweep["F"], dtype=np.float64).copy()
|
||
I = np.asarray(sweep["I"], dtype=np.float64).copy()
|
||
C = np.asarray(CALIBRATION_C, dtype=np.float64)
|
||
if F.size > 0:
|
||
F = C[0] + C[1] * F + C[2] * (F * F)
|
||
|
||
if F.size >= 2:
|
||
F_cal = np.linspace(float(F[0]), float(F[-1]), F.size, dtype=np.float64)
|
||
I_cal = np.interp(F_cal, F, I).astype(np.float64)
|
||
else:
|
||
F_cal = F.copy()
|
||
I_cal = I.copy()
|
||
|
||
return {
|
||
"F": F_cal,
|
||
"I": I_cal,
|
||
}
|
||
|
||
def _prepare_fft_segment(
|
||
sweep: np.ndarray,
|
||
freqs: Optional[np.ndarray],
|
||
fft_len: int = FFT_LEN,
|
||
) -> Optional[Tuple[np.ndarray, int]]:
|
||
"""Подготовить свип к FFT, пересэмплируя его на равномерную сетку по частоте."""
|
||
take_fft = min(int(sweep.size), int(fft_len))
|
||
if take_fft <= 0:
|
||
return None
|
||
|
||
sweep_seg = np.asarray(sweep[:take_fft], dtype=np.float32)
|
||
fallback = np.nan_to_num(sweep_seg, nan=0.0).astype(np.float32, copy=False)
|
||
if freqs is None:
|
||
return fallback, take_fft
|
||
|
||
freq_arr = np.asarray(freqs)
|
||
if freq_arr.size < take_fft:
|
||
return fallback, take_fft
|
||
|
||
freq_seg = np.asarray(freq_arr[:take_fft], dtype=np.float64)
|
||
valid = np.isfinite(sweep_seg) & np.isfinite(freq_seg)
|
||
if int(np.count_nonzero(valid)) < 2:
|
||
return fallback, take_fft
|
||
|
||
x_valid = freq_seg[valid]
|
||
y_valid = sweep_seg[valid]
|
||
order = np.argsort(x_valid, kind="mergesort")
|
||
x_valid = x_valid[order]
|
||
y_valid = y_valid[order]
|
||
x_unique, unique_idx = np.unique(x_valid, return_index=True)
|
||
y_unique = y_valid[unique_idx]
|
||
if x_unique.size < 2 or x_unique[-1] <= x_unique[0]:
|
||
return fallback, take_fft
|
||
|
||
x_uniform = np.linspace(float(x_unique[0]), float(x_unique[-1]), take_fft, dtype=np.float64)
|
||
resampled = np.interp(x_uniform, x_unique, y_unique).astype(np.float32)
|
||
return resampled, take_fft
|
||
|
||
|
||
def _fft_mag_to_db(mag: np.ndarray) -> np.ndarray:
|
||
"""Перевод модуля спектра в дБ с отсечкой отрицательных значений после вычитания фона."""
|
||
mag_arr = np.asarray(mag, dtype=np.float32)
|
||
safe_mag = np.maximum(mag_arr, 0.0)
|
||
return (20.0 * np.log10(safe_mag + 1e-9)).astype(np.float32, copy=False)
|
||
|
||
|
||
def _compute_fft_mag_row(
|
||
sweep: np.ndarray,
|
||
freqs: Optional[np.ndarray],
|
||
bins: int,
|
||
) -> np.ndarray:
|
||
"""Посчитать линейный модуль FFT-строки, используя калиброванную частотную ось."""
|
||
if bins <= 0:
|
||
return np.zeros((0,), dtype=np.float32)
|
||
|
||
prepared = _prepare_fft_segment(sweep, freqs, fft_len=FFT_LEN)
|
||
if prepared is None:
|
||
return np.full((bins,), np.nan, dtype=np.float32)
|
||
|
||
fft_seg, take_fft = prepared
|
||
fft_in = np.zeros((FFT_LEN,), dtype=np.float32)
|
||
win = np.hanning(take_fft).astype(np.float32)
|
||
fft_in[:take_fft] = fft_seg * win
|
||
spec = np.fft.ifft(fft_in)
|
||
mag = np.abs(spec).astype(np.float32)
|
||
if mag.shape[0] != bins:
|
||
mag = mag[:bins]
|
||
return mag
|
||
|
||
|
||
def _compute_fft_row(
|
||
sweep: np.ndarray,
|
||
freqs: Optional[np.ndarray],
|
||
bins: int,
|
||
) -> np.ndarray:
|
||
"""Посчитать FFT-строку в дБ."""
|
||
return _fft_mag_to_db(_compute_fft_mag_row(sweep, freqs, bins))
|
||
|
||
|
||
def _compute_distance_axis(freqs: Optional[np.ndarray], bins: int) -> np.ndarray:
|
||
"""Рассчитать ось расстояния для IFFT по равномерной частотной сетке."""
|
||
if bins <= 0:
|
||
return np.zeros((0,), dtype=np.float64)
|
||
|
||
if freqs is None:
|
||
return np.arange(bins, dtype=np.float64)
|
||
|
||
freq_arr = np.asarray(freqs, dtype=np.float64)
|
||
finite = freq_arr[np.isfinite(freq_arr)]
|
||
if finite.size < 2:
|
||
return np.arange(bins, dtype=np.float64)
|
||
|
||
df_ghz = float((finite[-1] - finite[0]) / max(1, finite.size - 1))
|
||
df_hz = abs(df_ghz) * 1e9
|
||
if not np.isfinite(df_hz) or df_hz <= 0.0:
|
||
return np.arange(bins, dtype=np.float64)
|
||
|
||
step_m = C_M_S / (2.0 * FFT_LEN * df_hz)
|
||
return np.arange(bins, dtype=np.float64) * step_m
|
||
|
||
|
||
def _find_peak_width_markers(xs: np.ndarray, ys: np.ndarray) -> Optional[Dict[str, float]]:
|
||
"""Найти главный ненулевой пик и его ширину по уровню половины высоты над фоном."""
|
||
x_arr = np.asarray(xs, dtype=np.float64)
|
||
y_arr = np.asarray(ys, dtype=np.float64)
|
||
valid = np.isfinite(x_arr) & np.isfinite(y_arr) & (x_arr > 0.0)
|
||
if int(np.count_nonzero(valid)) < 3:
|
||
return None
|
||
|
||
x = x_arr[valid]
|
||
y = y_arr[valid]
|
||
x_min = float(x[0])
|
||
x_max = float(x[-1])
|
||
x_span = x_max - x_min
|
||
central_mask = (x >= (x_min + 0.25 * x_span)) & (x <= (x_min + 0.75 * x_span))
|
||
if int(np.count_nonzero(central_mask)) > 0:
|
||
central_idx = np.flatnonzero(central_mask)
|
||
peak_idx = int(central_idx[int(np.argmax(y[central_mask]))])
|
||
else:
|
||
peak_idx = int(np.argmax(y))
|
||
peak_y = float(y[peak_idx])
|
||
shoulder_gap = max(1, min(8, y.size // 64 if y.size > 0 else 1))
|
||
shoulder_width = max(4, min(32, y.size // 16 if y.size > 0 else 4))
|
||
left_lo = max(0, peak_idx - shoulder_gap - shoulder_width)
|
||
left_hi = max(0, peak_idx - shoulder_gap)
|
||
right_lo = min(y.size, peak_idx + shoulder_gap + 1)
|
||
right_hi = min(y.size, right_lo + shoulder_width)
|
||
background_parts = []
|
||
if left_hi > left_lo:
|
||
background_parts.append(float(np.nanmedian(y[left_lo:left_hi])))
|
||
if right_hi > right_lo:
|
||
background_parts.append(float(np.nanmedian(y[right_lo:right_hi])))
|
||
if background_parts:
|
||
background = float(np.mean(background_parts))
|
||
else:
|
||
background = float(np.nanpercentile(y, 10))
|
||
if not np.isfinite(peak_y) or not np.isfinite(background) or peak_y <= background:
|
||
return None
|
||
|
||
half_level = background + 0.5 * (peak_y - background)
|
||
|
||
def _interp_cross(x0: float, y0: float, x1: float, y1: float) -> float:
|
||
if not (np.isfinite(x0) and np.isfinite(y0) and np.isfinite(x1) and np.isfinite(y1)):
|
||
return x1
|
||
dy = y1 - y0
|
||
if dy == 0.0:
|
||
return x1
|
||
t = (half_level - y0) / dy
|
||
t = min(1.0, max(0.0, t))
|
||
return x0 + t * (x1 - x0)
|
||
|
||
left_x = float(x[0])
|
||
for i in range(peak_idx, 0, -1):
|
||
if y[i - 1] <= half_level <= y[i]:
|
||
left_x = _interp_cross(float(x[i - 1]), float(y[i - 1]), float(x[i]), float(y[i]))
|
||
break
|
||
else:
|
||
left_x = float(x[0])
|
||
|
||
right_x = float(x[-1])
|
||
for i in range(peak_idx, x.size - 1):
|
||
if y[i] >= half_level >= y[i + 1]:
|
||
right_x = _interp_cross(float(x[i]), float(y[i]), float(x[i + 1]), float(y[i + 1]))
|
||
break
|
||
else:
|
||
right_x = float(x[-1])
|
||
|
||
width = right_x - left_x
|
||
if not np.isfinite(width) or width <= 0.0:
|
||
return None
|
||
|
||
return {
|
||
"background": background,
|
||
"left": left_x,
|
||
"right": right_x,
|
||
"width": width,
|
||
"amplitude": peak_y,
|
||
}
|
||
|
||
|
||
def _normalize_sweep_simple(raw: np.ndarray, calib: np.ndarray) -> np.ndarray:
|
||
"""Простая нормировка: поэлементное деление raw/calib."""
|
||
w = min(raw.size, calib.size)
|
||
if w <= 0:
|
||
return raw
|
||
out = np.full_like(raw, np.nan, dtype=np.float32)
|
||
with np.errstate(divide="ignore", invalid="ignore"):
|
||
out[:w] = raw[:w] / calib[:w]
|
||
out = np.nan_to_num(out, nan=np.nan, posinf=np.nan, neginf=np.nan)
|
||
return out
|
||
|
||
|
||
def _build_calib_envelopes(calib: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
|
||
"""Оценить нижнюю/верхнюю огибающие калибровочной кривой."""
|
||
n = int(calib.size)
|
||
if n <= 0:
|
||
empty = np.zeros((0,), dtype=np.float32)
|
||
return empty, empty
|
||
|
||
y = np.asarray(calib, dtype=np.float32)
|
||
finite = np.isfinite(y)
|
||
if not np.any(finite):
|
||
zeros = np.zeros_like(y, dtype=np.float32)
|
||
return zeros, zeros
|
||
|
||
if not np.all(finite):
|
||
x = np.arange(n, dtype=np.float32)
|
||
y = y.copy()
|
||
y[~finite] = np.interp(x[~finite], x[finite], y[finite]).astype(np.float32)
|
||
|
||
if n < 3:
|
||
return y.copy(), y.copy()
|
||
|
||
dy = np.diff(y)
|
||
s = np.sign(dy).astype(np.int8, copy=False)
|
||
|
||
if np.any(s == 0):
|
||
for i in range(1, s.size):
|
||
if s[i] == 0:
|
||
s[i] = s[i - 1]
|
||
for i in range(s.size - 2, -1, -1):
|
||
if s[i] == 0:
|
||
s[i] = s[i + 1]
|
||
s[s == 0] = 1
|
||
|
||
max_idx = np.where((s[:-1] > 0) & (s[1:] < 0))[0] + 1
|
||
min_idx = np.where((s[:-1] < 0) & (s[1:] > 0))[0] + 1
|
||
|
||
x = np.arange(n, dtype=np.float32)
|
||
|
||
def _interp_nodes(nodes: np.ndarray) -> np.ndarray:
|
||
if nodes.size == 0:
|
||
idx = np.array([0, n - 1], dtype=np.int64)
|
||
else:
|
||
idx = np.unique(np.concatenate(([0], nodes, [n - 1]))).astype(np.int64)
|
||
return np.interp(x, idx.astype(np.float32), y[idx]).astype(np.float32)
|
||
|
||
upper = _interp_nodes(max_idx)
|
||
lower = _interp_nodes(min_idx)
|
||
|
||
swap = lower > upper
|
||
if np.any(swap):
|
||
tmp = upper[swap].copy()
|
||
upper[swap] = lower[swap]
|
||
lower[swap] = tmp
|
||
|
||
return lower, upper
|
||
|
||
|
||
def _normalize_sweep_projector(raw: np.ndarray, calib: np.ndarray) -> np.ndarray:
|
||
"""Нормировка через проекцию между огибающими калибровки в диапазон [-1, +1]."""
|
||
w = min(raw.size, calib.size)
|
||
if w <= 0:
|
||
return raw
|
||
|
||
out = np.full_like(raw, np.nan, dtype=np.float32)
|
||
raw_seg = np.asarray(raw[:w], dtype=np.float32)
|
||
lower, upper = _build_calib_envelopes(np.asarray(calib[:w], dtype=np.float32))
|
||
span = upper - lower
|
||
|
||
finite_span = span[np.isfinite(span) & (span > 0)]
|
||
if finite_span.size > 0:
|
||
eps = max(float(np.median(finite_span)) * 1e-6, 1e-9)
|
||
else:
|
||
eps = 1e-9
|
||
|
||
valid = (
|
||
np.isfinite(raw_seg)
|
||
& np.isfinite(lower)
|
||
& np.isfinite(upper)
|
||
& (span > eps)
|
||
)
|
||
if np.any(valid):
|
||
proj = np.empty_like(raw_seg, dtype=np.float32)
|
||
proj[valid] = ((2.0 * (raw_seg[valid] - lower[valid]) / span[valid]) - 1.0) * 1000.0
|
||
proj[valid] = np.clip(proj[valid], -1000.0, 1000.0)
|
||
proj[~valid] = np.nan
|
||
out[:w] = proj
|
||
|
||
return out
|
||
|
||
|
||
def _normalize_by_calib(raw: np.ndarray, calib: np.ndarray, norm_type: str) -> np.ndarray:
|
||
"""Нормировка свипа по выбранному алгоритму."""
|
||
nt = str(norm_type).strip().lower()
|
||
if nt == "simple":
|
||
return _normalize_sweep_simple(raw, calib)
|
||
return _normalize_sweep_projector(raw, calib)
|
||
|
||
|
||
def try_open_pyserial(path: str, baud: int, timeout: float):
|
||
try:
|
||
import serial # type: ignore
|
||
except Exception:
|
||
return None
|
||
try:
|
||
ser = serial.Serial(path, baudrate=baud, timeout=timeout)
|
||
return ser
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
class FDReader:
|
||
"""Простой враппер чтения строк из файлового дескриптора TTY."""
|
||
|
||
def __init__(self, fd: int):
|
||
# Отдельно буферизуем для корректной readline()
|
||
self._fd = fd
|
||
raw = os.fdopen(fd, "rb", closefd=False)
|
||
self._file = raw
|
||
self._buf = io.BufferedReader(raw, buffer_size=65536)
|
||
|
||
def fileno(self) -> int:
|
||
return self._fd
|
||
|
||
def readline(self) -> bytes:
|
||
return self._buf.readline()
|
||
|
||
def close(self):
|
||
try:
|
||
self._buf.close()
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def open_raw_tty(path: str, baud: int) -> Optional[FDReader]:
|
||
"""Открыть TTY без pyserial и настроить порт через termios.
|
||
|
||
Возвращает FDReader или None при ошибке.
|
||
"""
|
||
try:
|
||
import termios
|
||
import tty
|
||
except Exception:
|
||
return None
|
||
|
||
try:
|
||
fd = os.open(path, os.O_RDONLY | os.O_NOCTTY)
|
||
except Exception:
|
||
return None
|
||
|
||
try:
|
||
attrs = termios.tcgetattr(fd)
|
||
# Установим «сырое» состояние
|
||
tty.setraw(fd)
|
||
|
||
# Скорость
|
||
baud_map = {
|
||
9600: termios.B9600,
|
||
19200: termios.B19200,
|
||
38400: termios.B38400,
|
||
57600: termios.B57600,
|
||
115200: termios.B115200,
|
||
230400: getattr(termios, "B230400", None),
|
||
460800: getattr(termios, "B460800", None),
|
||
}
|
||
b = baud_map.get(baud) or termios.B115200
|
||
|
||
attrs[4] = b # ispeed
|
||
attrs[5] = b # ospeed
|
||
|
||
# VMIN=1, VTIME=0 — блокирующее чтение по байту
|
||
cc = attrs[6]
|
||
cc[termios.VMIN] = 1
|
||
cc[termios.VTIME] = 0
|
||
attrs[6] = cc
|
||
|
||
termios.tcsetattr(fd, termios.TCSANOW, attrs)
|
||
except Exception:
|
||
try:
|
||
os.close(fd)
|
||
except Exception:
|
||
pass
|
||
return None
|
||
|
||
return FDReader(fd)
|
||
|
||
|
||
class SerialLineSource:
|
||
"""Единый интерфейс для чтения строк из порта (pyserial или raw TTY)."""
|
||
|
||
def __init__(self, path: str, baud: int, timeout: float = 1.0):
|
||
self._pyserial = try_open_pyserial(path, baud, timeout)
|
||
#self._pyserial = None
|
||
self._fdreader = None
|
||
self._using = "pyserial" if self._pyserial is not None else "raw"
|
||
if self._pyserial is None:
|
||
self._fdreader = open_raw_tty(path, baud)
|
||
if self._fdreader is None:
|
||
msg = f"Не удалось открыть порт '{path}' (pyserial и raw TTY не сработали)"
|
||
if sys.platform.startswith("win"):
|
||
msg += ". На Windows нужен pyserial: pip install pyserial"
|
||
raise RuntimeError(msg)
|
||
|
||
def readline(self) -> bytes:
|
||
if self._pyserial is not None:
|
||
try:
|
||
return self._pyserial.readline()
|
||
except Exception:
|
||
return b""
|
||
else:
|
||
try:
|
||
return self._fdreader.readline() # type: ignore[union-attr]
|
||
except Exception:
|
||
return b""
|
||
|
||
def close(self):
|
||
try:
|
||
if self._pyserial is not None:
|
||
self._pyserial.close()
|
||
elif self._fdreader is not None:
|
||
self._fdreader.close()
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
class SerialChunkReader:
|
||
"""Быстрое неблокирующее чтение чанков из serial/raw TTY для максимального дренажа буфера."""
|
||
|
||
def __init__(self, src: SerialLineSource):
|
||
self._src = src
|
||
self._ser = src._pyserial
|
||
self._fd: Optional[int] = None
|
||
if self._ser is not None:
|
||
# Неблокирующий режим для быстрой откачки
|
||
try:
|
||
self._ser.timeout = 0
|
||
except Exception:
|
||
pass
|
||
else:
|
||
try:
|
||
self._fd = src._fdreader.fileno() # type: ignore[union-attr]
|
||
try:
|
||
os.set_blocking(self._fd, False)
|
||
except Exception:
|
||
pass
|
||
except Exception:
|
||
self._fd = None
|
||
|
||
def read_available(self) -> bytes:
|
||
"""Вернёт доступные байты (b"" если данных нет)."""
|
||
if self._ser is not None:
|
||
try:
|
||
n = int(getattr(self._ser, "in_waiting", 0))
|
||
except Exception:
|
||
n = 0
|
||
if n > 0:
|
||
try:
|
||
return self._ser.read(n)
|
||
except Exception:
|
||
return b""
|
||
return b""
|
||
if self._fd is None:
|
||
return b""
|
||
out = bytearray()
|
||
while True:
|
||
try:
|
||
chunk = os.read(self._fd, 65536)
|
||
if not chunk:
|
||
break
|
||
out += chunk
|
||
if len(chunk) < 65536:
|
||
break
|
||
except BlockingIOError:
|
||
break
|
||
except Exception:
|
||
break
|
||
return bytes(out)
|
||
|
||
|
||
class SweepReader(threading.Thread):
|
||
"""Фоновый поток: читает строки, формирует завершённые свипы и кладёт в очередь."""
|
||
|
||
def __init__(
|
||
self,
|
||
port_path: str,
|
||
baud: int,
|
||
out_queue: Queue[SweepPacket],
|
||
stop_event: threading.Event,
|
||
fancy: bool = False,
|
||
bin_mode: bool = False,
|
||
logscale: bool = False,
|
||
parser_16_bit_x2: bool = False,
|
||
parser_test: bool = False,
|
||
):
|
||
super().__init__(daemon=True)
|
||
self._port_path = port_path
|
||
self._baud = baud
|
||
self._q = out_queue
|
||
self._stop = stop_event
|
||
self._src: Optional[SerialLineSource] = None
|
||
self._fancy = bool(fancy)
|
||
self._bin_mode = bool(bin_mode)
|
||
self._logscale = bool(logscale)
|
||
self._parser_16_bit_x2 = bool(parser_16_bit_x2)
|
||
self._parser_test = bool(parser_test)
|
||
self._max_width: int = 0
|
||
self._sweep_idx: int = 0
|
||
self._last_sweep_ts: Optional[float] = None
|
||
self._n_valid_hist = deque()
|
||
|
||
@staticmethod
|
||
def _u32_to_i32(v: int) -> int:
|
||
"""Преобразование 32-bit слова в знаковое значение."""
|
||
return v - 0x1_0000_0000 if (v & 0x8000_0000) else v
|
||
|
||
@staticmethod
|
||
def _u16_to_i16(v: int) -> int:
|
||
"""Преобразование 16-bit слова в знаковое значение."""
|
||
return v - 0x1_0000 if (v & 0x8000) else v
|
||
|
||
def _finalize_current(
|
||
self,
|
||
xs,
|
||
ys,
|
||
channels: Optional[set[int]],
|
||
raw_curves: Optional[Tuple[list[int], list[int]]] = None,
|
||
apply_inversion: bool = True,
|
||
):
|
||
if not xs:
|
||
return
|
||
ch_list = sorted(channels) if channels else [0]
|
||
ch_primary = ch_list[0] if ch_list else 0
|
||
max_x = max(xs)
|
||
width = max_x + 1
|
||
self._max_width = max(self._max_width, width)
|
||
target_width = self._max_width if self._fancy else width
|
||
|
||
def _scatter(values, dtype) -> np.ndarray:
|
||
series = np.full((target_width,), np.nan, dtype=dtype)
|
||
try:
|
||
idx = np.asarray(xs, dtype=np.int64)
|
||
vals = np.asarray(values, dtype=dtype)
|
||
series[idx] = vals
|
||
except Exception:
|
||
for x, y in zip(xs, values):
|
||
if 0 <= x < target_width:
|
||
series[x] = y
|
||
return series
|
||
|
||
def _fill_missing(series: np.ndarray):
|
||
known = ~np.isnan(series)
|
||
if not np.any(known):
|
||
return
|
||
known_idx = np.nonzero(known)[0]
|
||
for i0, i1 in zip(known_idx[:-1], known_idx[1:]):
|
||
if i1 - i0 > 1:
|
||
avg = (series[i0] + series[i1]) * 0.5
|
||
series[i0 + 1 : i1] = avg
|
||
first_idx = int(known_idx[0])
|
||
last_idx = int(known_idx[-1])
|
||
if first_idx > 0:
|
||
series[:first_idx] = series[first_idx]
|
||
if last_idx < series.size - 1:
|
||
series[last_idx + 1 :] = series[last_idx]
|
||
|
||
# Быстрый векторизованный путь
|
||
sweep = _scatter(ys, np.float32)
|
||
aux_curves: SweepAuxCurves = None
|
||
if raw_curves is not None:
|
||
aux_curves = (
|
||
_scatter(raw_curves[0], np.float32),
|
||
_scatter(raw_curves[1], np.float32),
|
||
)
|
||
# Метрики валидных точек до заполнения пропусков
|
||
finite_pre = np.isfinite(sweep)
|
||
n_valid_cur = int(np.count_nonzero(finite_pre))
|
||
|
||
# Дополнительная обработка пропусков: при --fancy заполняем внутренние разрывы, края и дотягиваем до максимальной длины
|
||
if self._fancy:
|
||
try:
|
||
_fill_missing(sweep)
|
||
if aux_curves is not None:
|
||
_fill_missing(aux_curves[0])
|
||
_fill_missing(aux_curves[1])
|
||
except Exception:
|
||
# В случае ошибки просто оставляем как есть
|
||
pass
|
||
# Инверсия данных при «отрицательном» уровне (среднее ниже порога)
|
||
if apply_inversion:
|
||
try:
|
||
m = float(np.nanmean(sweep))
|
||
if np.isfinite(m) and m < DATA_INVERSION_THRASHOLD:
|
||
sweep *= -1.0
|
||
except Exception:
|
||
pass
|
||
#sweep = np.abs(sweep)
|
||
#sweep -= float(np.nanmean(sweep))
|
||
|
||
# Метрики для статусной строки (вид словаря: переменная -> значение)
|
||
self._sweep_idx += 1
|
||
if len(ch_list) > 1:
|
||
sys.stderr.write(
|
||
f"[warn] Sweep {self._sweep_idx}: изменялся номер канала: {ch_list}\n"
|
||
)
|
||
now = time.time()
|
||
if self._last_sweep_ts is None:
|
||
dt_ms = float("nan")
|
||
else:
|
||
dt_ms = (now - self._last_sweep_ts) * 1000.0
|
||
self._last_sweep_ts = now
|
||
self._n_valid_hist.append((now, n_valid_cur))
|
||
while self._n_valid_hist and (now - self._n_valid_hist[0][0]) > 1.0:
|
||
self._n_valid_hist.popleft()
|
||
if self._n_valid_hist:
|
||
n_valid = float(sum(v for _t, v in self._n_valid_hist) / len(self._n_valid_hist))
|
||
else:
|
||
n_valid = float(n_valid_cur)
|
||
|
||
if n_valid_cur > 0:
|
||
vmin = float(np.nanmin(sweep))
|
||
vmax = float(np.nanmax(sweep))
|
||
mean = float(np.nanmean(sweep))
|
||
std = float(np.nanstd(sweep))
|
||
else:
|
||
vmin = vmax = mean = std = float("nan")
|
||
info: SweepInfo = {
|
||
"sweep": self._sweep_idx,
|
||
"ch": ch_primary,
|
||
"chs": ch_list,
|
||
"n_valid": n_valid,
|
||
"min": vmin,
|
||
"max": vmax,
|
||
"mean": mean,
|
||
"std": std,
|
||
"dt_ms": dt_ms,
|
||
}
|
||
|
||
# Кладём готовый свип (если очередь полна — выбрасываем самый старый)
|
||
try:
|
||
self._q.put_nowait((sweep, info, aux_curves))
|
||
except Full:
|
||
try:
|
||
_ = self._q.get_nowait()
|
||
except Exception:
|
||
pass
|
||
try:
|
||
self._q.put_nowait((sweep, info, aux_curves))
|
||
except Exception:
|
||
pass
|
||
|
||
def _run_ascii_stream(self, chunk_reader: SerialChunkReader):
|
||
xs: list[int] = []
|
||
ys: list[int] = []
|
||
cur_channel: Optional[int] = None
|
||
cur_channels: set[int] = set()
|
||
buf = bytearray()
|
||
while not self._stop.is_set():
|
||
data = chunk_reader.read_available()
|
||
if data:
|
||
buf += data
|
||
else:
|
||
time.sleep(0.0005)
|
||
continue
|
||
|
||
while True:
|
||
nl = buf.find(b"\n")
|
||
if nl == -1:
|
||
break
|
||
line = bytes(buf[:nl])
|
||
del buf[: nl + 1]
|
||
if line.endswith(b"\r"):
|
||
line = line[:-1]
|
||
if not line:
|
||
continue
|
||
|
||
if line.startswith(b"Sweep_start"):
|
||
self._finalize_current(xs, ys, cur_channels)
|
||
xs.clear()
|
||
ys.clear()
|
||
cur_channel = None
|
||
cur_channels.clear()
|
||
continue
|
||
|
||
# sCH X Y или s CH X Y (все целые со знаком). Разделяем по любым пробелам/табам.
|
||
if len(line) >= 3:
|
||
parts = line.split()
|
||
if len(parts) >= 3 and (parts[0].lower() == b"s" or parts[0].lower().startswith(b"s")):
|
||
try:
|
||
if parts[0].lower() == b"s":
|
||
if len(parts) >= 4:
|
||
ch = int(parts[1], 10)
|
||
x = int(parts[2], 10)
|
||
y = int(parts[3], 10) # поддержка знака: "+…" и "-…"
|
||
else:
|
||
ch = 0
|
||
x = int(parts[1], 10)
|
||
y = int(parts[2], 10) # поддержка знака: "+…" и "-…"
|
||
else:
|
||
# формат вида "s0"
|
||
ch = int(parts[0][1:], 10)
|
||
x = int(parts[1], 10)
|
||
y = int(parts[2], 10) # поддержка знака: "+…" и "-…"
|
||
except Exception:
|
||
continue
|
||
if cur_channel is None:
|
||
cur_channel = ch
|
||
cur_channels.add(ch)
|
||
xs.append(x)
|
||
ys.append(y)
|
||
|
||
if len(buf) > 1_000_000:
|
||
del buf[:-262144]
|
||
|
||
self._finalize_current(xs, ys, cur_channels)
|
||
|
||
def _run_binary_stream(self, chunk_reader: SerialChunkReader):
|
||
xs: list[int] = []
|
||
ys: list[int] = []
|
||
cur_channel: Optional[int] = None
|
||
cur_channels: set[int] = set()
|
||
expected_step: Optional[int] = None
|
||
syncing = False
|
||
words = deque()
|
||
|
||
buf = bytearray()
|
||
while not self._stop.is_set():
|
||
data = chunk_reader.read_available()
|
||
if data:
|
||
buf += data
|
||
else:
|
||
time.sleep(0.0005)
|
||
continue
|
||
|
||
usable = len(buf) & ~1
|
||
if usable == 0:
|
||
continue
|
||
|
||
i = 0
|
||
while i < usable:
|
||
w = int(buf[i]) | (int(buf[i + 1]) << 8)
|
||
words.append(w)
|
||
i += 2
|
||
|
||
|
||
|
||
|
||
|
||
# Бинарный протокол:
|
||
# старт свипа: 0xFFFF, 0xFFFF, 0xFFFF, (ch<<8)|0x0A
|
||
# точка: step, value_hi, value_lo, 0x000A
|
||
while len(words) >= 4:
|
||
w0 = int(words[0])
|
||
w1 = int(words[1])
|
||
w2 = int(words[2])
|
||
w3 = int(words[3])
|
||
|
||
if w0 == 0xFFFF and w1 == 0xFFFF and w2 == 0xFFFF and (w3 & 0x00FF) == 0x000A:
|
||
if len(words) < 5:
|
||
break
|
||
if int(words[4]) != 1:
|
||
words.popleft()
|
||
continue
|
||
self._finalize_current(xs, ys, cur_channels)
|
||
xs.clear()
|
||
ys.clear()
|
||
cur_channels.clear()
|
||
cur_channel = (w3 >> 8) & 0x00FF
|
||
cur_channels.add(cur_channel)
|
||
expected_step = 1
|
||
syncing = False
|
||
for _ in range(4):
|
||
words.popleft()
|
||
continue
|
||
|
||
if syncing:
|
||
if w0 == 0x000A:
|
||
syncing = False
|
||
expected_step = None
|
||
words.popleft()
|
||
continue
|
||
|
||
if w3 != 0x000A:
|
||
syncing = True
|
||
continue
|
||
|
||
if w0 <= 0:
|
||
syncing = True
|
||
continue
|
||
|
||
if expected_step is not None and w0 < expected_step:
|
||
syncing = True
|
||
continue
|
||
|
||
if cur_channel is not None:
|
||
cur_channels.add(cur_channel)
|
||
xs.append(w0)
|
||
value_u32 = (w1 << 16) | w2
|
||
ys.append(self._u32_to_i32(value_u32))
|
||
expected_step = w0 + 1
|
||
for _ in range(4):
|
||
words.popleft()
|
||
continue
|
||
|
||
# Поток может начаться с середины пакета; сдвигаемся по слову до ресинхронизации.
|
||
words.popleft()
|
||
|
||
del buf[:usable]
|
||
if len(buf) > 1_000_000:
|
||
del buf[:-262144]
|
||
|
||
self._finalize_current(xs, ys, cur_channels)
|
||
|
||
def _run_logscale_binary_stream(self, chunk_reader: SerialChunkReader):
|
||
xs: list[int] = []
|
||
ys: list[float] = []
|
||
avg_1_vals: list[int] = []
|
||
avg_2_vals: list[int] = []
|
||
cur_channel: Optional[int] = None
|
||
cur_channels: set[int] = set()
|
||
expected_step: Optional[int] = None
|
||
syncing = False
|
||
words = deque()
|
||
|
||
buf = bytearray()
|
||
while not self._stop.is_set():
|
||
data = chunk_reader.read_available()
|
||
if data:
|
||
buf += data
|
||
else:
|
||
time.sleep(0.0005)
|
||
continue
|
||
|
||
usable = len(buf) & ~1
|
||
if usable == 0:
|
||
continue
|
||
|
||
i = 0
|
||
while i < usable:
|
||
w = int(buf[i]) | (int(buf[i + 1]) << 8)
|
||
words.append(w)
|
||
i += 2
|
||
|
||
# Бинарный logscale-протокол:
|
||
# старт свипа: 0xFFFF x5, затем (ch<<8)|0x0A
|
||
# точка: step, avg1_hi, avg1_lo, avg2_hi, avg2_lo, 0x000A
|
||
while len(words) >= 6:
|
||
w0 = int(words[0])
|
||
w1 = int(words[1])
|
||
w2 = int(words[2])
|
||
w3 = int(words[3])
|
||
w4 = int(words[4])
|
||
w5 = int(words[5])
|
||
|
||
if (
|
||
w0 == 0xFFFF
|
||
and w1 == 0xFFFF
|
||
and w2 == 0xFFFF
|
||
and w3 == 0xFFFF
|
||
and w4 == 0xFFFF
|
||
and (w5 & 0x00FF) == 0x000A
|
||
):
|
||
if len(words) < 7:
|
||
break
|
||
if int(words[6]) != 1:
|
||
words.popleft()
|
||
continue
|
||
self._finalize_current(
|
||
xs,
|
||
ys,
|
||
cur_channels,
|
||
raw_curves=(avg_1_vals, avg_2_vals),
|
||
apply_inversion=False,
|
||
)
|
||
xs.clear()
|
||
ys.clear()
|
||
avg_1_vals.clear()
|
||
avg_2_vals.clear()
|
||
cur_channels.clear()
|
||
cur_channel = (w5 >> 8) & 0x00FF
|
||
cur_channels.add(cur_channel)
|
||
expected_step = 1
|
||
syncing = False
|
||
for _ in range(6):
|
||
words.popleft()
|
||
continue
|
||
|
||
if syncing:
|
||
if w0 == 0x000A:
|
||
syncing = False
|
||
expected_step = None
|
||
words.popleft()
|
||
continue
|
||
|
||
if w5 != 0x000A:
|
||
syncing = True
|
||
continue
|
||
|
||
if w0 <= 0:
|
||
syncing = True
|
||
continue
|
||
|
||
if expected_step is not None and w0 < expected_step:
|
||
syncing = True
|
||
continue
|
||
|
||
if cur_channel is not None:
|
||
cur_channels.add(cur_channel)
|
||
avg_1 = self._u32_to_i32((w1 << 16) | w2)
|
||
avg_2 = self._u32_to_i32((w3 << 16) | w4)
|
||
xs.append(w0)
|
||
avg_1_vals.append(avg_1)
|
||
avg_2_vals.append(avg_2)
|
||
ys.append(_log_pair_to_sweep(avg_1, avg_2))
|
||
expected_step = w0 + 1
|
||
#ys.append(LOG_BASE**(avg_1/LOG_SCALER) - LOG_BASE**(avg_2/LOG_SCALER))
|
||
for _ in range(6):
|
||
words.popleft()
|
||
continue
|
||
|
||
words.popleft()
|
||
|
||
del buf[:usable]
|
||
if len(buf) > 1_000_000:
|
||
del buf[:-262144]
|
||
|
||
self._finalize_current(
|
||
xs,
|
||
ys,
|
||
cur_channels,
|
||
raw_curves=(avg_1_vals, avg_2_vals),
|
||
apply_inversion=False,
|
||
)
|
||
|
||
def _run_logscale_16_bit_x2_binary_stream(self, chunk_reader: SerialChunkReader):
|
||
xs: list[int] = []
|
||
ys: list[float] = []
|
||
avg_1_vals: list[int] = []
|
||
avg_2_vals: list[int] = []
|
||
cur_channel: Optional[int] = None
|
||
cur_channels: set[int] = set()
|
||
expected_step: Optional[int] = None
|
||
syncing = False
|
||
words = deque()
|
||
|
||
buf = bytearray()
|
||
while not self._stop.is_set():
|
||
data = chunk_reader.read_available()
|
||
if data:
|
||
buf += data
|
||
else:
|
||
time.sleep(0.0005)
|
||
continue
|
||
|
||
usable = len(buf) & ~1
|
||
if usable == 0:
|
||
continue
|
||
|
||
i = 0
|
||
while i < usable:
|
||
w = int(buf[i]) | (int(buf[i + 1]) << 8)
|
||
words.append(w)
|
||
i += 2
|
||
#print(i)
|
||
#print(words)
|
||
# Бинарный logscale-протокол (16-bit x2):
|
||
# старт свипа: 0xFFFF, 0xFFFF, 0xFFFF, (ch<<8)|0x0A
|
||
# точка: step, avg1_lo16, avg2_lo16, 0xFFFF
|
||
while len(words) >= 4:
|
||
w0 = int(words[0])
|
||
w1 = int(words[1])
|
||
w2 = int(words[2])
|
||
w3 = int(words[3])
|
||
|
||
if w0 == 0xFFFF and w1 == 0xFFFF and w2 == 0xFFFF and (w3 & 0x00FF) == 0x000A:
|
||
if len(words) < 5:
|
||
break
|
||
if int(words[4]) != 1:
|
||
words.popleft()
|
||
continue
|
||
self._finalize_current(
|
||
xs,
|
||
ys,
|
||
cur_channels,
|
||
raw_curves=(avg_1_vals, avg_2_vals),
|
||
apply_inversion=False,
|
||
)
|
||
xs.clear()
|
||
ys.clear()
|
||
avg_1_vals.clear()
|
||
avg_2_vals.clear()
|
||
cur_channels.clear()
|
||
cur_channel = (w3 >> 8) & 0x00FF
|
||
cur_channels.add(cur_channel)
|
||
expected_step = 1
|
||
syncing = False
|
||
for _ in range(4):
|
||
words.popleft()
|
||
continue
|
||
|
||
if syncing:
|
||
if w0 == 0xFFFF:
|
||
syncing = False
|
||
expected_step = None
|
||
words.popleft()
|
||
continue
|
||
|
||
if w3 != 0xFFFF:
|
||
syncing = True
|
||
continue
|
||
|
||
if w0 <= 0:
|
||
syncing = True
|
||
continue
|
||
|
||
if expected_step is not None and w0 < expected_step:
|
||
syncing = True
|
||
continue
|
||
|
||
if cur_channel is not None:
|
||
cur_channels.add(cur_channel)
|
||
avg_1 = self._u16_to_i16(w1)
|
||
avg_2 = self._u16_to_i16(w2)
|
||
xs.append(w0)
|
||
avg_1_vals.append(avg_1)
|
||
avg_2_vals.append(avg_2)
|
||
ys.append(_log_pair_to_sweep(avg_1, avg_2))
|
||
expected_step = w0 + 1
|
||
for _ in range(4):
|
||
words.popleft()
|
||
continue
|
||
|
||
words.popleft()
|
||
|
||
del buf[:usable]
|
||
if len(buf) > 1_000_000:
|
||
del buf[:-262144]
|
||
|
||
self._finalize_current(
|
||
xs,
|
||
ys,
|
||
cur_channels,
|
||
raw_curves=(avg_1_vals, avg_2_vals),
|
||
apply_inversion=False,
|
||
)
|
||
|
||
def _run_parser_test_stream(self, chunk_reader: SerialChunkReader):
|
||
"""Тестовый парсер 16-bit x2:
|
||
- разделитель точки: одиночный 0xFFFF
|
||
- маркер старта свипа: серия 0xFFFF (>=2), затем (ch<<8)|0x0A
|
||
"""
|
||
xs: list[int] = []
|
||
ys: list[float] = []
|
||
avg_1_vals: list[int] = []
|
||
avg_2_vals: list[int] = []
|
||
cur_channel: Optional[int] = None
|
||
cur_channels: set[int] = set()
|
||
expected_step: Optional[int] = None
|
||
in_sweep = False
|
||
point_buf: list[int] = []
|
||
ffff_run = 0
|
||
local_resync = False
|
||
|
||
def _finalize_sweep():
|
||
self._finalize_current(
|
||
xs,
|
||
ys,
|
||
cur_channels,
|
||
raw_curves=(avg_1_vals, avg_2_vals),
|
||
apply_inversion=False,
|
||
)
|
||
xs.clear()
|
||
ys.clear()
|
||
avg_1_vals.clear()
|
||
avg_2_vals.clear()
|
||
cur_channels.clear()
|
||
|
||
def _consume_point() -> bool:
|
||
nonlocal expected_step
|
||
if len(point_buf) != 3:
|
||
return False
|
||
step = int(point_buf[0])
|
||
if step <= 0:
|
||
return False
|
||
if expected_step is not None and step < expected_step:
|
||
return False
|
||
if cur_channel is not None:
|
||
cur_channels.add(cur_channel)
|
||
avg_1 = self._u16_to_i16(int(point_buf[1]))
|
||
avg_2 = self._u16_to_i16(int(point_buf[2]))
|
||
xs.append(step)
|
||
avg_1_vals.append(avg_1)
|
||
avg_2_vals.append(avg_2)
|
||
ys.append(_log_pair_to_sweep(avg_1, avg_2))
|
||
expected_step = step + 1
|
||
return True
|
||
|
||
buf = bytearray()
|
||
buf_pos = 0
|
||
while not self._stop.is_set():
|
||
data = chunk_reader.read_available()
|
||
if data:
|
||
buf += data
|
||
else:
|
||
time.sleep(0.000005)
|
||
continue
|
||
|
||
# Обрабатываем поток строго по одному слову (16 бит) за шаг.
|
||
while (buf_pos + 1) < len(buf):
|
||
w = int(buf[buf_pos]) | (int(buf[buf_pos + 1]) << 8)
|
||
buf_pos += 2
|
||
|
||
if w == 0xFFFF:
|
||
ffff_run += 1
|
||
continue
|
||
|
||
if ffff_run > 0:
|
||
bad_point_on_this_delim = False
|
||
if in_sweep and point_buf and (not local_resync):
|
||
if not _consume_point():
|
||
local_resync = True
|
||
bad_point_on_this_delim = True
|
||
point_buf.clear()
|
||
|
||
if ffff_run >= 2:
|
||
if (w & 0x00FF) == 0x000A:
|
||
_finalize_sweep()
|
||
cur_channel = (w >> 8) & 0x00FF
|
||
cur_channels.add(cur_channel)
|
||
in_sweep = True
|
||
expected_step = 1
|
||
local_resync = False
|
||
point_buf.clear()
|
||
ffff_run = 0
|
||
continue
|
||
# Некорректная серия FFFF: остаёмся в текущем свипе, ждём
|
||
# следующего одиночного разделителя точки.
|
||
if in_sweep:
|
||
local_resync = True
|
||
ffff_run = 0
|
||
continue
|
||
|
||
# ffff_run == 1: это просто разделитель точки
|
||
if local_resync and (not bad_point_on_this_delim):
|
||
# Нашли следующий одиночный разделитель: выходим из локального resync.
|
||
local_resync = False
|
||
point_buf.clear()
|
||
ffff_run = 0
|
||
|
||
if in_sweep and (not local_resync):
|
||
point_buf.append(w)
|
||
if len(point_buf) > 3:
|
||
point_buf.clear()
|
||
local_resync = True
|
||
|
||
# Периодически уплотняем буфер, сохраняя возможный хвост из 1 байта.
|
||
if buf_pos >= 262144:
|
||
del buf[:buf_pos]
|
||
buf_pos = 0
|
||
# Аварийный лимит на остаток неразобранных данных.
|
||
if (len(buf) - buf_pos) > 1_000_000:
|
||
tail = buf[buf_pos:]
|
||
if len(tail) > 262144:
|
||
tail = tail[-262144:]
|
||
buf = bytearray(tail)
|
||
buf_pos = 0
|
||
|
||
# Попробуем завершить последнюю точку, если поток закончился ровно на разделителе.
|
||
if in_sweep and (not local_resync) and ffff_run == 1 and point_buf:
|
||
_consume_point()
|
||
point_buf.clear()
|
||
_finalize_sweep()
|
||
|
||
def run(self):
|
||
try:
|
||
self._src = SerialLineSource(self._port_path, self._baud, timeout=1.0)
|
||
sys.stderr.write(f"[info] Открыл порт {self._port_path} ({self._src._using})\n")
|
||
except Exception as e:
|
||
sys.stderr.write(f"[error] {e}\n")
|
||
return
|
||
|
||
try:
|
||
chunk_reader = SerialChunkReader(self._src)
|
||
if self._parser_test:
|
||
self._run_parser_test_stream(chunk_reader)
|
||
elif self._parser_16_bit_x2:
|
||
self._run_logscale_16_bit_x2_binary_stream(chunk_reader)
|
||
elif self._logscale:
|
||
self._run_logscale_binary_stream(chunk_reader)
|
||
elif self._bin_mode:
|
||
self._run_binary_stream(chunk_reader)
|
||
else:
|
||
self._run_ascii_stream(chunk_reader)
|
||
finally:
|
||
try:
|
||
if self._src is not None:
|
||
self._src.close()
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(
|
||
description=(
|
||
"Читает свипы из виртуального COM-порта и рисует: "
|
||
"последний свип и водопад (реалтайм)."
|
||
)
|
||
)
|
||
parser.add_argument(
|
||
"port",
|
||
help="Путь к порту, например /dev/ttyACM1 или COM3 (COM10+: \\\\.\\COM10)",
|
||
)
|
||
parser.add_argument("--baud", type=int, default=115200, help="Скорость (по умолчанию 115200)")
|
||
parser.add_argument("--max-sweeps", type=int, default=200, help="Количество видимых свипов в водопаде")
|
||
parser.add_argument("--max-fps", type=float, default=30.0, help="Лимит частоты отрисовки, кадров/с")
|
||
parser.add_argument("--cmap", default="viridis", help="Цветовая карта водопада")
|
||
parser.add_argument(
|
||
"--spec-clip",
|
||
default="2,98",
|
||
help=(
|
||
"Процентильная обрезка уровней водопада спектров, % (min,max). "
|
||
"Напр. 2,98. 'off' — отключить"
|
||
),
|
||
)
|
||
parser.add_argument(
|
||
"--spec-mean-sec",
|
||
type=float,
|
||
default=0.0,
|
||
help=(
|
||
"Вычитание среднего по каждой частоте за последние N секунд "
|
||
"в водопаде спектров (0 — отключить)"
|
||
),
|
||
)
|
||
parser.add_argument("--title", default="ADC Sweeps", help="Заголовок окна")
|
||
parser.add_argument(
|
||
"--fancy",
|
||
action="store_true",
|
||
help="Заполнять выпавшие точки средними значениями между соседними",
|
||
)
|
||
parser.add_argument(
|
||
"--ylim",
|
||
type=str,
|
||
default=None,
|
||
help="Фиксированные Y-пределы для кривой формата min,max (например -1000,1000). По умолчанию авто",
|
||
)
|
||
parser.add_argument(
|
||
"--backend",
|
||
choices=["auto", "pg", "mpl"],
|
||
default="pg",
|
||
help="Графический бэкенд: pyqtgraph (pg) — быстрее; matplotlib (mpl) — совместимый. По умолчанию pg",
|
||
)
|
||
parser.add_argument(
|
||
"--norm-type",
|
||
choices=["projector", "simple"],
|
||
default="projector",
|
||
help="Тип нормировки: projector (по огибающим в [-1,+1]) или simple (raw/calib)",
|
||
)
|
||
parser.add_argument(
|
||
"--bin",
|
||
dest="bin_mode",
|
||
action="store_true",
|
||
help=(
|
||
"Бинарный протокол: старт свипа 0xFFFF,0xFFFF,0xFFFF,(CH<<8)|0x0A; "
|
||
"точки step,uint32(hi16,lo16),0x000A"
|
||
),
|
||
)
|
||
parser.add_argument(
|
||
"--logscale",
|
||
action="store_true",
|
||
default=True,
|
||
help=(
|
||
"Новый бинарный протокол: точка несёт пару int32 (avg_1, avg_2), "
|
||
"а свип считается как 10**(avg_1*0.001) - 10**(avg_2*0.001)"
|
||
),
|
||
)
|
||
parser.add_argument(
|
||
"--parser_16_bit_x2",
|
||
action="store_true",
|
||
help=(
|
||
"Бинарный logscale-протокол c парой int16 (avg_1, avg_2): "
|
||
"старт 0xFFFF,0xFFFF,0xFFFF,(CH<<8)|0x0A; точка step,avg1_lo16,avg2_lo16,0xFFFF"
|
||
),
|
||
)
|
||
parser.add_argument(
|
||
"--parser_test",
|
||
action="store_true",
|
||
help=(
|
||
"Тестовый парсер для формата 16-bit x2: "
|
||
"одиночный 0xFFFF завершает точку, серия 0xFFFF начинает новый свип"
|
||
),
|
||
)
|
||
parser.add_argument(
|
||
"--calibrate",
|
||
action="store_true",
|
||
help=(
|
||
"Режим измерения ширины главного пика FFT: рисует красные маркеры "
|
||
"границ и фона и выводит ширину пика в статус"
|
||
),
|
||
)
|
||
|
||
args = parser.parse_args()
|
||
|
||
# Попробуем быстрый бэкенд (pyqtgraph) при auto/pg
|
||
if args.backend == "pg":
|
||
try:
|
||
return run_pyqtgraph(args)
|
||
except Exception as e:
|
||
sys.stderr.write(f"[error] PyQtGraph бэкенд недоступен: {e}\n")
|
||
sys.exit(1)
|
||
|
||
if args.backend == "auto":
|
||
try:
|
||
return run_pyqtgraph(args)
|
||
except Exception:
|
||
pass # При auto — тихо откатываемся на matplotlib
|
||
|
||
try:
|
||
import matplotlib
|
||
import matplotlib.pyplot as plt
|
||
from matplotlib.animation import FuncAnimation
|
||
from matplotlib.widgets import Slider, CheckButtons, TextBox
|
||
except Exception as e:
|
||
sys.stderr.write(f"[error] Нужны matplotlib и ее зависимости: {e}\n")
|
||
sys.exit(1)
|
||
|
||
# Очередь завершённых свипов и поток чтения
|
||
q: Queue[SweepPacket] = Queue(maxsize=1000)
|
||
stop_event = threading.Event()
|
||
reader = SweepReader(
|
||
args.port,
|
||
args.baud,
|
||
q,
|
||
stop_event,
|
||
fancy=bool(args.fancy),
|
||
bin_mode=bool(args.bin_mode),
|
||
logscale=bool(args.logscale),
|
||
parser_16_bit_x2=bool(args.parser_16_bit_x2),
|
||
parser_test=bool(args.parser_test),
|
||
)
|
||
reader.start()
|
||
|
||
# Графика
|
||
fig, axs = plt.subplots(2, 2, figsize=(12, 8))
|
||
(ax_line, ax_img), (ax_fft, ax_spec) = axs
|
||
fig.canvas.manager.set_window_title(args.title) if hasattr(fig.canvas.manager, "set_window_title") else None
|
||
# Увеличим расстояния и оставим место справа под ползунки оси Y B-scan
|
||
fig.subplots_adjust(wspace=0.25, hspace=0.35, left=0.07, right=0.90, top=0.92, bottom=0.08)
|
||
|
||
# Состояние для отображения
|
||
current_freqs: Optional[np.ndarray] = None
|
||
current_distances: Optional[np.ndarray] = None
|
||
current_sweep_raw: Optional[np.ndarray] = None
|
||
current_aux_curves: SweepAuxCurves = None
|
||
current_sweep_norm: Optional[np.ndarray] = None
|
||
current_fft_db: Optional[np.ndarray] = None
|
||
last_calib_sweep: Optional[np.ndarray] = None
|
||
current_info: Optional[SweepInfo] = None
|
||
x_shared: Optional[np.ndarray] = None
|
||
width: Optional[int] = None
|
||
max_sweeps = int(max(10, args.max_sweeps))
|
||
ring = None # type: Optional[np.ndarray]
|
||
ring_time = None # type: Optional[np.ndarray]
|
||
head = 0
|
||
# Авто-уровни цветовой шкалы водопада сырых данных пересчитываются по видимой области.
|
||
# FFT состояние
|
||
fft_bins = FFT_LEN // 2 + 1
|
||
ring_fft = None # type: Optional[np.ndarray]
|
||
y_min_fft, y_max_fft = None, None
|
||
distance_shared: Optional[np.ndarray] = None
|
||
# Параметры контраста водопада спектров
|
||
spec_clip = _parse_spec_clip(getattr(args, "spec_clip", None))
|
||
spec_mean_sec = float(getattr(args, "spec_mean_sec", 0.0))
|
||
# Ползунки управления Y для B-scan и контрастом
|
||
ymin_slider = None
|
||
ymax_slider = None
|
||
contrast_slider = None
|
||
calib_enabled = False
|
||
bg_subtract_enabled = False
|
||
peak_calibrate_mode = bool(getattr(args, "calibrate", False))
|
||
current_peak_width: Optional[float] = None
|
||
current_peak_amplitude: Optional[float] = None
|
||
norm_type = str(getattr(args, "norm_type", "projector")).strip().lower()
|
||
cb = None
|
||
c_boxes = []
|
||
c_values_text = None
|
||
|
||
# Статусная строка (внизу окна)
|
||
status_text = fig.text(
|
||
0.01,
|
||
0.01,
|
||
"",
|
||
ha="left",
|
||
va="bottom",
|
||
fontsize=8,
|
||
family="monospace",
|
||
)
|
||
|
||
# Линейный график последнего свипа
|
||
line_avg1_obj, = ax_line.plot([], [], lw=1, color="0.65")
|
||
line_avg2_obj, = ax_line.plot([], [], lw=1, color="0.45")
|
||
line_obj, = ax_line.plot([], [], lw=1, color="tab:blue")
|
||
line_calib_obj, = ax_line.plot([], [], lw=1, color="tab:red")
|
||
line_norm_obj, = ax_line.plot([], [], lw=1, color="tab:green")
|
||
ax_line.set_title("Сырые данные", pad=1)
|
||
ax_line.set_xlabel("ГГц")
|
||
ax_line.set_ylabel("")
|
||
channel_text = ax_line.text(
|
||
0.98,
|
||
0.98,
|
||
"",
|
||
transform=ax_line.transAxes,
|
||
ha="right",
|
||
va="top",
|
||
fontsize=9,
|
||
family="monospace",
|
||
)
|
||
|
||
# Линейный график спектра текущего свипа
|
||
fft_line_obj, = ax_fft.plot([], [], lw=1)
|
||
fft_bg_obj = ax_fft.axhline(0.0, lw=1, color="red", visible=False)
|
||
fft_left_obj = ax_fft.axvline(0.0, lw=1, color="red", visible=False)
|
||
fft_right_obj = ax_fft.axvline(0.0, lw=1, color="red", visible=False)
|
||
ax_fft.set_title("FFT", pad=1)
|
||
ax_fft.set_xlabel("Расстояние, м")
|
||
ax_fft.set_ylabel("дБ")
|
||
|
||
# Диапазон по Y для последнего свипа: авто по умолчанию (поддерживает отрицательные значения)
|
||
fixed_ylim: Optional[Tuple[float, float]] = None
|
||
# CLI переопределение при необходимости
|
||
if args.ylim:
|
||
try:
|
||
y0, y1 = args.ylim.split(",")
|
||
fixed_ylim = (float(y0), float(y1))
|
||
except Exception:
|
||
sys.stderr.write("[warn] Некорректный формат --ylim, игнорирую. Ожидалось min,max\n")
|
||
if fixed_ylim is not None:
|
||
ax_line.set_ylim(fixed_ylim)
|
||
|
||
# Водопад (будет инициализирован при первом свипе)
|
||
img_obj = ax_img.imshow(
|
||
np.zeros((1, 1), dtype=np.float32),
|
||
aspect="auto",
|
||
interpolation="nearest",
|
||
origin="lower",
|
||
cmap=args.cmap,
|
||
)
|
||
ax_img.set_title("Сырые данные", pad=12)
|
||
ax_img.set_xlabel("")
|
||
ax_img.set_ylabel("ГГц")
|
||
# Не показываем численные значения по времени на водопаде сырых данных
|
||
try:
|
||
ax_img.tick_params(axis="x", labelbottom=False)
|
||
except Exception:
|
||
pass
|
||
|
||
# Водопад спектров
|
||
img_fft_obj = ax_spec.imshow(
|
||
np.zeros((1, 1), dtype=np.float32),
|
||
aspect="auto",
|
||
interpolation="nearest",
|
||
origin="lower",
|
||
cmap=args.cmap,
|
||
)
|
||
ax_spec.set_title("B-scan (дБ)", pad=12)
|
||
ax_spec.set_xlabel("")
|
||
ax_spec.set_ylabel("Расстояние, м")
|
||
# Не показываем численные значения по времени на B-scan
|
||
try:
|
||
ax_spec.tick_params(axis="x", labelbottom=False)
|
||
except Exception:
|
||
pass
|
||
spec_left_obj = ax_spec.axhline(0.0, lw=1, color="red", visible=False)
|
||
spec_right_obj = ax_spec.axhline(0.0, lw=1, color="red", visible=False)
|
||
|
||
def _normalize_sweep(raw: np.ndarray, calib: np.ndarray) -> np.ndarray:
|
||
return _normalize_by_calib(raw, calib, norm_type=norm_type)
|
||
|
||
def _sync_checkbox_states():
|
||
nonlocal calib_enabled, bg_subtract_enabled, current_sweep_norm
|
||
try:
|
||
states = cb.get_status() if cb is not None else ()
|
||
calib_enabled = bool(states[0]) if len(states) > 0 else False
|
||
bg_subtract_enabled = bool(states[1]) if len(states) > 1 else False
|
||
except Exception:
|
||
calib_enabled = False
|
||
bg_subtract_enabled = False
|
||
if calib_enabled and current_sweep_raw is not None and last_calib_sweep is not None:
|
||
current_sweep_norm = _normalize_sweep(current_sweep_raw, last_calib_sweep)
|
||
else:
|
||
current_sweep_norm = None
|
||
|
||
# Слайдеры для управления осью Y B-scan (мин/макс) и контрастом
|
||
try:
|
||
ax_smin = fig.add_axes([0.92, 0.55, 0.02, 0.35])
|
||
ax_smax = fig.add_axes([0.95, 0.55, 0.02, 0.35])
|
||
ax_sctr = fig.add_axes([0.98, 0.55, 0.02, 0.35])
|
||
ax_cb = fig.add_axes([0.90, 0.40, 0.10, 0.14])
|
||
ymin_slider = Slider(ax_smin, "R min", 0.0, 1.0, valinit=0.0, orientation="vertical")
|
||
ymax_slider = Slider(ax_smax, "R max", 0.0, 1.0, valinit=1.0, orientation="vertical")
|
||
contrast_slider = Slider(ax_sctr, "Int max", 0, 100, valinit=100, valstep=1, orientation="vertical")
|
||
cb = CheckButtons(ax_cb, ["нормировка", "вычет фона"], [False, False])
|
||
|
||
def _on_ylim_change(_val):
|
||
try:
|
||
y0 = float(min(ymin_slider.val, ymax_slider.val))
|
||
y1 = float(max(ymin_slider.val, ymax_slider.val))
|
||
ax_spec.set_ylim(y0, y1)
|
||
fig.canvas.draw_idle()
|
||
except Exception:
|
||
pass
|
||
|
||
ymin_slider.on_changed(_on_ylim_change)
|
||
ymax_slider.on_changed(_on_ylim_change)
|
||
# Контраст влияет на верхнюю границу цветовой шкалы (процент от авто-диапазона)
|
||
contrast_slider.on_changed(lambda _v: fig.canvas.draw_idle())
|
||
cb.on_clicked(lambda _v: _sync_checkbox_states())
|
||
except Exception:
|
||
pass
|
||
|
||
if peak_calibrate_mode:
|
||
try:
|
||
def _refresh_c_values_text():
|
||
if c_values_text is None:
|
||
return
|
||
try:
|
||
c_values_text.set_text(
|
||
"\n".join(
|
||
f"C*{idx}={float(CALIBRATION_C[idx]):.6g}" for idx in range(3)
|
||
)
|
||
)
|
||
except Exception:
|
||
pass
|
||
|
||
def _set_c_value(idx: int, text: str):
|
||
global CALIBRATION_C, CALIBRATION_C_BASE
|
||
try:
|
||
CALIBRATION_C_BASE[idx] = float(text.strip())
|
||
CALIBRATION_C = recalculate_calibration_c(CALIBRATION_C_BASE)
|
||
except Exception:
|
||
pass
|
||
_refresh_c_values_text()
|
||
|
||
for idx, ypos in enumerate((0.36, 0.30, 0.24)):
|
||
ax_c = fig.add_axes([0.92, ypos, 0.08, 0.045])
|
||
tb = TextBox(ax_c, f"C{idx}", initial=f"{float(CALIBRATION_C_BASE[idx]):.6g}")
|
||
tb.on_submit(lambda text, i=idx: _set_c_value(i, text))
|
||
c_boxes.append(tb)
|
||
ax_c_info = fig.add_axes([0.90, 0.14, 0.10, 0.08])
|
||
ax_c_info.axis("off")
|
||
c_values_text = ax_c_info.text(
|
||
0.0,
|
||
1.0,
|
||
"",
|
||
ha="left",
|
||
va="top",
|
||
fontsize=7,
|
||
family="monospace",
|
||
transform=ax_c_info.transAxes,
|
||
)
|
||
_refresh_c_values_text()
|
||
except Exception:
|
||
pass
|
||
|
||
# Для контроля частоты обновления
|
||
max_fps = max(1.0, float(args.max_fps))
|
||
interval_ms = int(1000.0 / max_fps)
|
||
frames_since_ylim_update = 0
|
||
spec_slider_initialized = False
|
||
|
||
|
||
def ensure_buffer(_w: int):
|
||
nonlocal ring, width, head, x_shared, ring_fft, distance_shared, ring_time
|
||
if ring is not None:
|
||
return
|
||
width = WF_WIDTH
|
||
x_shared = np.linspace(SWEEP_FREQ_MIN_GHZ, SWEEP_FREQ_MAX_GHZ, width, dtype=np.float32)
|
||
ring = np.full((max_sweeps, width), np.nan, dtype=np.float32)
|
||
ring_time = np.full((max_sweeps,), np.nan, dtype=np.float64)
|
||
head = 0
|
||
# Обновляем изображение под новые размеры: время по X (горизонталь), X по Y
|
||
img_obj.set_data(np.zeros((width, max_sweeps), dtype=np.float32))
|
||
img_obj.set_extent((0, max_sweeps - 1, SWEEP_FREQ_MIN_GHZ, SWEEP_FREQ_MAX_GHZ))
|
||
ax_img.set_xlim(0, max_sweeps - 1)
|
||
ax_img.set_ylim(SWEEP_FREQ_MIN_GHZ, SWEEP_FREQ_MAX_GHZ)
|
||
# FFT буферы: время по X, бин по Y
|
||
ring_fft = np.full((max_sweeps, fft_bins), np.nan, dtype=np.float32)
|
||
img_fft_obj.set_data(np.zeros((fft_bins, max_sweeps), dtype=np.float32))
|
||
img_fft_obj.set_extent((0, max_sweeps - 1, 0.0, 1.0))
|
||
ax_spec.set_xlim(0, max_sweeps - 1)
|
||
ax_spec.set_ylim(0.0, 1.0)
|
||
distance_shared = _compute_distance_axis(current_freqs, fft_bins)
|
||
|
||
def _update_physical_axes():
|
||
nonlocal distance_shared, spec_slider_initialized
|
||
if current_freqs is not None and current_freqs.size > 0:
|
||
finite_f = current_freqs[np.isfinite(current_freqs)]
|
||
if finite_f.size > 0:
|
||
f_min = float(np.min(finite_f))
|
||
f_max = float(np.max(finite_f))
|
||
if f_max <= f_min:
|
||
f_max = f_min + 1.0
|
||
img_obj.set_extent((0, max_sweeps - 1, f_min, f_max))
|
||
ax_img.set_ylim(f_min, f_max)
|
||
|
||
distance_shared = _compute_distance_axis(current_freqs, fft_bins)
|
||
if distance_shared.size > 0:
|
||
d_min = float(distance_shared[0])
|
||
d_max = float(distance_shared[-1]) if distance_shared.size > 1 else float(distance_shared[0] + 1.0)
|
||
if d_max <= d_min:
|
||
d_max = d_min + 1.0
|
||
img_fft_obj.set_extent((0, max_sweeps - 1, d_min, d_max))
|
||
ax_spec.set_ylim(d_min, d_max)
|
||
if ymin_slider is not None and ymax_slider is not None:
|
||
try:
|
||
ymin_slider.valmin = d_min
|
||
ymin_slider.valmax = d_max
|
||
ymax_slider.valmin = d_min
|
||
ymax_slider.valmax = d_max
|
||
ymin_slider.ax.set_ylim(d_min, d_max)
|
||
ymax_slider.ax.set_ylim(d_min, d_max)
|
||
if (not spec_slider_initialized) or (not (d_min <= ymin_slider.val <= d_max)):
|
||
ymin_slider.set_val(d_min)
|
||
if (not spec_slider_initialized) or (not (d_min <= ymax_slider.val <= d_max)):
|
||
ymax_slider.set_val(d_max)
|
||
spec_slider_initialized = True
|
||
except Exception:
|
||
pass
|
||
|
||
def _visible_levels_matplotlib(data: np.ndarray, axis) -> Optional[Tuple[float, float]]:
|
||
"""(vmin, vmax) по центральным 90% значений в видимой области imshow."""
|
||
if data.size == 0:
|
||
return None
|
||
ny, nx = data.shape[0], data.shape[1]
|
||
try:
|
||
x0, x1 = axis.get_xlim()
|
||
y0, y1 = axis.get_ylim()
|
||
except Exception:
|
||
x0, x1 = 0.0, float(nx - 1)
|
||
y0, y1 = 0.0, float(ny - 1)
|
||
xmin, xmax = sorted((float(x0), float(x1)))
|
||
ymin, ymax = sorted((float(y0), float(y1)))
|
||
ix0 = max(0, min(nx - 1, int(np.floor(xmin))))
|
||
ix1 = max(0, min(nx - 1, int(np.ceil(xmax))))
|
||
iy0 = max(0, min(ny - 1, int(np.floor(ymin))))
|
||
iy1 = max(0, min(ny - 1, int(np.ceil(ymax))))
|
||
if ix1 < ix0:
|
||
ix1 = ix0
|
||
if iy1 < iy0:
|
||
iy1 = iy0
|
||
sub = data[iy0 : iy1 + 1, ix0 : ix1 + 1]
|
||
finite = np.isfinite(sub)
|
||
if not finite.any():
|
||
return None
|
||
vals = sub[finite]
|
||
vmin = float(np.nanpercentile(vals, 5))
|
||
vmax = float(np.nanpercentile(vals, 95))
|
||
if not (np.isfinite(vmin) and np.isfinite(vmax)) or vmin == vmax:
|
||
return None
|
||
return (vmin, vmax)
|
||
|
||
def push_sweep(s: np.ndarray, freqs: Optional[np.ndarray] = None):
|
||
nonlocal ring, head, ring_fft, y_min_fft, y_max_fft, ring_time
|
||
if s is None or s.size == 0 or ring is None:
|
||
return
|
||
# Нормализуем длину до фиксированной ширины
|
||
w = ring.shape[1]
|
||
row = np.full((w,), np.nan, dtype=np.float32)
|
||
take = min(w, s.size)
|
||
row[:take] = s[:take]
|
||
ring[head, :] = row
|
||
if ring_time is not None:
|
||
ring_time[head] = time.time()
|
||
head = (head + 1) % ring.shape[0]
|
||
# FFT строка (линейный модуль; перевод в дБ делаем при отображении)
|
||
if ring_fft is not None:
|
||
bins = ring_fft.shape[1]
|
||
fft_mag = _compute_fft_mag_row(s, freqs, bins)
|
||
ring_fft[(head - 1) % ring_fft.shape[0], :] = fft_mag
|
||
fft_row = _fft_mag_to_db(fft_mag)
|
||
# Экстремумы для цветовой шкалы
|
||
fr_min = np.nanmin(fft_row)
|
||
fr_max = np.nanmax(fft_row)
|
||
fr_max = np.nanpercentile(fft_row, 90)
|
||
if y_min_fft is None or (not np.isnan(fr_min) and fr_min < y_min_fft):
|
||
y_min_fft = float(fr_min)
|
||
if y_max_fft is None or (not np.isnan(fr_max) and fr_max > y_max_fft):
|
||
y_max_fft = float(fr_max)
|
||
|
||
def drain_queue():
|
||
nonlocal current_freqs, current_distances, current_sweep_raw, current_aux_curves, current_sweep_norm, current_info, last_calib_sweep
|
||
drained = 0
|
||
while True:
|
||
try:
|
||
s, info, aux_curves = q.get_nowait()
|
||
except Empty:
|
||
break
|
||
drained += 1
|
||
calibrated = calibrate_freqs(
|
||
{
|
||
"F": np.linspace(SWEEP_FREQ_MIN_GHZ, SWEEP_FREQ_MAX_GHZ, s.size, dtype=np.float64),
|
||
"I": s,
|
||
}
|
||
)
|
||
current_freqs = calibrated["F"]
|
||
current_distances = _compute_distance_axis(current_freqs, fft_bins)
|
||
s = calibrated["I"]
|
||
current_sweep_raw = s
|
||
current_aux_curves = aux_curves
|
||
current_info = info
|
||
ch = 0
|
||
try:
|
||
ch = int(info.get("ch", 0)) if isinstance(info, dict) else 0
|
||
except Exception:
|
||
ch = 0
|
||
if ch == 0:
|
||
last_calib_sweep = s
|
||
current_sweep_norm = None
|
||
sweep_for_proc = s
|
||
else:
|
||
if calib_enabled and last_calib_sweep is not None:
|
||
current_sweep_norm = _normalize_sweep(s, last_calib_sweep)
|
||
sweep_for_proc = current_sweep_norm
|
||
else:
|
||
current_sweep_norm = None
|
||
sweep_for_proc = s
|
||
ensure_buffer(s.size)
|
||
_update_physical_axes()
|
||
push_sweep(sweep_for_proc, current_freqs)
|
||
return drained
|
||
|
||
def make_display_ring():
|
||
# Возвращаем буфер с правильным порядком по времени (старые→новые) и осью времени по X
|
||
if ring is None:
|
||
return np.zeros((1, 1), dtype=np.float32)
|
||
base = ring if head == 0 else np.roll(ring, -head, axis=0)
|
||
return base.T # (width, time)
|
||
|
||
def make_display_times():
|
||
if ring_time is None:
|
||
return None
|
||
base_t = ring_time if head == 0 else np.roll(ring_time, -head)
|
||
return base_t
|
||
|
||
def _subtract_recent_mean_fft(disp_fft: np.ndarray) -> np.ndarray:
|
||
"""Вычесть среднее по каждой дальности за последние spec_mean_sec секунд в линейной области."""
|
||
if spec_mean_sec <= 0.0:
|
||
return disp_fft
|
||
disp_times = make_display_times()
|
||
if disp_times is None:
|
||
return disp_fft
|
||
now_t = time.time()
|
||
mask = np.isfinite(disp_times) & (disp_times >= (now_t - spec_mean_sec))
|
||
if not np.any(mask):
|
||
return disp_fft
|
||
try:
|
||
mean_spec = np.nanmean(disp_fft[:, mask], axis=1)
|
||
except Exception:
|
||
return disp_fft
|
||
mean_spec = np.nan_to_num(mean_spec, nan=0.0)
|
||
return disp_fft - mean_spec[:, None]
|
||
|
||
def _visible_bg_fft(disp_fft: np.ndarray) -> Optional[np.ndarray]:
|
||
"""Оценка фона по медиане в текущем видимом по времени окне B-scan."""
|
||
if not bg_subtract_enabled or disp_fft.size == 0:
|
||
return None
|
||
ny, nx = disp_fft.shape
|
||
if ny <= 0 or nx <= 0:
|
||
return None
|
||
try:
|
||
x0, x1 = ax_spec.get_xlim()
|
||
except Exception:
|
||
x0, x1 = 0.0, float(nx - 1)
|
||
xmin, xmax = sorted((float(x0), float(x1)))
|
||
ix0 = max(0, min(nx - 1, int(np.floor(xmin))))
|
||
ix1 = max(0, min(nx - 1, int(np.ceil(xmax))))
|
||
if ix1 < ix0:
|
||
ix1 = ix0
|
||
window = disp_fft[:, ix0 : ix1 + 1]
|
||
if window.size == 0:
|
||
return None
|
||
try:
|
||
bg_spec = np.nanmedian(window, axis=1)
|
||
except Exception:
|
||
return None
|
||
if not np.any(np.isfinite(bg_spec)):
|
||
return None
|
||
return np.nan_to_num(bg_spec, nan=0.0).astype(np.float32, copy=False)
|
||
|
||
def make_display_ring_fft():
|
||
if ring_fft is None:
|
||
return np.zeros((1, 1), dtype=np.float32)
|
||
base = ring_fft if head == 0 else np.roll(ring_fft, -head, axis=0)
|
||
return base.T # (bins, time)
|
||
|
||
def update(_frame):
|
||
nonlocal frames_since_ylim_update, current_peak_width, current_peak_amplitude
|
||
if peak_calibrate_mode and any(getattr(tb, "capturekeystrokes", False) for tb in c_boxes):
|
||
return (
|
||
line_obj,
|
||
line_avg1_obj,
|
||
line_avg2_obj,
|
||
line_calib_obj,
|
||
line_norm_obj,
|
||
img_obj,
|
||
fft_line_obj,
|
||
fft_bg_obj,
|
||
fft_left_obj,
|
||
fft_right_obj,
|
||
img_fft_obj,
|
||
spec_left_obj,
|
||
spec_right_obj,
|
||
status_text,
|
||
channel_text,
|
||
)
|
||
changed = drain_queue() > 0
|
||
|
||
# Обновление линии последнего свипа
|
||
if current_sweep_raw is not None:
|
||
if current_freqs is not None and current_freqs.size == current_sweep_raw.size:
|
||
xs = current_freqs
|
||
elif x_shared is not None and current_sweep_raw.size <= x_shared.size:
|
||
xs = x_shared[: current_sweep_raw.size]
|
||
else:
|
||
xs = np.arange(current_sweep_raw.size, dtype=np.int32)
|
||
line_obj.set_data(xs, current_sweep_raw)
|
||
if current_aux_curves is not None:
|
||
avg_1_curve, avg_2_curve = current_aux_curves
|
||
line_avg1_obj.set_data(xs[: avg_1_curve.size], avg_1_curve)
|
||
line_avg2_obj.set_data(xs[: avg_2_curve.size], avg_2_curve)
|
||
else:
|
||
line_avg1_obj.set_data([], [])
|
||
line_avg2_obj.set_data([], [])
|
||
if last_calib_sweep is not None:
|
||
line_calib_obj.set_data(xs[: last_calib_sweep.size], last_calib_sweep)
|
||
else:
|
||
line_calib_obj.set_data([], [])
|
||
if current_sweep_norm is not None:
|
||
line_norm_obj.set_data(xs[: current_sweep_norm.size], current_sweep_norm)
|
||
else:
|
||
line_norm_obj.set_data([], [])
|
||
# Лимиты по X: номинальный диапазон свипа
|
||
if isinstance(xs, np.ndarray) and xs.size > 0 and np.isfinite(xs[0]) and np.isfinite(xs[-1]):
|
||
ax_line.set_xlim(float(np.nanmin(xs)), float(np.nanmax(xs)))
|
||
else:
|
||
ax_line.set_xlim(SWEEP_FREQ_MIN_GHZ, SWEEP_FREQ_MAX_GHZ)
|
||
# Адаптивные Y-лимиты (если не задан --ylim)
|
||
if fixed_ylim is None:
|
||
y_series = [current_sweep_raw, last_calib_sweep, current_sweep_norm]
|
||
if current_aux_curves is not None:
|
||
y_series.extend(current_aux_curves)
|
||
y_limits = _compute_auto_ylim(*y_series)
|
||
if y_limits is not None:
|
||
ax_line.set_ylim(y_limits[0], y_limits[1])
|
||
|
||
# Обновление спектра текущего свипа
|
||
sweep_for_fft = current_sweep_norm if current_sweep_norm is not None else current_sweep_raw
|
||
if sweep_for_fft.size > 0 and distance_shared is not None:
|
||
fft_vals = _compute_fft_row(sweep_for_fft, current_freqs, distance_shared.size)
|
||
xs_fft = current_distances if current_distances is not None else distance_shared
|
||
if fft_vals.size > xs_fft.size:
|
||
fft_vals = fft_vals[: xs_fft.size]
|
||
fft_line_obj.set_data(xs_fft[: fft_vals.size], fft_vals)
|
||
# Авто-диапазон по Y для спектра
|
||
if np.isfinite(np.nanmin(fft_vals)) and np.isfinite(np.nanmax(fft_vals)):
|
||
finite_x = xs_fft[: fft_vals.size][np.isfinite(xs_fft[: fft_vals.size])]
|
||
if finite_x.size > 0:
|
||
ax_fft.set_xlim(float(np.min(finite_x)), float(np.max(finite_x)))
|
||
ax_fft.set_ylim(float(np.nanmin(fft_vals)), float(np.nanmax(fft_vals)))
|
||
if peak_calibrate_mode:
|
||
markers = _find_peak_width_markers(xs_fft[: fft_vals.size], fft_vals)
|
||
if markers is not None:
|
||
fft_bg_obj.set_ydata([markers["background"], markers["background"]])
|
||
fft_left_obj.set_xdata([markers["left"], markers["left"]])
|
||
fft_right_obj.set_xdata([markers["right"], markers["right"]])
|
||
spec_left_obj.set_ydata([markers["left"], markers["left"]])
|
||
spec_right_obj.set_ydata([markers["right"], markers["right"]])
|
||
fft_bg_obj.set_visible(True)
|
||
fft_left_obj.set_visible(True)
|
||
fft_right_obj.set_visible(True)
|
||
spec_left_obj.set_visible(True)
|
||
spec_right_obj.set_visible(True)
|
||
current_peak_width = markers["width"]
|
||
current_peak_amplitude = markers["amplitude"]
|
||
else:
|
||
fft_bg_obj.set_visible(False)
|
||
fft_left_obj.set_visible(False)
|
||
fft_right_obj.set_visible(False)
|
||
spec_left_obj.set_visible(False)
|
||
spec_right_obj.set_visible(False)
|
||
current_peak_width = None
|
||
current_peak_amplitude = None
|
||
else:
|
||
fft_bg_obj.set_visible(False)
|
||
fft_left_obj.set_visible(False)
|
||
fft_right_obj.set_visible(False)
|
||
spec_left_obj.set_visible(False)
|
||
spec_right_obj.set_visible(False)
|
||
current_peak_width = None
|
||
current_peak_amplitude = None
|
||
|
||
# Обновление водопада
|
||
if changed and ring is not None:
|
||
disp = make_display_ring()
|
||
# Новые данные справа: без реверса
|
||
img_obj.set_data(disp)
|
||
# Подписи времени не обновляем динамически (оставляем авто-тики)
|
||
# Авто-уровни: по видимой области (не накапливаем за всё время)
|
||
levels = _visible_levels_matplotlib(disp, ax_img)
|
||
if levels is not None:
|
||
img_obj.set_clim(vmin=levels[0], vmax=levels[1])
|
||
|
||
# Обновление водопада спектров
|
||
if changed and ring_fft is not None:
|
||
disp_fft_lin = make_display_ring_fft()
|
||
disp_fft_lin = _subtract_recent_mean_fft(disp_fft_lin)
|
||
bg_spec = _visible_bg_fft(disp_fft_lin)
|
||
if bg_spec is not None:
|
||
num = np.maximum(disp_fft_lin, 0.0).astype(np.float32, copy=False) + 1e-9
|
||
den = bg_spec[:, None] + 1e-9
|
||
disp_fft = (20.0 * np.log10(num / den)).astype(np.float32, copy=False)
|
||
else:
|
||
disp_fft = _fft_mag_to_db(disp_fft_lin)
|
||
# Новые данные справа: без реверса
|
||
img_fft_obj.set_data(disp_fft)
|
||
# Подписи времени не обновляем динамически (оставляем авто-тики)
|
||
# Автодиапазон по среднему спектру за видимый интервал (как в хорошей версии)
|
||
if bg_spec is not None:
|
||
try:
|
||
p5 = float(np.nanpercentile(disp_fft, 5))
|
||
p95 = float(np.nanpercentile(disp_fft, 95))
|
||
span = max(abs(p5), abs(p95))
|
||
except Exception:
|
||
span = float("nan")
|
||
if np.isfinite(span) and span > 0.0:
|
||
try:
|
||
c = float(contrast_slider.val) / 100.0 if contrast_slider is not None else 1.0
|
||
except Exception:
|
||
c = 1.0
|
||
span_eff = max(span * c, 1e-6)
|
||
img_fft_obj.set_clim(vmin=-span_eff, vmax=span_eff)
|
||
else:
|
||
try:
|
||
# disp_fft имеет форму (bins, time); берём среднее по времени
|
||
mean_spec = np.nanmean(disp_fft, axis=1)
|
||
vmin_v = float(np.nanmin(mean_spec))
|
||
vmax_v = float(np.nanmax(mean_spec))
|
||
except Exception:
|
||
vmin_v = vmax_v = None
|
||
# Если средние не дают валидный диапазон — используем процентильную обрезку (если задана)
|
||
if (vmin_v is None or not np.isfinite(vmin_v)) or (vmax_v is None or not np.isfinite(vmax_v)) or vmin_v == vmax_v:
|
||
if spec_clip is not None:
|
||
try:
|
||
vmin_v = float(np.nanpercentile(disp_fft, spec_clip[0]))
|
||
vmax_v = float(np.nanpercentile(disp_fft, spec_clip[1]))
|
||
except Exception:
|
||
vmin_v = vmax_v = None
|
||
# Фолбэк к отслеживаемым минимум/максимумам
|
||
if (vmin_v is None or not np.isfinite(vmin_v)) or (vmax_v is None or not np.isfinite(vmax_v)) or vmin_v == vmax_v:
|
||
if y_min_fft is not None and y_max_fft is not None and np.isfinite(y_min_fft) and np.isfinite(y_max_fft) and y_min_fft != y_max_fft:
|
||
vmin_v, vmax_v = y_min_fft, y_max_fft
|
||
if vmin_v is not None and vmax_v is not None and vmin_v != vmax_v:
|
||
# Применим скалирование контрастом (верхняя граница)
|
||
try:
|
||
c = float(contrast_slider.val) / 100.0 if contrast_slider is not None else 1.0
|
||
except Exception:
|
||
c = 1.0
|
||
vmax_eff = vmin_v + c * (vmax_v - vmin_v)
|
||
img_fft_obj.set_clim(vmin=vmin_v, vmax=vmax_eff)
|
||
|
||
if changed and current_info:
|
||
status_payload = dict(current_info)
|
||
if peak_calibrate_mode and current_peak_width is not None:
|
||
status_payload["peak_w"] = current_peak_width
|
||
if peak_calibrate_mode and current_peak_amplitude is not None:
|
||
status_payload["peak_a"] = current_peak_amplitude
|
||
status_text.set_text(_format_status_kv(status_payload))
|
||
chs = current_info.get("chs") if isinstance(current_info, dict) else None
|
||
if chs is None:
|
||
chs = current_info.get("ch") if isinstance(current_info, dict) else None
|
||
if chs is None:
|
||
channel_text.set_text("")
|
||
else:
|
||
try:
|
||
if isinstance(chs, (list, tuple, set)):
|
||
ch_list = sorted(int(v) for v in chs)
|
||
ch_text_val = ", ".join(str(v) for v in ch_list)
|
||
else:
|
||
ch_text_val = str(int(chs))
|
||
channel_text.set_text(f"chs {ch_text_val}")
|
||
except Exception:
|
||
channel_text.set_text(f"chs {chs}")
|
||
|
||
# Возвращаем обновлённые артисты
|
||
return (
|
||
line_obj,
|
||
line_avg1_obj,
|
||
line_avg2_obj,
|
||
line_calib_obj,
|
||
line_norm_obj,
|
||
img_obj,
|
||
fft_line_obj,
|
||
fft_bg_obj,
|
||
fft_left_obj,
|
||
fft_right_obj,
|
||
img_fft_obj,
|
||
spec_left_obj,
|
||
spec_right_obj,
|
||
status_text,
|
||
channel_text,
|
||
)
|
||
|
||
ani = FuncAnimation(fig, update, interval=interval_ms, blit=False)
|
||
cleanup_done = False
|
||
|
||
def _cleanup():
|
||
nonlocal cleanup_done
|
||
if cleanup_done:
|
||
return
|
||
cleanup_done = True
|
||
stop_event.set()
|
||
reader.join(timeout=1.0)
|
||
|
||
def _handle_sigint(_signum, _frame):
|
||
_cleanup()
|
||
try:
|
||
plt.close(fig)
|
||
except Exception:
|
||
pass
|
||
|
||
prev_sigint = signal.getsignal(signal.SIGINT)
|
||
try:
|
||
fig.canvas.mpl_connect("close_event", lambda _evt: _cleanup())
|
||
except Exception:
|
||
pass
|
||
try:
|
||
signal.signal(signal.SIGINT, _handle_sigint)
|
||
except Exception:
|
||
prev_sigint = None
|
||
|
||
try:
|
||
plt.show()
|
||
finally:
|
||
_cleanup()
|
||
if prev_sigint is not None:
|
||
try:
|
||
signal.signal(signal.SIGINT, prev_sigint)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def run_pyqtgraph(args):
|
||
"""Быстрый GUI на PyQtGraph. Требует pyqtgraph и PyQt5/PySide6."""
|
||
peak_calibrate_mode = bool(getattr(args, "calibrate", False))
|
||
try:
|
||
import pyqtgraph as pg
|
||
from pyqtgraph.Qt import QtCore, QtWidgets # type: ignore
|
||
except Exception as e:
|
||
raise RuntimeError(
|
||
"pyqtgraph и совместимый Qt backend не найдены. Установите: pip install pyqtgraph PyQt5"
|
||
) from e
|
||
|
||
# Очередь завершённых свипов и поток чтения
|
||
q: Queue[SweepPacket] = Queue(maxsize=1000)
|
||
stop_event = threading.Event()
|
||
reader = SweepReader(
|
||
args.port,
|
||
args.baud,
|
||
q,
|
||
stop_event,
|
||
fancy=bool(args.fancy),
|
||
bin_mode=bool(args.bin_mode),
|
||
logscale=bool(args.logscale),
|
||
parser_16_bit_x2=bool(args.parser_16_bit_x2),
|
||
parser_test=bool(args.parser_test),
|
||
)
|
||
reader.start()
|
||
|
||
# Настройки скорости
|
||
max_sweeps = int(max(10, args.max_sweeps))
|
||
max_fps = max(1.0, float(args.max_fps))
|
||
interval_ms = int(1000.0 / max_fps)
|
||
|
||
# PyQtGraph настройки
|
||
pg.setConfigOptions(
|
||
useOpenGL=not peak_calibrate_mode,
|
||
antialias=False,
|
||
imageAxisOrder="row-major",
|
||
)
|
||
app = QtWidgets.QApplication.instance()
|
||
if app is None:
|
||
app = QtWidgets.QApplication([])
|
||
try:
|
||
app.setApplicationName(str(args.title))
|
||
except Exception:
|
||
pass
|
||
try:
|
||
app.setQuitOnLastWindowClosed(True)
|
||
except Exception:
|
||
pass
|
||
main_window = QtWidgets.QWidget()
|
||
try:
|
||
main_window.setWindowTitle(str(args.title))
|
||
except Exception:
|
||
pass
|
||
main_layout = QtWidgets.QHBoxLayout(main_window)
|
||
main_layout.setContentsMargins(6, 6, 6, 6)
|
||
main_layout.setSpacing(6)
|
||
win = pg.GraphicsLayoutWidget(show=False, title=args.title)
|
||
main_layout.addWidget(win)
|
||
settings_widget = QtWidgets.QWidget()
|
||
settings_layout = QtWidgets.QVBoxLayout(settings_widget)
|
||
settings_layout.setContentsMargins(6, 6, 6, 6)
|
||
settings_layout.setSpacing(8)
|
||
try:
|
||
settings_widget.setMinimumWidth(170)
|
||
except Exception:
|
||
pass
|
||
main_layout.addWidget(settings_widget)
|
||
|
||
# Плот последнего свипа (слева-сверху)
|
||
p_line = win.addPlot(row=0, col=0, title="Сырые данные")
|
||
p_line.showGrid(x=True, y=True, alpha=0.3)
|
||
curve_avg1 = p_line.plot(pen=pg.mkPen((170, 170, 170), width=1))
|
||
curve_avg2 = p_line.plot(pen=pg.mkPen((110, 110, 110), width=1))
|
||
curve = p_line.plot(pen=pg.mkPen((80, 120, 255), width=1))
|
||
curve_calib = p_line.plot(pen=pg.mkPen((220, 60, 60), width=1))
|
||
curve_norm = p_line.plot(pen=pg.mkPen((60, 180, 90), width=1))
|
||
p_line.setLabel("bottom", "ГГц")
|
||
p_line.setLabel("left", "Y")
|
||
ch_text = pg.TextItem("", anchor=(1, 1))
|
||
ch_text.setZValue(10)
|
||
p_line.addItem(ch_text)
|
||
|
||
# Водопад (справа-сверху)
|
||
p_img = win.addPlot(row=0, col=1, title="Сырые данные водопад")
|
||
p_img.invertY(False)
|
||
p_img.showGrid(x=False, y=False)
|
||
p_img.setLabel("bottom", "Время, с (новое справа)")
|
||
try:
|
||
p_img.getAxis("bottom").setStyle(showValues=False)
|
||
except Exception:
|
||
pass
|
||
p_img.setLabel("left", "ГГц")
|
||
img = pg.ImageItem()
|
||
p_img.addItem(img)
|
||
|
||
# FFT (слева-снизу)
|
||
p_fft = win.addPlot(row=1, col=0, title="FFT")
|
||
p_fft.showGrid(x=True, y=True, alpha=0.3)
|
||
curve_fft = p_fft.plot(pen=pg.mkPen((255, 120, 80), width=1))
|
||
peak_pen = pg.mkPen((255, 0, 0), width=1)
|
||
fft_bg_line = pg.InfiniteLine(angle=0, movable=False, pen=peak_pen)
|
||
fft_left_line = pg.InfiniteLine(angle=90, movable=False, pen=peak_pen)
|
||
fft_right_line = pg.InfiniteLine(angle=90, movable=False, pen=peak_pen)
|
||
p_fft.addItem(fft_bg_line)
|
||
p_fft.addItem(fft_left_line)
|
||
p_fft.addItem(fft_right_line)
|
||
fft_bg_line.setVisible(False)
|
||
fft_left_line.setVisible(False)
|
||
fft_right_line.setVisible(False)
|
||
p_fft.setLabel("bottom", "Расстояние, м")
|
||
p_fft.setLabel("left", "дБ")
|
||
|
||
# Водопад спектров (справа-снизу)
|
||
p_spec = win.addPlot(row=1, col=1, title="B-scan (дБ)")
|
||
p_spec.invertY(False)
|
||
p_spec.showGrid(x=False, y=False)
|
||
p_spec.setLabel("bottom", "Время, с (новое справа)")
|
||
try:
|
||
p_spec.getAxis("bottom").setStyle(showValues=False)
|
||
except Exception:
|
||
pass
|
||
p_spec.setLabel("left", "Расстояние, м")
|
||
img_fft = pg.ImageItem()
|
||
p_spec.addItem(img_fft)
|
||
spec_left_line = pg.InfiniteLine(angle=0, movable=False, pen=peak_pen)
|
||
spec_right_line = pg.InfiniteLine(angle=0, movable=False, pen=peak_pen)
|
||
p_spec.addItem(spec_left_line)
|
||
p_spec.addItem(spec_right_line)
|
||
spec_left_line.setVisible(False)
|
||
spec_right_line.setVisible(False)
|
||
|
||
# Правая панель настроек внутри основного окна.
|
||
calib_cb = QtWidgets.QCheckBox("нормировка")
|
||
bg_subtract_cb = QtWidgets.QCheckBox("вычет фона")
|
||
try:
|
||
settings_title = QtWidgets.QLabel("Настройки")
|
||
settings_layout.addWidget(settings_title)
|
||
except Exception:
|
||
pass
|
||
settings_layout.addWidget(calib_cb)
|
||
settings_layout.addWidget(bg_subtract_cb)
|
||
calib_window = None
|
||
|
||
# Статусная строка (внизу окна)
|
||
status = pg.LabelItem(justify="left")
|
||
win.addItem(status, row=3, col=0, colspan=2)
|
||
|
||
# Состояние
|
||
ring: Optional[np.ndarray] = None
|
||
ring_time: Optional[np.ndarray] = None
|
||
head = 0
|
||
width: Optional[int] = None
|
||
x_shared: Optional[np.ndarray] = None
|
||
current_freqs: Optional[np.ndarray] = None
|
||
current_distances: Optional[np.ndarray] = None
|
||
current_sweep_raw: Optional[np.ndarray] = None
|
||
current_aux_curves: SweepAuxCurves = None
|
||
current_sweep_norm: Optional[np.ndarray] = None
|
||
current_fft_db: Optional[np.ndarray] = None
|
||
last_calib_sweep: Optional[np.ndarray] = None
|
||
current_info: Optional[SweepInfo] = None
|
||
# Авто-уровни цветовой шкалы водопада сырых данных пересчитываются по видимой области.
|
||
# Для спектров
|
||
fft_bins = FFT_LEN // 2 + 1
|
||
ring_fft: Optional[np.ndarray] = None
|
||
distance_shared: Optional[np.ndarray] = None
|
||
y_min_fft, y_max_fft = None, None
|
||
# Параметры контраста водопада спектров (процентильная обрезка)
|
||
spec_clip = _parse_spec_clip(getattr(args, "spec_clip", None))
|
||
spec_mean_sec = float(getattr(args, "spec_mean_sec", 0.0))
|
||
calib_enabled = False
|
||
bg_subtract_enabled = False
|
||
current_peak_width: Optional[float] = None
|
||
current_peak_amplitude: Optional[float] = None
|
||
norm_type = str(getattr(args, "norm_type", "projector")).strip().lower()
|
||
c_edits = []
|
||
c_value_labels = []
|
||
plot_dirty = False
|
||
# Диапазон по Y: авто по умолчанию (поддерживает отрицательные значения)
|
||
fixed_ylim: Optional[Tuple[float, float]] = None
|
||
if args.ylim:
|
||
try:
|
||
y0, y1 = args.ylim.split(",")
|
||
fixed_ylim = (float(y0), float(y1))
|
||
except Exception:
|
||
pass
|
||
if fixed_ylim is not None:
|
||
p_line.setYRange(fixed_ylim[0], fixed_ylim[1], padding=0)
|
||
|
||
def _normalize_sweep(raw: np.ndarray, calib: np.ndarray) -> np.ndarray:
|
||
return _normalize_by_calib(raw, calib, norm_type=norm_type)
|
||
|
||
def _set_calib_enabled():
|
||
nonlocal calib_enabled, current_sweep_norm, plot_dirty
|
||
try:
|
||
calib_enabled = bool(calib_cb.isChecked())
|
||
except Exception:
|
||
calib_enabled = False
|
||
if calib_enabled and current_sweep_raw is not None and last_calib_sweep is not None:
|
||
current_sweep_norm = _normalize_sweep(current_sweep_raw, last_calib_sweep)
|
||
else:
|
||
current_sweep_norm = None
|
||
plot_dirty = True
|
||
|
||
def _set_bg_subtract_enabled():
|
||
nonlocal bg_subtract_enabled, plot_dirty
|
||
try:
|
||
bg_subtract_enabled = bool(bg_subtract_cb.isChecked())
|
||
except Exception:
|
||
bg_subtract_enabled = False
|
||
plot_dirty = True
|
||
|
||
try:
|
||
calib_cb.stateChanged.connect(lambda _v: _set_calib_enabled())
|
||
except Exception:
|
||
pass
|
||
try:
|
||
bg_subtract_cb.stateChanged.connect(lambda _v: _set_bg_subtract_enabled())
|
||
except Exception:
|
||
pass
|
||
|
||
if peak_calibrate_mode:
|
||
try:
|
||
calib_window = QtWidgets.QWidget()
|
||
try:
|
||
calib_window.setWindowTitle(f"{args.title} freq calibration")
|
||
except Exception:
|
||
pass
|
||
calib_layout = QtWidgets.QFormLayout(calib_window)
|
||
calib_layout.setContentsMargins(8, 8, 8, 8)
|
||
|
||
def _refresh_c_value_labels():
|
||
for idx, label in enumerate(c_value_labels):
|
||
try:
|
||
label.setText(f"{float(CALIBRATION_C[idx]):.6g}")
|
||
except Exception:
|
||
pass
|
||
|
||
def _apply_c_value(idx: int, edit):
|
||
global CALIBRATION_C, CALIBRATION_C_BASE
|
||
try:
|
||
CALIBRATION_C_BASE[idx] = float(edit.text().strip())
|
||
CALIBRATION_C = recalculate_calibration_c(CALIBRATION_C_BASE)
|
||
except Exception:
|
||
try:
|
||
edit.setText(f"{float(CALIBRATION_C_BASE[idx]):.6g}")
|
||
except Exception:
|
||
pass
|
||
_refresh_c_value_labels()
|
||
|
||
def _apply_all_c_values():
|
||
for idx, edit in enumerate(c_edits):
|
||
_apply_c_value(idx, edit)
|
||
|
||
for idx in range(3):
|
||
edit = QtWidgets.QLineEdit(f"{float(CALIBRATION_C_BASE[idx]):.6g}")
|
||
try:
|
||
edit.setMaximumWidth(120)
|
||
except Exception:
|
||
pass
|
||
try:
|
||
edit.editingFinished.connect(lambda i=idx, e=edit: _apply_c_value(i, e))
|
||
except Exception:
|
||
pass
|
||
calib_layout.addRow(f"C{idx}", edit)
|
||
c_edits.append(edit)
|
||
try:
|
||
update_btn = QtWidgets.QPushButton("Update")
|
||
update_btn.clicked.connect(lambda _checked=False: _apply_all_c_values())
|
||
calib_layout.addRow(update_btn)
|
||
except Exception:
|
||
pass
|
||
try:
|
||
calib_layout.addRow(QtWidgets.QLabel("Working C"), QtWidgets.QLabel(""))
|
||
except Exception:
|
||
pass
|
||
for idx in range(3):
|
||
label = QtWidgets.QLabel(f"{float(CALIBRATION_C[idx]):.6g}")
|
||
calib_layout.addRow(f"C*{idx}", label)
|
||
c_value_labels.append(label)
|
||
_refresh_c_value_labels()
|
||
try:
|
||
calib_window.show()
|
||
except Exception:
|
||
pass
|
||
except Exception:
|
||
pass
|
||
try:
|
||
settings_layout.addStretch(1)
|
||
except Exception:
|
||
pass
|
||
|
||
def ensure_buffer(_w: int):
|
||
nonlocal ring, ring_time, head, width, x_shared, ring_fft, distance_shared
|
||
if ring is not None:
|
||
return
|
||
width = WF_WIDTH
|
||
x_shared = np.linspace(SWEEP_FREQ_MIN_GHZ, SWEEP_FREQ_MAX_GHZ, width, dtype=np.float32)
|
||
ring = np.full((max_sweeps, width), np.nan, dtype=np.float32)
|
||
ring_time = np.full((max_sweeps,), np.nan, dtype=np.float64)
|
||
head = 0
|
||
# Водопад: время по оси X, X по оси Y (ось Y: номинальный диапазон свипа)
|
||
img.setImage(ring.T, autoLevels=False)
|
||
img.setRect(0, SWEEP_FREQ_MIN_GHZ, max_sweeps, SWEEP_FREQ_MAX_GHZ - SWEEP_FREQ_MIN_GHZ)
|
||
p_img.setRange(xRange=(0, max_sweeps - 1), yRange=(SWEEP_FREQ_MIN_GHZ, SWEEP_FREQ_MAX_GHZ), padding=0)
|
||
p_line.setXRange(SWEEP_FREQ_MIN_GHZ, SWEEP_FREQ_MAX_GHZ, padding=0)
|
||
# FFT: время по оси X, бин по оси Y
|
||
ring_fft = np.full((max_sweeps, fft_bins), np.nan, dtype=np.float32)
|
||
img_fft.setImage(ring_fft.T, autoLevels=False)
|
||
img_fft.setRect(0, 0.0, max_sweeps, 1.0)
|
||
p_spec.setRange(xRange=(0, max_sweeps - 1), yRange=(0.0, 1.0), padding=0)
|
||
p_fft.setXRange(0.0, 1.0, padding=0)
|
||
distance_shared = _compute_distance_axis(current_freqs, fft_bins)
|
||
|
||
def _update_physical_axes():
|
||
nonlocal distance_shared
|
||
if current_freqs is not None and current_freqs.size > 0:
|
||
finite_f = current_freqs[np.isfinite(current_freqs)]
|
||
if finite_f.size > 0:
|
||
f_min = float(np.min(finite_f))
|
||
f_max = float(np.max(finite_f))
|
||
if f_max <= f_min:
|
||
f_max = f_min + 1.0
|
||
img.setRect(0, f_min, max_sweeps, f_max - f_min)
|
||
p_img.setRange(xRange=(0, max_sweeps - 1), yRange=(f_min, f_max), padding=0)
|
||
|
||
distance_shared = _compute_distance_axis(current_freqs, fft_bins)
|
||
if distance_shared.size > 0:
|
||
d_min = float(distance_shared[0])
|
||
d_max = float(distance_shared[-1]) if distance_shared.size > 1 else float(distance_shared[0] + 1.0)
|
||
if d_max <= d_min:
|
||
d_max = d_min + 1.0
|
||
img_fft.setRect(0, d_min, max_sweeps, d_max - d_min)
|
||
p_spec.setRange(xRange=(0, max_sweeps - 1), yRange=(d_min, d_max), padding=0)
|
||
|
||
def _visible_levels_pyqtgraph(data: np.ndarray) -> Optional[Tuple[float, float]]:
|
||
"""(vmin, vmax) по центральным 90% значений в видимой области ImageItem."""
|
||
if data.size == 0:
|
||
return None
|
||
ny, nx = data.shape[0], data.shape[1]
|
||
try:
|
||
(x0, x1), (y0, y1) = p_img.viewRange()
|
||
except Exception:
|
||
x0, x1 = 0.0, float(nx - 1)
|
||
y0, y1 = 0.0, float(ny - 1)
|
||
xmin, xmax = sorted((float(x0), float(x1)))
|
||
ymin, ymax = sorted((float(y0), float(y1)))
|
||
ix0 = max(0, min(nx - 1, int(np.floor(xmin))))
|
||
ix1 = max(0, min(nx - 1, int(np.ceil(xmax))))
|
||
iy0 = max(0, min(ny - 1, int(np.floor(ymin))))
|
||
iy1 = max(0, min(ny - 1, int(np.ceil(ymax))))
|
||
if ix1 < ix0:
|
||
ix1 = ix0
|
||
if iy1 < iy0:
|
||
iy1 = iy0
|
||
sub = data[iy0 : iy1 + 1, ix0 : ix1 + 1]
|
||
finite = np.isfinite(sub)
|
||
if not finite.any():
|
||
return None
|
||
vals = sub[finite]
|
||
vmin = float(np.nanpercentile(vals, 5))
|
||
vmax = float(np.nanpercentile(vals, 95))
|
||
if not (np.isfinite(vmin) and np.isfinite(vmax)) or vmin == vmax:
|
||
return None
|
||
return (vmin, vmax)
|
||
|
||
def _visible_bg_fft(disp_fft: np.ndarray) -> Optional[np.ndarray]:
|
||
"""Оценка фона по медиане в текущем видимом по времени окне B-scan."""
|
||
if not bg_subtract_enabled or disp_fft.size == 0:
|
||
return None
|
||
ny, nx = disp_fft.shape
|
||
if ny <= 0 or nx <= 0:
|
||
return None
|
||
try:
|
||
(x0, x1), _ = p_spec.viewRange()
|
||
except Exception:
|
||
x0, x1 = 0.0, float(nx - 1)
|
||
xmin, xmax = sorted((float(x0), float(x1)))
|
||
ix0 = max(0, min(nx - 1, int(np.floor(xmin))))
|
||
ix1 = max(0, min(nx - 1, int(np.ceil(xmax))))
|
||
if ix1 < ix0:
|
||
ix1 = ix0
|
||
window = disp_fft[:, ix0 : ix1 + 1]
|
||
if window.size == 0:
|
||
return None
|
||
try:
|
||
bg_spec = np.nanmedian(window, axis=1)
|
||
except Exception:
|
||
return None
|
||
if not np.any(np.isfinite(bg_spec)):
|
||
return None
|
||
return np.nan_to_num(bg_spec, nan=0.0).astype(np.float32, copy=False)
|
||
|
||
def push_sweep(s: np.ndarray, freqs: Optional[np.ndarray] = None):
|
||
nonlocal ring, ring_time, head, ring_fft, y_min_fft, y_max_fft, current_fft_db
|
||
if s is None or s.size == 0 or ring is None:
|
||
return
|
||
w = ring.shape[1]
|
||
row = np.full((w,), np.nan, dtype=np.float32)
|
||
take = min(w, s.size)
|
||
row[:take] = s[:take]
|
||
ring[head, :] = row
|
||
if ring_time is not None:
|
||
ring_time[head] = time.time()
|
||
head = (head + 1) % ring.shape[0]
|
||
# FFT строка (линейный модуль; перевод в дБ делаем при отображении)
|
||
if ring_fft is not None:
|
||
bins = ring_fft.shape[1]
|
||
fft_mag = _compute_fft_mag_row(s, freqs, bins)
|
||
ring_fft[(head - 1) % ring_fft.shape[0], :] = fft_mag
|
||
fft_row = _fft_mag_to_db(fft_mag)
|
||
current_fft_db = fft_row
|
||
fr_min = np.nanmin(fft_row)
|
||
fr_max = np.nanmax(fft_row)
|
||
if y_min_fft is None or (not np.isnan(fr_min) and fr_min < y_min_fft):
|
||
y_min_fft = float(fr_min)
|
||
if y_max_fft is None or (not np.isnan(fr_max) and fr_max > y_max_fft):
|
||
y_max_fft = float(fr_max)
|
||
|
||
def drain_queue():
|
||
nonlocal current_freqs, current_distances, current_sweep_raw, current_aux_curves, current_sweep_norm, current_info, last_calib_sweep
|
||
drained = 0
|
||
while True:
|
||
try:
|
||
s, info, aux_curves = q.get_nowait()
|
||
except Empty:
|
||
break
|
||
drained += 1
|
||
calibrated = calibrate_freqs(
|
||
{
|
||
"F": np.linspace(SWEEP_FREQ_MIN_GHZ, SWEEP_FREQ_MAX_GHZ, s.size, dtype=np.float64),
|
||
"I": s,
|
||
}
|
||
)
|
||
current_freqs = calibrated["F"]
|
||
current_distances = _compute_distance_axis(current_freqs, fft_bins)
|
||
s = calibrated["I"]
|
||
current_sweep_raw = s
|
||
current_aux_curves = aux_curves
|
||
current_info = info
|
||
ch = 0
|
||
try:
|
||
ch = int(info.get("ch", 0)) if isinstance(info, dict) else 0
|
||
except Exception:
|
||
ch = 0
|
||
if ch == 0:
|
||
last_calib_sweep = s
|
||
current_sweep_norm = None
|
||
sweep_for_proc = s
|
||
else:
|
||
if calib_enabled and last_calib_sweep is not None:
|
||
current_sweep_norm = _normalize_sweep(s, last_calib_sweep)
|
||
sweep_for_proc = current_sweep_norm
|
||
else:
|
||
current_sweep_norm = None
|
||
sweep_for_proc = s
|
||
ensure_buffer(s.size)
|
||
push_sweep(sweep_for_proc, current_freqs)
|
||
if drained > 0:
|
||
_update_physical_axes()
|
||
return drained
|
||
|
||
# Попытка применить LUT из колормэпа (если доступен)
|
||
try:
|
||
cm_mod = getattr(pg, "colormap", None)
|
||
if cm_mod is not None:
|
||
cm = cm_mod.get(args.cmap)
|
||
lut = cm.getLookupTable(0.0, 1.0, 256)
|
||
img.setLookupTable(lut)
|
||
img_fft.setLookupTable(lut)
|
||
except Exception:
|
||
pass
|
||
|
||
def update():
|
||
nonlocal current_peak_width, current_peak_amplitude, plot_dirty, current_fft_db
|
||
if peak_calibrate_mode and any(edit.hasFocus() for edit in c_edits):
|
||
return
|
||
changed = drain_queue() > 0
|
||
redraw_needed = changed or plot_dirty
|
||
if redraw_needed and current_sweep_raw is not None and x_shared is not None:
|
||
if current_freqs is not None and current_freqs.size == current_sweep_raw.size:
|
||
xs = current_freqs
|
||
elif current_sweep_raw.size <= x_shared.size:
|
||
xs = x_shared[: current_sweep_raw.size]
|
||
else:
|
||
xs = np.arange(current_sweep_raw.size)
|
||
curve.setData(xs, current_sweep_raw, autoDownsample=True)
|
||
if current_aux_curves is not None:
|
||
avg_1_curve, avg_2_curve = current_aux_curves
|
||
curve_avg1.setData(xs[: avg_1_curve.size], avg_1_curve, autoDownsample=True)
|
||
curve_avg2.setData(xs[: avg_2_curve.size], avg_2_curve, autoDownsample=True)
|
||
else:
|
||
curve_avg1.setData([], [])
|
||
curve_avg2.setData([], [])
|
||
if last_calib_sweep is not None:
|
||
curve_calib.setData(xs[: last_calib_sweep.size], last_calib_sweep, autoDownsample=True)
|
||
else:
|
||
curve_calib.setData([], [])
|
||
if current_sweep_norm is not None:
|
||
curve_norm.setData(xs[: current_sweep_norm.size], current_sweep_norm, autoDownsample=True)
|
||
else:
|
||
curve_norm.setData([], [])
|
||
if fixed_ylim is None:
|
||
y_series = [current_sweep_raw, last_calib_sweep, current_sweep_norm]
|
||
if current_aux_curves is not None:
|
||
y_series.extend(current_aux_curves)
|
||
y_limits = _compute_auto_ylim(*y_series)
|
||
if y_limits is not None:
|
||
p_line.setYRange(y_limits[0], y_limits[1], padding=0)
|
||
if isinstance(xs, np.ndarray) and xs.size > 0:
|
||
finite_x = xs[np.isfinite(xs)]
|
||
if finite_x.size > 0:
|
||
p_line.setXRange(float(np.min(finite_x)), float(np.max(finite_x)), padding=0)
|
||
|
||
# Обновим спектр
|
||
sweep_for_fft = current_sweep_norm if current_sweep_norm is not None else current_sweep_raw
|
||
if sweep_for_fft.size > 0 and distance_shared is not None:
|
||
if (
|
||
current_fft_db is None
|
||
or current_fft_db.size != distance_shared.size
|
||
or plot_dirty
|
||
):
|
||
current_fft_db = _compute_fft_row(sweep_for_fft, current_freqs, distance_shared.size)
|
||
fft_vals = current_fft_db
|
||
xs_fft = current_distances if current_distances is not None else distance_shared
|
||
if fft_vals.size > xs_fft.size:
|
||
fft_vals = fft_vals[: xs_fft.size]
|
||
curve_fft.setData(xs_fft[: fft_vals.size], fft_vals)
|
||
finite_x = xs_fft[: fft_vals.size][np.isfinite(xs_fft[: fft_vals.size])]
|
||
if finite_x.size > 0:
|
||
p_fft.setXRange(float(np.min(finite_x)), float(np.max(finite_x)), padding=0)
|
||
p_fft.setYRange(float(np.nanmin(fft_vals)), float(np.nanmax(fft_vals)), padding=0)
|
||
if peak_calibrate_mode:
|
||
markers = _find_peak_width_markers(xs_fft[: fft_vals.size], fft_vals)
|
||
if markers is not None:
|
||
fft_bg_line.setValue(markers["background"])
|
||
fft_left_line.setValue(markers["left"])
|
||
fft_right_line.setValue(markers["right"])
|
||
spec_left_line.setValue(markers["left"])
|
||
spec_right_line.setValue(markers["right"])
|
||
fft_bg_line.setVisible(True)
|
||
fft_left_line.setVisible(True)
|
||
fft_right_line.setVisible(True)
|
||
spec_left_line.setVisible(True)
|
||
spec_right_line.setVisible(True)
|
||
current_peak_width = markers["width"]
|
||
current_peak_amplitude = markers["amplitude"]
|
||
else:
|
||
fft_bg_line.setVisible(False)
|
||
fft_left_line.setVisible(False)
|
||
fft_right_line.setVisible(False)
|
||
spec_left_line.setVisible(False)
|
||
spec_right_line.setVisible(False)
|
||
current_peak_width = None
|
||
current_peak_amplitude = None
|
||
else:
|
||
fft_bg_line.setVisible(False)
|
||
fft_left_line.setVisible(False)
|
||
fft_right_line.setVisible(False)
|
||
spec_left_line.setVisible(False)
|
||
spec_right_line.setVisible(False)
|
||
current_peak_width = None
|
||
current_peak_amplitude = None
|
||
plot_dirty = False
|
||
|
||
if changed and ring is not None:
|
||
disp = ring if head == 0 else np.roll(ring, -head, axis=0)
|
||
disp = disp.T # (width, time with newest at right)
|
||
levels = _visible_levels_pyqtgraph(disp)
|
||
if levels is not None:
|
||
img.setImage(disp, autoLevels=False, levels=levels)
|
||
else:
|
||
img.setImage(disp, autoLevels=False)
|
||
|
||
if changed and current_info:
|
||
try:
|
||
status_payload = dict(current_info)
|
||
if peak_calibrate_mode and current_peak_width is not None:
|
||
status_payload["peak_w"] = current_peak_width
|
||
if peak_calibrate_mode and current_peak_amplitude is not None:
|
||
status_payload["peak_a"] = current_peak_amplitude
|
||
status.setText(_format_status_kv(status_payload))
|
||
except Exception:
|
||
pass
|
||
try:
|
||
chs = current_info.get("chs") if isinstance(current_info, dict) else None
|
||
if chs is None:
|
||
chs = current_info.get("ch") if isinstance(current_info, dict) else None
|
||
if chs is None:
|
||
ch_text.setText("")
|
||
else:
|
||
if isinstance(chs, (list, tuple, set)):
|
||
ch_list = sorted(int(v) for v in chs)
|
||
ch_text_val = ", ".join(str(v) for v in ch_list)
|
||
else:
|
||
ch_text_val = str(int(chs))
|
||
ch_text.setText(f"chs {ch_text_val}")
|
||
(x0, x1), (y0, y1) = p_line.viewRange()
|
||
dx = 0.01 * max(1.0, float(x1 - x0))
|
||
dy = 0.01 * max(1.0, float(y1 - y0))
|
||
ch_text.setPos(float(x1 - dx), float(y1 - dy))
|
||
except Exception:
|
||
pass
|
||
|
||
if changed and ring_fft is not None:
|
||
disp_fft_lin = ring_fft if head == 0 else np.roll(ring_fft, -head, axis=0)
|
||
disp_fft_lin = disp_fft_lin.T
|
||
if spec_mean_sec > 0.0 and ring_time is not None:
|
||
disp_times = ring_time if head == 0 else np.roll(ring_time, -head)
|
||
now_t = time.time()
|
||
mask = np.isfinite(disp_times) & (disp_times >= (now_t - spec_mean_sec))
|
||
if np.any(mask):
|
||
try:
|
||
mean_spec = np.nanmean(disp_fft_lin[:, mask], axis=1)
|
||
mean_spec = np.nan_to_num(mean_spec, nan=0.0)
|
||
disp_fft_lin = disp_fft_lin - mean_spec[:, None]
|
||
except Exception:
|
||
pass
|
||
bg_spec = _visible_bg_fft(disp_fft_lin)
|
||
if bg_spec is not None:
|
||
num = np.maximum(disp_fft_lin, 0.0).astype(np.float32, copy=False) + 1e-9
|
||
den = bg_spec[:, None] + 1e-9
|
||
disp_fft = (20.0 * np.log10(num / den)).astype(np.float32, copy=False)
|
||
else:
|
||
disp_fft = _fft_mag_to_db(disp_fft_lin)
|
||
# Автодиапазон по среднему спектру за видимый интервал (как в хорошей версии)
|
||
levels = None
|
||
if bg_spec is not None:
|
||
try:
|
||
p5 = float(np.nanpercentile(disp_fft, 5))
|
||
p95 = float(np.nanpercentile(disp_fft, 95))
|
||
span = max(abs(p5), abs(p95))
|
||
if np.isfinite(span) and span > 0.0:
|
||
levels = (-span, span)
|
||
except Exception:
|
||
levels = None
|
||
else:
|
||
try:
|
||
mean_spec = np.nanmean(disp_fft, axis=1)
|
||
vmin_v = float(np.nanmin(mean_spec))
|
||
vmax_v = float(np.nanmax(mean_spec))
|
||
if np.isfinite(vmin_v) and np.isfinite(vmax_v) and vmin_v != vmax_v:
|
||
levels = (vmin_v, vmax_v)
|
||
except Exception:
|
||
levels = None
|
||
# Процентильная обрезка как запасной вариант
|
||
if levels is None and spec_clip is not None:
|
||
try:
|
||
vmin_v = float(np.nanpercentile(disp_fft, spec_clip[0]))
|
||
vmax_v = float(np.nanpercentile(disp_fft, spec_clip[1]))
|
||
if np.isfinite(vmin_v) and np.isfinite(vmax_v) and vmin_v != vmax_v:
|
||
levels = (vmin_v, vmax_v)
|
||
except Exception:
|
||
levels = None
|
||
# Ещё один фолбэк — глобальные накопленные мин/макс
|
||
if levels is None and y_min_fft is not None and y_max_fft is not None and np.isfinite(y_min_fft) and np.isfinite(y_max_fft) and y_min_fft != y_max_fft:
|
||
levels = (y_min_fft, y_max_fft)
|
||
if levels is not None:
|
||
img_fft.setImage(disp_fft, autoLevels=False, levels=levels)
|
||
else:
|
||
img_fft.setImage(disp_fft, autoLevels=False)
|
||
|
||
timer = pg.QtCore.QTimer()
|
||
timer.timeout.connect(update)
|
||
timer.start(interval_ms)
|
||
sigint_requested = threading.Event()
|
||
sigint_timer = pg.QtCore.QTimer()
|
||
sigint_timer.setInterval(50)
|
||
sigint_timer.timeout.connect(lambda: app.quit() if sigint_requested.is_set() else None)
|
||
sigint_timer.start()
|
||
cleanup_done = False
|
||
|
||
def on_quit():
|
||
nonlocal cleanup_done
|
||
if cleanup_done:
|
||
return
|
||
cleanup_done = True
|
||
try:
|
||
timer.stop()
|
||
except Exception:
|
||
pass
|
||
try:
|
||
sigint_timer.stop()
|
||
except Exception:
|
||
pass
|
||
stop_event.set()
|
||
reader.join(timeout=1.0)
|
||
try:
|
||
main_window.close()
|
||
except Exception:
|
||
pass
|
||
if calib_window is not None:
|
||
try:
|
||
calib_window.close()
|
||
except Exception:
|
||
pass
|
||
|
||
def _handle_sigint(_signum, _frame):
|
||
sigint_requested.set()
|
||
|
||
prev_sigint = signal.getsignal(signal.SIGINT)
|
||
try:
|
||
signal.signal(signal.SIGINT, _handle_sigint)
|
||
except Exception:
|
||
prev_sigint = None
|
||
|
||
orig_close_event = getattr(main_window, "closeEvent", None)
|
||
|
||
def _close_event(event):
|
||
try:
|
||
if callable(orig_close_event):
|
||
orig_close_event(event)
|
||
else:
|
||
event.accept()
|
||
except Exception:
|
||
try:
|
||
event.accept()
|
||
except Exception:
|
||
pass
|
||
try:
|
||
app.quit()
|
||
except Exception:
|
||
pass
|
||
|
||
try:
|
||
main_window.closeEvent = _close_event # type: ignore[method-assign]
|
||
except Exception:
|
||
pass
|
||
|
||
app.aboutToQuit.connect(on_quit)
|
||
try:
|
||
main_window.resize(1200, 680)
|
||
except Exception:
|
||
pass
|
||
main_window.show()
|
||
exec_fn = getattr(app, "exec_", None) or getattr(app, "exec", None)
|
||
try:
|
||
exec_fn()
|
||
finally:
|
||
on_quit()
|
||
if prev_sigint is not None:
|
||
try:
|
||
signal.signal(signal.SIGINT, prev_sigint)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|