From 66a318fff8e232e70b5833e6cb7d701918f58429 Mon Sep 17 00:00:00 2001 From: awe Date: Fri, 13 Feb 2026 17:32:04 +0300 Subject: [PATCH] add calibration file --- calib_envelope.npy | Bin 0 -> 3164 bytes rfg_adc_plotter/gui/matplotlib_backend.py | 32 +++++++-- rfg_adc_plotter/gui/pyqtgraph_backend.py | 46 +++++++++++-- rfg_adc_plotter/processing/normalizer.py | 38 +++++++++++ rfg_adc_plotter/state/app_state.py | 79 +++++++++++++++++++--- 5 files changed, 174 insertions(+), 21 deletions(-) create mode 100644 calib_envelope.npy diff --git a/calib_envelope.npy b/calib_envelope.npy new file mode 100644 index 0000000000000000000000000000000000000000..9053d4f604ebba00a6d93003e6ac2ff91ef38a58 GIT binary patch literal 3164 zcmbW3e~4676vwYW(Zy=}y@=Id+HT951J7TH)SLIPRQj=mFH(YL$sI&YhusP>ilI_A zu^L^CouOce%w$cvHx{nud3AC5`QbB%e@HKi@E!!?nNYf}4N6Yphw$u;)l z*zZApldOW+fC3;DIKH@q7CqwpZy4~thfF63KT{~U6T7+aASB3IF`4Pz?C3DEwOBOS|T zcEv2KH#S(=6H6rCLf6N-cS8SubSJQ##%}_(H&R~&F5;)X?|~Hlo%pXpj{5V!rkcn0 ztUane)tQDq$bn+WwgnoY1#FlD(_qBL4_-yCLKz0gl_%#uycR z1CGI2m_Z#iM#Yrh0Vw)$3;Es1TJtg}_jc&1BgwoSq99o{$u|y5pb>`Hdy4+=usv^Y zSc~crC8&Z4eIw+aOKq*pqoF+0wHlwbB5}sHtgphgT0e$jM8Y_d6}Mz(GauuB z&>pNDVD0Pur~0Ozfc{<--Bgc!0(*Dm%z8?+k$VWevFgN ztmJIDoS$W!qea-FumIGbWnfe92H1g|gDmJw={u?4tDy;s$^kkLTC4BU&${FE>3if$ za1!(l^{s4J2l^g5)BP|2MJR#3c^b5zdfL$?iM7{$UjVe-3Q|Y`f6y`d>TxIOXx0#KAKPF?VHFOI04x(PG_#SZx-AJ zx5FKvx8hc~7N&sWOk!++>!FEtGg+@S@|_7cf&7(wDoh6DH25~)do%KtzS>VA|9iOi zU&H-se_B7zVC6FB_-FcjhJK&IPd!vGRQIUg3(xxrSv?vJ^|H%PeH^Ed|CavASG`hy zRNqCY`TC%*iXokRRFB@BYd|$#2YORgtL)0Def$;aWRs72r#$CDd;O6-I{(_=4{#dv zEo-4|7`cqBd-4f#5spIzbUu{lJJ5Qa3)Lu}9Q1(h;U3Vv)LW}}_b_q^4naS>4LZkr zu?K$j_gMEx_bIp^$+~ep^EkAF4N-`|5|B>z`rnuqbn&EzeCDsSKk|83P9YFXXrY>0z=lqVOy zuiJflHnRqueEutnlYm~1`84!xpk7BndE}>aq5o-gr$>+npa*P_kEz3Eo`T#}(QlA`W7M}Ed0VeU* z&EP)XLR{tlXSn~=(tw{~JWCIT$yWlci$fapwra0>Yg3?}B*Rhju{T(<^jg;rHbnmw z&znc?R{UD%zkai4BS*q@`c2ch(f?l2P+pF#_$xp;mOzyE=T5)wY-YOu+Zyz>MlA-r gzDpxWuA;k=p#EQcf5ot@8|mdNV&26X)nQ_P0u2rk9smFU literal 0 HcmV?d00001 diff --git a/rfg_adc_plotter/gui/matplotlib_backend.py b/rfg_adc_plotter/gui/matplotlib_backend.py index 304660d..e7f737f 100644 --- a/rfg_adc_plotter/gui/matplotlib_backend.py +++ b/rfg_adc_plotter/gui/matplotlib_backend.py @@ -12,7 +12,7 @@ from rfg_adc_plotter.constants import FFT_LEN, FREQ_SPAN_GHZ, IFFT_LEN _IFFT_T_MAX_NS = float((IFFT_LEN - 1) / (FREQ_SPAN_GHZ * 1e9) * 1e9) from rfg_adc_plotter.io.sweep_reader import SweepReader from rfg_adc_plotter.processing.normalizer import build_calib_envelopes -from rfg_adc_plotter.state.app_state import AppState, format_status +from rfg_adc_plotter.state.app_state import CALIB_ENVELOPE_PATH, AppState, format_status from rfg_adc_plotter.state.ring_buffer import RingBuffer from rfg_adc_plotter.types import SweepPacket @@ -165,10 +165,16 @@ def run_matplotlib(args): ax_smax = fig.add_axes([0.95, 0.55, 0.02, 0.35]) ax_sctr = fig.add_axes([0.98, 0.55, 0.02, 0.35]) ax_cb = fig.add_axes([0.92, 0.45, 0.08, 0.08]) + ax_cb_file = fig.add_axes([0.92, 0.36, 0.08, 0.08]) ymin_slider = Slider(ax_smin, "Y min", 0, max(1, fft_bins - 1), valinit=0, valstep=1, orientation="vertical") ymax_slider = Slider(ax_smax, "Y max", 0, max(1, fft_bins - 1), valinit=max(1, fft_bins - 1), valstep=1, orientation="vertical") contrast_slider = Slider(ax_sctr, "Int max", 0, 100, valinit=100, valstep=1, orientation="vertical") calib_cb = CheckButtons(ax_cb, ["калибровка"], [False]) + calib_file_cb = CheckButtons(ax_cb_file, ["из файла"], [False]) + + import os as _os + if not _os.path.isfile(CALIB_ENVELOPE_PATH): + ax_cb_file.set_visible(False) def _on_ylim_change(_val): try: @@ -179,12 +185,30 @@ def run_matplotlib(args): except Exception: pass + def _on_calib_file_clicked(_v): + use_file = bool(calib_file_cb.get_status()[0]) + if use_file: + ok = state.load_calib_envelope(CALIB_ENVELOPE_PATH) + if ok: + state.set_calib_mode("file") + else: + calib_file_cb.set_active(0) # снять галочку + else: + state.set_calib_mode("live") + state.set_calib_enabled(bool(calib_cb.get_status()[0])) + + def _on_calib_clicked(_v): + import os as _os2 + if _os2.path.isfile(CALIB_ENVELOPE_PATH): + ax_cb_file.set_visible(True) + state.set_calib_enabled(bool(calib_cb.get_status()[0])) + fig.canvas.draw_idle() + ymin_slider.on_changed(_on_ylim_change) ymax_slider.on_changed(_on_ylim_change) contrast_slider.on_changed(lambda _v: fig.canvas.draw_idle()) - calib_cb.on_clicked(lambda _v: state.set_calib_enabled( - bool(calib_cb.get_status()[0]) - )) + calib_cb.on_clicked(_on_calib_clicked) + calib_file_cb.on_clicked(_on_calib_file_clicked) except Exception: calib_cb = None diff --git a/rfg_adc_plotter/gui/pyqtgraph_backend.py b/rfg_adc_plotter/gui/pyqtgraph_backend.py index d2a9ac1..ed76229 100644 --- a/rfg_adc_plotter/gui/pyqtgraph_backend.py +++ b/rfg_adc_plotter/gui/pyqtgraph_backend.py @@ -10,7 +10,7 @@ import numpy as np from rfg_adc_plotter.constants import FREQ_SPAN_GHZ, IFFT_LEN from rfg_adc_plotter.io.sweep_reader import SweepReader from rfg_adc_plotter.processing.normalizer import build_calib_envelopes -from rfg_adc_plotter.state.app_state import AppState, format_status +from rfg_adc_plotter.state.app_state import CALIB_ENVELOPE_PATH, AppState, format_status from rfg_adc_plotter.state.ring_buffer import RingBuffer from rfg_adc_plotter.types import SweepPacket @@ -184,12 +184,46 @@ def run_pyqtgraph(args): img_fft = pg.ImageItem() p_spec.addItem(img_fft) - # Чекбокс калибровки + # Чекбоксы калибровки — в одном контейнере + calib_widget = QtWidgets.QWidget() + calib_layout = QtWidgets.QHBoxLayout(calib_widget) + calib_layout.setContentsMargins(2, 2, 2, 2) + calib_layout.setSpacing(8) + calib_cb = QtWidgets.QCheckBox("калибровка") - cb_proxy = QtWidgets.QGraphicsProxyWidget() - cb_proxy.setWidget(calib_cb) - win.addItem(cb_proxy, row=2, col=1) - calib_cb.stateChanged.connect(lambda _v: state.set_calib_enabled(calib_cb.isChecked())) + calib_file_cb = QtWidgets.QCheckBox("из файла") + calib_file_cb.setEnabled(False) # активируется только если файл существует + + calib_layout.addWidget(calib_cb) + calib_layout.addWidget(calib_file_cb) + + cb_container_proxy = QtWidgets.QGraphicsProxyWidget() + cb_container_proxy.setWidget(calib_widget) + win.addItem(cb_container_proxy, row=2, col=1) + + def _check_file_cb_available(): + import os + calib_file_cb.setEnabled(os.path.isfile(CALIB_ENVELOPE_PATH)) + + _check_file_cb_available() + + def _on_calib_file_toggled(checked): + if checked: + ok = state.load_calib_envelope(CALIB_ENVELOPE_PATH) + if ok: + state.set_calib_mode("file") + else: + calib_file_cb.setChecked(False) + else: + state.set_calib_mode("live") + state.set_calib_enabled(calib_cb.isChecked()) + + def _on_calib_toggled(_v): + _check_file_cb_available() + state.set_calib_enabled(calib_cb.isChecked()) + + calib_cb.stateChanged.connect(_on_calib_toggled) + calib_file_cb.stateChanged.connect(lambda _v: _on_calib_file_toggled(calib_file_cb.isChecked())) # Статусная строка status = pg.LabelItem(justify="left") diff --git a/rfg_adc_plotter/processing/normalizer.py b/rfg_adc_plotter/processing/normalizer.py index 5d9c675..0b68760 100644 --- a/rfg_adc_plotter/processing/normalizer.py +++ b/rfg_adc_plotter/processing/normalizer.py @@ -109,3 +109,41 @@ def normalize_by_calib(raw: np.ndarray, calib: np.ndarray, norm_type: str) -> np if nt == "simple": return normalize_simple(raw, calib) return normalize_projector(raw, calib) + + +def normalize_by_envelope(raw: np.ndarray, envelope: np.ndarray) -> np.ndarray: + """Нормировка свипа через проекцию на огибающую из файла. + + Воспроизводит логику normalize_projector: проецирует raw в [-1000, +1000] + используя готовую верхнюю огибающую (upper = envelope, lower = -envelope). + """ + w = min(raw.size, envelope.size) + if w <= 0: + return raw + + out = np.full_like(raw, np.nan, dtype=np.float32) + raw_seg = np.asarray(raw[:w], dtype=np.float32) + upper = np.asarray(envelope[:w], dtype=np.float32) + lower = -upper + span = upper - lower # = 2 * upper + + finite_span = span[np.isfinite(span) & (span > 0)] + if finite_span.size > 0: + eps = max(float(np.median(finite_span)) * 1e-6, 1e-9) + else: + eps = 1e-9 + + valid = ( + np.isfinite(raw_seg) + & np.isfinite(lower) + & np.isfinite(upper) + & (span > eps) + ) + if np.any(valid): + proj = np.empty_like(raw_seg, dtype=np.float32) + proj[valid] = ((2.0 * (raw_seg[valid] - lower[valid]) / span[valid]) - 1.0) * 1000.0 + proj[valid] = np.clip(proj[valid], -1000.0, 1000.0) + proj[~valid] = np.nan + out[:w] = proj + + return out diff --git a/rfg_adc_plotter/state/app_state.py b/rfg_adc_plotter/state/app_state.py index 1ad5682..229b273 100644 --- a/rfg_adc_plotter/state/app_state.py +++ b/rfg_adc_plotter/state/app_state.py @@ -1,14 +1,21 @@ """Состояние приложения: текущие свипы и настройки калибровки/нормировки.""" +import os from queue import Empty, Queue from typing import Any, Dict, Mapping, Optional import numpy as np -from rfg_adc_plotter.processing.normalizer import normalize_by_calib +from rfg_adc_plotter.processing.normalizer import ( + build_calib_envelopes, + normalize_by_calib, + normalize_by_envelope, +) from rfg_adc_plotter.state.ring_buffer import RingBuffer from rfg_adc_plotter.types import SweepInfo, SweepPacket +CALIB_ENVELOPE_PATH = "calib_envelope.npy" + def format_status(data: Mapping[str, Any]) -> str: """Преобразовать словарь метрик в одну строку 'k:v'.""" @@ -44,21 +51,65 @@ class AppState: self.current_info: Optional[SweepInfo] = None self.calib_enabled: bool = False self.norm_type: str = norm_type + # "live" — нормировка по текущему ch0-свипу; "file" — по огибающей из файла + self.calib_mode: str = "live" + self.calib_file_envelope: Optional[np.ndarray] = None def _normalize(self, raw: np.ndarray, calib: np.ndarray) -> np.ndarray: + if self.calib_mode == "file" and self.calib_file_envelope is not None: + return normalize_by_envelope(raw, self.calib_file_envelope) return normalize_by_calib(raw, calib, self.norm_type) + def save_calib_envelope(self, path: str = CALIB_ENVELOPE_PATH) -> bool: + """Вычислить огибающую из last_calib_sweep и сохранить в файл. + + Возвращает True при успехе. + """ + if self.last_calib_sweep is None: + return False + try: + _lower, upper = build_calib_envelopes(self.last_calib_sweep) + np.save(path, upper) + return True + except Exception as exc: + import sys + sys.stderr.write(f"[warn] Не удалось сохранить огибающую: {exc}\n") + return False + + def load_calib_envelope(self, path: str = CALIB_ENVELOPE_PATH) -> bool: + """Загрузить огибающую из файла. + + Возвращает True при успехе. + """ + if not os.path.isfile(path): + return False + try: + env = np.load(path) + self.calib_file_envelope = np.asarray(env, dtype=np.float32) + return True + except Exception as exc: + import sys + sys.stderr.write(f"[warn] Не удалось загрузить огибающую: {exc}\n") + return False + + def set_calib_mode(self, mode: str): + """Переключить режим калибровки: 'live' или 'file'.""" + self.calib_mode = mode + def set_calib_enabled(self, enabled: bool): """Включить/выключить режим калибровки, пересчитать norm-свип.""" self.calib_enabled = enabled - if ( - self.calib_enabled - and self.current_sweep_raw is not None - and self.last_calib_sweep is not None - ): - self.current_sweep_norm = self._normalize( - self.current_sweep_raw, self.last_calib_sweep - ) + if self.calib_enabled and self.current_sweep_raw is not None: + if self.calib_mode == "file" and self.calib_file_envelope is not None: + self.current_sweep_norm = normalize_by_envelope( + self.current_sweep_raw, self.calib_file_envelope + ) + elif self.calib_mode == "live" and self.last_calib_sweep is not None: + self.current_sweep_norm = self._normalize( + self.current_sweep_raw, self.last_calib_sweep + ) + else: + self.current_sweep_norm = None else: self.current_sweep_norm = None @@ -86,11 +137,17 @@ class AppState: # Канал 0 — опорный (калибровочный) свип if ch == 0: self.last_calib_sweep = s + self.save_calib_envelope() self.current_sweep_norm = None sweep_for_ring = s else: - if self.calib_enabled and self.last_calib_sweep is not None: - self.current_sweep_norm = self._normalize(s, self.last_calib_sweep) + can_normalize = self.calib_enabled and ( + (self.calib_mode == "file" and self.calib_file_envelope is not None) + or (self.calib_mode == "live" and self.last_calib_sweep is not None) + ) + if can_normalize: + calib_ref = self.last_calib_sweep if self.last_calib_sweep is not None else s + self.current_sweep_norm = self._normalize(s, calib_ref) sweep_for_ring = self.current_sweep_norm else: self.current_sweep_norm = None