implemented new normalisator mode: projector. It takes upper and lower evenlopes of ref signal and projects raw data from evenlopes scope to +-1000
This commit is contained in:
@ -85,6 +85,116 @@ def _parse_spec_clip(spec: Optional[str]) -> Optional[Tuple[float, float]]:
|
||||
return None
|
||||
|
||||
|
||||
def _normalize_sweep_simple(raw: np.ndarray, calib: np.ndarray) -> np.ndarray:
|
||||
"""Простая нормировка: поэлементное деление raw/calib."""
|
||||
w = min(raw.size, calib.size)
|
||||
if w <= 0:
|
||||
return raw
|
||||
out = np.full_like(raw, np.nan, dtype=np.float32)
|
||||
with np.errstate(divide="ignore", invalid="ignore"):
|
||||
out[:w] = raw[:w] / calib[:w]
|
||||
out = np.nan_to_num(out, nan=np.nan, posinf=np.nan, neginf=np.nan)
|
||||
return out
|
||||
|
||||
|
||||
def _build_calib_envelopes(calib: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
|
||||
"""Оценить нижнюю/верхнюю огибающие калибровочной кривой."""
|
||||
n = int(calib.size)
|
||||
if n <= 0:
|
||||
empty = np.zeros((0,), dtype=np.float32)
|
||||
return empty, empty
|
||||
|
||||
y = np.asarray(calib, dtype=np.float32)
|
||||
finite = np.isfinite(y)
|
||||
if not np.any(finite):
|
||||
zeros = np.zeros_like(y, dtype=np.float32)
|
||||
return zeros, zeros
|
||||
|
||||
if not np.all(finite):
|
||||
x = np.arange(n, dtype=np.float32)
|
||||
y = y.copy()
|
||||
y[~finite] = np.interp(x[~finite], x[finite], y[finite]).astype(np.float32)
|
||||
|
||||
if n < 3:
|
||||
return y.copy(), y.copy()
|
||||
|
||||
dy = np.diff(y)
|
||||
s = np.sign(dy).astype(np.int8, copy=False)
|
||||
|
||||
if np.any(s == 0):
|
||||
for i in range(1, s.size):
|
||||
if s[i] == 0:
|
||||
s[i] = s[i - 1]
|
||||
for i in range(s.size - 2, -1, -1):
|
||||
if s[i] == 0:
|
||||
s[i] = s[i + 1]
|
||||
s[s == 0] = 1
|
||||
|
||||
max_idx = np.where((s[:-1] > 0) & (s[1:] < 0))[0] + 1
|
||||
min_idx = np.where((s[:-1] < 0) & (s[1:] > 0))[0] + 1
|
||||
|
||||
x = np.arange(n, dtype=np.float32)
|
||||
|
||||
def _interp_nodes(nodes: np.ndarray) -> np.ndarray:
|
||||
if nodes.size == 0:
|
||||
idx = np.array([0, n - 1], dtype=np.int64)
|
||||
else:
|
||||
idx = np.unique(np.concatenate(([0], nodes, [n - 1]))).astype(np.int64)
|
||||
return np.interp(x, idx.astype(np.float32), y[idx]).astype(np.float32)
|
||||
|
||||
upper = _interp_nodes(max_idx)
|
||||
lower = _interp_nodes(min_idx)
|
||||
|
||||
swap = lower > upper
|
||||
if np.any(swap):
|
||||
tmp = upper[swap].copy()
|
||||
upper[swap] = lower[swap]
|
||||
lower[swap] = tmp
|
||||
|
||||
return lower, upper
|
||||
|
||||
|
||||
def _normalize_sweep_projector(raw: np.ndarray, calib: np.ndarray) -> np.ndarray:
|
||||
"""Нормировка через проекцию между огибающими калибровки в диапазон [-1, +1]."""
|
||||
w = min(raw.size, calib.size)
|
||||
if w <= 0:
|
||||
return raw
|
||||
|
||||
out = np.full_like(raw, np.nan, dtype=np.float32)
|
||||
raw_seg = np.asarray(raw[:w], dtype=np.float32)
|
||||
lower, upper = _build_calib_envelopes(np.asarray(calib[:w], dtype=np.float32))
|
||||
span = upper - lower
|
||||
|
||||
finite_span = span[np.isfinite(span) & (span > 0)]
|
||||
if finite_span.size > 0:
|
||||
eps = max(float(np.median(finite_span)) * 1e-6, 1e-9)
|
||||
else:
|
||||
eps = 1e-9
|
||||
|
||||
valid = (
|
||||
np.isfinite(raw_seg)
|
||||
& np.isfinite(lower)
|
||||
& np.isfinite(upper)
|
||||
& (span > eps)
|
||||
)
|
||||
if np.any(valid):
|
||||
proj = np.empty_like(raw_seg, dtype=np.float32)
|
||||
proj[valid] = ((2.0 * (raw_seg[valid] - lower[valid]) / span[valid]) - 1.0) * 1000.0
|
||||
proj[valid] = np.clip(proj[valid], -1000.0, 1000.0)
|
||||
proj[~valid] = np.nan
|
||||
out[:w] = proj
|
||||
|
||||
return out
|
||||
|
||||
|
||||
def _normalize_by_calib(raw: np.ndarray, calib: np.ndarray, norm_type: str) -> np.ndarray:
|
||||
"""Нормировка свипа по выбранному алгоритму."""
|
||||
nt = str(norm_type).strip().lower()
|
||||
if nt == "simple":
|
||||
return _normalize_sweep_simple(raw, calib)
|
||||
return _normalize_sweep_projector(raw, calib)
|
||||
|
||||
|
||||
def try_open_pyserial(path: str, baud: int, timeout: float):
|
||||
try:
|
||||
import serial # type: ignore
|
||||
@ -532,6 +642,12 @@ def main():
|
||||
default="auto",
|
||||
help="Графический бэкенд: pyqtgraph (pg) — быстрее; matplotlib (mpl) — совместимый. По умолчанию auto",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--norm-type",
|
||||
choices=["projector", "simple"],
|
||||
default="projector",
|
||||
help="Тип нормировки: projector (по огибающим в [-1,+1]) или simple (raw/calib)",
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
@ -592,6 +708,7 @@ def main():
|
||||
ymax_slider = None
|
||||
contrast_slider = None
|
||||
calib_enabled = False
|
||||
norm_type = str(getattr(args, "norm_type", "projector")).strip().lower()
|
||||
cb = None
|
||||
|
||||
# Статусная строка (внизу окна)
|
||||
@ -674,15 +791,9 @@ def main():
|
||||
ax_spec.tick_params(axis="x", labelbottom=False)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _normalize_sweep(raw: np.ndarray, calib: np.ndarray) -> np.ndarray:
|
||||
w = min(raw.size, calib.size)
|
||||
if w <= 0:
|
||||
return raw
|
||||
out = np.full_like(raw, np.nan, dtype=np.float32)
|
||||
with np.errstate(divide="ignore", invalid="ignore"):
|
||||
out[:w] = raw[:w] / calib[:w]
|
||||
out = np.nan_to_num(out, nan=np.nan, posinf=np.nan, neginf=np.nan)
|
||||
return out
|
||||
return _normalize_by_calib(raw, calib, norm_type=norm_type)
|
||||
|
||||
def _set_calib_enabled():
|
||||
nonlocal calib_enabled, current_sweep_norm
|
||||
@ -1146,6 +1257,7 @@ def run_pyqtgraph(args):
|
||||
spec_clip = _parse_spec_clip(getattr(args, "spec_clip", None))
|
||||
spec_mean_sec = float(getattr(args, "spec_mean_sec", 0.0))
|
||||
calib_enabled = False
|
||||
norm_type = str(getattr(args, "norm_type", "projector")).strip().lower()
|
||||
# Диапазон по Y: авто по умолчанию (поддерживает отрицательные значения)
|
||||
fixed_ylim: Optional[Tuple[float, float]] = None
|
||||
if args.ylim:
|
||||
@ -1158,14 +1270,7 @@ def run_pyqtgraph(args):
|
||||
p_line.setYRange(fixed_ylim[0], fixed_ylim[1], padding=0)
|
||||
|
||||
def _normalize_sweep(raw: np.ndarray, calib: np.ndarray) -> np.ndarray:
|
||||
w = min(raw.size, calib.size)
|
||||
if w <= 0:
|
||||
return raw
|
||||
out = np.full_like(raw, np.nan, dtype=np.float32)
|
||||
with np.errstate(divide="ignore", invalid="ignore"):
|
||||
out[:w] = raw[:w] / calib[:w]
|
||||
out = np.nan_to_num(out, nan=np.nan, posinf=np.nan, neginf=np.nan)
|
||||
return out
|
||||
return _normalize_by_calib(raw, calib, norm_type=norm_type)
|
||||
|
||||
def _set_calib_enabled():
|
||||
nonlocal calib_enabled, current_sweep_norm
|
||||
|
||||
Reference in New Issue
Block a user