4 Commits

View File

@ -25,8 +25,9 @@ import os
import sys
import threading
import time
from collections import deque
from queue import Queue, Empty, Full
from typing import Optional, Tuple
from typing import Any, Dict, Mapping, Optional, Tuple, Union
import numpy as np
@ -36,6 +37,31 @@ FFT_LEN = 1024 # длина БПФ для спектра/водопада сп
# считаем, что сигнал «меньше нуля» и домножаем свип на -1
DATA_INVERSION_THRASHOLD = 10.0
Number = Union[int, float]
SweepInfo = Dict[str, Number]
SweepPacket = Tuple[np.ndarray, SweepInfo]
def _format_status_kv(data: Mapping[str, Any]) -> str:
"""Преобразовать словарь метрик в одну строку 'k:v'."""
def _fmt(v: Any) -> str:
if v is None:
return "NA"
try:
fv = float(v)
except Exception:
return str(v)
if not np.isfinite(fv):
return "nan"
# Достаточно компактно для статус-строки.
if abs(fv) >= 1000 or (0 < abs(fv) < 0.01):
return f"{fv:.3g}"
return f"{fv:.3f}".rstrip("0").rstrip(".")
parts = [f"{k}:{_fmt(v)}" for k, v in data.items()]
return " ".join(parts)
def _parse_spec_clip(spec: Optional[str]) -> Optional[Tuple[float, float]]:
"""Разобрать строку вида "low,high" процентов для контрастного отображения водопада спектров.
@ -157,9 +183,10 @@ class SerialLineSource:
if self._pyserial is None:
self._fdreader = open_raw_tty(path, baud)
if self._fdreader is None:
raise RuntimeError(
f"Не удалось открыть порт '{path}' (pyserial и raw TTY не сработали)"
)
msg = f"Не удалось открыть порт '{path}' (pyserial и raw TTY не сработали)"
if sys.platform.startswith("win"):
msg += ". На Windows нужен pyserial: pip install pyserial"
raise RuntimeError(msg)
def readline(self) -> bytes:
if self._pyserial is not None:
@ -240,7 +267,14 @@ class SerialChunkReader:
class SweepReader(threading.Thread):
"""Фоновый поток: читает строки, формирует завершённые свипы и кладёт в очередь."""
def __init__(self, port_path: str, baud: int, out_queue: Queue, stop_event: threading.Event, fancy: bool = False):
def __init__(
self,
port_path: str,
baud: int,
out_queue: Queue[SweepPacket],
stop_event: threading.Event,
fancy: bool = False,
):
super().__init__(daemon=True)
self._port_path = port_path
self._baud = baud
@ -249,6 +283,9 @@ class SweepReader(threading.Thread):
self._src: Optional[SerialLineSource] = None
self._fancy = bool(fancy)
self._max_width: int = 0
self._sweep_idx: int = 0
self._last_sweep_ts: Optional[float] = None
self._n_valid_hist = deque()
def _finalize_current(self, xs, ys):
if not xs:
@ -268,6 +305,10 @@ class SweepReader(threading.Thread):
for x, y in zip(xs, ys):
if 0 <= x < target_width:
sweep[x] = float(y)
# Метрики валидных точек до заполнения пропусков
finite_pre = np.isfinite(sweep)
n_valid_cur = int(np.count_nonzero(finite_pre))
# Дополнительная обработка пропусков: при --fancy заполняем внутренние разрывы, края и дотягиваем до максимальной длины
if self._fancy:
try:
@ -295,16 +336,51 @@ class SweepReader(threading.Thread):
sweep *= -1.0
except Exception:
pass
sweep -= float(np.nanmean(sweep))
# Метрики для статусной строки (вид словаря: переменная -> значение)
self._sweep_idx += 1
now = time.time()
if self._last_sweep_ts is None:
dt_ms = float("nan")
else:
dt_ms = (now - self._last_sweep_ts) * 1000.0
self._last_sweep_ts = now
self._n_valid_hist.append((now, n_valid_cur))
while self._n_valid_hist and (now - self._n_valid_hist[0][0]) > 1.0:
self._n_valid_hist.popleft()
if self._n_valid_hist:
n_valid = float(sum(v for _t, v in self._n_valid_hist) / len(self._n_valid_hist))
else:
n_valid = float(n_valid_cur)
if n_valid_cur > 0:
vmin = float(np.nanmin(sweep))
vmax = float(np.nanmax(sweep))
mean = float(np.nanmean(sweep))
std = float(np.nanstd(sweep))
else:
vmin = vmax = mean = std = float("nan")
info: SweepInfo = {
"sweep": self._sweep_idx,
"n_valid": n_valid,
"min": vmin,
"max": vmax,
"mean": mean,
"std": std,
"dt_ms": dt_ms,
}
# Кладём готовый свип (если очередь полна — выбрасываем самый старый)
try:
self._q.put_nowait(sweep)
self._q.put_nowait((sweep, info))
except Full:
try:
_ = self._q.get_nowait()
except Exception:
pass
try:
self._q.put_nowait(sweep)
self._q.put_nowait((sweep, info))
except Exception:
pass
@ -386,7 +462,10 @@ def main():
"последний свип и водопад (реалтайм)."
)
)
parser.add_argument("port", help="Путь к порту, например /dev/ttyACM1")
parser.add_argument(
"port",
help="Путь к порту, например /dev/ttyACM1 или COM3 (COM10+: \\\\.\\COM10)",
)
parser.add_argument("--baud", type=int, default=115200, help="Скорость (по умолчанию 115200)")
parser.add_argument("--max-sweeps", type=int, default=200, help="Количество видимых свипов в водопаде")
parser.add_argument("--max-fps", type=float, default=30.0, help="Лимит частоты отрисовки, кадров/с")
@ -421,7 +500,7 @@ def main():
args = parser.parse_args()
# Попробуем быстрый бэкенд (pyqtgraph) при auto/pg
if args.backend in ("auto", "pg"):
if args.backend in ("pg"):
try:
return run_pyqtgraph(args)
except Exception as e:
@ -440,7 +519,7 @@ def main():
sys.exit(1)
# Очередь завершённых свипов и поток чтения
q: Queue[np.ndarray] = Queue(maxsize=1000)
q: Queue[SweepPacket] = Queue(maxsize=1000)
stop_event = threading.Event()
reader = SweepReader(args.port, args.baud, q, stop_event, fancy=bool(args.fancy))
reader.start()
@ -454,13 +533,14 @@ def main():
# Состояние для отображения
current_sweep: Optional[np.ndarray] = None
current_info: Optional[SweepInfo] = None
x_shared: Optional[np.ndarray] = None
width: Optional[int] = None
max_sweeps = int(max(10, args.max_sweeps))
ring = None # type: Optional[np.ndarray]
ring_time = None # type: Optional[np.ndarray]
head = 0
y_min, y_max = None, None
# Авто-уровни цветовой шкалы водопада сырых данных пересчитываются по видимой области.
# FFT состояние
fft_bins = FFT_LEN // 2 + 1
ring_fft = None # type: Optional[np.ndarray]
@ -473,6 +553,17 @@ def main():
ymax_slider = None
contrast_slider = None
# Статусная строка (внизу окна)
status_text = fig.text(
0.01,
0.01,
"",
ha="left",
va="bottom",
fontsize=8,
family="monospace",
)
# Линейный график последнего свипа
line_obj, = ax_line.plot([], [], lw=1)
ax_line.set_title("Сырые данные", pad=1)
@ -582,8 +673,40 @@ def main():
ax_spec.set_ylim(0, max(1, fft_bins - 1))
freq_shared = np.arange(fft_bins, dtype=np.int32)
def _visible_levels_matplotlib(data: np.ndarray, axis) -> Optional[Tuple[float, float]]:
"""(vmin, vmax) по текущей видимой области imshow (без накопления по времени)."""
if data.size == 0:
return None
ny, nx = data.shape[0], data.shape[1]
try:
x0, x1 = axis.get_xlim()
y0, y1 = axis.get_ylim()
except Exception:
x0, x1 = 0.0, float(nx - 1)
y0, y1 = 0.0, float(ny - 1)
xmin, xmax = sorted((float(x0), float(x1)))
ymin, ymax = sorted((float(y0), float(y1)))
ix0 = max(0, min(nx - 1, int(np.floor(xmin))))
ix1 = max(0, min(nx - 1, int(np.ceil(xmax))))
iy0 = max(0, min(ny - 1, int(np.floor(ymin))))
iy1 = max(0, min(ny - 1, int(np.ceil(ymax))))
if ix1 < ix0:
ix1 = ix0
if iy1 < iy0:
iy1 = iy0
sub = data[iy0 : iy1 + 1, ix0 : ix1 + 1]
finite = np.isfinite(sub)
if not finite.any():
return None
vals = sub[finite]
vmin = float(np.min(vals))
vmax = float(np.max(vals))
if not (np.isfinite(vmin) and np.isfinite(vmax)) or vmin == vmax:
return None
return (vmin, vmax)
def push_sweep(s: np.ndarray):
nonlocal ring, head, y_min, y_max, ring_fft, y_min_fft, y_max_fft, ring_time
nonlocal ring, head, ring_fft, y_min_fft, y_max_fft, ring_time
if s is None or s.size == 0 or ring is None:
return
# Нормализуем длину до фиксированной ширины
@ -595,13 +718,6 @@ def main():
if ring_time is not None:
ring_time[head] = time.time()
head = (head + 1) % ring.shape[0]
# Обновляем мин/макс по данным (игнорим NaN)
sv_min = np.nanmin(row)
sv_max = np.nanmax(row)
if y_min is None or (not np.isnan(sv_min) and sv_min < y_min):
y_min = float(sv_min)
if y_max is None or (not np.isnan(sv_max) and sv_max > y_max):
y_max = float(sv_max)
# FFT строка (дБ)
if ring_fft is not None:
bins = ring_fft.shape[1]
@ -630,21 +746,23 @@ def main():
# Экстремумы для цветовой шкалы
fr_min = np.nanmin(fft_row)
fr_max = np.nanmax(fft_row)
fr_max = np.nanpercentile(fft_row, 90)
if y_min_fft is None or (not np.isnan(fr_min) and fr_min < y_min_fft):
y_min_fft = float(fr_min)
if y_max_fft is None or (not np.isnan(fr_max) and fr_max > y_max_fft):
y_max_fft = float(fr_max)
def drain_queue():
nonlocal current_sweep
nonlocal current_sweep, current_info
drained = 0
while True:
try:
s = q.get_nowait()
s, info = q.get_nowait()
except Empty:
break
drained += 1
current_sweep = s
current_info = info
ensure_buffer(s.size)
push_sweep(s)
return drained
@ -721,10 +839,10 @@ def main():
# Новые данные справа: без реверса
img_obj.set_data(disp)
# Подписи времени не обновляем динамически (оставляем авто-тики)
# Актуализируем цветовую шкалу только при расширении экстремумов
if y_min is not None and y_max is not None and np.isfinite(y_min) and np.isfinite(y_max):
if y_min != y_max:
img_obj.set_clim(vmin=y_min, vmax=y_max)
# Авто-уровни: по видимой области (не накапливаем за всё время)
levels = _visible_levels_matplotlib(disp, ax_img)
if levels is not None:
img_obj.set_clim(vmin=levels[0], vmax=levels[1])
# Обновление водопада спектров
if changed and ring_fft is not None:
@ -761,8 +879,11 @@ def main():
vmax_eff = vmin_v + c * (vmax_v - vmin_v)
img_fft_obj.set_clim(vmin=vmin_v, vmax=vmax_eff)
if changed and current_info:
status_text.set_text(_format_status_kv(current_info))
# Возвращаем обновлённые артисты
return (line_obj, img_obj, fft_line_obj, img_fft_obj)
return (line_obj, img_obj, fft_line_obj, img_fft_obj, status_text)
ani = FuncAnimation(fig, update, interval=interval_ms, blit=False)
@ -788,7 +909,7 @@ def run_pyqtgraph(args):
) from e
# Очередь завершённых свипов и поток чтения
q: Queue[np.ndarray] = Queue(maxsize=1000)
q: Queue[SweepPacket] = Queue(maxsize=1000)
stop_event = threading.Event()
reader = SweepReader(args.port, args.baud, q, stop_event, fancy=bool(args.fancy))
reader.start()
@ -844,13 +965,18 @@ def run_pyqtgraph(args):
img_fft = pg.ImageItem()
p_spec.addItem(img_fft)
# Статусная строка (внизу окна)
status = pg.LabelItem(justify="left")
win.addItem(status, row=2, col=0, colspan=2)
# Состояние
ring: Optional[np.ndarray] = None
head = 0
width: Optional[int] = None
x_shared: Optional[np.ndarray] = None
current_sweep: Optional[np.ndarray] = None
y_min, y_max = None, None
current_info: Optional[SweepInfo] = None
# Авто-уровни цветовой шкалы водопада сырых данных пересчитываются по видимой области.
# Для спектров
fft_bins = FFT_LEN // 2 + 1
ring_fft: Optional[np.ndarray] = None
@ -888,8 +1014,39 @@ def run_pyqtgraph(args):
p_fft.setXRange(0, max(1, fft_bins - 1), padding=0)
freq_shared = np.arange(fft_bins, dtype=np.int32)
def _visible_levels_pyqtgraph(data: np.ndarray) -> Optional[Tuple[float, float]]:
"""(vmin, vmax) по текущей видимой области ImageItem (без накопления по времени)."""
if data.size == 0:
return None
ny, nx = data.shape[0], data.shape[1]
try:
(x0, x1), (y0, y1) = p_img.viewRange()
except Exception:
x0, x1 = 0.0, float(nx - 1)
y0, y1 = 0.0, float(ny - 1)
xmin, xmax = sorted((float(x0), float(x1)))
ymin, ymax = sorted((float(y0), float(y1)))
ix0 = max(0, min(nx - 1, int(np.floor(xmin))))
ix1 = max(0, min(nx - 1, int(np.ceil(xmax))))
iy0 = max(0, min(ny - 1, int(np.floor(ymin))))
iy1 = max(0, min(ny - 1, int(np.ceil(ymax))))
if ix1 < ix0:
ix1 = ix0
if iy1 < iy0:
iy1 = iy0
sub = data[iy0 : iy1 + 1, ix0 : ix1 + 1]
finite = np.isfinite(sub)
if not finite.any():
return None
vals = sub[finite]
vmin = float(np.min(vals))
vmax = float(np.max(vals))
if not (np.isfinite(vmin) and np.isfinite(vmax)) or vmin == vmax:
return None
return (vmin, vmax)
def push_sweep(s: np.ndarray):
nonlocal ring, head, y_min, y_max, ring_fft, y_min_fft, y_max_fft
nonlocal ring, head, ring_fft, y_min_fft, y_max_fft
if s is None or s.size == 0 or ring is None:
return
w = ring.shape[1]
@ -898,13 +1055,6 @@ def run_pyqtgraph(args):
row[:take] = s[:take]
ring[head, :] = row
head = (head + 1) % ring.shape[0]
# Обновление экстремумов
sv_min = np.nanmin(row)
sv_max = np.nanmax(row)
if y_min is None or (not np.isnan(sv_min) and sv_min < y_min):
y_min = float(sv_min)
if y_max is None or (not np.isnan(sv_max) and sv_max > y_max):
y_max = float(sv_max)
# FFT строка (дБ)
if ring_fft is not None:
bins = ring_fft.shape[1]
@ -930,15 +1080,16 @@ def run_pyqtgraph(args):
y_max_fft = float(fr_max)
def drain_queue():
nonlocal current_sweep
nonlocal current_sweep, current_info
drained = 0
while True:
try:
s = q.get_nowait()
s, info = q.get_nowait()
except Empty:
break
drained += 1
current_sweep = s
current_info = info
ensure_buffer(s.size)
push_sweep(s)
return drained
@ -986,11 +1137,18 @@ def run_pyqtgraph(args):
if changed and ring is not None:
disp = ring if head == 0 else np.roll(ring, -head, axis=0)
disp = disp.T[:, ::-1] # (width, time with newest at left)
if y_min is not None and y_max is not None and y_min != y_max and np.isfinite(y_min) and np.isfinite(y_max):
img.setImage(disp, autoLevels=False, levels=(y_min, y_max))
levels = _visible_levels_pyqtgraph(disp)
if levels is not None:
img.setImage(disp, autoLevels=False, levels=levels)
else:
img.setImage(disp, autoLevels=False)
if changed and current_info:
try:
status.setText(_format_status_kv(current_info))
except Exception:
pass
if changed and ring_fft is not None:
disp_fft = ring_fft if head == 0 else np.roll(ring_fft, -head, axis=0)
disp_fft = disp_fft.T[:, ::-1]