diff --git a/calib_envelope.npy b/calib_envelope.npy new file mode 100644 index 0000000..9053d4f Binary files /dev/null and b/calib_envelope.npy differ diff --git a/rfg_adc_plotter/gui/matplotlib_backend.py b/rfg_adc_plotter/gui/matplotlib_backend.py index 304660d..e7f737f 100644 --- a/rfg_adc_plotter/gui/matplotlib_backend.py +++ b/rfg_adc_plotter/gui/matplotlib_backend.py @@ -12,7 +12,7 @@ from rfg_adc_plotter.constants import FFT_LEN, FREQ_SPAN_GHZ, IFFT_LEN _IFFT_T_MAX_NS = float((IFFT_LEN - 1) / (FREQ_SPAN_GHZ * 1e9) * 1e9) from rfg_adc_plotter.io.sweep_reader import SweepReader from rfg_adc_plotter.processing.normalizer import build_calib_envelopes -from rfg_adc_plotter.state.app_state import AppState, format_status +from rfg_adc_plotter.state.app_state import CALIB_ENVELOPE_PATH, AppState, format_status from rfg_adc_plotter.state.ring_buffer import RingBuffer from rfg_adc_plotter.types import SweepPacket @@ -165,10 +165,16 @@ def run_matplotlib(args): ax_smax = fig.add_axes([0.95, 0.55, 0.02, 0.35]) ax_sctr = fig.add_axes([0.98, 0.55, 0.02, 0.35]) ax_cb = fig.add_axes([0.92, 0.45, 0.08, 0.08]) + ax_cb_file = fig.add_axes([0.92, 0.36, 0.08, 0.08]) ymin_slider = Slider(ax_smin, "Y min", 0, max(1, fft_bins - 1), valinit=0, valstep=1, orientation="vertical") ymax_slider = Slider(ax_smax, "Y max", 0, max(1, fft_bins - 1), valinit=max(1, fft_bins - 1), valstep=1, orientation="vertical") contrast_slider = Slider(ax_sctr, "Int max", 0, 100, valinit=100, valstep=1, orientation="vertical") calib_cb = CheckButtons(ax_cb, ["калибровка"], [False]) + calib_file_cb = CheckButtons(ax_cb_file, ["из файла"], [False]) + + import os as _os + if not _os.path.isfile(CALIB_ENVELOPE_PATH): + ax_cb_file.set_visible(False) def _on_ylim_change(_val): try: @@ -179,12 +185,30 @@ def run_matplotlib(args): except Exception: pass + def _on_calib_file_clicked(_v): + use_file = bool(calib_file_cb.get_status()[0]) + if use_file: + ok = state.load_calib_envelope(CALIB_ENVELOPE_PATH) + if ok: + state.set_calib_mode("file") + else: + calib_file_cb.set_active(0) # снять галочку + else: + state.set_calib_mode("live") + state.set_calib_enabled(bool(calib_cb.get_status()[0])) + + def _on_calib_clicked(_v): + import os as _os2 + if _os2.path.isfile(CALIB_ENVELOPE_PATH): + ax_cb_file.set_visible(True) + state.set_calib_enabled(bool(calib_cb.get_status()[0])) + fig.canvas.draw_idle() + ymin_slider.on_changed(_on_ylim_change) ymax_slider.on_changed(_on_ylim_change) contrast_slider.on_changed(lambda _v: fig.canvas.draw_idle()) - calib_cb.on_clicked(lambda _v: state.set_calib_enabled( - bool(calib_cb.get_status()[0]) - )) + calib_cb.on_clicked(_on_calib_clicked) + calib_file_cb.on_clicked(_on_calib_file_clicked) except Exception: calib_cb = None diff --git a/rfg_adc_plotter/gui/pyqtgraph_backend.py b/rfg_adc_plotter/gui/pyqtgraph_backend.py index d2a9ac1..ed76229 100644 --- a/rfg_adc_plotter/gui/pyqtgraph_backend.py +++ b/rfg_adc_plotter/gui/pyqtgraph_backend.py @@ -10,7 +10,7 @@ import numpy as np from rfg_adc_plotter.constants import FREQ_SPAN_GHZ, IFFT_LEN from rfg_adc_plotter.io.sweep_reader import SweepReader from rfg_adc_plotter.processing.normalizer import build_calib_envelopes -from rfg_adc_plotter.state.app_state import AppState, format_status +from rfg_adc_plotter.state.app_state import CALIB_ENVELOPE_PATH, AppState, format_status from rfg_adc_plotter.state.ring_buffer import RingBuffer from rfg_adc_plotter.types import SweepPacket @@ -184,12 +184,46 @@ def run_pyqtgraph(args): img_fft = pg.ImageItem() p_spec.addItem(img_fft) - # Чекбокс калибровки + # Чекбоксы калибровки — в одном контейнере + calib_widget = QtWidgets.QWidget() + calib_layout = QtWidgets.QHBoxLayout(calib_widget) + calib_layout.setContentsMargins(2, 2, 2, 2) + calib_layout.setSpacing(8) + calib_cb = QtWidgets.QCheckBox("калибровка") - cb_proxy = QtWidgets.QGraphicsProxyWidget() - cb_proxy.setWidget(calib_cb) - win.addItem(cb_proxy, row=2, col=1) - calib_cb.stateChanged.connect(lambda _v: state.set_calib_enabled(calib_cb.isChecked())) + calib_file_cb = QtWidgets.QCheckBox("из файла") + calib_file_cb.setEnabled(False) # активируется только если файл существует + + calib_layout.addWidget(calib_cb) + calib_layout.addWidget(calib_file_cb) + + cb_container_proxy = QtWidgets.QGraphicsProxyWidget() + cb_container_proxy.setWidget(calib_widget) + win.addItem(cb_container_proxy, row=2, col=1) + + def _check_file_cb_available(): + import os + calib_file_cb.setEnabled(os.path.isfile(CALIB_ENVELOPE_PATH)) + + _check_file_cb_available() + + def _on_calib_file_toggled(checked): + if checked: + ok = state.load_calib_envelope(CALIB_ENVELOPE_PATH) + if ok: + state.set_calib_mode("file") + else: + calib_file_cb.setChecked(False) + else: + state.set_calib_mode("live") + state.set_calib_enabled(calib_cb.isChecked()) + + def _on_calib_toggled(_v): + _check_file_cb_available() + state.set_calib_enabled(calib_cb.isChecked()) + + calib_cb.stateChanged.connect(_on_calib_toggled) + calib_file_cb.stateChanged.connect(lambda _v: _on_calib_file_toggled(calib_file_cb.isChecked())) # Статусная строка status = pg.LabelItem(justify="left") diff --git a/rfg_adc_plotter/processing/normalizer.py b/rfg_adc_plotter/processing/normalizer.py index 5d9c675..0b68760 100644 --- a/rfg_adc_plotter/processing/normalizer.py +++ b/rfg_adc_plotter/processing/normalizer.py @@ -109,3 +109,41 @@ def normalize_by_calib(raw: np.ndarray, calib: np.ndarray, norm_type: str) -> np if nt == "simple": return normalize_simple(raw, calib) return normalize_projector(raw, calib) + + +def normalize_by_envelope(raw: np.ndarray, envelope: np.ndarray) -> np.ndarray: + """Нормировка свипа через проекцию на огибающую из файла. + + Воспроизводит логику normalize_projector: проецирует raw в [-1000, +1000] + используя готовую верхнюю огибающую (upper = envelope, lower = -envelope). + """ + w = min(raw.size, envelope.size) + if w <= 0: + return raw + + out = np.full_like(raw, np.nan, dtype=np.float32) + raw_seg = np.asarray(raw[:w], dtype=np.float32) + upper = np.asarray(envelope[:w], dtype=np.float32) + lower = -upper + span = upper - lower # = 2 * upper + + finite_span = span[np.isfinite(span) & (span > 0)] + if finite_span.size > 0: + eps = max(float(np.median(finite_span)) * 1e-6, 1e-9) + else: + eps = 1e-9 + + valid = ( + np.isfinite(raw_seg) + & np.isfinite(lower) + & np.isfinite(upper) + & (span > eps) + ) + if np.any(valid): + proj = np.empty_like(raw_seg, dtype=np.float32) + proj[valid] = ((2.0 * (raw_seg[valid] - lower[valid]) / span[valid]) - 1.0) * 1000.0 + proj[valid] = np.clip(proj[valid], -1000.0, 1000.0) + proj[~valid] = np.nan + out[:w] = proj + + return out diff --git a/rfg_adc_plotter/state/app_state.py b/rfg_adc_plotter/state/app_state.py index 1ad5682..229b273 100644 --- a/rfg_adc_plotter/state/app_state.py +++ b/rfg_adc_plotter/state/app_state.py @@ -1,14 +1,21 @@ """Состояние приложения: текущие свипы и настройки калибровки/нормировки.""" +import os from queue import Empty, Queue from typing import Any, Dict, Mapping, Optional import numpy as np -from rfg_adc_plotter.processing.normalizer import normalize_by_calib +from rfg_adc_plotter.processing.normalizer import ( + build_calib_envelopes, + normalize_by_calib, + normalize_by_envelope, +) from rfg_adc_plotter.state.ring_buffer import RingBuffer from rfg_adc_plotter.types import SweepInfo, SweepPacket +CALIB_ENVELOPE_PATH = "calib_envelope.npy" + def format_status(data: Mapping[str, Any]) -> str: """Преобразовать словарь метрик в одну строку 'k:v'.""" @@ -44,21 +51,65 @@ class AppState: self.current_info: Optional[SweepInfo] = None self.calib_enabled: bool = False self.norm_type: str = norm_type + # "live" — нормировка по текущему ch0-свипу; "file" — по огибающей из файла + self.calib_mode: str = "live" + self.calib_file_envelope: Optional[np.ndarray] = None def _normalize(self, raw: np.ndarray, calib: np.ndarray) -> np.ndarray: + if self.calib_mode == "file" and self.calib_file_envelope is not None: + return normalize_by_envelope(raw, self.calib_file_envelope) return normalize_by_calib(raw, calib, self.norm_type) + def save_calib_envelope(self, path: str = CALIB_ENVELOPE_PATH) -> bool: + """Вычислить огибающую из last_calib_sweep и сохранить в файл. + + Возвращает True при успехе. + """ + if self.last_calib_sweep is None: + return False + try: + _lower, upper = build_calib_envelopes(self.last_calib_sweep) + np.save(path, upper) + return True + except Exception as exc: + import sys + sys.stderr.write(f"[warn] Не удалось сохранить огибающую: {exc}\n") + return False + + def load_calib_envelope(self, path: str = CALIB_ENVELOPE_PATH) -> bool: + """Загрузить огибающую из файла. + + Возвращает True при успехе. + """ + if not os.path.isfile(path): + return False + try: + env = np.load(path) + self.calib_file_envelope = np.asarray(env, dtype=np.float32) + return True + except Exception as exc: + import sys + sys.stderr.write(f"[warn] Не удалось загрузить огибающую: {exc}\n") + return False + + def set_calib_mode(self, mode: str): + """Переключить режим калибровки: 'live' или 'file'.""" + self.calib_mode = mode + def set_calib_enabled(self, enabled: bool): """Включить/выключить режим калибровки, пересчитать norm-свип.""" self.calib_enabled = enabled - if ( - self.calib_enabled - and self.current_sweep_raw is not None - and self.last_calib_sweep is not None - ): - self.current_sweep_norm = self._normalize( - self.current_sweep_raw, self.last_calib_sweep - ) + if self.calib_enabled and self.current_sweep_raw is not None: + if self.calib_mode == "file" and self.calib_file_envelope is not None: + self.current_sweep_norm = normalize_by_envelope( + self.current_sweep_raw, self.calib_file_envelope + ) + elif self.calib_mode == "live" and self.last_calib_sweep is not None: + self.current_sweep_norm = self._normalize( + self.current_sweep_raw, self.last_calib_sweep + ) + else: + self.current_sweep_norm = None else: self.current_sweep_norm = None @@ -86,11 +137,17 @@ class AppState: # Канал 0 — опорный (калибровочный) свип if ch == 0: self.last_calib_sweep = s + self.save_calib_envelope() self.current_sweep_norm = None sweep_for_ring = s else: - if self.calib_enabled and self.last_calib_sweep is not None: - self.current_sweep_norm = self._normalize(s, self.last_calib_sweep) + can_normalize = self.calib_enabled and ( + (self.calib_mode == "file" and self.calib_file_envelope is not None) + or (self.calib_mode == "live" and self.last_calib_sweep is not None) + ) + if can_normalize: + calib_ref = self.last_calib_sweep if self.last_calib_sweep is not None else s + self.current_sweep_norm = self._normalize(s, calib_ref) sweep_for_ring = self.current_sweep_norm else: self.current_sweep_norm = None