From 157447a2375e0a80b417eb8e0c4237bbb817cd77 Mon Sep 17 00:00:00 2001 From: awe Date: Thu, 12 Mar 2026 16:48:26 +0300 Subject: [PATCH] calib fix --- rfg_adc_plotter/gui/pyqtgraph_backend.py | 244 +++++++++++++++----- rfg_adc_plotter/processing/__init__.py | 8 + rfg_adc_plotter/processing/calibration.py | 43 ++++ rfg_adc_plotter/processing/normalization.py | 46 ++++ rfg_adc_plotter/state/runtime_state.py | 2 + tests/test_processing.py | 43 +++- 6 files changed, 327 insertions(+), 59 deletions(-) diff --git a/rfg_adc_plotter/gui/pyqtgraph_backend.py b/rfg_adc_plotter/gui/pyqtgraph_backend.py index c3f1027..9541343 100644 --- a/rfg_adc_plotter/gui/pyqtgraph_backend.py +++ b/rfg_adc_plotter/gui/pyqtgraph_backend.py @@ -13,14 +13,17 @@ import numpy as np from rfg_adc_plotter.constants import FFT_LEN, SWEEP_FREQ_MAX_GHZ, SWEEP_FREQ_MIN_GHZ from rfg_adc_plotter.io.sweep_reader import SweepReader from rfg_adc_plotter.processing.calibration import ( + build_calib_envelope, calibrate_freqs, get_calibration_base, get_calibration_coeffs, + load_calib_envelope, + save_calib_envelope, set_calibration_base_value, ) from rfg_adc_plotter.processing.fft import compute_fft_row, fft_mag_to_db from rfg_adc_plotter.processing.formatting import compute_auto_ylim, format_status_kv, parse_spec_clip -from rfg_adc_plotter.processing.normalization import normalize_by_calib +from rfg_adc_plotter.processing.normalization import normalize_by_envelope, resample_envelope from rfg_adc_plotter.processing.peaks import ( find_peak_width_markers, find_top_peaks_over_ref, @@ -95,7 +98,6 @@ def run_pyqtgraph(args) -> None: fft_bins = FFT_LEN // 2 + 1 spec_clip = parse_spec_clip(getattr(args, "spec_clip", None)) spec_mean_sec = float(getattr(args, "spec_mean_sec", 0.0)) - norm_type = str(getattr(args, "norm_type", "projector")).strip().lower() runtime = RuntimeState(ring=RingBuffer(max_sweeps)) pg.setConfigOptions( @@ -130,8 +132,8 @@ def run_pyqtgraph(args) -> None: settings_layout.setContentsMargins(6, 6, 6, 6) settings_layout.setSpacing(8) try: - settings_widget.setMinimumWidth(130) - settings_widget.setMaximumWidth(150) + settings_widget.setMinimumWidth(250) + settings_widget.setMaximumWidth(340) except Exception: pass main_layout.addWidget(settings_widget) @@ -201,16 +203,41 @@ def run_pyqtgraph(args) -> None: spec_left_line.setVisible(False) spec_right_line.setVisible(False) - calib_cb = QtWidgets.QCheckBox("нормировка") + calib_cb = QtWidgets.QCheckBox("калибровка по огибающей") bg_compute_cb = QtWidgets.QCheckBox("расчет фона") bg_subtract_cb = QtWidgets.QCheckBox("вычет фона") fft_bg_subtract_cb = QtWidgets.QCheckBox("FFT вычет фона") peak_search_cb = QtWidgets.QCheckBox("поиск пиков") + calib_group = QtWidgets.QGroupBox("Калибровка") + calib_group_layout = QtWidgets.QVBoxLayout(calib_group) + calib_group_layout.setContentsMargins(6, 6, 6, 6) + calib_group_layout.setSpacing(6) + calib_group_layout.addWidget(calib_cb) + calib_path_edit = QtWidgets.QLineEdit("calibration_envelope.npy") + try: + calib_path_edit.setPlaceholderText("calibration_envelope.npy") + except Exception: + pass + calib_path_row = QtWidgets.QHBoxLayout() + calib_path_row.setContentsMargins(0, 0, 0, 0) + calib_path_row.setSpacing(4) + calib_path_row.addWidget(calib_path_edit) + calib_pick_btn = QtWidgets.QPushButton("Файл...") + calib_path_row.addWidget(calib_pick_btn) + calib_group_layout.addLayout(calib_path_row) + calib_buttons_row = QtWidgets.QHBoxLayout() + calib_buttons_row.setContentsMargins(0, 0, 0, 0) + calib_buttons_row.setSpacing(4) + calib_save_btn = QtWidgets.QPushButton("Сохранить текущую") + calib_load_btn = QtWidgets.QPushButton("Загрузить") + calib_buttons_row.addWidget(calib_save_btn) + calib_buttons_row.addWidget(calib_load_btn) + calib_group_layout.addLayout(calib_buttons_row) try: settings_layout.addWidget(QtWidgets.QLabel("Настройки")) except Exception: pass - settings_layout.addWidget(calib_cb) + settings_layout.addWidget(calib_group) settings_layout.addWidget(bg_compute_cb) settings_layout.addWidget(bg_subtract_cb) settings_layout.addWidget(fft_bg_subtract_cb) @@ -223,6 +250,8 @@ def run_pyqtgraph(args) -> None: bg_compute_enabled = True bg_subtract_enabled = False fft_bg_subtract_enabled = False + status_note = "" + status_dirty = True fixed_ylim: Optional[Tuple[float, float]] = None if args.ylim: try: @@ -270,8 +299,49 @@ def run_pyqtgraph(args) -> None: img_fft.setRect(0, d_min, max_sweeps, d_max - d_min) p_spec.setRange(xRange=(0, max_sweeps - 1), yRange=(d_min, d_max), padding=0) - def normalize_sweep(raw: np.ndarray, calib: np.ndarray) -> np.ndarray: - return normalize_by_calib(raw, calib, norm_type=norm_type) + def resolve_curve_xs(size: int) -> np.ndarray: + if size <= 0: + return np.zeros((0,), dtype=np.float32) + if runtime.current_freqs is not None and runtime.current_freqs.size == size: + return runtime.current_freqs + if runtime.current_freqs is not None and runtime.current_freqs.size > 1: + return np.linspace( + float(runtime.current_freqs[0]), + float(runtime.current_freqs[-1]), + size, + dtype=np.float64, + ) + if runtime.ring.x_shared is not None and size <= runtime.ring.x_shared.size: + return runtime.ring.x_shared[:size] + return np.arange(size, dtype=np.float32) + + def set_status_note(note: str) -> None: + nonlocal status_note, status_dirty + status_note = str(note).strip() + status_dirty = True + + def get_calib_file_path() -> str: + try: + path = calib_path_edit.text().strip() + except Exception: + path = "" + return path or "calibration_envelope.npy" + + def recompute_current_processed_sweep(push_to_ring: bool = False) -> None: + if runtime.current_sweep_raw is not None and calib_enabled and runtime.calib_envelope is not None: + runtime.current_sweep_norm = normalize_by_envelope(runtime.current_sweep_raw, runtime.calib_envelope) + else: + runtime.current_sweep_norm = None + + runtime.current_fft_db = None + if not push_to_ring or runtime.current_sweep_raw is None: + return + + sweep_for_processing = runtime.current_sweep_norm if runtime.current_sweep_norm is not None else runtime.current_sweep_raw + ensure_buffer(runtime.current_sweep_raw.size) + runtime.ring.push(sweep_for_processing, runtime.current_freqs) + runtime.current_distances = runtime.ring.distance_axis + runtime.current_fft_db = runtime.ring.last_fft_db def set_calib_enabled() -> None: nonlocal calib_enabled @@ -279,10 +349,74 @@ def run_pyqtgraph(args) -> None: calib_enabled = bool(calib_cb.isChecked()) except Exception: calib_enabled = False - if calib_enabled and runtime.current_sweep_raw is not None and runtime.last_calib_sweep is not None: - runtime.current_sweep_norm = normalize_sweep(runtime.current_sweep_raw, runtime.last_calib_sweep) - else: - runtime.current_sweep_norm = None + if calib_enabled and runtime.calib_envelope is None: + set_status_note("калибровка: огибающая не загружена") + recompute_current_processed_sweep(push_to_ring=False) + runtime.mark_dirty() + + def pick_calib_file() -> None: + start_path = get_calib_file_path() + try: + selected, _ = QtWidgets.QFileDialog.getSaveFileName( + main_window, + "Файл калибровки", + start_path, + "NumPy (*.npy);;All files (*)", + ) + except Exception as exc: + set_status_note(f"калибровка: выбор файла недоступен ({exc})") + runtime.mark_dirty() + return + if not selected: + return + try: + calib_path_edit.setText(selected) + except Exception: + pass + + def save_current_calibration() -> None: + if runtime.current_sweep_raw is None or runtime.current_sweep_raw.size == 0: + set_status_note("калибровка: нет текущего raw-свипа") + runtime.mark_dirty() + return + try: + envelope = build_calib_envelope(runtime.current_sweep_raw) + saved_path = save_calib_envelope(get_calib_file_path(), envelope) + except Exception as exc: + set_status_note(f"калибровка: не удалось сохранить ({exc})") + runtime.mark_dirty() + return + + runtime.calib_envelope = envelope + runtime.calib_file_path = saved_path + runtime.last_calib_sweep = None + try: + calib_path_edit.setText(saved_path) + except Exception: + pass + recompute_current_processed_sweep(push_to_ring=False) + set_status_note(f"калибровка сохранена: {saved_path}") + runtime.mark_dirty() + + def load_calibration_file() -> None: + path = get_calib_file_path() + try: + envelope = load_calib_envelope(path) + except Exception as exc: + set_status_note(f"калибровка: не удалось загрузить ({exc})") + runtime.mark_dirty() + return + + normalized_path = path if path.lower().endswith(".npy") else f"{path}.npy" + runtime.calib_envelope = envelope + runtime.calib_file_path = normalized_path + runtime.last_calib_sweep = None + try: + calib_path_edit.setText(normalized_path) + except Exception: + pass + recompute_current_processed_sweep(push_to_ring=False) + set_status_note(f"калибровка загружена: {normalized_path}") runtime.mark_dirty() def set_bg_compute_enabled() -> None: @@ -317,6 +451,9 @@ def run_pyqtgraph(args) -> None: try: calib_cb.stateChanged.connect(lambda _v: set_calib_enabled()) + calib_pick_btn.clicked.connect(lambda _checked=False: pick_calib_file()) + calib_save_btn.clicked.connect(lambda _checked=False: save_current_calibration()) + calib_load_btn.clicked.connect(lambda _checked=False: load_calibration_file()) bg_compute_cb.stateChanged.connect(lambda _v: set_bg_compute_enabled()) bg_subtract_cb.stateChanged.connect(lambda _v: set_bg_subtract_enabled()) fft_bg_subtract_cb.stateChanged.connect(lambda _v: set_fft_bg_subtract_enabled()) @@ -531,29 +668,7 @@ def run_pyqtgraph(args) -> None: runtime.current_sweep_raw = calibrated["I"] runtime.current_aux_curves = aux_curves runtime.current_info = info - - channel = 0 - try: - channel = int(info.get("ch", 0)) if isinstance(info, dict) else 0 - except Exception: - channel = 0 - - if channel == 0: - runtime.last_calib_sweep = runtime.current_sweep_raw - runtime.current_sweep_norm = None - sweep_for_processing = runtime.current_sweep_raw - else: - if calib_enabled and runtime.last_calib_sweep is not None: - runtime.current_sweep_norm = normalize_sweep(runtime.current_sweep_raw, runtime.last_calib_sweep) - sweep_for_processing = runtime.current_sweep_norm - else: - runtime.current_sweep_norm = None - sweep_for_processing = runtime.current_sweep_raw - - ensure_buffer(runtime.current_sweep_raw.size) - runtime.ring.push(sweep_for_processing, runtime.current_freqs) - runtime.current_distances = runtime.ring.distance_axis - runtime.current_fft_db = runtime.ring.last_fft_db + recompute_current_processed_sweep(push_to_ring=True) if drained > 0: update_physical_axes() return drained @@ -569,7 +684,7 @@ def run_pyqtgraph(args) -> None: pass def update() -> None: - nonlocal peak_ref_window + nonlocal peak_ref_window, status_dirty if peak_calibrate_mode and any(edit.hasFocus() for edit in c_edits): return if peak_search_enabled and peak_window_edit is not None and peak_window_edit.hasFocus(): @@ -584,26 +699,34 @@ def run_pyqtgraph(args) -> None: except Exception: bg_fft_for_line = None - if redraw_needed and runtime.current_sweep_raw is not None: - xs = None - if runtime.current_freqs is not None and runtime.current_freqs.size == runtime.current_sweep_raw.size: - xs = runtime.current_freqs - elif runtime.ring.x_shared is not None and runtime.current_sweep_raw.size <= runtime.ring.x_shared.size: - xs = runtime.ring.x_shared[: runtime.current_sweep_raw.size] - else: - xs = np.arange(runtime.current_sweep_raw.size) + if redraw_needed and (runtime.current_sweep_raw is not None or runtime.calib_envelope is not None): + xs = resolve_curve_xs( + runtime.current_sweep_raw.size if runtime.current_sweep_raw is not None else runtime.calib_envelope.size + ) + displayed_calib = None - curve.setData(xs, runtime.current_sweep_raw, autoDownsample=True) - if runtime.current_aux_curves is not None: - avg_1_curve, avg_2_curve = runtime.current_aux_curves - curve_avg1.setData(xs[: avg_1_curve.size], avg_1_curve, autoDownsample=True) - curve_avg2.setData(xs[: avg_2_curve.size], avg_2_curve, autoDownsample=True) + if runtime.current_sweep_raw is not None: + curve.setData(xs[: runtime.current_sweep_raw.size], runtime.current_sweep_raw, autoDownsample=True) + if runtime.current_aux_curves is not None: + avg_1_curve, avg_2_curve = runtime.current_aux_curves + curve_avg1.setData(xs[: avg_1_curve.size], avg_1_curve, autoDownsample=True) + curve_avg2.setData(xs[: avg_2_curve.size], avg_2_curve, autoDownsample=True) + else: + curve_avg1.setData([], []) + curve_avg2.setData([], []) else: + curve.setData([], []) curve_avg1.setData([], []) curve_avg2.setData([], []) - if runtime.last_calib_sweep is not None: - curve_calib.setData(xs[: runtime.last_calib_sweep.size], runtime.last_calib_sweep, autoDownsample=True) + if runtime.calib_envelope is not None: + if runtime.current_sweep_raw is not None: + displayed_calib = resample_envelope(runtime.calib_envelope, runtime.current_sweep_raw.size) + xs_calib = xs[: displayed_calib.size] + else: + displayed_calib = runtime.calib_envelope + xs_calib = resolve_curve_xs(displayed_calib.size) + curve_calib.setData(xs_calib, displayed_calib, autoDownsample=True) else: curve_calib.setData([], []) @@ -613,8 +736,8 @@ def run_pyqtgraph(args) -> None: curve_norm.setData([], []) if fixed_ylim is None: - y_series = [runtime.current_sweep_raw, runtime.last_calib_sweep, runtime.current_sweep_norm] - if runtime.current_aux_curves is not None: + y_series = [runtime.current_sweep_raw, displayed_calib, runtime.current_sweep_norm] + if runtime.current_aux_curves is not None and runtime.current_sweep_raw is not None: y_series.extend(runtime.current_aux_curves) y_limits = compute_auto_ylim(*y_series) if y_limits is not None: @@ -627,7 +750,7 @@ def run_pyqtgraph(args) -> None: sweep_for_fft = runtime.current_sweep_norm if runtime.current_sweep_norm is not None else runtime.current_sweep_raw distance_axis = runtime.current_distances if runtime.current_distances is not None else runtime.ring.distance_axis - if sweep_for_fft.size > 0 and distance_axis is not None: + if sweep_for_fft is not None and sweep_for_fft.size > 0 and distance_axis is not None: if runtime.current_fft_db is None or runtime.current_fft_db.size != distance_axis.size or runtime.plot_dirty: runtime.current_fft_db = compute_fft_row(sweep_for_fft, runtime.current_freqs, distance_axis.size) fft_vals = runtime.current_fft_db @@ -735,16 +858,23 @@ def run_pyqtgraph(args) -> None: else: img.setImage(disp, autoLevels=False) - if changed and runtime.current_info: + if redraw_needed or status_dirty: try: - status_payload = dict(runtime.current_info) + status_payload = dict(runtime.current_info) if runtime.current_info else {} if peak_calibrate_mode and runtime.current_peak_width is not None: status_payload["peak_w"] = runtime.current_peak_width if peak_calibrate_mode and runtime.current_peak_amplitude is not None: status_payload["peak_a"] = runtime.current_peak_amplitude - status.setText(format_status_kv(status_payload)) + base_status = format_status_kv(status_payload) if status_payload else "" + if status_note: + status.setText(f"{base_status} | {status_note}" if base_status else status_note) + else: + status.setText(base_status) except Exception: pass + status_dirty = False + + if changed and runtime.current_info: try: chs = runtime.current_info.get("chs") if isinstance(runtime.current_info, dict) else None if chs is None: diff --git a/rfg_adc_plotter/processing/__init__.py b/rfg_adc_plotter/processing/__init__.py index 7d36236..2d3e98a 100644 --- a/rfg_adc_plotter/processing/__init__.py +++ b/rfg_adc_plotter/processing/__init__.py @@ -1,10 +1,13 @@ """Pure sweep-processing helpers.""" from rfg_adc_plotter.processing.calibration import ( + build_calib_envelope, calibrate_freqs, get_calibration_base, get_calibration_coeffs, + load_calib_envelope, recalculate_calibration_c, + save_calib_envelope, set_calibration_base_value, ) from rfg_adc_plotter.processing.fft import ( @@ -20,6 +23,7 @@ from rfg_adc_plotter.processing.formatting import ( ) from rfg_adc_plotter.processing.normalization import ( build_calib_envelopes, + normalize_by_envelope, normalize_by_calib, ) from rfg_adc_plotter.processing.peaks import ( @@ -30,6 +34,7 @@ from rfg_adc_plotter.processing.peaks import ( __all__ = [ "build_calib_envelopes", + "build_calib_envelope", "calibrate_freqs", "compute_auto_ylim", "compute_distance_axis", @@ -41,9 +46,12 @@ __all__ = [ "format_status_kv", "get_calibration_base", "get_calibration_coeffs", + "load_calib_envelope", + "normalize_by_envelope", "normalize_by_calib", "parse_spec_clip", "recalculate_calibration_c", "rolling_median_ref", + "save_calib_envelope", "set_calibration_base_value", ] diff --git a/rfg_adc_plotter/processing/calibration.py b/rfg_adc_plotter/processing/calibration.py index 236ee2e..0a23dc2 100644 --- a/rfg_adc_plotter/processing/calibration.py +++ b/rfg_adc_plotter/processing/calibration.py @@ -2,11 +2,13 @@ from __future__ import annotations +from pathlib import Path from typing import Any, Mapping import numpy as np from rfg_adc_plotter.constants import SWEEP_FREQ_MAX_GHZ, SWEEP_FREQ_MIN_GHZ +from rfg_adc_plotter.processing.normalization import build_calib_envelopes from rfg_adc_plotter.types import SweepData @@ -79,3 +81,44 @@ def calibrate_freqs(sweep: Mapping[str, Any]) -> SweepData: "F": freqs_cal, "I": values_cal, } + + +def build_calib_envelope(sweep: np.ndarray) -> np.ndarray: + """Build the active calibration envelope from a raw sweep.""" + values = np.asarray(sweep, dtype=np.float32).reshape(-1) + if values.size == 0: + raise ValueError("Calibration sweep is empty") + _, upper = build_calib_envelopes(values) + return np.asarray(upper, dtype=np.float32) + + +def validate_calib_envelope(envelope: np.ndarray) -> np.ndarray: + """Validate a saved calibration envelope payload.""" + values = np.asarray(envelope, dtype=np.float32).reshape(-1) + if values.size == 0: + raise ValueError("Calibration envelope is empty") + if not np.issubdtype(values.dtype, np.number): + raise ValueError("Calibration envelope must be numeric") + return values + + +def _normalize_calib_path(path: str | Path) -> Path: + out = Path(path).expanduser() + if out.suffix.lower() != ".npy": + out = out.with_suffix(".npy") + return out + + +def save_calib_envelope(path: str | Path, envelope: np.ndarray) -> str: + """Persist a calibration envelope as a .npy file and return the final path.""" + normalized_path = _normalize_calib_path(path) + values = validate_calib_envelope(envelope) + np.save(normalized_path, values.astype(np.float32, copy=False)) + return str(normalized_path) + + +def load_calib_envelope(path: str | Path) -> np.ndarray: + """Load and validate a calibration envelope from a .npy file.""" + normalized_path = _normalize_calib_path(path) + loaded = np.load(normalized_path, allow_pickle=False) + return validate_calib_envelope(loaded) diff --git a/rfg_adc_plotter/processing/normalization.py b/rfg_adc_plotter/processing/normalization.py index 4b03683..688fee2 100644 --- a/rfg_adc_plotter/processing/normalization.py +++ b/rfg_adc_plotter/processing/normalization.py @@ -108,6 +108,52 @@ def normalize_sweep_projector(raw: np.ndarray, calib: np.ndarray) -> np.ndarray: return out +def resample_envelope(envelope: np.ndarray, width: int) -> np.ndarray: + """Resample an envelope to the target sweep width on the index axis.""" + target_width = int(width) + if target_width <= 0: + return np.zeros((0,), dtype=np.float32) + + values = np.asarray(envelope, dtype=np.float32).reshape(-1) + if values.size == 0: + return np.full((target_width,), np.nan, dtype=np.float32) + if values.size == target_width: + return values.astype(np.float32, copy=True) + + x_src = np.arange(values.size, dtype=np.float32) + finite = np.isfinite(values) + if not np.any(finite): + return np.full((target_width,), np.nan, dtype=np.float32) + if int(np.count_nonzero(finite)) == 1: + fill = float(values[finite][0]) + return np.full((target_width,), fill, dtype=np.float32) + + x_dst = np.linspace(0.0, float(values.size - 1), target_width, dtype=np.float32) + return np.interp(x_dst, x_src[finite], values[finite]).astype(np.float32) + + +def normalize_by_envelope(raw: np.ndarray, envelope: np.ndarray) -> np.ndarray: + """Normalize a sweep by an envelope with safe resampling and zero protection.""" + raw_arr = np.asarray(raw, dtype=np.float32).reshape(-1) + if raw_arr.size == 0: + return raw_arr.copy() + + env = resample_envelope(envelope, raw_arr.size) + out = np.full_like(raw_arr, np.nan, dtype=np.float32) + + finite_env = np.abs(env[np.isfinite(env)]) + if finite_env.size > 0: + eps = max(float(np.median(finite_env)) * 1e-6, 1e-9) + else: + eps = 1e-9 + + valid = np.isfinite(raw_arr) & np.isfinite(env) & (np.abs(env) > eps) + if np.any(valid): + with np.errstate(divide="ignore", invalid="ignore"): + out[valid] = raw_arr[valid] / env[valid] + return np.nan_to_num(out, nan=np.nan, posinf=np.nan, neginf=np.nan) + + def normalize_by_calib(raw: np.ndarray, calib: np.ndarray, norm_type: str) -> np.ndarray: """Apply the selected normalization method.""" norm = str(norm_type).strip().lower() diff --git a/rfg_adc_plotter/state/runtime_state.py b/rfg_adc_plotter/state/runtime_state.py index e0633b0..67e37c9 100644 --- a/rfg_adc_plotter/state/runtime_state.py +++ b/rfg_adc_plotter/state/runtime_state.py @@ -21,6 +21,8 @@ class RuntimeState: current_sweep_norm: Optional[np.ndarray] = None current_fft_db: Optional[np.ndarray] = None last_calib_sweep: Optional[np.ndarray] = None + calib_envelope: Optional[np.ndarray] = None + calib_file_path: Optional[str] = None current_info: Optional[SweepInfo] = None bg_spec_cache: Optional[np.ndarray] = None current_peak_width: Optional[float] = None diff --git a/tests/test_processing.py b/tests/test_processing.py index 425263f..d45f4f8 100644 --- a/tests/test_processing.py +++ b/tests/test_processing.py @@ -1,11 +1,24 @@ from __future__ import annotations +import os +import tempfile import numpy as np import unittest -from rfg_adc_plotter.processing.calibration import calibrate_freqs, recalculate_calibration_c +from rfg_adc_plotter.processing.calibration import ( + build_calib_envelope, + calibrate_freqs, + load_calib_envelope, + recalculate_calibration_c, + save_calib_envelope, +) from rfg_adc_plotter.processing.fft import compute_distance_axis, compute_fft_mag_row, compute_fft_row -from rfg_adc_plotter.processing.normalization import build_calib_envelopes, normalize_by_calib +from rfg_adc_plotter.processing.normalization import ( + build_calib_envelopes, + normalize_by_calib, + normalize_by_envelope, + resample_envelope, +) from rfg_adc_plotter.processing.peaks import find_peak_width_markers, find_top_peaks_over_ref, rolling_median_ref @@ -39,6 +52,32 @@ class ProcessingTests(unittest.TestCase): self.assertTrue(np.any(np.isfinite(simple))) self.assertTrue(np.any(np.isfinite(projector))) + def test_file_calibration_envelope_roundtrip_and_division(self): + raw = (np.sin(np.linspace(0.0, 8.0 * np.pi, 128)) * 50.0 + 100.0).astype(np.float32) + envelope = build_calib_envelope(raw) + normalized = normalize_by_envelope(raw, envelope) + resampled = resample_envelope(envelope, 96) + + self.assertEqual(envelope.shape, raw.shape) + self.assertEqual(normalized.shape, raw.shape) + self.assertEqual(resampled.shape, (96,)) + self.assertTrue(np.any(np.isfinite(normalized))) + self.assertTrue(np.all(np.isfinite(envelope))) + + with tempfile.TemporaryDirectory() as tmp_dir: + path = os.path.join(tmp_dir, "calibration_envelope") + saved_path = save_calib_envelope(path, envelope) + loaded = load_calib_envelope(saved_path) + self.assertTrue(saved_path.endswith(".npy")) + self.assertTrue(np.allclose(loaded, envelope)) + + def test_load_calib_envelope_rejects_empty_payload(self): + with tempfile.TemporaryDirectory() as tmp_dir: + path = os.path.join(tmp_dir, "empty.npy") + np.save(path, np.zeros((0,), dtype=np.float32)) + with self.assertRaises(ValueError): + load_calib_envelope(path) + def test_fft_helpers_return_expected_shapes(self): sweep = np.sin(np.linspace(0.0, 4.0 * np.pi, 128)).astype(np.float32) freqs = np.linspace(3.3, 14.3, 128, dtype=np.float64)