From c0b33ace678889108c6959dcf55e1f1beef782f7 Mon Sep 17 00:00:00 2001 From: Theodor Chikin Date: Wed, 28 Jan 2026 17:08:32 +0300 Subject: [PATCH] Now some debug variables displayed at the bottom of window --- RFG_ADC_dataplotter.py | 222 ++++++++++++++++++++++++++++++++++------- 1 file changed, 187 insertions(+), 35 deletions(-) diff --git a/RFG_ADC_dataplotter.py b/RFG_ADC_dataplotter.py index 1db9313..aad32c9 100755 --- a/RFG_ADC_dataplotter.py +++ b/RFG_ADC_dataplotter.py @@ -25,8 +25,9 @@ import os import sys import threading import time +from collections import deque from queue import Queue, Empty, Full -from typing import Optional, Tuple +from typing import Any, Dict, Mapping, Optional, Tuple, Union import numpy as np @@ -36,6 +37,31 @@ FFT_LEN = 1024 # длина БПФ для спектра/водопада сп # считаем, что сигнал «меньше нуля» и домножаем свип на -1 DATA_INVERSION_THRASHOLD = 10.0 +Number = Union[int, float] +SweepInfo = Dict[str, Number] +SweepPacket = Tuple[np.ndarray, SweepInfo] + + +def _format_status_kv(data: Mapping[str, Any]) -> str: + """Преобразовать словарь метрик в одну строку 'k:v'.""" + + def _fmt(v: Any) -> str: + if v is None: + return "NA" + try: + fv = float(v) + except Exception: + return str(v) + if not np.isfinite(fv): + return "nan" + # Достаточно компактно для статус-строки. + if abs(fv) >= 1000 or (0 < abs(fv) < 0.01): + return f"{fv:.3g}" + return f"{fv:.3f}".rstrip("0").rstrip(".") + + parts = [f"{k}:{_fmt(v)}" for k, v in data.items()] + return " ".join(parts) + def _parse_spec_clip(spec: Optional[str]) -> Optional[Tuple[float, float]]: """Разобрать строку вида "low,high" процентов для контрастного отображения водопада спектров. @@ -241,7 +267,14 @@ class SerialChunkReader: class SweepReader(threading.Thread): """Фоновый поток: читает строки, формирует завершённые свипы и кладёт в очередь.""" - def __init__(self, port_path: str, baud: int, out_queue: Queue, stop_event: threading.Event, fancy: bool = False): + def __init__( + self, + port_path: str, + baud: int, + out_queue: Queue[SweepPacket], + stop_event: threading.Event, + fancy: bool = False, + ): super().__init__(daemon=True) self._port_path = port_path self._baud = baud @@ -250,6 +283,9 @@ class SweepReader(threading.Thread): self._src: Optional[SerialLineSource] = None self._fancy = bool(fancy) self._max_width: int = 0 + self._sweep_idx: int = 0 + self._last_sweep_ts: Optional[float] = None + self._n_valid_hist = deque() def _finalize_current(self, xs, ys): if not xs: @@ -296,16 +332,53 @@ class SweepReader(threading.Thread): sweep *= -1.0 except Exception: pass + sweep -= float(np.nanmean(sweep)) + + # Метрики для статусной строки (вид словаря: переменная -> значение) + self._sweep_idx += 1 + now = time.time() + if self._last_sweep_ts is None: + dt_ms = float("nan") + else: + dt_ms = (now - self._last_sweep_ts) * 1000.0 + self._last_sweep_ts = now + finite = np.isfinite(sweep) + n_valid_cur = int(np.count_nonzero(finite)) + self._n_valid_hist.append((now, n_valid_cur)) + while self._n_valid_hist and (now - self._n_valid_hist[0][0]) > 10.0: + self._n_valid_hist.popleft() + if self._n_valid_hist: + n_valid = float(sum(v for _t, v in self._n_valid_hist) / len(self._n_valid_hist)) + else: + n_valid = float(n_valid_cur) + + if n_valid_cur > 0: + vmin = float(np.nanmin(sweep)) + vmax = float(np.nanmax(sweep)) + mean = float(np.nanmean(sweep)) + std = float(np.nanstd(sweep)) + else: + vmin = vmax = mean = std = float("nan") + info: SweepInfo = { + "sweep": self._sweep_idx, + "n_valid": n_valid, + "min": vmin, + "max": vmax, + "mean": mean, + "std": std, + "dt_ms": dt_ms, + } + # Кладём готовый свип (если очередь полна — выбрасываем самый старый) try: - self._q.put_nowait(sweep) + self._q.put_nowait((sweep, info)) except Full: try: _ = self._q.get_nowait() except Exception: pass try: - self._q.put_nowait(sweep) + self._q.put_nowait((sweep, info)) except Exception: pass @@ -444,7 +517,7 @@ def main(): sys.exit(1) # Очередь завершённых свипов и поток чтения - q: Queue[np.ndarray] = Queue(maxsize=1000) + q: Queue[SweepPacket] = Queue(maxsize=1000) stop_event = threading.Event() reader = SweepReader(args.port, args.baud, q, stop_event, fancy=bool(args.fancy)) reader.start() @@ -458,13 +531,14 @@ def main(): # Состояние для отображения current_sweep: Optional[np.ndarray] = None + current_info: Optional[SweepInfo] = None x_shared: Optional[np.ndarray] = None width: Optional[int] = None max_sweeps = int(max(10, args.max_sweeps)) ring = None # type: Optional[np.ndarray] ring_time = None # type: Optional[np.ndarray] head = 0 - y_min, y_max = None, None + # Авто-уровни цветовой шкалы водопада сырых данных пересчитываются по видимой области. # FFT состояние fft_bins = FFT_LEN // 2 + 1 ring_fft = None # type: Optional[np.ndarray] @@ -477,6 +551,17 @@ def main(): ymax_slider = None contrast_slider = None + # Статусная строка (внизу окна) + status_text = fig.text( + 0.01, + 0.01, + "", + ha="left", + va="bottom", + fontsize=8, + family="monospace", + ) + # Линейный график последнего свипа line_obj, = ax_line.plot([], [], lw=1) ax_line.set_title("Сырые данные", pad=1) @@ -586,8 +671,40 @@ def main(): ax_spec.set_ylim(0, max(1, fft_bins - 1)) freq_shared = np.arange(fft_bins, dtype=np.int32) + def _visible_levels_matplotlib(data: np.ndarray, axis) -> Optional[Tuple[float, float]]: + """(vmin, vmax) по текущей видимой области imshow (без накопления по времени).""" + if data.size == 0: + return None + ny, nx = data.shape[0], data.shape[1] + try: + x0, x1 = axis.get_xlim() + y0, y1 = axis.get_ylim() + except Exception: + x0, x1 = 0.0, float(nx - 1) + y0, y1 = 0.0, float(ny - 1) + xmin, xmax = sorted((float(x0), float(x1))) + ymin, ymax = sorted((float(y0), float(y1))) + ix0 = max(0, min(nx - 1, int(np.floor(xmin)))) + ix1 = max(0, min(nx - 1, int(np.ceil(xmax)))) + iy0 = max(0, min(ny - 1, int(np.floor(ymin)))) + iy1 = max(0, min(ny - 1, int(np.ceil(ymax)))) + if ix1 < ix0: + ix1 = ix0 + if iy1 < iy0: + iy1 = iy0 + sub = data[iy0 : iy1 + 1, ix0 : ix1 + 1] + finite = np.isfinite(sub) + if not finite.any(): + return None + vals = sub[finite] + vmin = float(np.min(vals)) + vmax = float(np.max(vals)) + if not (np.isfinite(vmin) and np.isfinite(vmax)) or vmin == vmax: + return None + return (vmin, vmax) + def push_sweep(s: np.ndarray): - nonlocal ring, head, y_min, y_max, ring_fft, y_min_fft, y_max_fft, ring_time + nonlocal ring, head, ring_fft, y_min_fft, y_max_fft, ring_time if s is None or s.size == 0 or ring is None: return # Нормализуем длину до фиксированной ширины @@ -599,13 +716,6 @@ def main(): if ring_time is not None: ring_time[head] = time.time() head = (head + 1) % ring.shape[0] - # Обновляем мин/макс по данным (игнорим NaN) - sv_min = np.nanmin(row) - sv_max = np.nanmax(row) - if y_min is None or (not np.isnan(sv_min) and sv_min < y_min): - y_min = float(sv_min) - if y_max is None or (not np.isnan(sv_max) and sv_max > y_max): - y_max = float(sv_max) # FFT строка (дБ) if ring_fft is not None: bins = ring_fft.shape[1] @@ -634,21 +744,23 @@ def main(): # Экстремумы для цветовой шкалы fr_min = np.nanmin(fft_row) fr_max = np.nanmax(fft_row) + fr_max = np.nanpercentile(fft_row, 90) if y_min_fft is None or (not np.isnan(fr_min) and fr_min < y_min_fft): y_min_fft = float(fr_min) if y_max_fft is None or (not np.isnan(fr_max) and fr_max > y_max_fft): y_max_fft = float(fr_max) def drain_queue(): - nonlocal current_sweep + nonlocal current_sweep, current_info drained = 0 while True: try: - s = q.get_nowait() + s, info = q.get_nowait() except Empty: break drained += 1 current_sweep = s + current_info = info ensure_buffer(s.size) push_sweep(s) return drained @@ -725,10 +837,10 @@ def main(): # Новые данные справа: без реверса img_obj.set_data(disp) # Подписи времени не обновляем динамически (оставляем авто-тики) - # Актуализируем цветовую шкалу только при расширении экстремумов - if y_min is not None and y_max is not None and np.isfinite(y_min) and np.isfinite(y_max): - if y_min != y_max: - img_obj.set_clim(vmin=y_min, vmax=y_max) + # Авто-уровни: по видимой области (не накапливаем за всё время) + levels = _visible_levels_matplotlib(disp, ax_img) + if levels is not None: + img_obj.set_clim(vmin=levels[0], vmax=levels[1]) # Обновление водопада спектров if changed and ring_fft is not None: @@ -765,8 +877,11 @@ def main(): vmax_eff = vmin_v + c * (vmax_v - vmin_v) img_fft_obj.set_clim(vmin=vmin_v, vmax=vmax_eff) + if changed and current_info: + status_text.set_text(_format_status_kv(current_info)) + # Возвращаем обновлённые артисты - return (line_obj, img_obj, fft_line_obj, img_fft_obj) + return (line_obj, img_obj, fft_line_obj, img_fft_obj, status_text) ani = FuncAnimation(fig, update, interval=interval_ms, blit=False) @@ -792,7 +907,7 @@ def run_pyqtgraph(args): ) from e # Очередь завершённых свипов и поток чтения - q: Queue[np.ndarray] = Queue(maxsize=1000) + q: Queue[SweepPacket] = Queue(maxsize=1000) stop_event = threading.Event() reader = SweepReader(args.port, args.baud, q, stop_event, fancy=bool(args.fancy)) reader.start() @@ -848,13 +963,18 @@ def run_pyqtgraph(args): img_fft = pg.ImageItem() p_spec.addItem(img_fft) + # Статусная строка (внизу окна) + status = pg.LabelItem(justify="left") + win.addItem(status, row=2, col=0, colspan=2) + # Состояние ring: Optional[np.ndarray] = None head = 0 width: Optional[int] = None x_shared: Optional[np.ndarray] = None current_sweep: Optional[np.ndarray] = None - y_min, y_max = None, None + current_info: Optional[SweepInfo] = None + # Авто-уровни цветовой шкалы водопада сырых данных пересчитываются по видимой области. # Для спектров fft_bins = FFT_LEN // 2 + 1 ring_fft: Optional[np.ndarray] = None @@ -892,8 +1012,39 @@ def run_pyqtgraph(args): p_fft.setXRange(0, max(1, fft_bins - 1), padding=0) freq_shared = np.arange(fft_bins, dtype=np.int32) + def _visible_levels_pyqtgraph(data: np.ndarray) -> Optional[Tuple[float, float]]: + """(vmin, vmax) по текущей видимой области ImageItem (без накопления по времени).""" + if data.size == 0: + return None + ny, nx = data.shape[0], data.shape[1] + try: + (x0, x1), (y0, y1) = p_img.viewRange() + except Exception: + x0, x1 = 0.0, float(nx - 1) + y0, y1 = 0.0, float(ny - 1) + xmin, xmax = sorted((float(x0), float(x1))) + ymin, ymax = sorted((float(y0), float(y1))) + ix0 = max(0, min(nx - 1, int(np.floor(xmin)))) + ix1 = max(0, min(nx - 1, int(np.ceil(xmax)))) + iy0 = max(0, min(ny - 1, int(np.floor(ymin)))) + iy1 = max(0, min(ny - 1, int(np.ceil(ymax)))) + if ix1 < ix0: + ix1 = ix0 + if iy1 < iy0: + iy1 = iy0 + sub = data[iy0 : iy1 + 1, ix0 : ix1 + 1] + finite = np.isfinite(sub) + if not finite.any(): + return None + vals = sub[finite] + vmin = float(np.min(vals)) + vmax = float(np.max(vals)) + if not (np.isfinite(vmin) and np.isfinite(vmax)) or vmin == vmax: + return None + return (vmin, vmax) + def push_sweep(s: np.ndarray): - nonlocal ring, head, y_min, y_max, ring_fft, y_min_fft, y_max_fft + nonlocal ring, head, ring_fft, y_min_fft, y_max_fft if s is None or s.size == 0 or ring is None: return w = ring.shape[1] @@ -902,13 +1053,6 @@ def run_pyqtgraph(args): row[:take] = s[:take] ring[head, :] = row head = (head + 1) % ring.shape[0] - # Обновление экстремумов - sv_min = np.nanmin(row) - sv_max = np.nanmax(row) - if y_min is None or (not np.isnan(sv_min) and sv_min < y_min): - y_min = float(sv_min) - if y_max is None or (not np.isnan(sv_max) and sv_max > y_max): - y_max = float(sv_max) # FFT строка (дБ) if ring_fft is not None: bins = ring_fft.shape[1] @@ -934,15 +1078,16 @@ def run_pyqtgraph(args): y_max_fft = float(fr_max) def drain_queue(): - nonlocal current_sweep + nonlocal current_sweep, current_info drained = 0 while True: try: - s = q.get_nowait() + s, info = q.get_nowait() except Empty: break drained += 1 current_sweep = s + current_info = info ensure_buffer(s.size) push_sweep(s) return drained @@ -990,11 +1135,18 @@ def run_pyqtgraph(args): if changed and ring is not None: disp = ring if head == 0 else np.roll(ring, -head, axis=0) disp = disp.T[:, ::-1] # (width, time with newest at left) - if y_min is not None and y_max is not None and y_min != y_max and np.isfinite(y_min) and np.isfinite(y_max): - img.setImage(disp, autoLevels=False, levels=(y_min, y_max)) + levels = _visible_levels_pyqtgraph(disp) + if levels is not None: + img.setImage(disp, autoLevels=False, levels=levels) else: img.setImage(disp, autoLevels=False) + if changed and current_info: + try: + status.setText(_format_status_kv(current_info)) + except Exception: + pass + if changed and ring_fft is not None: disp_fft = ring_fft if head == 0 else np.roll(ring_fft, -head, axis=0) disp_fft = disp_fft.T[:, ::-1]