diff --git a/rfg_adc_plotter/gui/pyqtgraph_backend.py b/rfg_adc_plotter/gui/pyqtgraph_backend.py index 4a0a60b..cf7cb82 100644 --- a/rfg_adc_plotter/gui/pyqtgraph_backend.py +++ b/rfg_adc_plotter/gui/pyqtgraph_backend.py @@ -21,16 +21,23 @@ from rfg_adc_plotter.processing.background import ( ) from rfg_adc_plotter.processing.calibration import ( build_calib_envelope, + build_complex_calibration_curve, calibrate_freqs, get_calibration_base, get_calibration_coeffs, load_calib_envelope, + load_complex_calibration, save_calib_envelope, + save_complex_calibration, set_calibration_base_value, ) from rfg_adc_plotter.processing.fft import compute_fft_complex_row, compute_fft_mag_row, fft_mag_to_db from rfg_adc_plotter.processing.formatting import compute_auto_ylim, format_status_kv, parse_spec_clip -from rfg_adc_plotter.processing.normalization import normalize_by_envelope, resample_envelope +from rfg_adc_plotter.processing.normalization import ( + normalize_by_complex_calibration, + normalize_by_envelope, + resample_envelope, +) from rfg_adc_plotter.processing.peaks import ( find_peak_width_markers, find_top_peaks_over_ref, @@ -809,6 +816,7 @@ def run_pyqtgraph(args) -> None: spec_right_line.setVisible(False) calib_cb = QtWidgets.QCheckBox("калибровка по огибающей") + complex_calib_cb = QtWidgets.QCheckBox("комплексная калибровка") range_group = QtWidgets.QGroupBox("Рабочий диапазон") range_group_layout = QtWidgets.QFormLayout(range_group) range_group_layout.setContentsMargins(6, 6, 6, 6) @@ -869,6 +877,36 @@ def run_pyqtgraph(args) -> None: calib_buttons_row.addWidget(calib_save_btn) calib_buttons_row.addWidget(calib_load_btn) calib_group_layout.addLayout(calib_buttons_row) + complex_calib_group = QtWidgets.QGroupBox("Комплексная калибровка") + complex_calib_group_layout = QtWidgets.QVBoxLayout(complex_calib_group) + complex_calib_group_layout.setContentsMargins(6, 6, 6, 6) + complex_calib_group_layout.setSpacing(6) + complex_calib_group_layout.addWidget(complex_calib_cb) + complex_calib_path_edit = QtWidgets.QLineEdit("complex_calibration.npy") + try: + complex_calib_path_edit.setPlaceholderText("complex_calibration.npy") + except Exception: + pass + complex_calib_path_row = QtWidgets.QHBoxLayout() + complex_calib_path_row.setContentsMargins(0, 0, 0, 0) + complex_calib_path_row.setSpacing(4) + complex_calib_path_row.addWidget(complex_calib_path_edit) + complex_calib_pick_btn = QtWidgets.QPushButton("Файл...") + complex_calib_path_row.addWidget(complex_calib_pick_btn) + complex_calib_group_layout.addLayout(complex_calib_path_row) + complex_calib_buttons_row = QtWidgets.QHBoxLayout() + complex_calib_buttons_row.setContentsMargins(0, 0, 0, 0) + complex_calib_buttons_row.setSpacing(4) + complex_calib_save_btn = QtWidgets.QPushButton("Сохранить текущую") + complex_calib_load_btn = QtWidgets.QPushButton("Загрузить") + complex_calib_buttons_row.addWidget(complex_calib_save_btn) + complex_calib_buttons_row.addWidget(complex_calib_load_btn) + complex_calib_group_layout.addLayout(complex_calib_buttons_row) + if not complex_sweep_mode: + try: + complex_calib_group.setEnabled(False) + except Exception: + pass background_group = QtWidgets.QGroupBox("Фон") background_group_layout = QtWidgets.QVBoxLayout(background_group) background_group_layout.setContentsMargins(6, 6, 6, 6) @@ -927,6 +965,7 @@ def run_pyqtgraph(args) -> None: pass settings_layout.addWidget(range_group) settings_layout.addWidget(calib_group) + settings_layout.addWidget(complex_calib_group) settings_layout.addWidget(parsed_data_cb) settings_layout.addWidget(background_group) settings_layout.addWidget(fft_mode_label) @@ -940,6 +979,7 @@ def run_pyqtgraph(args) -> None: win.addItem(status, row=3, col=0, colspan=2) calib_enabled = False + complex_calib_enabled = False parsed_data_enabled = False background_enabled = False fft_abs_enabled = True @@ -951,6 +991,7 @@ def run_pyqtgraph(args) -> None: waiting_data_note = "" status_note_expires_at: Optional[float] = None status_dirty = True + calibration_toggle_in_progress = False range_change_in_progress = False debug_event_counts: Dict[str, int] = {} last_queue_backlog = 0 @@ -1097,6 +1138,13 @@ def run_pyqtgraph(args) -> None: path = "" return path or "calibration_envelope.npy" + def get_complex_calib_file_path() -> str: + try: + path = complex_calib_path_edit.text().strip() + except Exception: + path = "" + return path or "complex_calibration.npy" + def get_background_file_path() -> str: try: path = background_path_edit.text().strip() @@ -1198,25 +1246,69 @@ def run_pyqtgraph(args) -> None: if fft_source is None and runtime.current_sweep_raw is not None: fft_source = np.asarray(runtime.current_sweep_raw, dtype=np.float32) - if ( - runtime.current_sweep_raw is not None - and runtime.current_sweep_raw.size > 0 - and calib_enabled - and runtime.calib_envelope is not None - ): - runtime.current_sweep_norm = normalize_by_envelope(runtime.current_sweep_raw, runtime.calib_envelope) - else: - runtime.current_sweep_norm = None + runtime.current_sweep_norm = None + runtime.current_fft_input = None + complex_calib_applied = False + if complex_calib_enabled and runtime.complex_calib_curve is not None: + complex_source: Optional[np.ndarray] = None + if runtime.current_aux_curves is not None: + try: + aux_1, aux_2 = runtime.current_aux_curves + aux_1_arr = np.asarray(aux_1, dtype=np.float32).reshape(-1) + aux_2_arr = np.asarray(aux_2, dtype=np.float32).reshape(-1) + aux_width = min(aux_1_arr.size, aux_2_arr.size) + if aux_width > 0: + complex_source = ( + aux_1_arr[:aux_width].astype(np.complex64) + + (1j * aux_2_arr[:aux_width].astype(np.complex64)) + ) + except Exception: + complex_source = None + if complex_source is None and fft_source is not None: + fft_arr = np.asarray(fft_source).reshape(-1) + if fft_arr.size > 0 and np.iscomplexobj(fft_arr): + complex_source = np.asarray(fft_arr, dtype=np.complex64) - if fft_source is None or np.asarray(fft_source).size == 0: - runtime.current_fft_input = None - elif calib_enabled and runtime.calib_envelope is not None: - runtime.current_fft_input = normalize_by_envelope(fft_source, runtime.calib_envelope) - else: - runtime.current_fft_input = np.asarray( - fft_source, - dtype=np.complex64 if np.iscomplexobj(fft_source) else np.float32, - ).copy() + if complex_source is not None and complex_source.size > 0: + complex_norm = normalize_by_complex_calibration( + complex_source, + runtime.complex_calib_curve, + ) + runtime.current_fft_input = np.asarray(complex_norm, dtype=np.complex64).reshape(-1) + norm_real = runtime.current_fft_input.real.astype(np.float32, copy=False) + norm_imag = runtime.current_fft_input.imag.astype(np.float32, copy=False) + runtime.current_aux_curves = (norm_real, norm_imag) + if bin_iq_power_mode: + norm_real_f64 = norm_real.astype(np.float64, copy=False) + norm_imag_f64 = norm_imag.astype(np.float64, copy=False) + runtime.current_sweep_norm = np.asarray( + (norm_real_f64 * norm_real_f64) + (norm_imag_f64 * norm_imag_f64), + dtype=np.float32, + ) + else: + runtime.current_sweep_norm = np.abs(runtime.current_fft_input).astype(np.float32, copy=False) + complex_calib_applied = True + + if not complex_calib_applied: + if ( + runtime.current_sweep_raw is not None + and runtime.current_sweep_raw.size > 0 + and calib_enabled + and runtime.calib_envelope is not None + ): + runtime.current_sweep_norm = normalize_by_envelope(runtime.current_sweep_raw, runtime.calib_envelope) + else: + runtime.current_sweep_norm = None + + if fft_source is None or np.asarray(fft_source).size == 0: + runtime.current_fft_input = None + elif calib_enabled and runtime.calib_envelope is not None: + runtime.current_fft_input = normalize_by_envelope(fft_source, runtime.calib_envelope) + else: + runtime.current_fft_input = np.asarray( + fft_source, + dtype=np.complex64 if np.iscomplexobj(fft_source) else np.float32, + ).copy() runtime.current_fft_complex = None runtime.current_fft_mag = None @@ -1246,17 +1338,67 @@ def run_pyqtgraph(args) -> None: runtime.background_buffer.push(runtime.current_fft_mag) def set_calib_enabled() -> None: - nonlocal calib_enabled + nonlocal calib_enabled, complex_calib_enabled, calibration_toggle_in_progress try: - calib_enabled = bool(calib_cb.isChecked()) + requested_state = bool(calib_cb.isChecked()) except Exception: - calib_enabled = False + requested_state = False + + if calibration_toggle_in_progress: + calib_enabled = requested_state + return + + if requested_state and complex_calib_enabled: + calibration_toggle_in_progress = True + try: + complex_calib_cb.setChecked(False) + except Exception: + pass + finally: + calibration_toggle_in_progress = False + complex_calib_enabled = False + set_status_note("калибровка: комплексная выключена (взаимоисключение)") + + calib_enabled = requested_state if calib_enabled and runtime.calib_envelope is None: set_status_note("калибровка: огибающая не загружена") reset_background_state(clear_profile=True) recompute_current_processed_sweep(push_to_ring=False) runtime.mark_dirty() + def set_complex_calib_enabled() -> None: + nonlocal calib_enabled, complex_calib_enabled, calibration_toggle_in_progress + if not complex_sweep_mode: + complex_calib_enabled = False + return + + try: + requested_state = bool(complex_calib_cb.isChecked()) + except Exception: + requested_state = False + + if calibration_toggle_in_progress: + complex_calib_enabled = requested_state + return + + if requested_state and calib_enabled: + calibration_toggle_in_progress = True + try: + calib_cb.setChecked(False) + except Exception: + pass + finally: + calibration_toggle_in_progress = False + calib_enabled = False + set_status_note("калибровка: огибающая выключена (взаимоисключение)") + + complex_calib_enabled = requested_state + if complex_calib_enabled and runtime.complex_calib_curve is None: + set_status_note("калибровка: комплексная кривая не загружена") + reset_background_state(clear_profile=True) + recompute_current_processed_sweep(push_to_ring=False) + runtime.mark_dirty() + def set_parsed_data_enabled() -> None: nonlocal parsed_data_enabled try: @@ -1332,6 +1474,26 @@ def run_pyqtgraph(args) -> None: except Exception: pass + def pick_complex_calib_file() -> None: + start_path = get_complex_calib_file_path() + try: + selected, _ = QtWidgets.QFileDialog.getSaveFileName( + main_window, + "Файл комплексной калибровки", + start_path, + "NumPy (*.npy);;All files (*)", + ) + except Exception as exc: + set_status_note(f"калибровка: выбор файла комплексной кривой недоступен ({exc})") + runtime.mark_dirty() + return + if not selected: + return + try: + complex_calib_path_edit.setText(selected) + except Exception: + pass + def pick_background_file() -> None: start_path = get_background_file_path() try: @@ -1399,6 +1561,56 @@ def run_pyqtgraph(args) -> None: set_status_note(f"калибровка загружена: {normalized_path}") runtime.mark_dirty() + def save_current_complex_calibration() -> None: + if not complex_sweep_mode: + set_status_note("калибровка: комплексный режим не активен") + runtime.mark_dirty() + return + if runtime.full_current_aux_curves is None: + set_status_note("калибровка: нет CH1/CH2 для сохранения комплексной кривой") + runtime.mark_dirty() + return + try: + aux_1, aux_2 = runtime.full_current_aux_curves + curve = build_complex_calibration_curve(aux_1, aux_2) + saved_path = save_complex_calibration(get_complex_calib_file_path(), curve) + except Exception as exc: + set_status_note(f"калибровка: не удалось сохранить комплексную кривую ({exc})") + runtime.mark_dirty() + return + + runtime.complex_calib_curve = curve + runtime.complex_calib_file_path = saved_path + try: + complex_calib_path_edit.setText(saved_path) + except Exception: + pass + reset_background_state(clear_profile=True) + recompute_current_processed_sweep(push_to_ring=False) + set_status_note(f"комплексная калибровка сохранена: {saved_path}") + runtime.mark_dirty() + + def load_complex_calibration_file() -> None: + path = get_complex_calib_file_path() + try: + curve = load_complex_calibration(path) + except Exception as exc: + set_status_note(f"калибровка: не удалось загрузить комплексную кривую ({exc})") + runtime.mark_dirty() + return + + normalized_path = path if path.lower().endswith(".npy") else f"{path}.npy" + runtime.complex_calib_curve = curve + runtime.complex_calib_file_path = normalized_path + try: + complex_calib_path_edit.setText(normalized_path) + except Exception: + pass + reset_background_state(clear_profile=True) + recompute_current_processed_sweep(push_to_ring=False) + set_status_note(f"комплексная калибровка загружена: {normalized_path}") + runtime.mark_dirty() + def save_current_background() -> None: background = runtime.background_buffer.median() if background is None or background.size == 0: @@ -1494,6 +1706,8 @@ def run_pyqtgraph(args) -> None: except Exception: pass restore_range_controls() + set_calib_enabled() + set_complex_calib_enabled() set_parsed_data_enabled() set_background_enabled() set_fft_curve_visibility() @@ -1504,10 +1718,14 @@ def run_pyqtgraph(args) -> None: range_min_spin.valueChanged.connect(lambda _v: set_working_range()) range_max_spin.valueChanged.connect(lambda _v: set_working_range()) calib_cb.stateChanged.connect(lambda _v: set_calib_enabled()) + complex_calib_cb.stateChanged.connect(lambda _v: set_complex_calib_enabled()) parsed_data_cb.stateChanged.connect(lambda _v: set_parsed_data_enabled()) calib_pick_btn.clicked.connect(lambda _checked=False: pick_calib_file()) calib_save_btn.clicked.connect(lambda _checked=False: save_current_calibration()) calib_load_btn.clicked.connect(lambda _checked=False: load_calibration_file()) + complex_calib_pick_btn.clicked.connect(lambda _checked=False: pick_complex_calib_file()) + complex_calib_save_btn.clicked.connect(lambda _checked=False: save_current_complex_calibration()) + complex_calib_load_btn.clicked.connect(lambda _checked=False: load_complex_calibration_file()) background_cb.stateChanged.connect(lambda _v: set_background_enabled()) background_pick_btn.clicked.connect(lambda _checked=False: pick_background_file()) background_save_btn.clicked.connect(lambda _checked=False: save_current_background()) diff --git a/rfg_adc_plotter/processing/__init__.py b/rfg_adc_plotter/processing/__init__.py index e104de5..caa0521 100644 --- a/rfg_adc_plotter/processing/__init__.py +++ b/rfg_adc_plotter/processing/__init__.py @@ -8,12 +8,15 @@ from rfg_adc_plotter.processing.background import ( ) from rfg_adc_plotter.processing.calibration import ( build_calib_envelope, + build_complex_calibration_curve, calibrate_freqs, get_calibration_base, get_calibration_coeffs, load_calib_envelope, + load_complex_calibration, recalculate_calibration_c, save_calib_envelope, + save_complex_calibration, set_calibration_base_value, ) from rfg_adc_plotter.processing.fft import ( @@ -30,6 +33,8 @@ from rfg_adc_plotter.processing.formatting import ( ) from rfg_adc_plotter.processing.normalization import ( build_calib_envelopes, + fit_complex_calibration_to_width, + normalize_by_complex_calibration, normalize_by_envelope, normalize_by_calib, ) @@ -42,6 +47,7 @@ from rfg_adc_plotter.processing.peaks import ( __all__ = [ "build_calib_envelopes", "build_calib_envelope", + "build_complex_calibration_curve", "calibrate_freqs", "compute_auto_ylim", "compute_distance_axis", @@ -55,13 +61,17 @@ __all__ = [ "get_calibration_base", "get_calibration_coeffs", "load_calib_envelope", + "load_complex_calibration", "load_fft_background", + "fit_complex_calibration_to_width", + "normalize_by_complex_calibration", "normalize_by_envelope", "normalize_by_calib", "parse_spec_clip", "recalculate_calibration_c", "rolling_median_ref", "save_calib_envelope", + "save_complex_calibration", "save_fft_background", "set_calibration_base_value", "subtract_fft_background", diff --git a/rfg_adc_plotter/processing/calibration.py b/rfg_adc_plotter/processing/calibration.py index ff41fa1..07f268d 100644 --- a/rfg_adc_plotter/processing/calibration.py +++ b/rfg_adc_plotter/processing/calibration.py @@ -101,6 +101,17 @@ def build_calib_envelope(sweep: np.ndarray) -> np.ndarray: return np.asarray(upper, dtype=np.float32) +def build_complex_calibration_curve(ch1: np.ndarray, ch2: np.ndarray) -> np.ndarray: + """Build a complex calibration curve as ``ch1 + 1j*ch2``.""" + ch1_arr = np.asarray(ch1, dtype=np.float32).reshape(-1) + ch2_arr = np.asarray(ch2, dtype=np.float32).reshape(-1) + width = min(ch1_arr.size, ch2_arr.size) + if width <= 0: + raise ValueError("Complex calibration source is empty") + curve = ch1_arr[:width].astype(np.complex64) + (1j * ch2_arr[:width].astype(np.complex64)) + return validate_complex_calibration_curve(curve) + + def validate_calib_envelope(envelope: np.ndarray) -> np.ndarray: """Validate a saved calibration envelope payload.""" values = np.asarray(envelope, dtype=np.float32).reshape(-1) @@ -111,6 +122,16 @@ def validate_calib_envelope(envelope: np.ndarray) -> np.ndarray: return values +def validate_complex_calibration_curve(curve: np.ndarray) -> np.ndarray: + """Validate a saved complex calibration payload.""" + values = np.asarray(curve).reshape(-1) + if values.size == 0: + raise ValueError("Complex calibration curve is empty") + if not np.issubdtype(values.dtype, np.number): + raise ValueError("Complex calibration curve must be numeric") + return np.asarray(values, dtype=np.complex64) + + def _normalize_calib_path(path: str | Path) -> Path: out = Path(path).expanduser() if out.suffix.lower() != ".npy": @@ -131,3 +152,18 @@ def load_calib_envelope(path: str | Path) -> np.ndarray: normalized_path = _normalize_calib_path(path) loaded = np.load(normalized_path, allow_pickle=False) return validate_calib_envelope(loaded) + + +def save_complex_calibration(path: str | Path, curve: np.ndarray) -> str: + """Persist a complex calibration curve as a .npy file and return the final path.""" + normalized_path = _normalize_calib_path(path) + values = validate_complex_calibration_curve(curve) + np.save(normalized_path, values.astype(np.complex64, copy=False)) + return str(normalized_path) + + +def load_complex_calibration(path: str | Path) -> np.ndarray: + """Load and validate a complex calibration curve from a .npy file.""" + normalized_path = _normalize_calib_path(path) + loaded = np.load(normalized_path, allow_pickle=False) + return validate_complex_calibration_curve(loaded) diff --git a/rfg_adc_plotter/processing/normalization.py b/rfg_adc_plotter/processing/normalization.py index 0a6e41c..b855f33 100644 --- a/rfg_adc_plotter/processing/normalization.py +++ b/rfg_adc_plotter/processing/normalization.py @@ -148,6 +148,57 @@ def resample_envelope(envelope: np.ndarray, width: int) -> np.ndarray: return np.interp(x_dst, x_src[finite], values[finite]).astype(np.float32) +def fit_complex_calibration_to_width(calib: np.ndarray, width: int) -> np.ndarray: + """Fit a complex calibration curve to the signal width via trim/pad with ones.""" + target_width = int(width) + if target_width <= 0: + return np.zeros((0,), dtype=np.complex64) + + values = np.asarray(calib, dtype=np.complex64).reshape(-1) + if values.size <= 0: + return np.ones((target_width,), dtype=np.complex64) + if values.size == target_width: + return values.astype(np.complex64, copy=True) + if values.size > target_width: + return np.asarray(values[:target_width], dtype=np.complex64) + + out = np.ones((target_width,), dtype=np.complex64) + out[: values.size] = values + return out + + +def normalize_by_complex_calibration( + signal: np.ndarray, + calib: np.ndarray, + eps: float = 1e-9, +) -> np.ndarray: + """Normalize complex signal by a complex calibration curve with zero protection.""" + sig_arr = np.asarray(signal, dtype=np.complex64).reshape(-1) + if sig_arr.size <= 0: + return sig_arr.copy() + + calib_fit = fit_complex_calibration_to_width(calib, sig_arr.size) + eps_abs = max(abs(float(eps)), 1e-12) + denom = np.asarray(calib_fit, dtype=np.complex64).copy() + safe_denom = ( + np.isfinite(denom.real) + & np.isfinite(denom.imag) + & (np.abs(denom) >= eps_abs) + ) + if np.any(~safe_denom): + denom[~safe_denom] = np.complex64(1.0 + 0.0j) + + out = np.full(sig_arr.shape, np.nan + 0j, dtype=np.complex64) + valid_sig = np.isfinite(sig_arr.real) & np.isfinite(sig_arr.imag) + if np.any(valid_sig): + with np.errstate(divide="ignore", invalid="ignore"): + out[valid_sig] = sig_arr[valid_sig] / denom[valid_sig] + + out_real = np.nan_to_num(out.real, nan=np.nan, posinf=np.nan, neginf=np.nan) + out_imag = np.nan_to_num(out.imag, nan=np.nan, posinf=np.nan, neginf=np.nan) + return (out_real + (1j * out_imag)).astype(np.complex64, copy=False) + + def normalize_by_envelope(raw: np.ndarray, envelope: np.ndarray) -> np.ndarray: """Normalize a sweep by an envelope with safe resampling and zero protection.""" raw_in = np.asarray(raw).reshape(-1) diff --git a/rfg_adc_plotter/state/runtime_state.py b/rfg_adc_plotter/state/runtime_state.py index c8fdebf..3960161 100644 --- a/rfg_adc_plotter/state/runtime_state.py +++ b/rfg_adc_plotter/state/runtime_state.py @@ -35,6 +35,8 @@ class RuntimeState: last_calib_sweep: Optional[np.ndarray] = None calib_envelope: Optional[np.ndarray] = None calib_file_path: Optional[str] = None + complex_calib_curve: Optional[np.ndarray] = None + complex_calib_file_path: Optional[str] = None background_buffer: BackgroundMedianBuffer = field( default_factory=lambda: BackgroundMedianBuffer(BACKGROUND_MEDIAN_SWEEPS) ) diff --git a/tests/test_processing.py b/tests/test_processing.py index 4d04216..b19955b 100644 --- a/tests/test_processing.py +++ b/tests/test_processing.py @@ -27,10 +27,13 @@ from rfg_adc_plotter.gui.pyqtgraph_backend import ( ) from rfg_adc_plotter.processing.calibration import ( build_calib_envelope, + build_complex_calibration_curve, calibrate_freqs, load_calib_envelope, + load_complex_calibration, recalculate_calibration_c, save_calib_envelope, + save_complex_calibration, ) from rfg_adc_plotter.processing.background import ( load_fft_background, @@ -49,7 +52,9 @@ from rfg_adc_plotter.processing.fft import ( ) from rfg_adc_plotter.processing.normalization import ( build_calib_envelopes, + fit_complex_calibration_to_width, normalize_by_calib, + normalize_by_complex_calibration, normalize_by_envelope, resample_envelope, ) @@ -148,6 +153,46 @@ class ProcessingTests(unittest.TestCase): with self.assertRaises(ValueError): load_calib_envelope(path) + def test_complex_calibration_curve_roundtrip(self): + ch1 = np.asarray([1.0, 2.0, 3.0], dtype=np.float32) + ch2 = np.asarray([0.5, -1.0, 4.0], dtype=np.float32) + curve = build_complex_calibration_curve(ch1, ch2) + expected = np.asarray([1.0 + 0.5j, 2.0 - 1.0j, 3.0 + 4.0j], dtype=np.complex64) + + self.assertTrue(np.iscomplexobj(curve)) + self.assertTrue(np.allclose(curve, expected)) + + with tempfile.TemporaryDirectory() as tmp_dir: + path = os.path.join(tmp_dir, "complex_calibration") + saved_path = save_complex_calibration(path, curve) + loaded = load_complex_calibration(saved_path) + self.assertTrue(saved_path.endswith(".npy")) + self.assertEqual(loaded.dtype, np.complex64) + self.assertTrue(np.allclose(loaded, expected)) + + def test_fit_complex_calibration_to_width_pads_or_trims(self): + calib = np.asarray([1.0 + 1.0j, 2.0 + 2.0j], dtype=np.complex64) + padded = fit_complex_calibration_to_width(calib, 4) + trimmed = fit_complex_calibration_to_width( + np.asarray([1.0 + 1.0j, 2.0 + 2.0j, 3.0 + 3.0j], dtype=np.complex64), + 2, + ) + + self.assertEqual(padded.shape, (4,)) + self.assertTrue(np.allclose(padded, np.asarray([1.0 + 1.0j, 2.0 + 2.0j, 1.0 + 0.0j, 1.0 + 0.0j], dtype=np.complex64))) + self.assertEqual(trimmed.shape, (2,)) + self.assertTrue(np.allclose(trimmed, np.asarray([1.0 + 1.0j, 2.0 + 2.0j], dtype=np.complex64))) + + def test_normalize_by_complex_calibration_handles_zero_and_length_mismatch(self): + signal = np.asarray([2.0 + 2.0j, 4.0 + 0.0j, 3.0 + 3.0j], dtype=np.complex64) + calib = np.asarray([1.0 + 1.0j, 0.0 + 0.0j], dtype=np.complex64) + normalized = normalize_by_complex_calibration(signal, calib) + expected = np.asarray([2.0 + 0.0j, 4.0 + 0.0j, 3.0 + 3.0j], dtype=np.complex64) + + self.assertTrue(np.iscomplexobj(normalized)) + self.assertTrue(np.all(np.isfinite(normalized))) + self.assertTrue(np.allclose(normalized, expected)) + def test_fft_background_roundtrip_and_rejects_non_1d_payload(self): background = np.asarray([0.5, 1.5, 2.5], dtype=np.float32) with tempfile.TemporaryDirectory() as tmp_dir: