new calib
This commit is contained in:
@ -20,7 +20,7 @@ def normalize_sweep_simple(raw: np.ndarray, calib: np.ndarray) -> np.ndarray:
|
||||
|
||||
|
||||
def build_calib_envelopes(calib: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
|
||||
"""Estimate lower and upper envelopes of a calibration curve."""
|
||||
"""Estimate smooth lower/upper envelopes from local extrema."""
|
||||
n = int(calib.size)
|
||||
if n <= 0:
|
||||
empty = np.zeros((0,), dtype=np.float32)
|
||||
@ -40,32 +40,48 @@ def build_calib_envelopes(calib: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
|
||||
if n < 3:
|
||||
return values.copy(), values.copy()
|
||||
|
||||
dy = np.diff(values)
|
||||
signs = np.sign(dy).astype(np.int8, copy=False)
|
||||
|
||||
if np.any(signs == 0):
|
||||
for i in range(1, signs.size):
|
||||
if signs[i] == 0:
|
||||
signs[i] = signs[i - 1]
|
||||
for i in range(signs.size - 2, -1, -1):
|
||||
if signs[i] == 0:
|
||||
signs[i] = signs[i + 1]
|
||||
signs[signs == 0] = 1
|
||||
|
||||
max_idx = np.where((signs[:-1] > 0) & (signs[1:] < 0))[0] + 1
|
||||
min_idx = np.where((signs[:-1] < 0) & (signs[1:] > 0))[0] + 1
|
||||
|
||||
x = np.arange(n, dtype=np.float32)
|
||||
|
||||
def _interp_nodes(nodes: np.ndarray) -> np.ndarray:
|
||||
if nodes.size == 0:
|
||||
idx = np.array([0, n - 1], dtype=np.int64)
|
||||
else:
|
||||
idx = np.unique(np.concatenate(([0], nodes, [n - 1]))).astype(np.int64)
|
||||
return np.interp(x, idx.astype(np.float32), values[idx]).astype(np.float32)
|
||||
def _moving_average(series: np.ndarray, window: int) -> np.ndarray:
|
||||
width = max(1, int(window))
|
||||
if width <= 1 or series.size <= 2:
|
||||
return np.asarray(series, dtype=np.float32).copy()
|
||||
if width % 2 == 0:
|
||||
width += 1
|
||||
pad = width // 2
|
||||
padded = np.pad(np.asarray(series, dtype=np.float32), (pad, pad), mode="edge")
|
||||
kernel = np.full((width,), 1.0 / float(width), dtype=np.float32)
|
||||
return np.convolve(padded, kernel, mode="valid").astype(np.float32)
|
||||
|
||||
upper = _interp_nodes(max_idx)
|
||||
lower = _interp_nodes(min_idx)
|
||||
def _smooth_extrema_envelope(use_max: bool) -> np.ndarray:
|
||||
step = max(3, n // 32)
|
||||
node_idx_list = []
|
||||
for start in range(0, n, step):
|
||||
stop = min(n, start + step)
|
||||
segment = values[start:stop]
|
||||
idx_rel = int(np.argmax(segment) if use_max else np.argmin(segment))
|
||||
node_idx_list.append(start + idx_rel)
|
||||
|
||||
extrema_idx = np.unique(np.asarray(node_idx_list, dtype=np.int64))
|
||||
if extrema_idx.size == 0:
|
||||
extrema_idx = np.asarray([int(np.argmax(values) if use_max else np.argmin(values))], dtype=np.int64)
|
||||
|
||||
node_idx = np.unique(np.concatenate(([0], extrema_idx, [n - 1]))).astype(np.int64)
|
||||
node_vals = values[node_idx].astype(np.float32, copy=True)
|
||||
node_vals[0] = float(values[extrema_idx[0]])
|
||||
node_vals[-1] = float(values[extrema_idx[-1]])
|
||||
node_vals = _moving_average(node_vals, 3)
|
||||
node_vals[0] = float(values[extrema_idx[0]])
|
||||
node_vals[-1] = float(values[extrema_idx[-1]])
|
||||
|
||||
envelope = np.interp(x, node_idx.astype(np.float32), node_vals).astype(np.float32)
|
||||
smooth_window = max(1, n // 64)
|
||||
if smooth_window > 1:
|
||||
envelope = _moving_average(envelope, smooth_window)
|
||||
return envelope
|
||||
|
||||
upper = _smooth_extrema_envelope(use_max=True)
|
||||
lower = _smooth_extrema_envelope(use_max=False)
|
||||
|
||||
swap = lower > upper
|
||||
if np.any(swap):
|
||||
|
||||
Reference in New Issue
Block a user