done
This commit is contained in:
43
rfg_adc_plotter/processing/fourier.py
Normal file
43
rfg_adc_plotter/processing/fourier.py
Normal file
@ -0,0 +1,43 @@
|
||||
"""Преобразование свипа в IFFT-временной профиль (дБ)."""
|
||||
|
||||
from typing import Optional
|
||||
|
||||
import numpy as np
|
||||
|
||||
from rfg_adc_plotter.constants import FREQ_SPAN_GHZ, IFFT_LEN, SWEEP_LEN, ZEROS_LOW, ZEROS_MID
|
||||
|
||||
|
||||
def build_ifft_time_axis_ns() -> np.ndarray:
|
||||
"""Временная ось IFFT в наносекундах."""
|
||||
return (
|
||||
np.arange(IFFT_LEN, dtype=np.float64) / (FREQ_SPAN_GHZ * 1e9) * 1e9
|
||||
).astype(np.float32)
|
||||
|
||||
|
||||
def compute_ifft_db_profile(sweep: Optional[np.ndarray]) -> np.ndarray:
|
||||
"""Построить IFFT-профиль свипа в дБ.
|
||||
|
||||
Цепочка:
|
||||
raw/processed sweep -> двусторонний спектр (заполнение нулями) ->
|
||||
ifftshift -> ifft -> |x| -> 20log10.
|
||||
"""
|
||||
bins = IFFT_LEN
|
||||
if sweep is None:
|
||||
return np.full((bins,), np.nan, dtype=np.float32)
|
||||
|
||||
s = np.asarray(sweep)
|
||||
if s.size == 0:
|
||||
return np.full((bins,), np.nan, dtype=np.float32)
|
||||
|
||||
sig = np.zeros(SWEEP_LEN, dtype=np.float32)
|
||||
take = min(int(s.size), SWEEP_LEN)
|
||||
seg = np.nan_to_num(s[:take], nan=0.0).astype(np.float32, copy=False)
|
||||
sig[:take] = seg
|
||||
|
||||
data = np.zeros(IFFT_LEN, dtype=np.complex64)
|
||||
data[ZEROS_LOW + ZEROS_MID :] = sig
|
||||
|
||||
spec = np.fft.ifftshift(data)
|
||||
result = np.fft.ifft(spec)
|
||||
mag = np.abs(result).astype(np.float32)
|
||||
return (mag + 1e-9).astype(np.float32)
|
||||
415
rfg_adc_plotter/processing/pipeline.py
Normal file
415
rfg_adc_plotter/processing/pipeline.py
Normal file
@ -0,0 +1,415 @@
|
||||
"""Явный pipeline предобработки свипов перед помещением в RingBuffer."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
import os
|
||||
from typing import Optional, Tuple
|
||||
|
||||
import numpy as np
|
||||
|
||||
from rfg_adc_plotter.io.capture_reference_loader import (
|
||||
CaptureParseSummary,
|
||||
aggregate_capture_reference,
|
||||
detect_reference_file_format,
|
||||
load_capture_sweeps,
|
||||
)
|
||||
from rfg_adc_plotter.processing.normalizer import (
|
||||
build_calib_envelopes,
|
||||
normalize_by_calib,
|
||||
normalize_by_envelope,
|
||||
)
|
||||
|
||||
DEFAULT_CALIB_ENVELOPE_PATH = "calib_envelope.npy"
|
||||
DEFAULT_BACKGROUND_PATH = "background.npy"
|
||||
|
||||
|
||||
def _normalize_path(path: str) -> str:
|
||||
return str(path).strip()
|
||||
|
||||
|
||||
def _normalize_save_npy_path(path: str) -> str:
|
||||
p = _normalize_path(path)
|
||||
if not p:
|
||||
return p
|
||||
_root, ext = os.path.splitext(p)
|
||||
if ext:
|
||||
return p
|
||||
return f"{p}.npy"
|
||||
|
||||
|
||||
def _summary_for_npy(path: str) -> CaptureParseSummary:
|
||||
return CaptureParseSummary(
|
||||
path=path,
|
||||
format="npy",
|
||||
sweeps_total=0,
|
||||
sweeps_valid=0,
|
||||
channels_seen=tuple(),
|
||||
dominant_width=None,
|
||||
dominant_n_valid=None,
|
||||
aggregation="median",
|
||||
warnings=tuple(),
|
||||
)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class SweepProcessingResult:
|
||||
"""Результат предобработки одного свипа."""
|
||||
|
||||
processed_sweep: np.ndarray
|
||||
normalized_sweep: Optional[np.ndarray]
|
||||
calibration_applied: bool
|
||||
background_applied: bool
|
||||
calibration_source: str # off|live|npy|capture
|
||||
background_source: str # off|npy|capture(raw)|capture(raw->calib)
|
||||
is_calibration_reference: bool
|
||||
stage_trace: Tuple[str, ...]
|
||||
|
||||
|
||||
class SweepPreprocessor:
|
||||
"""Управляет калибровкой/фоном и применяет их к входному свипу."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
norm_type: str = "projector",
|
||||
calib_envelope_path: str = DEFAULT_CALIB_ENVELOPE_PATH,
|
||||
background_path: str = DEFAULT_BACKGROUND_PATH,
|
||||
auto_save_live_calib_envelope: bool = True,
|
||||
):
|
||||
self.norm_type = str(norm_type).strip().lower() or "projector"
|
||||
self.calib_enabled = False
|
||||
self.calib_mode = "live" # live | file
|
||||
self.background_enabled = False
|
||||
self.auto_save_live_calib_envelope = bool(auto_save_live_calib_envelope)
|
||||
|
||||
self.calib_envelope_path = _normalize_path(calib_envelope_path)
|
||||
self.background_path = _normalize_path(background_path)
|
||||
|
||||
self.last_calib_sweep: Optional[np.ndarray] = None
|
||||
self.calib_file_envelope: Optional[np.ndarray] = None
|
||||
|
||||
# background — в текущем домене вычитания (raw или normalized), UI использует для preview/state
|
||||
self.background: Optional[np.ndarray] = None
|
||||
# raw background loaded from capture file; преобразуется на лету при активной калибровке
|
||||
self.background_raw_capture: Optional[np.ndarray] = None
|
||||
|
||||
# Источники и метаданные загрузки
|
||||
self.calib_external_source_type: str = "none" # none|npy|capture
|
||||
self.background_source_type: str = "none" # none|npy_processed|capture_raw
|
||||
self.calib_reference_summary: Optional[CaptureParseSummary] = None
|
||||
self.background_reference_summary: Optional[CaptureParseSummary] = None
|
||||
self.last_reference_error: str = ""
|
||||
|
||||
# Параметры офлайн-парсинга capture (должны совпадать с live parser по настройке UI)
|
||||
self.capture_fancy: bool = False
|
||||
self.capture_logscale: bool = False
|
||||
self.reference_aggregation_method: str = "median"
|
||||
|
||||
# ---- Конфигурация ----
|
||||
def set_calib_mode(self, mode: str):
|
||||
m = str(mode).strip().lower()
|
||||
self.calib_mode = "file" if m == "file" else "live"
|
||||
|
||||
def set_calib_enabled(self, enabled: bool):
|
||||
self.calib_enabled = bool(enabled)
|
||||
|
||||
def set_background_enabled(self, enabled: bool):
|
||||
self.background_enabled = bool(enabled)
|
||||
|
||||
def set_capture_parse_options(self, *, fancy: Optional[bool] = None, logscale: Optional[bool] = None):
|
||||
if fancy is not None:
|
||||
self.capture_fancy = bool(fancy)
|
||||
if logscale is not None:
|
||||
self.capture_logscale = bool(logscale)
|
||||
|
||||
def set_calib_envelope_path(self, path: str):
|
||||
p = _normalize_path(path)
|
||||
if p:
|
||||
if p != self.calib_envelope_path:
|
||||
self.calib_file_envelope = None
|
||||
if self.calib_external_source_type in ("npy", "capture"):
|
||||
self.calib_external_source_type = "none"
|
||||
self.calib_reference_summary = None
|
||||
self.calib_envelope_path = p
|
||||
|
||||
def set_background_path(self, path: str):
|
||||
p = _normalize_path(path)
|
||||
if p:
|
||||
if p != self.background_path:
|
||||
self.background = None
|
||||
self.background_raw_capture = None
|
||||
self.background_source_type = "none"
|
||||
self.background_reference_summary = None
|
||||
self.background_path = p
|
||||
|
||||
def has_calib_envelope_file(self) -> bool:
|
||||
return bool(self.calib_envelope_path) and os.path.isfile(self.calib_envelope_path)
|
||||
|
||||
def has_background_file(self) -> bool:
|
||||
return bool(self.background_path) and os.path.isfile(self.background_path)
|
||||
|
||||
# ---- Загрузка/сохранение .npy ----
|
||||
def _save_array(self, arr: np.ndarray, current_path: str, path: Optional[str]) -> str:
|
||||
target = _normalize_save_npy_path(path if path is not None else current_path)
|
||||
if not target:
|
||||
raise ValueError("Пустой путь сохранения")
|
||||
np.save(target, arr)
|
||||
return target
|
||||
|
||||
def save_calib_envelope(self, path: Optional[str] = None) -> bool:
|
||||
"""Сохранить огибающую из последнего live-калибровочного свипа (экспорт .npy)."""
|
||||
if self.last_calib_sweep is None:
|
||||
return False
|
||||
try:
|
||||
_lower, upper = build_calib_envelopes(self.last_calib_sweep)
|
||||
self.calib_envelope_path = self._save_array(upper, self.calib_envelope_path, path)
|
||||
self.last_reference_error = ""
|
||||
return True
|
||||
except Exception as exc:
|
||||
self.last_reference_error = f"save calib envelope failed: {exc}"
|
||||
return False
|
||||
|
||||
def save_background(self, sweep_for_ring: Optional[np.ndarray], path: Optional[str] = None) -> bool:
|
||||
"""Сохранить текущий свип (в текущем домене обработки) как .npy-фон."""
|
||||
if sweep_for_ring is None:
|
||||
return False
|
||||
try:
|
||||
bg = np.asarray(sweep_for_ring, dtype=np.float32).copy()
|
||||
self.background_path = self._save_array(bg, self.background_path, path)
|
||||
self.background = bg
|
||||
self.background_raw_capture = None
|
||||
self.background_source_type = "npy_processed"
|
||||
self.background_reference_summary = _summary_for_npy(self.background_path)
|
||||
self.last_reference_error = ""
|
||||
return True
|
||||
except Exception as exc:
|
||||
self.last_reference_error = f"save background failed: {exc}"
|
||||
return False
|
||||
|
||||
# ---- Загрузка эталонов (.npy или capture) ----
|
||||
def _detect_source_kind(self, path: str, source_kind: str) -> Optional[str]:
|
||||
sk = str(source_kind).strip().lower() or "auto"
|
||||
if sk == "auto":
|
||||
return detect_reference_file_format(path)
|
||||
if sk in ("npy", "bin_capture", "capture"):
|
||||
return "bin_capture" if sk == "capture" else sk
|
||||
return None
|
||||
|
||||
def _load_npy_vector(self, path: str) -> np.ndarray:
|
||||
arr = np.load(path)
|
||||
return np.asarray(arr, dtype=np.float32).reshape(-1)
|
||||
|
||||
def load_calib_reference(
|
||||
self,
|
||||
path: Optional[str] = None,
|
||||
*,
|
||||
source_kind: str = "auto",
|
||||
method: str = "median",
|
||||
) -> bool:
|
||||
"""Загрузить калибровку из .npy (огибающая) или raw capture файла."""
|
||||
if path is not None:
|
||||
self.set_calib_envelope_path(path)
|
||||
p = self.calib_envelope_path
|
||||
if not p or not os.path.isfile(p):
|
||||
self.last_reference_error = f"Файл калибровки не найден: {p}"
|
||||
return False
|
||||
|
||||
fmt = self._detect_source_kind(p, source_kind)
|
||||
if fmt is None:
|
||||
self.last_reference_error = f"Неизвестный формат файла калибровки: {p}"
|
||||
return False
|
||||
|
||||
try:
|
||||
if fmt == "npy":
|
||||
env = self._load_npy_vector(p)
|
||||
self.calib_file_envelope = env
|
||||
self.calib_external_source_type = "npy"
|
||||
self.calib_reference_summary = _summary_for_npy(p)
|
||||
self.last_reference_error = ""
|
||||
return True
|
||||
|
||||
sweeps = load_capture_sweeps(p, fancy=self.capture_fancy, logscale=self.capture_logscale)
|
||||
vec, summary = aggregate_capture_reference(
|
||||
sweeps,
|
||||
channel=0,
|
||||
method=method or self.reference_aggregation_method,
|
||||
path=p,
|
||||
)
|
||||
_lower, upper = build_calib_envelopes(vec)
|
||||
self.calib_file_envelope = np.asarray(upper, dtype=np.float32)
|
||||
self.calib_external_source_type = "capture"
|
||||
self.calib_reference_summary = summary
|
||||
self.last_reference_error = ""
|
||||
return True
|
||||
except Exception as exc:
|
||||
self.last_reference_error = f"Ошибка загрузки калибровки: {exc}"
|
||||
return False
|
||||
|
||||
def load_background_reference(
|
||||
self,
|
||||
path: Optional[str] = None,
|
||||
*,
|
||||
source_kind: str = "auto",
|
||||
method: str = "median",
|
||||
) -> bool:
|
||||
"""Загрузить фон из .npy (готовый домен) или raw capture файла."""
|
||||
if path is not None:
|
||||
self.set_background_path(path)
|
||||
p = self.background_path
|
||||
if not p or not os.path.isfile(p):
|
||||
self.last_reference_error = f"Файл фона не найден: {p}"
|
||||
return False
|
||||
|
||||
fmt = self._detect_source_kind(p, source_kind)
|
||||
if fmt is None:
|
||||
self.last_reference_error = f"Неизвестный формат файла фона: {p}"
|
||||
return False
|
||||
|
||||
try:
|
||||
if fmt == "npy":
|
||||
bg = self._load_npy_vector(p)
|
||||
self.background = bg
|
||||
self.background_raw_capture = None
|
||||
self.background_source_type = "npy_processed"
|
||||
self.background_reference_summary = _summary_for_npy(p)
|
||||
self.last_reference_error = ""
|
||||
return True
|
||||
|
||||
sweeps = load_capture_sweeps(p, fancy=self.capture_fancy, logscale=self.capture_logscale)
|
||||
vec, summary = aggregate_capture_reference(
|
||||
sweeps,
|
||||
channel=0,
|
||||
method=method or self.reference_aggregation_method,
|
||||
path=p,
|
||||
)
|
||||
self.background_raw_capture = np.asarray(vec, dtype=np.float32)
|
||||
# Для UI/preview текущий background отражает текущий домен (пока raw по умолчанию).
|
||||
self.background = self.background_raw_capture
|
||||
self.background_source_type = "capture_raw"
|
||||
self.background_reference_summary = summary
|
||||
self.last_reference_error = ""
|
||||
return True
|
||||
except Exception as exc:
|
||||
self.last_reference_error = f"Ошибка загрузки фона: {exc}"
|
||||
return False
|
||||
|
||||
# Совместимые обертки для старого API (строго .npy)
|
||||
def load_calib_envelope(self, path: Optional[str] = None) -> bool:
|
||||
target = path if path is not None else self.calib_envelope_path
|
||||
return self.load_calib_reference(target, source_kind="npy")
|
||||
|
||||
def load_background(self, path: Optional[str] = None) -> bool:
|
||||
target = path if path is not None else self.background_path
|
||||
return self.load_background_reference(target, source_kind="npy")
|
||||
|
||||
# ---- Нормировка / фон ----
|
||||
def _normalize_against_active_reference(self, raw: np.ndarray) -> Tuple[Optional[np.ndarray], bool, str]:
|
||||
if not self.calib_enabled:
|
||||
return None, False, "off"
|
||||
|
||||
if self.calib_mode == "file":
|
||||
if self.calib_file_envelope is None:
|
||||
return None, False, "off"
|
||||
src = "capture" if self.calib_external_source_type == "capture" else "npy"
|
||||
return normalize_by_envelope(raw, self.calib_file_envelope), True, src
|
||||
|
||||
if self.last_calib_sweep is None:
|
||||
return None, False, "off"
|
||||
return normalize_by_calib(raw, self.last_calib_sweep, self.norm_type), True, "live"
|
||||
|
||||
def _transform_raw_background_for_current_domain(self, calib_applied: bool) -> Optional[np.ndarray]:
|
||||
if self.background_raw_capture is None:
|
||||
return None
|
||||
if not calib_applied:
|
||||
return self.background_raw_capture
|
||||
|
||||
# Порядок pipeline фиксирован: raw -> calibration -> background -> IFFT.
|
||||
# Поэтому raw-фон из capture нужно привести в тот же домен, что и текущий sweep_for_ring.
|
||||
if self.calib_mode == "file" and self.calib_file_envelope is not None:
|
||||
return normalize_by_envelope(self.background_raw_capture, self.calib_file_envelope)
|
||||
if self.calib_mode == "live" and self.last_calib_sweep is not None:
|
||||
return normalize_by_calib(self.background_raw_capture, self.last_calib_sweep, self.norm_type)
|
||||
return None
|
||||
|
||||
def _effective_background(self, calib_applied: bool) -> Tuple[Optional[np.ndarray], str]:
|
||||
if self.background_source_type == "capture_raw":
|
||||
bg = self._transform_raw_background_for_current_domain(calib_applied)
|
||||
if bg is None:
|
||||
return None, "capture(raw->calib:missing-calib)"
|
||||
self.background = np.asarray(bg, dtype=np.float32)
|
||||
return self.background, ("capture(raw->calib)" if calib_applied else "capture(raw)")
|
||||
if self.background_source_type == "npy_processed" and self.background is not None:
|
||||
return self.background, "npy"
|
||||
if self.background is not None:
|
||||
return self.background, "unknown"
|
||||
return None, "off"
|
||||
|
||||
def _subtract_background(self, sweep: np.ndarray, calib_applied: bool) -> Tuple[np.ndarray, bool, str]:
|
||||
if not self.background_enabled:
|
||||
return sweep, False, "off"
|
||||
bg, bg_src = self._effective_background(calib_applied)
|
||||
if bg is None:
|
||||
return sweep, False, f"{bg_src}:missing"
|
||||
out = np.asarray(sweep, dtype=np.float32).copy()
|
||||
w = min(out.size, bg.size)
|
||||
if w > 0:
|
||||
out[:w] -= bg[:w]
|
||||
return out, True, bg_src
|
||||
|
||||
def process(self, sweep: np.ndarray, channel: int, update_references: bool = True) -> SweepProcessingResult:
|
||||
"""Применить к свипу калибровку/фон и вернуть явные этапы обработки."""
|
||||
raw = np.asarray(sweep, dtype=np.float32)
|
||||
ch = int(channel)
|
||||
|
||||
if ch == 0:
|
||||
if update_references:
|
||||
self.last_calib_sweep = raw
|
||||
if self.auto_save_live_calib_envelope:
|
||||
self.save_calib_envelope()
|
||||
|
||||
# ch0 всегда остаётся live-калибровочной ссылкой (raw), но при file-калибровке
|
||||
# можем применять её и к ch0 для отображения/обработки независимо от канала.
|
||||
calib_applied = False
|
||||
calib_source = "off"
|
||||
normalized: Optional[np.ndarray] = None
|
||||
if self.calib_enabled and self.calib_mode == "file":
|
||||
normalized, calib_applied, calib_source = self._normalize_against_active_reference(raw)
|
||||
|
||||
base = normalized if normalized is not None else raw
|
||||
processed, bg_applied, bg_source = self._subtract_background(base, calib_applied=calib_applied)
|
||||
|
||||
stages = ["parsed_sweep", "raw_sweep", "ch0_live_calibration_reference"]
|
||||
stages.append(f"calibration_{calib_source}" if calib_applied else "calibration_off")
|
||||
stages.append(f"background_{bg_source}" if bg_applied else "background_off")
|
||||
stages.extend(["ring_buffer", "ifft_db"])
|
||||
|
||||
return SweepProcessingResult(
|
||||
processed_sweep=processed,
|
||||
normalized_sweep=normalized,
|
||||
calibration_applied=calib_applied,
|
||||
background_applied=bg_applied,
|
||||
calibration_source=calib_source if calib_applied else "off",
|
||||
background_source=bg_source if bg_applied else "off",
|
||||
is_calibration_reference=True,
|
||||
stage_trace=tuple(stages),
|
||||
)
|
||||
|
||||
normalized, calib_applied, calib_source = self._normalize_against_active_reference(raw)
|
||||
base = normalized if normalized is not None else raw
|
||||
processed, bg_applied, bg_source = self._subtract_background(base, calib_applied)
|
||||
|
||||
stages = ["parsed_sweep", "raw_sweep"]
|
||||
stages.append(f"calibration_{calib_source}" if calib_applied else "calibration_off")
|
||||
stages.append(f"background_{bg_source}" if bg_applied else "background_off")
|
||||
stages.extend(["ring_buffer", "ifft_db"])
|
||||
|
||||
return SweepProcessingResult(
|
||||
processed_sweep=processed,
|
||||
normalized_sweep=normalized,
|
||||
calibration_applied=calib_applied,
|
||||
background_applied=bg_applied,
|
||||
calibration_source=calib_source if calib_applied else "off",
|
||||
background_source=bg_source if bg_applied else "off",
|
||||
is_calibration_reference=False,
|
||||
stage_trace=tuple(stages),
|
||||
)
|
||||
Reference in New Issue
Block a user