diff --git a/RFG_ADC_dataplotter.py b/RFG_ADC_dataplotter.py index 4efe717..b3ab06d 100755 --- a/RFG_ADC_dataplotter.py +++ b/RFG_ADC_dataplotter.py @@ -38,7 +38,7 @@ FFT_LEN = 1024 # длина БПФ для спектра/водопада сп DATA_INVERSION_THRASHOLD = 10.0 Number = Union[int, float] -SweepInfo = Dict[str, Number] +SweepInfo = Dict[str, Any] SweepPacket = Tuple[np.ndarray, SweepInfo] @@ -338,7 +338,7 @@ class SweepReader(threading.Thread): sweep *= -1.0 except Exception: pass - sweep -= float(np.nanmean(sweep)) + #sweep -= float(np.nanmean(sweep)) # Метрики для статусной строки (вид словаря: переменная -> значение) self._sweep_idx += 1 @@ -549,7 +549,7 @@ def main(): import matplotlib import matplotlib.pyplot as plt from matplotlib.animation import FuncAnimation - from matplotlib.widgets import Slider + from matplotlib.widgets import Slider, CheckButtons except Exception as e: sys.stderr.write(f"[error] Нужны matplotlib и ее зависимости: {e}\n") sys.exit(1) @@ -568,7 +568,9 @@ def main(): fig.subplots_adjust(wspace=0.25, hspace=0.35, left=0.07, right=0.90, top=0.92, bottom=0.08) # Состояние для отображения - current_sweep: Optional[np.ndarray] = None + current_sweep_raw: Optional[np.ndarray] = None + current_sweep_norm: Optional[np.ndarray] = None + last_calib_sweep: Optional[np.ndarray] = None current_info: Optional[SweepInfo] = None x_shared: Optional[np.ndarray] = None width: Optional[int] = None @@ -589,6 +591,8 @@ def main(): ymin_slider = None ymax_slider = None contrast_slider = None + calib_enabled = False + cb = None # Статусная строка (внизу окна) status_text = fig.text( @@ -602,7 +606,9 @@ def main(): ) # Линейный график последнего свипа - line_obj, = ax_line.plot([], [], lw=1) + line_obj, = ax_line.plot([], [], lw=1, color="tab:blue") + line_calib_obj, = ax_line.plot([], [], lw=1, color="tab:red") + line_norm_obj, = ax_line.plot([], [], lw=1, color="tab:green") ax_line.set_title("Сырые данные", pad=1) ax_line.set_xlabel("F") ax_line.set_ylabel("") @@ -668,14 +674,37 @@ def main(): ax_spec.tick_params(axis="x", labelbottom=False) except Exception: pass + def _normalize_sweep(raw: np.ndarray, calib: np.ndarray) -> np.ndarray: + w = min(raw.size, calib.size) + if w <= 0: + return raw + out = np.full_like(raw, np.nan, dtype=np.float32) + with np.errstate(divide="ignore", invalid="ignore"): + out[:w] = raw[:w] / calib[:w] + out = np.nan_to_num(out, nan=np.nan, posinf=np.nan, neginf=np.nan) + return out + + def _set_calib_enabled(): + nonlocal calib_enabled, current_sweep_norm + try: + calib_enabled = bool(cb.get_status()[0]) if cb is not None else False + except Exception: + calib_enabled = False + if calib_enabled and current_sweep_raw is not None and last_calib_sweep is not None: + current_sweep_norm = _normalize_sweep(current_sweep_raw, last_calib_sweep) + else: + current_sweep_norm = None + # Слайдеры для управления осью Y B-scan (мин/макс) и контрастом try: ax_smin = fig.add_axes([0.92, 0.55, 0.02, 0.35]) ax_smax = fig.add_axes([0.95, 0.55, 0.02, 0.35]) ax_sctr = fig.add_axes([0.98, 0.55, 0.02, 0.35]) + ax_cb = fig.add_axes([0.92, 0.45, 0.08, 0.08]) ymin_slider = Slider(ax_smin, "Y min", 0, max(1, fft_bins - 1), valinit=0, valstep=1, orientation="vertical") ymax_slider = Slider(ax_smax, "Y max", 0, max(1, fft_bins - 1), valinit=max(1, fft_bins - 1), valstep=1, orientation="vertical") contrast_slider = Slider(ax_sctr, "Int max", 0, 100, valinit=100, valstep=1, orientation="vertical") + cb = CheckButtons(ax_cb, ["калибровка"], [False]) def _on_ylim_change(_val): try: @@ -690,6 +719,7 @@ def main(): ymax_slider.on_changed(_on_ylim_change) # Контраст влияет на верхнюю границу цветовой шкалы (процент от авто-диапазона) contrast_slider.on_changed(lambda _v: fig.canvas.draw_idle()) + cb.on_clicked(lambda _v: _set_calib_enabled()) except Exception: pass @@ -698,6 +728,7 @@ def main(): interval_ms = int(1000.0 / max_fps) frames_since_ylim_update = 0 + def ensure_buffer(_w: int): nonlocal ring, width, head, x_shared, ring_fft, freq_shared, ring_time if ring is not None: @@ -800,7 +831,7 @@ def main(): y_max_fft = float(fr_max) def drain_queue(): - nonlocal current_sweep, current_info + nonlocal current_sweep_raw, current_sweep_norm, current_info, last_calib_sweep drained = 0 while True: try: @@ -808,10 +839,26 @@ def main(): except Empty: break drained += 1 - current_sweep = s + current_sweep_raw = s current_info = info + ch = 0 + try: + ch = int(info.get("ch", 0)) if isinstance(info, dict) else 0 + except Exception: + ch = 0 + if ch == 0: + last_calib_sweep = s + current_sweep_norm = None + sweep_for_proc = s + else: + if calib_enabled and last_calib_sweep is not None: + current_sweep_norm = _normalize_sweep(s, last_calib_sweep) + sweep_for_proc = current_sweep_norm + else: + current_sweep_norm = None + sweep_for_proc = s ensure_buffer(s.size) - push_sweep(s) + push_sweep(sweep_for_proc) return drained def make_display_ring(): @@ -856,18 +903,26 @@ def main(): changed = drain_queue() > 0 # Обновление линии последнего свипа - if current_sweep is not None: - if x_shared is not None and current_sweep.size <= x_shared.size: - xs = x_shared[: current_sweep.size] + if current_sweep_raw is not None: + if x_shared is not None and current_sweep_raw.size <= x_shared.size: + xs = x_shared[: current_sweep_raw.size] else: - xs = np.arange(current_sweep.size, dtype=np.int32) - line_obj.set_data(xs, current_sweep) + xs = np.arange(current_sweep_raw.size, dtype=np.int32) + line_obj.set_data(xs, current_sweep_raw) + if last_calib_sweep is not None: + line_calib_obj.set_data(xs[: last_calib_sweep.size], last_calib_sweep) + else: + line_calib_obj.set_data([], []) + if current_sweep_norm is not None: + line_norm_obj.set_data(xs[: current_sweep_norm.size], current_sweep_norm) + else: + line_norm_obj.set_data([], []) # Лимиты по X постоянные под текущую ширину - ax_line.set_xlim(0, max(1, current_sweep.size - 1)) + ax_line.set_xlim(0, max(1, current_sweep_raw.size - 1)) # Адаптивные Y-лимиты (если не задан --ylim) if fixed_ylim is None: - y0 = float(np.nanmin(current_sweep)) - y1 = float(np.nanmax(current_sweep)) + y0 = float(np.nanmin(current_sweep_raw)) + y1 = float(np.nanmax(current_sweep_raw)) if np.isfinite(y0) and np.isfinite(y1): if y0 == y1: pad = max(1.0, abs(y0) * 0.05) @@ -880,10 +935,11 @@ def main(): ax_line.set_ylim(y0, y1) # Обновление спектра текущего свипа - take_fft = min(int(current_sweep.size), FFT_LEN) + sweep_for_fft = current_sweep_norm if current_sweep_norm is not None else current_sweep_raw + take_fft = min(int(sweep_for_fft.size), FFT_LEN) if take_fft > 0 and freq_shared is not None: fft_in = np.zeros((FFT_LEN,), dtype=np.float32) - seg = np.nan_to_num(current_sweep[:take_fft], nan=0.0).astype(np.float32, copy=False) + seg = np.nan_to_num(sweep_for_fft[:take_fft], nan=0.0).astype(np.float32, copy=False) win = np.hanning(take_fft).astype(np.float32) fft_in[:take_fft] = seg * win spec = np.fft.rfft(fft_in) @@ -964,7 +1020,16 @@ def main(): channel_text.set_text(f"chs {chs}") # Возвращаем обновлённые артисты - return (line_obj, img_obj, fft_line_obj, img_fft_obj, status_text, channel_text) + return ( + line_obj, + line_calib_obj, + line_norm_obj, + img_obj, + fft_line_obj, + img_fft_obj, + status_text, + channel_text, + ) ani = FuncAnimation(fig, update, interval=interval_ms, blit=False) @@ -1010,6 +1075,8 @@ def run_pyqtgraph(args): p_line = win.addPlot(row=0, col=0, title="Сырые данные") p_line.showGrid(x=True, y=True, alpha=0.3) curve = p_line.plot(pen=pg.mkPen((80, 120, 255), width=1)) + curve_calib = p_line.plot(pen=pg.mkPen((220, 60, 60), width=1)) + curve_norm = p_line.plot(pen=pg.mkPen((60, 180, 90), width=1)) p_line.setLabel("bottom", "X") p_line.setLabel("left", "Y") ch_text = pg.TextItem("", anchor=(1, 1)) @@ -1049,9 +1116,15 @@ def run_pyqtgraph(args): img_fft = pg.ImageItem() p_spec.addItem(img_fft) + # Чекбокс калибровки + calib_cb = QtWidgets.QCheckBox("калибровка") + cb_proxy = QtWidgets.QGraphicsProxyWidget() + cb_proxy.setWidget(calib_cb) + win.addItem(cb_proxy, row=2, col=1) + # Статусная строка (внизу окна) status = pg.LabelItem(justify="left") - win.addItem(status, row=2, col=0, colspan=2) + win.addItem(status, row=3, col=0, colspan=2) # Состояние ring: Optional[np.ndarray] = None @@ -1059,7 +1132,9 @@ def run_pyqtgraph(args): head = 0 width: Optional[int] = None x_shared: Optional[np.ndarray] = None - current_sweep: Optional[np.ndarray] = None + current_sweep_raw: Optional[np.ndarray] = None + current_sweep_norm: Optional[np.ndarray] = None + last_calib_sweep: Optional[np.ndarray] = None current_info: Optional[SweepInfo] = None # Авто-уровни цветовой шкалы водопада сырых данных пересчитываются по видимой области. # Для спектров @@ -1070,6 +1145,7 @@ def run_pyqtgraph(args): # Параметры контраста водопада спектров (процентильная обрезка) spec_clip = _parse_spec_clip(getattr(args, "spec_clip", None)) spec_mean_sec = float(getattr(args, "spec_mean_sec", 0.0)) + calib_enabled = False # Диапазон по Y: авто по умолчанию (поддерживает отрицательные значения) fixed_ylim: Optional[Tuple[float, float]] = None if args.ylim: @@ -1081,6 +1157,32 @@ def run_pyqtgraph(args): if fixed_ylim is not None: p_line.setYRange(fixed_ylim[0], fixed_ylim[1], padding=0) + def _normalize_sweep(raw: np.ndarray, calib: np.ndarray) -> np.ndarray: + w = min(raw.size, calib.size) + if w <= 0: + return raw + out = np.full_like(raw, np.nan, dtype=np.float32) + with np.errstate(divide="ignore", invalid="ignore"): + out[:w] = raw[:w] / calib[:w] + out = np.nan_to_num(out, nan=np.nan, posinf=np.nan, neginf=np.nan) + return out + + def _set_calib_enabled(): + nonlocal calib_enabled, current_sweep_norm + try: + calib_enabled = bool(calib_cb.isChecked()) + except Exception: + calib_enabled = False + if calib_enabled and current_sweep_raw is not None and last_calib_sweep is not None: + current_sweep_norm = _normalize_sweep(current_sweep_raw, last_calib_sweep) + else: + current_sweep_norm = None + + try: + calib_cb.stateChanged.connect(lambda _v: _set_calib_enabled()) + except Exception: + pass + def ensure_buffer(_w: int): nonlocal ring, ring_time, head, width, x_shared, ring_fft, freq_shared if ring is not None: @@ -1169,7 +1271,7 @@ def run_pyqtgraph(args): y_max_fft = float(fr_max) def drain_queue(): - nonlocal current_sweep, current_info + nonlocal current_sweep_raw, current_sweep_norm, current_info, last_calib_sweep drained = 0 while True: try: @@ -1177,10 +1279,26 @@ def run_pyqtgraph(args): except Empty: break drained += 1 - current_sweep = s + current_sweep_raw = s current_info = info + ch = 0 + try: + ch = int(info.get("ch", 0)) if isinstance(info, dict) else 0 + except Exception: + ch = 0 + if ch == 0: + last_calib_sweep = s + current_sweep_norm = None + sweep_for_proc = s + else: + if calib_enabled and last_calib_sweep is not None: + current_sweep_norm = _normalize_sweep(s, last_calib_sweep) + sweep_for_proc = current_sweep_norm + else: + current_sweep_norm = None + sweep_for_proc = s ensure_buffer(s.size) - push_sweep(s) + push_sweep(sweep_for_proc) return drained # Попытка применить LUT из колормэпа (если доступен) @@ -1194,24 +1312,33 @@ def run_pyqtgraph(args): def update(): changed = drain_queue() > 0 - if current_sweep is not None and x_shared is not None: - if current_sweep.size <= x_shared.size: - xs = x_shared[: current_sweep.size] + if current_sweep_raw is not None and x_shared is not None: + if current_sweep_raw.size <= x_shared.size: + xs = x_shared[: current_sweep_raw.size] else: - xs = np.arange(current_sweep.size) - curve.setData(xs, current_sweep, autoDownsample=True) + xs = np.arange(current_sweep_raw.size) + curve.setData(xs, current_sweep_raw, autoDownsample=True) + if last_calib_sweep is not None: + curve_calib.setData(xs[: last_calib_sweep.size], last_calib_sweep, autoDownsample=True) + else: + curve_calib.setData([], []) + if current_sweep_norm is not None: + curve_norm.setData(xs[: current_sweep_norm.size], current_sweep_norm, autoDownsample=True) + else: + curve_norm.setData([], []) if fixed_ylim is None: - y0 = float(np.nanmin(current_sweep)) - y1 = float(np.nanmax(current_sweep)) + y0 = float(np.nanmin(current_sweep_raw)) + y1 = float(np.nanmax(current_sweep_raw)) if np.isfinite(y0) and np.isfinite(y1): margin = 0.05 * max(1.0, (y1 - y0)) p_line.setYRange(y0 - margin, y1 + margin, padding=0) # Обновим спектр - take_fft = min(int(current_sweep.size), FFT_LEN) + sweep_for_fft = current_sweep_norm if current_sweep_norm is not None else current_sweep_raw + take_fft = min(int(sweep_for_fft.size), FFT_LEN) if take_fft > 0 and freq_shared is not None: fft_in = np.zeros((FFT_LEN,), dtype=np.float32) - seg = np.nan_to_num(current_sweep[:take_fft], nan=0.0).astype(np.float32, copy=False) + seg = np.nan_to_num(sweep_for_fft[:take_fft], nan=0.0).astype(np.float32, copy=False) win = np.hanning(take_fft).astype(np.float32) fft_in[:take_fft] = seg * win spec = np.fft.rfft(fft_in)