calib fix

This commit is contained in:
awe
2026-03-12 16:48:26 +03:00
parent c2a892f397
commit 157447a237
6 changed files with 327 additions and 59 deletions

View File

@ -1,10 +1,13 @@
"""Pure sweep-processing helpers."""
from rfg_adc_plotter.processing.calibration import (
build_calib_envelope,
calibrate_freqs,
get_calibration_base,
get_calibration_coeffs,
load_calib_envelope,
recalculate_calibration_c,
save_calib_envelope,
set_calibration_base_value,
)
from rfg_adc_plotter.processing.fft import (
@ -20,6 +23,7 @@ from rfg_adc_plotter.processing.formatting import (
)
from rfg_adc_plotter.processing.normalization import (
build_calib_envelopes,
normalize_by_envelope,
normalize_by_calib,
)
from rfg_adc_plotter.processing.peaks import (
@ -30,6 +34,7 @@ from rfg_adc_plotter.processing.peaks import (
__all__ = [
"build_calib_envelopes",
"build_calib_envelope",
"calibrate_freqs",
"compute_auto_ylim",
"compute_distance_axis",
@ -41,9 +46,12 @@ __all__ = [
"format_status_kv",
"get_calibration_base",
"get_calibration_coeffs",
"load_calib_envelope",
"normalize_by_envelope",
"normalize_by_calib",
"parse_spec_clip",
"recalculate_calibration_c",
"rolling_median_ref",
"save_calib_envelope",
"set_calibration_base_value",
]

View File

@ -2,11 +2,13 @@
from __future__ import annotations
from pathlib import Path
from typing import Any, Mapping
import numpy as np
from rfg_adc_plotter.constants import SWEEP_FREQ_MAX_GHZ, SWEEP_FREQ_MIN_GHZ
from rfg_adc_plotter.processing.normalization import build_calib_envelopes
from rfg_adc_plotter.types import SweepData
@ -79,3 +81,44 @@ def calibrate_freqs(sweep: Mapping[str, Any]) -> SweepData:
"F": freqs_cal,
"I": values_cal,
}
def build_calib_envelope(sweep: np.ndarray) -> np.ndarray:
"""Build the active calibration envelope from a raw sweep."""
values = np.asarray(sweep, dtype=np.float32).reshape(-1)
if values.size == 0:
raise ValueError("Calibration sweep is empty")
_, upper = build_calib_envelopes(values)
return np.asarray(upper, dtype=np.float32)
def validate_calib_envelope(envelope: np.ndarray) -> np.ndarray:
"""Validate a saved calibration envelope payload."""
values = np.asarray(envelope, dtype=np.float32).reshape(-1)
if values.size == 0:
raise ValueError("Calibration envelope is empty")
if not np.issubdtype(values.dtype, np.number):
raise ValueError("Calibration envelope must be numeric")
return values
def _normalize_calib_path(path: str | Path) -> Path:
out = Path(path).expanduser()
if out.suffix.lower() != ".npy":
out = out.with_suffix(".npy")
return out
def save_calib_envelope(path: str | Path, envelope: np.ndarray) -> str:
"""Persist a calibration envelope as a .npy file and return the final path."""
normalized_path = _normalize_calib_path(path)
values = validate_calib_envelope(envelope)
np.save(normalized_path, values.astype(np.float32, copy=False))
return str(normalized_path)
def load_calib_envelope(path: str | Path) -> np.ndarray:
"""Load and validate a calibration envelope from a .npy file."""
normalized_path = _normalize_calib_path(path)
loaded = np.load(normalized_path, allow_pickle=False)
return validate_calib_envelope(loaded)

View File

@ -108,6 +108,52 @@ def normalize_sweep_projector(raw: np.ndarray, calib: np.ndarray) -> np.ndarray:
return out
def resample_envelope(envelope: np.ndarray, width: int) -> np.ndarray:
"""Resample an envelope to the target sweep width on the index axis."""
target_width = int(width)
if target_width <= 0:
return np.zeros((0,), dtype=np.float32)
values = np.asarray(envelope, dtype=np.float32).reshape(-1)
if values.size == 0:
return np.full((target_width,), np.nan, dtype=np.float32)
if values.size == target_width:
return values.astype(np.float32, copy=True)
x_src = np.arange(values.size, dtype=np.float32)
finite = np.isfinite(values)
if not np.any(finite):
return np.full((target_width,), np.nan, dtype=np.float32)
if int(np.count_nonzero(finite)) == 1:
fill = float(values[finite][0])
return np.full((target_width,), fill, dtype=np.float32)
x_dst = np.linspace(0.0, float(values.size - 1), target_width, dtype=np.float32)
return np.interp(x_dst, x_src[finite], values[finite]).astype(np.float32)
def normalize_by_envelope(raw: np.ndarray, envelope: np.ndarray) -> np.ndarray:
"""Normalize a sweep by an envelope with safe resampling and zero protection."""
raw_arr = np.asarray(raw, dtype=np.float32).reshape(-1)
if raw_arr.size == 0:
return raw_arr.copy()
env = resample_envelope(envelope, raw_arr.size)
out = np.full_like(raw_arr, np.nan, dtype=np.float32)
finite_env = np.abs(env[np.isfinite(env)])
if finite_env.size > 0:
eps = max(float(np.median(finite_env)) * 1e-6, 1e-9)
else:
eps = 1e-9
valid = np.isfinite(raw_arr) & np.isfinite(env) & (np.abs(env) > eps)
if np.any(valid):
with np.errstate(divide="ignore", invalid="ignore"):
out[valid] = raw_arr[valid] / env[valid]
return np.nan_to_num(out, nan=np.nan, posinf=np.nan, neginf=np.nan)
def normalize_by_calib(raw: np.ndarray, calib: np.ndarray, norm_type: str) -> np.ndarray:
"""Apply the selected normalization method."""
norm = str(norm_type).strip().lower()