add calibration file

This commit is contained in:
awe
2026-02-13 17:32:04 +03:00
parent d2d504f5b8
commit 66a318fff8
5 changed files with 174 additions and 21 deletions

BIN
calib_envelope.npy Normal file

Binary file not shown.

View File

@ -12,7 +12,7 @@ from rfg_adc_plotter.constants import FFT_LEN, FREQ_SPAN_GHZ, IFFT_LEN
_IFFT_T_MAX_NS = float((IFFT_LEN - 1) / (FREQ_SPAN_GHZ * 1e9) * 1e9) _IFFT_T_MAX_NS = float((IFFT_LEN - 1) / (FREQ_SPAN_GHZ * 1e9) * 1e9)
from rfg_adc_plotter.io.sweep_reader import SweepReader from rfg_adc_plotter.io.sweep_reader import SweepReader
from rfg_adc_plotter.processing.normalizer import build_calib_envelopes from rfg_adc_plotter.processing.normalizer import build_calib_envelopes
from rfg_adc_plotter.state.app_state import AppState, format_status from rfg_adc_plotter.state.app_state import CALIB_ENVELOPE_PATH, AppState, format_status
from rfg_adc_plotter.state.ring_buffer import RingBuffer from rfg_adc_plotter.state.ring_buffer import RingBuffer
from rfg_adc_plotter.types import SweepPacket from rfg_adc_plotter.types import SweepPacket
@ -165,10 +165,16 @@ def run_matplotlib(args):
ax_smax = fig.add_axes([0.95, 0.55, 0.02, 0.35]) ax_smax = fig.add_axes([0.95, 0.55, 0.02, 0.35])
ax_sctr = fig.add_axes([0.98, 0.55, 0.02, 0.35]) ax_sctr = fig.add_axes([0.98, 0.55, 0.02, 0.35])
ax_cb = fig.add_axes([0.92, 0.45, 0.08, 0.08]) ax_cb = fig.add_axes([0.92, 0.45, 0.08, 0.08])
ax_cb_file = fig.add_axes([0.92, 0.36, 0.08, 0.08])
ymin_slider = Slider(ax_smin, "Y min", 0, max(1, fft_bins - 1), valinit=0, valstep=1, orientation="vertical") ymin_slider = Slider(ax_smin, "Y min", 0, max(1, fft_bins - 1), valinit=0, valstep=1, orientation="vertical")
ymax_slider = Slider(ax_smax, "Y max", 0, max(1, fft_bins - 1), valinit=max(1, fft_bins - 1), valstep=1, orientation="vertical") ymax_slider = Slider(ax_smax, "Y max", 0, max(1, fft_bins - 1), valinit=max(1, fft_bins - 1), valstep=1, orientation="vertical")
contrast_slider = Slider(ax_sctr, "Int max", 0, 100, valinit=100, valstep=1, orientation="vertical") contrast_slider = Slider(ax_sctr, "Int max", 0, 100, valinit=100, valstep=1, orientation="vertical")
calib_cb = CheckButtons(ax_cb, ["калибровка"], [False]) calib_cb = CheckButtons(ax_cb, ["калибровка"], [False])
calib_file_cb = CheckButtons(ax_cb_file, ["из файла"], [False])
import os as _os
if not _os.path.isfile(CALIB_ENVELOPE_PATH):
ax_cb_file.set_visible(False)
def _on_ylim_change(_val): def _on_ylim_change(_val):
try: try:
@ -179,12 +185,30 @@ def run_matplotlib(args):
except Exception: except Exception:
pass pass
def _on_calib_file_clicked(_v):
use_file = bool(calib_file_cb.get_status()[0])
if use_file:
ok = state.load_calib_envelope(CALIB_ENVELOPE_PATH)
if ok:
state.set_calib_mode("file")
else:
calib_file_cb.set_active(0) # снять галочку
else:
state.set_calib_mode("live")
state.set_calib_enabled(bool(calib_cb.get_status()[0]))
def _on_calib_clicked(_v):
import os as _os2
if _os2.path.isfile(CALIB_ENVELOPE_PATH):
ax_cb_file.set_visible(True)
state.set_calib_enabled(bool(calib_cb.get_status()[0]))
fig.canvas.draw_idle()
ymin_slider.on_changed(_on_ylim_change) ymin_slider.on_changed(_on_ylim_change)
ymax_slider.on_changed(_on_ylim_change) ymax_slider.on_changed(_on_ylim_change)
contrast_slider.on_changed(lambda _v: fig.canvas.draw_idle()) contrast_slider.on_changed(lambda _v: fig.canvas.draw_idle())
calib_cb.on_clicked(lambda _v: state.set_calib_enabled( calib_cb.on_clicked(_on_calib_clicked)
bool(calib_cb.get_status()[0]) calib_file_cb.on_clicked(_on_calib_file_clicked)
))
except Exception: except Exception:
calib_cb = None calib_cb = None

View File

@ -10,7 +10,7 @@ import numpy as np
from rfg_adc_plotter.constants import FREQ_SPAN_GHZ, IFFT_LEN from rfg_adc_plotter.constants import FREQ_SPAN_GHZ, IFFT_LEN
from rfg_adc_plotter.io.sweep_reader import SweepReader from rfg_adc_plotter.io.sweep_reader import SweepReader
from rfg_adc_plotter.processing.normalizer import build_calib_envelopes from rfg_adc_plotter.processing.normalizer import build_calib_envelopes
from rfg_adc_plotter.state.app_state import AppState, format_status from rfg_adc_plotter.state.app_state import CALIB_ENVELOPE_PATH, AppState, format_status
from rfg_adc_plotter.state.ring_buffer import RingBuffer from rfg_adc_plotter.state.ring_buffer import RingBuffer
from rfg_adc_plotter.types import SweepPacket from rfg_adc_plotter.types import SweepPacket
@ -184,12 +184,46 @@ def run_pyqtgraph(args):
img_fft = pg.ImageItem() img_fft = pg.ImageItem()
p_spec.addItem(img_fft) p_spec.addItem(img_fft)
# Чекбокс калибровки # Чекбоксы калибровки — в одном контейнере
calib_widget = QtWidgets.QWidget()
calib_layout = QtWidgets.QHBoxLayout(calib_widget)
calib_layout.setContentsMargins(2, 2, 2, 2)
calib_layout.setSpacing(8)
calib_cb = QtWidgets.QCheckBox("калибровка") calib_cb = QtWidgets.QCheckBox("калибровка")
cb_proxy = QtWidgets.QGraphicsProxyWidget() calib_file_cb = QtWidgets.QCheckBox("из файла")
cb_proxy.setWidget(calib_cb) calib_file_cb.setEnabled(False) # активируется только если файл существует
win.addItem(cb_proxy, row=2, col=1)
calib_cb.stateChanged.connect(lambda _v: state.set_calib_enabled(calib_cb.isChecked())) calib_layout.addWidget(calib_cb)
calib_layout.addWidget(calib_file_cb)
cb_container_proxy = QtWidgets.QGraphicsProxyWidget()
cb_container_proxy.setWidget(calib_widget)
win.addItem(cb_container_proxy, row=2, col=1)
def _check_file_cb_available():
import os
calib_file_cb.setEnabled(os.path.isfile(CALIB_ENVELOPE_PATH))
_check_file_cb_available()
def _on_calib_file_toggled(checked):
if checked:
ok = state.load_calib_envelope(CALIB_ENVELOPE_PATH)
if ok:
state.set_calib_mode("file")
else:
calib_file_cb.setChecked(False)
else:
state.set_calib_mode("live")
state.set_calib_enabled(calib_cb.isChecked())
def _on_calib_toggled(_v):
_check_file_cb_available()
state.set_calib_enabled(calib_cb.isChecked())
calib_cb.stateChanged.connect(_on_calib_toggled)
calib_file_cb.stateChanged.connect(lambda _v: _on_calib_file_toggled(calib_file_cb.isChecked()))
# Статусная строка # Статусная строка
status = pg.LabelItem(justify="left") status = pg.LabelItem(justify="left")

View File

@ -109,3 +109,41 @@ def normalize_by_calib(raw: np.ndarray, calib: np.ndarray, norm_type: str) -> np
if nt == "simple": if nt == "simple":
return normalize_simple(raw, calib) return normalize_simple(raw, calib)
return normalize_projector(raw, calib) return normalize_projector(raw, calib)
def normalize_by_envelope(raw: np.ndarray, envelope: np.ndarray) -> np.ndarray:
"""Нормировка свипа через проекцию на огибающую из файла.
Воспроизводит логику normalize_projector: проецирует raw в [-1000, +1000]
используя готовую верхнюю огибающую (upper = envelope, lower = -envelope).
"""
w = min(raw.size, envelope.size)
if w <= 0:
return raw
out = np.full_like(raw, np.nan, dtype=np.float32)
raw_seg = np.asarray(raw[:w], dtype=np.float32)
upper = np.asarray(envelope[:w], dtype=np.float32)
lower = -upper
span = upper - lower # = 2 * upper
finite_span = span[np.isfinite(span) & (span > 0)]
if finite_span.size > 0:
eps = max(float(np.median(finite_span)) * 1e-6, 1e-9)
else:
eps = 1e-9
valid = (
np.isfinite(raw_seg)
& np.isfinite(lower)
& np.isfinite(upper)
& (span > eps)
)
if np.any(valid):
proj = np.empty_like(raw_seg, dtype=np.float32)
proj[valid] = ((2.0 * (raw_seg[valid] - lower[valid]) / span[valid]) - 1.0) * 1000.0
proj[valid] = np.clip(proj[valid], -1000.0, 1000.0)
proj[~valid] = np.nan
out[:w] = proj
return out

View File

@ -1,14 +1,21 @@
"""Состояние приложения: текущие свипы и настройки калибровки/нормировки.""" """Состояние приложения: текущие свипы и настройки калибровки/нормировки."""
import os
from queue import Empty, Queue from queue import Empty, Queue
from typing import Any, Dict, Mapping, Optional from typing import Any, Dict, Mapping, Optional
import numpy as np import numpy as np
from rfg_adc_plotter.processing.normalizer import normalize_by_calib from rfg_adc_plotter.processing.normalizer import (
build_calib_envelopes,
normalize_by_calib,
normalize_by_envelope,
)
from rfg_adc_plotter.state.ring_buffer import RingBuffer from rfg_adc_plotter.state.ring_buffer import RingBuffer
from rfg_adc_plotter.types import SweepInfo, SweepPacket from rfg_adc_plotter.types import SweepInfo, SweepPacket
CALIB_ENVELOPE_PATH = "calib_envelope.npy"
def format_status(data: Mapping[str, Any]) -> str: def format_status(data: Mapping[str, Any]) -> str:
"""Преобразовать словарь метрик в одну строку 'k:v'.""" """Преобразовать словарь метрик в одну строку 'k:v'."""
@ -44,21 +51,65 @@ class AppState:
self.current_info: Optional[SweepInfo] = None self.current_info: Optional[SweepInfo] = None
self.calib_enabled: bool = False self.calib_enabled: bool = False
self.norm_type: str = norm_type self.norm_type: str = norm_type
# "live" — нормировка по текущему ch0-свипу; "file" — по огибающей из файла
self.calib_mode: str = "live"
self.calib_file_envelope: Optional[np.ndarray] = None
def _normalize(self, raw: np.ndarray, calib: np.ndarray) -> np.ndarray: def _normalize(self, raw: np.ndarray, calib: np.ndarray) -> np.ndarray:
if self.calib_mode == "file" and self.calib_file_envelope is not None:
return normalize_by_envelope(raw, self.calib_file_envelope)
return normalize_by_calib(raw, calib, self.norm_type) return normalize_by_calib(raw, calib, self.norm_type)
def save_calib_envelope(self, path: str = CALIB_ENVELOPE_PATH) -> bool:
"""Вычислить огибающую из last_calib_sweep и сохранить в файл.
Возвращает True при успехе.
"""
if self.last_calib_sweep is None:
return False
try:
_lower, upper = build_calib_envelopes(self.last_calib_sweep)
np.save(path, upper)
return True
except Exception as exc:
import sys
sys.stderr.write(f"[warn] Не удалось сохранить огибающую: {exc}\n")
return False
def load_calib_envelope(self, path: str = CALIB_ENVELOPE_PATH) -> bool:
"""Загрузить огибающую из файла.
Возвращает True при успехе.
"""
if not os.path.isfile(path):
return False
try:
env = np.load(path)
self.calib_file_envelope = np.asarray(env, dtype=np.float32)
return True
except Exception as exc:
import sys
sys.stderr.write(f"[warn] Не удалось загрузить огибающую: {exc}\n")
return False
def set_calib_mode(self, mode: str):
"""Переключить режим калибровки: 'live' или 'file'."""
self.calib_mode = mode
def set_calib_enabled(self, enabled: bool): def set_calib_enabled(self, enabled: bool):
"""Включить/выключить режим калибровки, пересчитать norm-свип.""" """Включить/выключить режим калибровки, пересчитать norm-свип."""
self.calib_enabled = enabled self.calib_enabled = enabled
if ( if self.calib_enabled and self.current_sweep_raw is not None:
self.calib_enabled if self.calib_mode == "file" and self.calib_file_envelope is not None:
and self.current_sweep_raw is not None self.current_sweep_norm = normalize_by_envelope(
and self.last_calib_sweep is not None self.current_sweep_raw, self.calib_file_envelope
): )
self.current_sweep_norm = self._normalize( elif self.calib_mode == "live" and self.last_calib_sweep is not None:
self.current_sweep_raw, self.last_calib_sweep self.current_sweep_norm = self._normalize(
) self.current_sweep_raw, self.last_calib_sweep
)
else:
self.current_sweep_norm = None
else: else:
self.current_sweep_norm = None self.current_sweep_norm = None
@ -86,11 +137,17 @@ class AppState:
# Канал 0 — опорный (калибровочный) свип # Канал 0 — опорный (калибровочный) свип
if ch == 0: if ch == 0:
self.last_calib_sweep = s self.last_calib_sweep = s
self.save_calib_envelope()
self.current_sweep_norm = None self.current_sweep_norm = None
sweep_for_ring = s sweep_for_ring = s
else: else:
if self.calib_enabled and self.last_calib_sweep is not None: can_normalize = self.calib_enabled and (
self.current_sweep_norm = self._normalize(s, self.last_calib_sweep) (self.calib_mode == "file" and self.calib_file_envelope is not None)
or (self.calib_mode == "live" and self.last_calib_sweep is not None)
)
if can_normalize:
calib_ref = self.last_calib_sweep if self.last_calib_sweep is not None else s
self.current_sweep_norm = self._normalize(s, calib_ref)
sweep_for_ring = self.current_sweep_norm sweep_for_ring = self.current_sweep_norm
else: else:
self.current_sweep_norm = None self.current_sweep_norm = None