diff --git a/rfg_adc_plotter/processing/normalization.py b/rfg_adc_plotter/processing/normalization.py index 688fee2..6066541 100644 --- a/rfg_adc_plotter/processing/normalization.py +++ b/rfg_adc_plotter/processing/normalization.py @@ -20,7 +20,7 @@ def normalize_sweep_simple(raw: np.ndarray, calib: np.ndarray) -> np.ndarray: def build_calib_envelopes(calib: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: - """Estimate lower and upper envelopes of a calibration curve.""" + """Estimate smooth lower/upper envelopes from local extrema.""" n = int(calib.size) if n <= 0: empty = np.zeros((0,), dtype=np.float32) @@ -40,32 +40,48 @@ def build_calib_envelopes(calib: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: if n < 3: return values.copy(), values.copy() - dy = np.diff(values) - signs = np.sign(dy).astype(np.int8, copy=False) - - if np.any(signs == 0): - for i in range(1, signs.size): - if signs[i] == 0: - signs[i] = signs[i - 1] - for i in range(signs.size - 2, -1, -1): - if signs[i] == 0: - signs[i] = signs[i + 1] - signs[signs == 0] = 1 - - max_idx = np.where((signs[:-1] > 0) & (signs[1:] < 0))[0] + 1 - min_idx = np.where((signs[:-1] < 0) & (signs[1:] > 0))[0] + 1 - x = np.arange(n, dtype=np.float32) - def _interp_nodes(nodes: np.ndarray) -> np.ndarray: - if nodes.size == 0: - idx = np.array([0, n - 1], dtype=np.int64) - else: - idx = np.unique(np.concatenate(([0], nodes, [n - 1]))).astype(np.int64) - return np.interp(x, idx.astype(np.float32), values[idx]).astype(np.float32) + def _moving_average(series: np.ndarray, window: int) -> np.ndarray: + width = max(1, int(window)) + if width <= 1 or series.size <= 2: + return np.asarray(series, dtype=np.float32).copy() + if width % 2 == 0: + width += 1 + pad = width // 2 + padded = np.pad(np.asarray(series, dtype=np.float32), (pad, pad), mode="edge") + kernel = np.full((width,), 1.0 / float(width), dtype=np.float32) + return np.convolve(padded, kernel, mode="valid").astype(np.float32) - upper = _interp_nodes(max_idx) - lower = _interp_nodes(min_idx) + def _smooth_extrema_envelope(use_max: bool) -> np.ndarray: + step = max(3, n // 32) + node_idx_list = [] + for start in range(0, n, step): + stop = min(n, start + step) + segment = values[start:stop] + idx_rel = int(np.argmax(segment) if use_max else np.argmin(segment)) + node_idx_list.append(start + idx_rel) + + extrema_idx = np.unique(np.asarray(node_idx_list, dtype=np.int64)) + if extrema_idx.size == 0: + extrema_idx = np.asarray([int(np.argmax(values) if use_max else np.argmin(values))], dtype=np.int64) + + node_idx = np.unique(np.concatenate(([0], extrema_idx, [n - 1]))).astype(np.int64) + node_vals = values[node_idx].astype(np.float32, copy=True) + node_vals[0] = float(values[extrema_idx[0]]) + node_vals[-1] = float(values[extrema_idx[-1]]) + node_vals = _moving_average(node_vals, 3) + node_vals[0] = float(values[extrema_idx[0]]) + node_vals[-1] = float(values[extrema_idx[-1]]) + + envelope = np.interp(x, node_idx.astype(np.float32), node_vals).astype(np.float32) + smooth_window = max(1, n // 64) + if smooth_window > 1: + envelope = _moving_average(envelope, smooth_window) + return envelope + + upper = _smooth_extrema_envelope(use_max=True) + lower = _smooth_extrema_envelope(use_max=False) swap = lower > upper if np.any(swap): diff --git a/tests/test_processing.py b/tests/test_processing.py index d45f4f8..dab5d61 100644 --- a/tests/test_processing.py +++ b/tests/test_processing.py @@ -44,6 +44,11 @@ class ProcessingTests(unittest.TestCase): self.assertEqual(lower.shape, calib.shape) self.assertEqual(upper.shape, calib.shape) self.assertTrue(np.all(lower <= upper)) + self.assertTrue(np.all(np.isfinite(upper))) + self.assertLess( + float(np.mean(np.abs(np.diff(upper, n=2)))), + float(np.mean(np.abs(np.diff(calib, n=2)))), + ) simple = normalize_by_calib(raw, calib + 10.0, norm_type="simple") projector = normalize_by_calib(raw, calib, norm_type="projector")