from pathlib import Path import numpy as np from rfg_adc_plotter.io.capture_reference_loader import ( aggregate_capture_reference, detect_reference_file_format, load_capture_sweeps, ) from rfg_adc_plotter.processing.pipeline import SweepPreprocessor ROOT = Path(__file__).resolve().parents[1] SAMPLE_BG = ROOT / "sample_data" / "empty" SAMPLE_CALIB = ROOT / "sample_data" / "no_antennas_35dB_attenuators" SAMPLE_NEW_FMT = ROOT / "sample_data" / "new_format" / "attenuators_50dB" def test_detect_reference_file_format_for_sample_capture(): assert detect_reference_file_format(str(SAMPLE_BG)) == "bin_capture" assert detect_reference_file_format(str(SAMPLE_CALIB)) == "bin_capture" assert detect_reference_file_format(str(SAMPLE_NEW_FMT)) == "bin_capture" def test_load_capture_sweeps_parses_binary_capture(): sweeps = load_capture_sweeps(str(SAMPLE_BG), fancy=False, logscale=False) assert len(sweeps) > 100 sweep0, info0 = sweeps[0] assert isinstance(sweep0, np.ndarray) assert "ch" in info0 channels = set() for _s, info in sweeps: chs = info.get("chs", [info.get("ch", 0)]) channels.update(int(v) for v in chs) assert channels == {0} def test_load_capture_sweeps_parses_new_format_logdetector_capture(): sweeps = load_capture_sweeps(str(SAMPLE_NEW_FMT), fancy=False, logscale=False) assert len(sweeps) > 900 widths = [int(s.size) for s, _info in sweeps] dominant_width = max(set(widths), key=widths.count) # Должно совпадать с ожидаемой шириной свипа из штатных capture. assert dominant_width in (758, 759) channels = set() for _s, info in sweeps: chs = info.get("chs", [info.get("ch", 0)]) channels.update(int(v) for v in chs) assert channels == {0} def test_aggregate_capture_reference_filters_incomplete_sweeps(): sweeps = load_capture_sweeps(str(SAMPLE_BG), fancy=False, logscale=False) vector, summary = aggregate_capture_reference(sweeps, channel=0, method="median", path=str(SAMPLE_BG)) assert isinstance(vector, np.ndarray) assert vector.dtype == np.float32 assert summary.sweeps_total == len(sweeps) assert summary.sweeps_valid > 0 assert summary.sweeps_valid < summary.sweeps_total assert summary.dominant_width in (759, 758) # sample_data starts at x=1..758 => width=759 def test_preprocessor_can_load_capture_calib_and_background_and_apply(): p = SweepPreprocessor(norm_type="projector", auto_save_live_calib_envelope=False) p.set_capture_parse_options(fancy=False, logscale=False) assert p.load_calib_reference(str(SAMPLE_CALIB)) p.set_calib_mode("file") p.set_calib_enabled(True) assert p.calib_file_envelope is not None assert p.calib_external_source_type == "capture" assert p.load_background_reference(str(SAMPLE_BG)) p.set_background_enabled(True) assert p.background_source_type == "capture_raw" n = min(758, int(p.calib_file_envelope.size)) sweep = np.linspace(-100.0, 100.0, n, dtype=np.float32) res = p.process(sweep, channel=1, update_references=False) assert res.calibration_applied is True assert res.background_applied is True assert res.calibration_source == "capture" assert "background_capture(raw->calib)" in res.stage_trace def test_preprocessor_applies_background_for_ch0_reference_too(): p = SweepPreprocessor(norm_type="projector", auto_save_live_calib_envelope=False) p.set_capture_parse_options(fancy=False, logscale=False) assert p.load_background_reference(str(SAMPLE_BG)) p.set_background_enabled(True) n = min(758, int(p.background.size)) if p.background is not None else 758 raw = np.linspace(-10.0, 10.0, n, dtype=np.float32) res = p.process(raw, channel=0, update_references=True) assert res.is_calibration_reference is True assert res.background_applied is True assert np.any(np.abs(res.processed_sweep - raw) > 0) assert p.last_calib_sweep is not None assert np.allclose(p.last_calib_sweep[:n], raw[:n], equal_nan=True)