"""Явный pipeline предобработки свипов перед помещением в RingBuffer.""" from __future__ import annotations from dataclasses import dataclass import os from typing import Optional, Tuple import numpy as np from rfg_adc_plotter.io.capture_reference_loader import ( CaptureParseSummary, aggregate_capture_reference, detect_reference_file_format, load_capture_sweeps, ) from rfg_adc_plotter.processing.normalizer import ( build_calib_envelopes, normalize_by_calib, normalize_by_envelope, ) DEFAULT_CALIB_ENVELOPE_PATH = "calib_envelope.npy" DEFAULT_BACKGROUND_PATH = "background.npy" def _normalize_path(path: str) -> str: return str(path).strip() def _normalize_save_npy_path(path: str) -> str: p = _normalize_path(path) if not p: return p _root, ext = os.path.splitext(p) if ext: return p return f"{p}.npy" def _summary_for_npy(path: str) -> CaptureParseSummary: return CaptureParseSummary( path=path, format="npy", sweeps_total=0, sweeps_valid=0, channels_seen=tuple(), dominant_width=None, dominant_n_valid=None, aggregation="median", warnings=tuple(), ) @dataclass(frozen=True) class SweepProcessingResult: """Результат предобработки одного свипа.""" processed_sweep: np.ndarray normalized_sweep: Optional[np.ndarray] calibration_applied: bool background_applied: bool calibration_source: str # off|live|npy|capture background_source: str # off|npy|capture(raw)|capture(raw->calib) is_calibration_reference: bool stage_trace: Tuple[str, ...] class SweepPreprocessor: """Управляет калибровкой/фоном и применяет их к входному свипу.""" def __init__( self, norm_type: str = "projector", calib_envelope_path: str = DEFAULT_CALIB_ENVELOPE_PATH, background_path: str = DEFAULT_BACKGROUND_PATH, auto_save_live_calib_envelope: bool = True, ): self.norm_type = str(norm_type).strip().lower() or "projector" self.calib_enabled = False self.calib_mode = "live" # live | file self.background_enabled = False self.auto_save_live_calib_envelope = bool(auto_save_live_calib_envelope) self.calib_envelope_path = _normalize_path(calib_envelope_path) self.background_path = _normalize_path(background_path) self.last_calib_sweep: Optional[np.ndarray] = None self.calib_file_envelope: Optional[np.ndarray] = None # background — в текущем домене вычитания (raw или normalized), UI использует для preview/state self.background: Optional[np.ndarray] = None # raw background loaded from capture file; преобразуется на лету при активной калибровке self.background_raw_capture: Optional[np.ndarray] = None # Источники и метаданные загрузки self.calib_external_source_type: str = "none" # none|npy|capture self.background_source_type: str = "none" # none|npy_processed|capture_raw self.calib_reference_summary: Optional[CaptureParseSummary] = None self.background_reference_summary: Optional[CaptureParseSummary] = None self.last_reference_error: str = "" # Параметры офлайн-парсинга capture (должны совпадать с live parser по настройке UI) self.capture_fancy: bool = False self.capture_logscale: bool = False self.reference_aggregation_method: str = "median" # ---- Конфигурация ---- def set_calib_mode(self, mode: str): m = str(mode).strip().lower() self.calib_mode = "file" if m == "file" else "live" def set_calib_enabled(self, enabled: bool): self.calib_enabled = bool(enabled) def set_background_enabled(self, enabled: bool): self.background_enabled = bool(enabled) def set_capture_parse_options(self, *, fancy: Optional[bool] = None, logscale: Optional[bool] = None): if fancy is not None: self.capture_fancy = bool(fancy) if logscale is not None: self.capture_logscale = bool(logscale) def set_calib_envelope_path(self, path: str): p = _normalize_path(path) if p: if p != self.calib_envelope_path: self.calib_file_envelope = None if self.calib_external_source_type in ("npy", "capture"): self.calib_external_source_type = "none" self.calib_reference_summary = None self.calib_envelope_path = p def set_background_path(self, path: str): p = _normalize_path(path) if p: if p != self.background_path: self.background = None self.background_raw_capture = None self.background_source_type = "none" self.background_reference_summary = None self.background_path = p def has_calib_envelope_file(self) -> bool: return bool(self.calib_envelope_path) and os.path.isfile(self.calib_envelope_path) def has_background_file(self) -> bool: return bool(self.background_path) and os.path.isfile(self.background_path) # ---- Загрузка/сохранение .npy ---- def _save_array(self, arr: np.ndarray, current_path: str, path: Optional[str]) -> str: target = _normalize_save_npy_path(path if path is not None else current_path) if not target: raise ValueError("Пустой путь сохранения") np.save(target, arr) return target def save_calib_envelope(self, path: Optional[str] = None) -> bool: """Сохранить огибающую из последнего live-калибровочного свипа (экспорт .npy).""" if self.last_calib_sweep is None: return False try: _lower, upper = build_calib_envelopes(self.last_calib_sweep) self.calib_envelope_path = self._save_array(upper, self.calib_envelope_path, path) self.last_reference_error = "" return True except Exception as exc: self.last_reference_error = f"save calib envelope failed: {exc}" return False def save_background(self, sweep_for_ring: Optional[np.ndarray], path: Optional[str] = None) -> bool: """Сохранить текущий свип (в текущем домене обработки) как .npy-фон.""" if sweep_for_ring is None: return False try: bg = np.asarray(sweep_for_ring, dtype=np.float32).copy() self.background_path = self._save_array(bg, self.background_path, path) self.background = bg self.background_raw_capture = None self.background_source_type = "npy_processed" self.background_reference_summary = _summary_for_npy(self.background_path) self.last_reference_error = "" return True except Exception as exc: self.last_reference_error = f"save background failed: {exc}" return False # ---- Загрузка эталонов (.npy или capture) ---- def _detect_source_kind(self, path: str, source_kind: str) -> Optional[str]: sk = str(source_kind).strip().lower() or "auto" if sk == "auto": return detect_reference_file_format(path) if sk in ("npy", "bin_capture", "capture"): return "bin_capture" if sk == "capture" else sk return None def _load_npy_vector(self, path: str) -> np.ndarray: arr = np.load(path) return np.asarray(arr, dtype=np.float32).reshape(-1) def load_calib_reference( self, path: Optional[str] = None, *, source_kind: str = "auto", method: str = "median", ) -> bool: """Загрузить калибровку из .npy (огибающая) или raw capture файла.""" if path is not None: self.set_calib_envelope_path(path) p = self.calib_envelope_path if not p or not os.path.isfile(p): self.last_reference_error = f"Файл калибровки не найден: {p}" return False fmt = self._detect_source_kind(p, source_kind) if fmt is None: self.last_reference_error = f"Неизвестный формат файла калибровки: {p}" return False try: if fmt == "npy": env = self._load_npy_vector(p) self.calib_file_envelope = env self.calib_external_source_type = "npy" self.calib_reference_summary = _summary_for_npy(p) self.last_reference_error = "" return True sweeps = load_capture_sweeps(p, fancy=self.capture_fancy, logscale=self.capture_logscale) vec, summary = aggregate_capture_reference( sweeps, channel=0, method=method or self.reference_aggregation_method, path=p, ) _lower, upper = build_calib_envelopes(vec) self.calib_file_envelope = np.asarray(upper, dtype=np.float32) self.calib_external_source_type = "capture" self.calib_reference_summary = summary self.last_reference_error = "" return True except Exception as exc: self.last_reference_error = f"Ошибка загрузки калибровки: {exc}" return False def load_background_reference( self, path: Optional[str] = None, *, source_kind: str = "auto", method: str = "median", ) -> bool: """Загрузить фон из .npy (готовый домен) или raw capture файла.""" if path is not None: self.set_background_path(path) p = self.background_path if not p or not os.path.isfile(p): self.last_reference_error = f"Файл фона не найден: {p}" return False fmt = self._detect_source_kind(p, source_kind) if fmt is None: self.last_reference_error = f"Неизвестный формат файла фона: {p}" return False try: if fmt == "npy": bg = self._load_npy_vector(p) self.background = bg self.background_raw_capture = None self.background_source_type = "npy_processed" self.background_reference_summary = _summary_for_npy(p) self.last_reference_error = "" return True sweeps = load_capture_sweeps(p, fancy=self.capture_fancy, logscale=self.capture_logscale) vec, summary = aggregate_capture_reference( sweeps, channel=0, method=method or self.reference_aggregation_method, path=p, ) self.background_raw_capture = np.asarray(vec, dtype=np.float32) # Для UI/preview текущий background отражает текущий домен (пока raw по умолчанию). self.background = self.background_raw_capture self.background_source_type = "capture_raw" self.background_reference_summary = summary self.last_reference_error = "" return True except Exception as exc: self.last_reference_error = f"Ошибка загрузки фона: {exc}" return False # Совместимые обертки для старого API (строго .npy) def load_calib_envelope(self, path: Optional[str] = None) -> bool: target = path if path is not None else self.calib_envelope_path return self.load_calib_reference(target, source_kind="npy") def load_background(self, path: Optional[str] = None) -> bool: target = path if path is not None else self.background_path return self.load_background_reference(target, source_kind="npy") # ---- Нормировка / фон ---- def _normalize_against_active_reference(self, raw: np.ndarray) -> Tuple[Optional[np.ndarray], bool, str]: if not self.calib_enabled: return None, False, "off" if self.calib_mode == "file": if self.calib_file_envelope is None: return None, False, "off" src = "capture" if self.calib_external_source_type == "capture" else "npy" return normalize_by_envelope(raw, self.calib_file_envelope), True, src if self.last_calib_sweep is None: return None, False, "off" return normalize_by_calib(raw, self.last_calib_sweep, self.norm_type), True, "live" def _transform_raw_background_for_current_domain(self, calib_applied: bool) -> Optional[np.ndarray]: if self.background_raw_capture is None: return None if not calib_applied: return self.background_raw_capture # Порядок pipeline фиксирован: raw -> calibration -> background -> IFFT. # Поэтому raw-фон из capture нужно привести в тот же домен, что и текущий sweep_for_ring. if self.calib_mode == "file" and self.calib_file_envelope is not None: return normalize_by_envelope(self.background_raw_capture, self.calib_file_envelope) if self.calib_mode == "live" and self.last_calib_sweep is not None: return normalize_by_calib(self.background_raw_capture, self.last_calib_sweep, self.norm_type) return None def _effective_background(self, calib_applied: bool) -> Tuple[Optional[np.ndarray], str]: if self.background_source_type == "capture_raw": bg = self._transform_raw_background_for_current_domain(calib_applied) if bg is None: return None, "capture(raw->calib:missing-calib)" self.background = np.asarray(bg, dtype=np.float32) return self.background, ("capture(raw->calib)" if calib_applied else "capture(raw)") if self.background_source_type == "npy_processed" and self.background is not None: return self.background, "npy" if self.background is not None: return self.background, "unknown" return None, "off" def _subtract_background(self, sweep: np.ndarray, calib_applied: bool) -> Tuple[np.ndarray, bool, str]: if not self.background_enabled: return sweep, False, "off" bg, bg_src = self._effective_background(calib_applied) if bg is None: return sweep, False, f"{bg_src}:missing" out = np.asarray(sweep, dtype=np.float32).copy() w = min(out.size, bg.size) if w > 0: out[:w] -= bg[:w] return out, True, bg_src def process(self, sweep: np.ndarray, channel: int, update_references: bool = True) -> SweepProcessingResult: """Применить к свипу калибровку/фон и вернуть явные этапы обработки.""" raw = np.asarray(sweep, dtype=np.float32) ch = int(channel) if ch == 0: if update_references: self.last_calib_sweep = raw if self.auto_save_live_calib_envelope: self.save_calib_envelope() # ch0 всегда остаётся live-калибровочной ссылкой (raw), но при file-калибровке # можем применять её и к ch0 для отображения/обработки независимо от канала. calib_applied = False calib_source = "off" normalized: Optional[np.ndarray] = None if self.calib_enabled and self.calib_mode == "file": normalized, calib_applied, calib_source = self._normalize_against_active_reference(raw) base = normalized if normalized is not None else raw processed, bg_applied, bg_source = self._subtract_background(base, calib_applied=calib_applied) stages = ["parsed_sweep", "raw_sweep", "ch0_live_calibration_reference"] stages.append(f"calibration_{calib_source}" if calib_applied else "calibration_off") stages.append(f"background_{bg_source}" if bg_applied else "background_off") stages.extend(["ring_buffer", "ifft_db"]) return SweepProcessingResult( processed_sweep=processed, normalized_sweep=normalized, calibration_applied=calib_applied, background_applied=bg_applied, calibration_source=calib_source if calib_applied else "off", background_source=bg_source if bg_applied else "off", is_calibration_reference=True, stage_trace=tuple(stages), ) normalized, calib_applied, calib_source = self._normalize_against_active_reference(raw) base = normalized if normalized is not None else raw processed, bg_applied, bg_source = self._subtract_background(base, calib_applied) stages = ["parsed_sweep", "raw_sweep"] stages.append(f"calibration_{calib_source}" if calib_applied else "calibration_off") stages.append(f"background_{bg_source}" if bg_applied else "background_off") stages.extend(["ring_buffer", "ifft_db"]) return SweepProcessingResult( processed_sweep=processed, normalized_sweep=normalized, calibration_applied=calib_applied, background_applied=bg_applied, calibration_source=calib_source if calib_applied else "off", background_source=bg_source if bg_applied else "off", is_calibration_reference=False, stage_trace=tuple(stages), )