210 lines
7.0 KiB
Python
210 lines
7.0 KiB
Python
"""Peak-search helpers for FFT visualizations."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from typing import Dict, List, Optional
|
|
|
|
import numpy as np
|
|
|
|
|
|
def find_peak_width_markers(xs: np.ndarray, ys: np.ndarray) -> Optional[Dict[str, float]]:
|
|
"""Find the dominant non-zero peak and its half-height width."""
|
|
x_arr = np.asarray(xs, dtype=np.float64)
|
|
y_arr = np.asarray(ys, dtype=np.float64)
|
|
valid = np.isfinite(x_arr) & np.isfinite(y_arr) & (x_arr > 0.0)
|
|
if int(np.count_nonzero(valid)) < 3:
|
|
return None
|
|
|
|
x = x_arr[valid]
|
|
y = y_arr[valid]
|
|
x_min = float(x[0])
|
|
x_max = float(x[-1])
|
|
x_span = x_max - x_min
|
|
central_mask = (x >= (x_min + 0.25 * x_span)) & (x <= (x_min + 0.75 * x_span))
|
|
if int(np.count_nonzero(central_mask)) > 0:
|
|
central_idx = np.flatnonzero(central_mask)
|
|
peak_idx = int(central_idx[int(np.argmax(y[central_mask]))])
|
|
else:
|
|
peak_idx = int(np.argmax(y))
|
|
peak_y = float(y[peak_idx])
|
|
shoulder_gap = max(1, min(8, y.size // 64 if y.size > 0 else 1))
|
|
shoulder_width = max(4, min(32, y.size // 16 if y.size > 0 else 4))
|
|
left_lo = max(0, peak_idx - shoulder_gap - shoulder_width)
|
|
left_hi = max(0, peak_idx - shoulder_gap)
|
|
right_lo = min(y.size, peak_idx + shoulder_gap + 1)
|
|
right_hi = min(y.size, right_lo + shoulder_width)
|
|
background_parts = []
|
|
if left_hi > left_lo:
|
|
background_parts.append(float(np.nanmedian(y[left_lo:left_hi])))
|
|
if right_hi > right_lo:
|
|
background_parts.append(float(np.nanmedian(y[right_lo:right_hi])))
|
|
if background_parts:
|
|
background = float(np.mean(background_parts))
|
|
else:
|
|
background = float(np.nanpercentile(y, 10))
|
|
if not np.isfinite(peak_y) or not np.isfinite(background) or peak_y <= background:
|
|
return None
|
|
|
|
half_level = background + 0.5 * (peak_y - background)
|
|
|
|
def _interp_cross(x0: float, y0: float, x1: float, y1: float) -> float:
|
|
if not (np.isfinite(x0) and np.isfinite(y0) and np.isfinite(x1) and np.isfinite(y1)):
|
|
return x1
|
|
dy = y1 - y0
|
|
if dy == 0.0:
|
|
return x1
|
|
t = (half_level - y0) / dy
|
|
t = min(1.0, max(0.0, t))
|
|
return x0 + t * (x1 - x0)
|
|
|
|
left_x = float(x[0])
|
|
for i in range(peak_idx, 0, -1):
|
|
if y[i - 1] <= half_level <= y[i]:
|
|
left_x = _interp_cross(float(x[i - 1]), float(y[i - 1]), float(x[i]), float(y[i]))
|
|
break
|
|
|
|
right_x = float(x[-1])
|
|
for i in range(peak_idx, x.size - 1):
|
|
if y[i] >= half_level >= y[i + 1]:
|
|
right_x = _interp_cross(float(x[i]), float(y[i]), float(x[i + 1]), float(y[i + 1]))
|
|
break
|
|
|
|
width = right_x - left_x
|
|
if not np.isfinite(width) or width <= 0.0:
|
|
return None
|
|
|
|
return {
|
|
"background": background,
|
|
"left": left_x,
|
|
"right": right_x,
|
|
"width": width,
|
|
"amplitude": peak_y,
|
|
}
|
|
|
|
|
|
def rolling_median_ref(xs: np.ndarray, ys: np.ndarray, window_ghz: float) -> np.ndarray:
|
|
"""Compute a rolling median reference on a fixed-width X window."""
|
|
x = np.asarray(xs, dtype=np.float64)
|
|
y = np.asarray(ys, dtype=np.float64)
|
|
out = np.full(y.shape, np.nan, dtype=np.float64)
|
|
if x.size == 0 or y.size == 0 or x.size != y.size:
|
|
return out
|
|
width = float(window_ghz)
|
|
if not np.isfinite(width) or width <= 0.0:
|
|
return out
|
|
half = 0.5 * width
|
|
for i in range(x.size):
|
|
xi = x[i]
|
|
if not np.isfinite(xi):
|
|
continue
|
|
left = np.searchsorted(x, xi - half, side="left")
|
|
right = np.searchsorted(x, xi + half, side="right")
|
|
if right <= left:
|
|
continue
|
|
segment = y[left:right]
|
|
finite = np.isfinite(segment)
|
|
if not np.any(finite):
|
|
continue
|
|
out[i] = float(np.nanmedian(segment))
|
|
return out
|
|
|
|
|
|
def find_top_peaks_over_ref(
|
|
xs: np.ndarray,
|
|
ys: np.ndarray,
|
|
ref: np.ndarray,
|
|
top_n: int = 3,
|
|
) -> List[Dict[str, float]]:
|
|
"""Find the top-N non-overlapping peaks above a reference curve."""
|
|
x = np.asarray(xs, dtype=np.float64)
|
|
y = np.asarray(ys, dtype=np.float64)
|
|
r = np.asarray(ref, dtype=np.float64)
|
|
if x.size < 3 or y.size != x.size or r.size != x.size:
|
|
return []
|
|
|
|
valid = np.isfinite(x) & np.isfinite(y) & np.isfinite(r)
|
|
if not np.any(valid):
|
|
return []
|
|
delta = np.full_like(y, np.nan, dtype=np.float64)
|
|
delta[valid] = y[valid] - r[valid]
|
|
|
|
candidates: List[int] = []
|
|
for i in range(1, x.size - 1):
|
|
if not (np.isfinite(delta[i - 1]) and np.isfinite(delta[i]) and np.isfinite(delta[i + 1])):
|
|
continue
|
|
if delta[i] <= 0.0:
|
|
continue
|
|
left_ok = delta[i] > delta[i - 1]
|
|
right_ok = delta[i] >= delta[i + 1]
|
|
alt_left_ok = delta[i] >= delta[i - 1]
|
|
alt_right_ok = delta[i] > delta[i + 1]
|
|
if (left_ok and right_ok) or (alt_left_ok and alt_right_ok):
|
|
candidates.append(i)
|
|
if not candidates:
|
|
return []
|
|
|
|
candidates.sort(key=lambda i: float(delta[i]), reverse=True)
|
|
|
|
def _interp_cross(x0: float, y0: float, x1: float, y1: float, y_cross: float) -> float:
|
|
dy = y1 - y0
|
|
if not np.isfinite(dy) or dy == 0.0:
|
|
return x1
|
|
t = (y_cross - y0) / dy
|
|
t = min(1.0, max(0.0, t))
|
|
return x0 + t * (x1 - x0)
|
|
|
|
picked: List[Dict[str, float]] = []
|
|
for idx in candidates:
|
|
peak_y = float(y[idx])
|
|
peak_ref = float(r[idx])
|
|
peak_h = float(delta[idx])
|
|
if not (np.isfinite(peak_y) and np.isfinite(peak_ref) and np.isfinite(peak_h)) or peak_h <= 0.0:
|
|
continue
|
|
|
|
half_level = peak_ref + 0.5 * peak_h
|
|
|
|
left_x = float(x[0])
|
|
for i in range(idx, 0, -1):
|
|
y0 = float(y[i - 1])
|
|
y1 = float(y[i])
|
|
if np.isfinite(y0) and np.isfinite(y1) and (y0 <= half_level <= y1):
|
|
left_x = _interp_cross(float(x[i - 1]), y0, float(x[i]), y1, half_level)
|
|
break
|
|
|
|
right_x = float(x[-1])
|
|
for i in range(idx, x.size - 1):
|
|
y0 = float(y[i])
|
|
y1 = float(y[i + 1])
|
|
if np.isfinite(y0) and np.isfinite(y1) and (y0 >= half_level >= y1):
|
|
right_x = _interp_cross(float(x[i]), y0, float(x[i + 1]), y1, half_level)
|
|
break
|
|
|
|
width = float(right_x - left_x)
|
|
if not np.isfinite(width) or width <= 0.0:
|
|
continue
|
|
|
|
overlap = False
|
|
for peak in picked:
|
|
if not (right_x <= peak["left"] or left_x >= peak["right"]):
|
|
overlap = True
|
|
break
|
|
if overlap:
|
|
continue
|
|
|
|
picked.append(
|
|
{
|
|
"x": float(x[idx]),
|
|
"peak_y": peak_y,
|
|
"ref": peak_ref,
|
|
"height": peak_h,
|
|
"left": left_x,
|
|
"right": right_x,
|
|
"width": width,
|
|
}
|
|
)
|
|
if len(picked) >= int(max(1, top_n)):
|
|
break
|
|
|
|
picked.sort(key=lambda peak: peak["x"])
|
|
return picked
|