170 lines
6.3 KiB
Python
170 lines
6.3 KiB
Python
"""Frequency-axis calibration helpers."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
from typing import Any, Mapping
|
|
|
|
import numpy as np
|
|
|
|
from rfg_adc_plotter.constants import SWEEP_FREQ_MAX_GHZ, SWEEP_FREQ_MIN_GHZ
|
|
from rfg_adc_plotter.processing.normalization import build_calib_envelopes
|
|
from rfg_adc_plotter.types import SweepData
|
|
|
|
|
|
def recalculate_calibration_c(
|
|
base_coeffs: np.ndarray,
|
|
f_min: float = SWEEP_FREQ_MIN_GHZ,
|
|
f_max: float = SWEEP_FREQ_MAX_GHZ,
|
|
) -> np.ndarray:
|
|
"""Recalculate coefficients while preserving sweep edges."""
|
|
coeffs = np.asarray(base_coeffs, dtype=np.float64).reshape(-1)
|
|
if coeffs.size < 3:
|
|
out = np.zeros((3,), dtype=np.float64)
|
|
out[: coeffs.size] = coeffs
|
|
coeffs = out
|
|
c0, c1, c2 = float(coeffs[0]), float(coeffs[1]), float(coeffs[2])
|
|
x0 = float(f_min)
|
|
x1 = float(f_max)
|
|
y0 = c0 + c1 * x0 + c2 * (x0 ** 2)
|
|
y1 = c0 + c1 * x1 + c2 * (x1 ** 2)
|
|
if not (np.isfinite(y0) and np.isfinite(y1)) or y1 == y0:
|
|
return np.asarray([c0, c1, c2], dtype=np.float64)
|
|
scale = (x1 - x0) / (y1 - y0)
|
|
shift = x0 - scale * y0
|
|
return np.asarray(
|
|
[
|
|
shift + scale * c0,
|
|
scale * c1,
|
|
scale * c2,
|
|
],
|
|
dtype=np.float64,
|
|
)
|
|
|
|
|
|
CALIBRATION_C_BASE = np.asarray([0.0, 1.0, 0.025], dtype=np.float64)
|
|
CALIBRATION_C = recalculate_calibration_c(CALIBRATION_C_BASE)
|
|
|
|
|
|
def get_calibration_base() -> np.ndarray:
|
|
return np.asarray(CALIBRATION_C_BASE, dtype=np.float64).copy()
|
|
|
|
|
|
def get_calibration_coeffs() -> np.ndarray:
|
|
return np.asarray(CALIBRATION_C, dtype=np.float64).copy()
|
|
|
|
|
|
def set_calibration_base_value(index: int, value: float) -> np.ndarray:
|
|
"""Update one base coefficient and recalculate the working coefficients."""
|
|
global CALIBRATION_C
|
|
CALIBRATION_C_BASE[int(index)] = float(value)
|
|
CALIBRATION_C = recalculate_calibration_c(CALIBRATION_C_BASE)
|
|
return get_calibration_coeffs()
|
|
|
|
|
|
def calibrate_freqs(sweep: Mapping[str, Any]) -> SweepData:
|
|
"""Return a sweep copy with calibrated and resampled frequency axis."""
|
|
freqs = np.asarray(sweep["F"], dtype=np.float64).copy()
|
|
values_in = np.asarray(sweep["I"]).reshape(-1)
|
|
values = np.asarray(
|
|
values_in,
|
|
dtype=np.complex128 if np.iscomplexobj(values_in) else np.float64,
|
|
).copy()
|
|
coeffs = np.asarray(CALIBRATION_C, dtype=np.float64)
|
|
if freqs.size > 0:
|
|
freqs = coeffs[0] + coeffs[1] * freqs + coeffs[2] * (freqs * freqs)
|
|
|
|
if freqs.size >= 2:
|
|
freqs_cal = np.linspace(float(freqs[0]), float(freqs[-1]), freqs.size, dtype=np.float64)
|
|
if np.iscomplexobj(values):
|
|
values_real = np.interp(freqs_cal, freqs, values.real.astype(np.float64, copy=False))
|
|
values_imag = np.interp(freqs_cal, freqs, values.imag.astype(np.float64, copy=False))
|
|
values_cal = (values_real + (1j * values_imag)).astype(np.complex64)
|
|
else:
|
|
values_cal = np.interp(freqs_cal, freqs, values).astype(np.float64)
|
|
else:
|
|
freqs_cal = freqs.copy()
|
|
values_cal = values.copy()
|
|
|
|
return {
|
|
"F": freqs_cal,
|
|
"I": values_cal,
|
|
}
|
|
|
|
|
|
def build_calib_envelope(sweep: np.ndarray) -> np.ndarray:
|
|
"""Build the active calibration envelope from a raw sweep."""
|
|
values = np.asarray(sweep, dtype=np.float32).reshape(-1)
|
|
if values.size == 0:
|
|
raise ValueError("Calibration sweep is empty")
|
|
_, upper = build_calib_envelopes(values)
|
|
return np.asarray(upper, dtype=np.float32)
|
|
|
|
|
|
def build_complex_calibration_curve(ch1: np.ndarray, ch2: np.ndarray) -> np.ndarray:
|
|
"""Build a complex calibration curve as ``ch1 + 1j*ch2``."""
|
|
ch1_arr = np.asarray(ch1, dtype=np.float32).reshape(-1)
|
|
ch2_arr = np.asarray(ch2, dtype=np.float32).reshape(-1)
|
|
width = min(ch1_arr.size, ch2_arr.size)
|
|
if width <= 0:
|
|
raise ValueError("Complex calibration source is empty")
|
|
curve = ch1_arr[:width].astype(np.complex64) + (1j * ch2_arr[:width].astype(np.complex64))
|
|
return validate_complex_calibration_curve(curve)
|
|
|
|
|
|
def validate_calib_envelope(envelope: np.ndarray) -> np.ndarray:
|
|
"""Validate a saved calibration envelope payload."""
|
|
values = np.asarray(envelope, dtype=np.float32).reshape(-1)
|
|
if values.size == 0:
|
|
raise ValueError("Calibration envelope is empty")
|
|
if not np.issubdtype(values.dtype, np.number):
|
|
raise ValueError("Calibration envelope must be numeric")
|
|
return values
|
|
|
|
|
|
def validate_complex_calibration_curve(curve: np.ndarray) -> np.ndarray:
|
|
"""Validate a saved complex calibration payload."""
|
|
values = np.asarray(curve).reshape(-1)
|
|
if values.size == 0:
|
|
raise ValueError("Complex calibration curve is empty")
|
|
if not np.issubdtype(values.dtype, np.number):
|
|
raise ValueError("Complex calibration curve must be numeric")
|
|
return np.asarray(values, dtype=np.complex64)
|
|
|
|
|
|
def _normalize_calib_path(path: str | Path) -> Path:
|
|
out = Path(path).expanduser()
|
|
if out.suffix.lower() != ".npy":
|
|
out = out.with_suffix(".npy")
|
|
return out
|
|
|
|
|
|
def save_calib_envelope(path: str | Path, envelope: np.ndarray) -> str:
|
|
"""Persist a calibration envelope as a .npy file and return the final path."""
|
|
normalized_path = _normalize_calib_path(path)
|
|
values = validate_calib_envelope(envelope)
|
|
np.save(normalized_path, values.astype(np.float32, copy=False))
|
|
return str(normalized_path)
|
|
|
|
|
|
def load_calib_envelope(path: str | Path) -> np.ndarray:
|
|
"""Load and validate a calibration envelope from a .npy file."""
|
|
normalized_path = _normalize_calib_path(path)
|
|
loaded = np.load(normalized_path, allow_pickle=False)
|
|
return validate_calib_envelope(loaded)
|
|
|
|
|
|
def save_complex_calibration(path: str | Path, curve: np.ndarray) -> str:
|
|
"""Persist a complex calibration curve as a .npy file and return the final path."""
|
|
normalized_path = _normalize_calib_path(path)
|
|
values = validate_complex_calibration_curve(curve)
|
|
np.save(normalized_path, values.astype(np.complex64, copy=False))
|
|
return str(normalized_path)
|
|
|
|
|
|
def load_complex_calibration(path: str | Path) -> np.ndarray:
|
|
"""Load and validate a complex calibration curve from a .npy file."""
|
|
normalized_path = _normalize_calib_path(path)
|
|
loaded = np.load(normalized_path, allow_pickle=False)
|
|
return validate_complex_calibration_curve(loaded)
|