new
This commit is contained in:
209
rfg_adc_plotter/processing/peaks.py
Normal file
209
rfg_adc_plotter/processing/peaks.py
Normal file
@ -0,0 +1,209 @@
|
||||
"""Peak-search helpers for FFT visualizations."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Dict, List, Optional
|
||||
|
||||
import numpy as np
|
||||
|
||||
|
||||
def find_peak_width_markers(xs: np.ndarray, ys: np.ndarray) -> Optional[Dict[str, float]]:
|
||||
"""Find the dominant non-zero peak and its half-height width."""
|
||||
x_arr = np.asarray(xs, dtype=np.float64)
|
||||
y_arr = np.asarray(ys, dtype=np.float64)
|
||||
valid = np.isfinite(x_arr) & np.isfinite(y_arr) & (x_arr > 0.0)
|
||||
if int(np.count_nonzero(valid)) < 3:
|
||||
return None
|
||||
|
||||
x = x_arr[valid]
|
||||
y = y_arr[valid]
|
||||
x_min = float(x[0])
|
||||
x_max = float(x[-1])
|
||||
x_span = x_max - x_min
|
||||
central_mask = (x >= (x_min + 0.25 * x_span)) & (x <= (x_min + 0.75 * x_span))
|
||||
if int(np.count_nonzero(central_mask)) > 0:
|
||||
central_idx = np.flatnonzero(central_mask)
|
||||
peak_idx = int(central_idx[int(np.argmax(y[central_mask]))])
|
||||
else:
|
||||
peak_idx = int(np.argmax(y))
|
||||
peak_y = float(y[peak_idx])
|
||||
shoulder_gap = max(1, min(8, y.size // 64 if y.size > 0 else 1))
|
||||
shoulder_width = max(4, min(32, y.size // 16 if y.size > 0 else 4))
|
||||
left_lo = max(0, peak_idx - shoulder_gap - shoulder_width)
|
||||
left_hi = max(0, peak_idx - shoulder_gap)
|
||||
right_lo = min(y.size, peak_idx + shoulder_gap + 1)
|
||||
right_hi = min(y.size, right_lo + shoulder_width)
|
||||
background_parts = []
|
||||
if left_hi > left_lo:
|
||||
background_parts.append(float(np.nanmedian(y[left_lo:left_hi])))
|
||||
if right_hi > right_lo:
|
||||
background_parts.append(float(np.nanmedian(y[right_lo:right_hi])))
|
||||
if background_parts:
|
||||
background = float(np.mean(background_parts))
|
||||
else:
|
||||
background = float(np.nanpercentile(y, 10))
|
||||
if not np.isfinite(peak_y) or not np.isfinite(background) or peak_y <= background:
|
||||
return None
|
||||
|
||||
half_level = background + 0.5 * (peak_y - background)
|
||||
|
||||
def _interp_cross(x0: float, y0: float, x1: float, y1: float) -> float:
|
||||
if not (np.isfinite(x0) and np.isfinite(y0) and np.isfinite(x1) and np.isfinite(y1)):
|
||||
return x1
|
||||
dy = y1 - y0
|
||||
if dy == 0.0:
|
||||
return x1
|
||||
t = (half_level - y0) / dy
|
||||
t = min(1.0, max(0.0, t))
|
||||
return x0 + t * (x1 - x0)
|
||||
|
||||
left_x = float(x[0])
|
||||
for i in range(peak_idx, 0, -1):
|
||||
if y[i - 1] <= half_level <= y[i]:
|
||||
left_x = _interp_cross(float(x[i - 1]), float(y[i - 1]), float(x[i]), float(y[i]))
|
||||
break
|
||||
|
||||
right_x = float(x[-1])
|
||||
for i in range(peak_idx, x.size - 1):
|
||||
if y[i] >= half_level >= y[i + 1]:
|
||||
right_x = _interp_cross(float(x[i]), float(y[i]), float(x[i + 1]), float(y[i + 1]))
|
||||
break
|
||||
|
||||
width = right_x - left_x
|
||||
if not np.isfinite(width) or width <= 0.0:
|
||||
return None
|
||||
|
||||
return {
|
||||
"background": background,
|
||||
"left": left_x,
|
||||
"right": right_x,
|
||||
"width": width,
|
||||
"amplitude": peak_y,
|
||||
}
|
||||
|
||||
|
||||
def rolling_median_ref(xs: np.ndarray, ys: np.ndarray, window_ghz: float) -> np.ndarray:
|
||||
"""Compute a rolling median reference on a fixed-width X window."""
|
||||
x = np.asarray(xs, dtype=np.float64)
|
||||
y = np.asarray(ys, dtype=np.float64)
|
||||
out = np.full(y.shape, np.nan, dtype=np.float64)
|
||||
if x.size == 0 or y.size == 0 or x.size != y.size:
|
||||
return out
|
||||
width = float(window_ghz)
|
||||
if not np.isfinite(width) or width <= 0.0:
|
||||
return out
|
||||
half = 0.5 * width
|
||||
for i in range(x.size):
|
||||
xi = x[i]
|
||||
if not np.isfinite(xi):
|
||||
continue
|
||||
left = np.searchsorted(x, xi - half, side="left")
|
||||
right = np.searchsorted(x, xi + half, side="right")
|
||||
if right <= left:
|
||||
continue
|
||||
segment = y[left:right]
|
||||
finite = np.isfinite(segment)
|
||||
if not np.any(finite):
|
||||
continue
|
||||
out[i] = float(np.nanmedian(segment))
|
||||
return out
|
||||
|
||||
|
||||
def find_top_peaks_over_ref(
|
||||
xs: np.ndarray,
|
||||
ys: np.ndarray,
|
||||
ref: np.ndarray,
|
||||
top_n: int = 3,
|
||||
) -> List[Dict[str, float]]:
|
||||
"""Find the top-N non-overlapping peaks above a reference curve."""
|
||||
x = np.asarray(xs, dtype=np.float64)
|
||||
y = np.asarray(ys, dtype=np.float64)
|
||||
r = np.asarray(ref, dtype=np.float64)
|
||||
if x.size < 3 or y.size != x.size or r.size != x.size:
|
||||
return []
|
||||
|
||||
valid = np.isfinite(x) & np.isfinite(y) & np.isfinite(r)
|
||||
if not np.any(valid):
|
||||
return []
|
||||
delta = np.full_like(y, np.nan, dtype=np.float64)
|
||||
delta[valid] = y[valid] - r[valid]
|
||||
|
||||
candidates: List[int] = []
|
||||
for i in range(1, x.size - 1):
|
||||
if not (np.isfinite(delta[i - 1]) and np.isfinite(delta[i]) and np.isfinite(delta[i + 1])):
|
||||
continue
|
||||
if delta[i] <= 0.0:
|
||||
continue
|
||||
left_ok = delta[i] > delta[i - 1]
|
||||
right_ok = delta[i] >= delta[i + 1]
|
||||
alt_left_ok = delta[i] >= delta[i - 1]
|
||||
alt_right_ok = delta[i] > delta[i + 1]
|
||||
if (left_ok and right_ok) or (alt_left_ok and alt_right_ok):
|
||||
candidates.append(i)
|
||||
if not candidates:
|
||||
return []
|
||||
|
||||
candidates.sort(key=lambda i: float(delta[i]), reverse=True)
|
||||
|
||||
def _interp_cross(x0: float, y0: float, x1: float, y1: float, y_cross: float) -> float:
|
||||
dy = y1 - y0
|
||||
if not np.isfinite(dy) or dy == 0.0:
|
||||
return x1
|
||||
t = (y_cross - y0) / dy
|
||||
t = min(1.0, max(0.0, t))
|
||||
return x0 + t * (x1 - x0)
|
||||
|
||||
picked: List[Dict[str, float]] = []
|
||||
for idx in candidates:
|
||||
peak_y = float(y[idx])
|
||||
peak_ref = float(r[idx])
|
||||
peak_h = float(delta[idx])
|
||||
if not (np.isfinite(peak_y) and np.isfinite(peak_ref) and np.isfinite(peak_h)) or peak_h <= 0.0:
|
||||
continue
|
||||
|
||||
half_level = peak_ref + 0.5 * peak_h
|
||||
|
||||
left_x = float(x[0])
|
||||
for i in range(idx, 0, -1):
|
||||
y0 = float(y[i - 1])
|
||||
y1 = float(y[i])
|
||||
if np.isfinite(y0) and np.isfinite(y1) and (y0 <= half_level <= y1):
|
||||
left_x = _interp_cross(float(x[i - 1]), y0, float(x[i]), y1, half_level)
|
||||
break
|
||||
|
||||
right_x = float(x[-1])
|
||||
for i in range(idx, x.size - 1):
|
||||
y0 = float(y[i])
|
||||
y1 = float(y[i + 1])
|
||||
if np.isfinite(y0) and np.isfinite(y1) and (y0 >= half_level >= y1):
|
||||
right_x = _interp_cross(float(x[i]), y0, float(x[i + 1]), y1, half_level)
|
||||
break
|
||||
|
||||
width = float(right_x - left_x)
|
||||
if not np.isfinite(width) or width <= 0.0:
|
||||
continue
|
||||
|
||||
overlap = False
|
||||
for peak in picked:
|
||||
if not (right_x <= peak["left"] or left_x >= peak["right"]):
|
||||
overlap = True
|
||||
break
|
||||
if overlap:
|
||||
continue
|
||||
|
||||
picked.append(
|
||||
{
|
||||
"x": float(x[idx]),
|
||||
"peak_y": peak_y,
|
||||
"ref": peak_ref,
|
||||
"height": peak_h,
|
||||
"left": left_x,
|
||||
"right": right_x,
|
||||
"width": width,
|
||||
}
|
||||
)
|
||||
if len(picked) >= int(max(1, top_n)):
|
||||
break
|
||||
|
||||
picked.sort(key=lambda peak: peak["x"])
|
||||
return picked
|
||||
Reference in New Issue
Block a user