diff --git a/README.md b/README.md index 0bdef69..e76bca6 100644 --- a/README.md +++ b/README.md @@ -115,7 +115,7 @@ Logscale binary с парой `int32`: .venv/bin/python -m rfg_adc_plotter.main /dev/ttyACM0 --logscale ``` -Logscale binary `16-bit x2`: +Complex binary `16-bit x2`: ```bash .venv/bin/python -m rfg_adc_plotter.main /dev/ttyACM0 --parser_16_bit_x2 diff --git a/rfg_adc_plotter/cli.py b/rfg_adc_plotter/cli.py index 96f0761..e7c2bb0 100644 --- a/rfg_adc_plotter/cli.py +++ b/rfg_adc_plotter/cli.py @@ -83,15 +83,15 @@ def build_parser() -> argparse.ArgumentParser: "--parser_16_bit_x2", action="store_true", help=( - "Бинарный logscale-протокол c парой int16 (avg_1, avg_2): " - "старт 0xFFFF,0xFFFF,0xFFFF,(CH<<8)|0x0A; точка step,avg1_lo16,avg2_lo16,0xFFFF" + "Бинарный complex-протокол c парой int16 (Re, Im): " + "старт 0xFFFF,0xFFFF,0xFFFF,(CH<<8)|0x0A; точка step,re_lo16,im_lo16,0xFFFF" ), ) parser.add_argument( "--parser_test", action="store_true", help=( - "Тестовый парсер для формата 16-bit x2: " + "Тестовый парсер для complex-формата 16-bit x2: " "одиночный 0xFFFF завершает точку, серия 0xFFFF начинает новый свип" ), ) diff --git a/rfg_adc_plotter/gui/pyqtgraph_backend.py b/rfg_adc_plotter/gui/pyqtgraph_backend.py index 9b10486..78a36c2 100644 --- a/rfg_adc_plotter/gui/pyqtgraph_backend.py +++ b/rfg_adc_plotter/gui/pyqtgraph_backend.py @@ -26,7 +26,7 @@ from rfg_adc_plotter.processing.calibration import ( save_calib_envelope, set_calibration_base_value, ) -from rfg_adc_plotter.processing.fft import compute_fft_mag_row, fft_mag_to_db +from rfg_adc_plotter.processing.fft import compute_fft_complex_row, compute_fft_mag_row, fft_mag_to_db from rfg_adc_plotter.processing.formatting import compute_auto_ylim, format_status_kv, parse_spec_clip from rfg_adc_plotter.processing.normalization import normalize_by_envelope, resample_envelope from rfg_adc_plotter.processing.peaks import ( @@ -205,6 +205,40 @@ def resolve_visible_aux_curves(aux_curves: SweepAuxCurves, enabled: bool) -> Swe return aux_1_arr, aux_2_arr +def resolve_visible_fft_curves( + fft_complex: Optional[np.ndarray], + fft_mag: Optional[np.ndarray], + *, + complex_mode: bool, + show_abs: bool, + show_real: bool, + show_imag: bool, +) -> Tuple[Optional[np.ndarray], Optional[np.ndarray], Optional[np.ndarray]]: + """Resolve the visible FFT line series for the current mode.""" + mag_arr = None if fft_mag is None else np.asarray(fft_mag, dtype=np.float32).reshape(-1) + if not complex_mode: + return (mag_arr if show_abs else None, None, None) + + complex_arr = None if fft_complex is None else np.asarray(fft_complex, dtype=np.complex64).reshape(-1) + if complex_arr is None or complex_arr.size <= 0: + return (mag_arr if show_abs else None, None, None) + + if mag_arr is None or mag_arr.size != complex_arr.size: + mag_arr = np.abs(complex_arr).astype(np.float32) + + abs_curve = mag_arr if show_abs else None + real_curve = complex_arr.real.astype(np.float32) if show_real else None + imag_curve = complex_arr.imag.astype(np.float32) if show_imag else None + return abs_curve, real_curve, imag_curve + + +def _db_to_linear_amplitude(values: np.ndarray) -> np.ndarray: + """Convert dB values back into linear amplitude for overlay display.""" + vals = np.asarray(values, dtype=np.float32) + out = np.power(np.float32(10.0), vals / np.float32(20.0)).astype(np.float32) - np.float32(1e-9) + return np.maximum(out, 0.0).astype(np.float32, copy=False) + + def compute_background_subtracted_bscan_levels( disp_fft_lin: np.ndarray, disp_fft: np.ndarray, @@ -233,7 +267,12 @@ def run_pyqtgraph(args) -> None: """Start the PyQtGraph GUI.""" peak_calibrate_mode = bool(getattr(args, "calibrate", False)) peak_search_enabled = bool(getattr(args, "peak_search", False)) - complex_sweep_mode = bool(getattr(args, "parser_complex_ascii", False)) + complex_ascii_mode = bool(getattr(args, "parser_complex_ascii", False)) + complex_sweep_mode = bool( + complex_ascii_mode + or getattr(args, "parser_16_bit_x2", False) + or getattr(args, "parser_test", False) + ) try: import pyqtgraph as pg from pyqtgraph.Qt import QtCore, QtWidgets # type: ignore @@ -254,7 +293,7 @@ def run_pyqtgraph(args) -> None: logscale=bool(args.logscale), parser_16_bit_x2=bool(args.parser_16_bit_x2), parser_test=bool(args.parser_test), - parser_complex_ascii=complex_sweep_mode, + parser_complex_ascii=complex_ascii_mode, ) reader.start() @@ -336,6 +375,8 @@ def run_pyqtgraph(args) -> None: p_fft = win.addPlot(row=1, col=0, title="FFT") p_fft.showGrid(x=True, y=True, alpha=0.3) curve_fft = p_fft.plot(pen=pg.mkPen((255, 120, 80), width=1)) + curve_fft_real = p_fft.plot(pen=pg.mkPen((80, 120, 255), width=1)) + curve_fft_imag = p_fft.plot(pen=pg.mkPen((120, 200, 120), width=1)) curve_fft_ref = p_fft.plot(pen=pg.mkPen((255, 0, 0), width=1)) peak_pen = pg.mkPen((255, 0, 0), width=1) peak_box_pen = pg.mkPen((0, 170, 0), width=3) @@ -353,7 +394,13 @@ def run_pyqtgraph(args) -> None: fft_left_line.setVisible(False) fft_right_line.setVisible(False) p_fft.setLabel("bottom", "Расстояние, м") - p_fft.setLabel("left", "дБ") + p_fft.setLabel("left", "Амплитуда" if complex_sweep_mode else "дБ") + if complex_sweep_mode: + try: + p_fft.setTitle("FFT: Re / Im / Abs") + p_line.setTitle("Сырые данные до FFT") + except Exception: + pass p_spec = win.addPlot(row=1, col=1, title="B-scan (дБ)") p_spec.invertY(False) @@ -452,8 +499,26 @@ def run_pyqtgraph(args) -> None: parsed_data_cb = QtWidgets.QCheckBox("данные после парсинга") if complex_sweep_mode: try: - parsed_data_cb.setText("Re/Im после парсинга") - p_line.setTitle("Модуль комплексного сигнала") + parsed_data_cb.setText("Сырые Re/Im") + except Exception: + pass + fft_curve_group = QtWidgets.QGroupBox("FFT кривые") + fft_curve_layout = QtWidgets.QVBoxLayout(fft_curve_group) + fft_curve_layout.setContentsMargins(6, 6, 6, 6) + fft_curve_layout.setSpacing(4) + fft_abs_cb = QtWidgets.QCheckBox("Abs после FFT") + fft_real_cb = QtWidgets.QCheckBox("Re после FFT") + fft_imag_cb = QtWidgets.QCheckBox("Im после FFT") + for checkbox in (fft_abs_cb, fft_real_cb, fft_imag_cb): + try: + checkbox.setChecked(True) + except Exception: + pass + fft_curve_layout.addWidget(checkbox) + if not complex_sweep_mode: + try: + fft_real_cb.setEnabled(False) + fft_imag_cb.setEnabled(False) except Exception: pass try: @@ -466,6 +531,7 @@ def run_pyqtgraph(args) -> None: settings_layout.addWidget(background_group) settings_layout.addWidget(fft_mode_label) settings_layout.addWidget(fft_mode_combo) + settings_layout.addWidget(fft_curve_group) settings_layout.addWidget(peak_search_cb) status = pg.LabelItem(justify="left") @@ -474,6 +540,9 @@ def run_pyqtgraph(args) -> None: calib_enabled = False parsed_data_enabled = False background_enabled = False + fft_abs_enabled = True + fft_real_enabled = True + fft_imag_enabled = True fft_mode = "symmetric" status_note = "" status_dirty = True @@ -587,6 +656,7 @@ def run_pyqtgraph(args) -> None: runtime.background_buffer.reset() if clear_profile: runtime.background_profile = None + runtime.current_fft_complex = None runtime.current_fft_mag = None runtime.current_fft_db = None @@ -603,6 +673,7 @@ def run_pyqtgraph(args) -> None: def reset_ring_buffers() -> None: runtime.ring.reset() runtime.current_distances = None + runtime.current_fft_complex = None runtime.current_fft_mag = None runtime.current_fft_db = None runtime.current_peak_width = None @@ -621,6 +692,7 @@ def run_pyqtgraph(args) -> None: runtime.current_sweep_raw = None runtime.current_fft_source = None runtime.current_fft_input = None + runtime.current_fft_complex = None runtime.current_aux_curves = None runtime.current_sweep_norm = None runtime.current_fft_mag = None @@ -658,6 +730,7 @@ def run_pyqtgraph(args) -> None: runtime.current_sweep_raw = None runtime.current_fft_source = None runtime.current_fft_input = None + runtime.current_fft_complex = None runtime.current_aux_curves = None runtime.current_sweep_norm = None runtime.current_fft_mag = None @@ -693,6 +766,7 @@ def run_pyqtgraph(args) -> None: dtype=np.complex64 if np.iscomplexobj(fft_source) else np.float32, ).copy() + runtime.current_fft_complex = None runtime.current_fft_mag = None runtime.current_fft_db = None if ( @@ -707,6 +781,7 @@ def run_pyqtgraph(args) -> None: ensure_buffer(runtime.current_sweep_raw.size) runtime.ring.push(sweep_for_processing, runtime.current_freqs, fft_input=fft_input_for_processing) runtime.current_distances = runtime.ring.distance_axis + runtime.current_fft_complex = None runtime.current_fft_mag = runtime.ring.get_last_fft_linear() runtime.current_fft_db = runtime.ring.last_fft_db if runtime.current_fft_mag is not None: @@ -732,6 +807,22 @@ def run_pyqtgraph(args) -> None: parsed_data_enabled = False runtime.mark_dirty() + def set_fft_curve_visibility() -> None: + nonlocal fft_abs_enabled, fft_real_enabled, fft_imag_enabled + try: + fft_abs_enabled = bool(fft_abs_cb.isChecked()) + except Exception: + fft_abs_enabled = True + try: + fft_real_enabled = bool(fft_real_cb.isChecked()) + except Exception: + fft_real_enabled = True + try: + fft_imag_enabled = bool(fft_imag_cb.isChecked()) + except Exception: + fft_imag_enabled = True + runtime.mark_dirty() + def restore_range_controls() -> None: nonlocal range_change_in_progress range_change_in_progress = True @@ -935,6 +1026,7 @@ def run_pyqtgraph(args) -> None: restore_range_controls() set_parsed_data_enabled() set_background_enabled() + set_fft_curve_visibility() set_fft_mode() try: @@ -950,6 +1042,9 @@ def run_pyqtgraph(args) -> None: background_save_btn.clicked.connect(lambda _checked=False: save_current_background()) background_load_btn.clicked.connect(lambda _checked=False: load_background_file()) fft_mode_combo.currentIndexChanged.connect(lambda _v: set_fft_mode()) + fft_abs_cb.stateChanged.connect(lambda _v: set_fft_curve_visibility()) + fft_real_cb.stateChanged.connect(lambda _v: set_fft_curve_visibility()) + fft_imag_cb.stateChanged.connect(lambda _v: set_fft_curve_visibility()) except Exception: pass @@ -1112,12 +1207,13 @@ def run_pyqtgraph(args) -> None: pass def refresh_current_fft_cache(sweep_for_fft: np.ndarray, bins: int) -> None: - runtime.current_fft_mag = compute_fft_mag_row( + runtime.current_fft_complex = compute_fft_complex_row( sweep_for_fft, runtime.current_freqs, bins, mode=fft_mode, ) + runtime.current_fft_mag = np.abs(runtime.current_fft_complex).astype(np.float32, copy=False) runtime.current_fft_db = fft_mag_to_db(runtime.current_fft_mag) def drain_queue() -> int: @@ -1258,9 +1354,21 @@ def run_pyqtgraph(args) -> None: sweep_for_fft = runtime.current_sweep_norm if runtime.current_sweep_norm is not None else runtime.current_sweep_raw distance_axis = runtime.current_distances if runtime.current_distances is not None else runtime.ring.distance_axis if sweep_for_fft is not None and sweep_for_fft.size > 0 and distance_axis is not None: - if runtime.current_fft_mag is None or runtime.current_fft_mag.size != distance_axis.size or runtime.plot_dirty: + if ( + runtime.current_fft_mag is None + or runtime.current_fft_mag.size != distance_axis.size + or runtime.plot_dirty + or ( + complex_sweep_mode + and ( + runtime.current_fft_complex is None + or runtime.current_fft_complex.size != distance_axis.size + ) + ) + ): refresh_current_fft_cache(sweep_for_fft, distance_axis.size) fft_mag = runtime.current_fft_mag + fft_complex = runtime.current_fft_complex xs_fft = distance_axis[: fft_mag.size] active_background = None try: @@ -1275,68 +1383,97 @@ def run_pyqtgraph(args) -> None: set_status_note(f"фон: не удалось применить ({exc})") active_background = None display_fft_mag = fft_mag - fft_vals = fft_mag_to_db(display_fft_mag) - curve_fft.setData(xs_fft, fft_vals) finite_x = xs_fft[np.isfinite(xs_fft)] if finite_x.size > 0: p_fft.setXRange(float(np.min(finite_x)), float(np.max(finite_x)), padding=0) - finite_fft = np.isfinite(xs_fft) & np.isfinite(fft_vals) - y_for_range = fft_vals[finite_fft] - if peak_search_enabled: - fft_ref = rolling_median_ref(xs_fft, fft_vals, peak_ref_window) - finite_ref = np.isfinite(xs_fft) & np.isfinite(fft_ref) - if np.any(finite_ref): - curve_fft_ref.setData(xs_fft[finite_ref], fft_ref[finite_ref]) - curve_fft_ref.setVisible(True) - y_for_range = np.concatenate((y_for_range, fft_ref[finite_ref])) + fft_vals_db = fft_mag_to_db(display_fft_mag) + ref_curve_for_range = None + + if complex_sweep_mode: + visible_abs, visible_real, visible_imag = resolve_visible_fft_curves( + fft_complex, + display_fft_mag, + complex_mode=True, + show_abs=fft_abs_enabled, + show_real=fft_real_enabled, + show_imag=fft_imag_enabled, + ) + if visible_abs is not None: + curve_fft.setData(xs_fft[: visible_abs.size], visible_abs) else: - curve_fft_ref.setVisible(False) - runtime.peak_candidates = find_top_peaks_over_ref(xs_fft, fft_vals, fft_ref, top_n=3) - refresh_peak_params_label(runtime.peak_candidates) - for idx, box in enumerate(fft_peak_boxes): - if idx < len(runtime.peak_candidates): - peak = runtime.peak_candidates[idx] - box.setData( - [peak["left"], peak["left"], peak["right"], peak["right"], peak["left"]], - [peak["ref"], peak["peak_y"], peak["peak_y"], peak["ref"], peak["ref"]], - ) - box.setVisible(True) + curve_fft.setData([], []) + if visible_real is not None: + curve_fft_real.setData(xs_fft[: visible_real.size], visible_real) + else: + curve_fft_real.setData([], []) + if visible_imag is not None: + curve_fft_imag.setData(xs_fft[: visible_imag.size], visible_imag) + else: + curve_fft_imag.setData([], []) + + if peak_search_enabled and visible_abs is not None: + fft_ref_db = rolling_median_ref(xs_fft, fft_vals_db, peak_ref_window) + finite_ref = np.isfinite(xs_fft) & np.isfinite(fft_ref_db) + if np.any(finite_ref): + fft_ref_lin = _db_to_linear_amplitude(fft_ref_db[finite_ref]) + curve_fft_ref.setData(xs_fft[finite_ref], fft_ref_lin) + curve_fft_ref.setVisible(True) + ref_curve_for_range = fft_ref_lin else: + curve_fft_ref.setVisible(False) + runtime.peak_candidates = find_top_peaks_over_ref(xs_fft, fft_vals_db, fft_ref_db, top_n=3) + refresh_peak_params_label(runtime.peak_candidates) + for idx, box in enumerate(fft_peak_boxes): + if idx < len(runtime.peak_candidates): + peak = runtime.peak_candidates[idx] + y_box = _db_to_linear_amplitude( + np.asarray( + [peak["ref"], peak["peak_y"], peak["peak_y"], peak["ref"], peak["ref"]], + dtype=np.float32, + ) + ) + box.setData( + [peak["left"], peak["left"], peak["right"], peak["right"], peak["left"]], + y_box, + ) + box.setVisible(True) + else: + box.setVisible(False) + else: + runtime.peak_candidates = [] + refresh_peak_params_label([]) + curve_fft_ref.setVisible(False) + for box in fft_peak_boxes: box.setVisible(False) - else: - runtime.peak_candidates = [] - refresh_peak_params_label([]) - curve_fft_ref.setVisible(False) - for box in fft_peak_boxes: - box.setVisible(False) - if active_background is not None: - p_fft.setYRange(-10.0, 30.0, padding=0) - else: - finite_y = y_for_range[np.isfinite(y_for_range)] - if finite_y.size > 0: - y0 = float(np.min(finite_y)) - y1 = float(np.max(finite_y)) - if y1 <= y0: - y1 = y0 + 1e-3 - p_fft.setYRange(y0, y1, padding=0) + y_limits = compute_auto_ylim(visible_abs, visible_real, visible_imag, ref_curve_for_range) + if y_limits is not None: + p_fft.setYRange(y_limits[0], y_limits[1], padding=0) - if peak_calibrate_mode: - markers = find_peak_width_markers(xs_fft, fft_vals) - if markers is not None: - fft_bg_line.setValue(markers["background"]) - fft_left_line.setValue(markers["left"]) - fft_right_line.setValue(markers["right"]) - spec_left_line.setValue(markers["left"]) - spec_right_line.setValue(markers["right"]) - fft_bg_line.setVisible(True) - fft_left_line.setVisible(True) - fft_right_line.setVisible(True) - spec_left_line.setVisible(True) - spec_right_line.setVisible(True) - runtime.current_peak_width = markers["width"] - runtime.current_peak_amplitude = markers["amplitude"] + if peak_calibrate_mode and visible_abs is not None: + markers = find_peak_width_markers(xs_fft, fft_vals_db) + if markers is not None: + fft_bg_line.setValue(float(_db_to_linear_amplitude(np.asarray([markers["background"]]))[0])) + fft_left_line.setValue(markers["left"]) + fft_right_line.setValue(markers["right"]) + spec_left_line.setValue(markers["left"]) + spec_right_line.setValue(markers["right"]) + fft_bg_line.setVisible(True) + fft_left_line.setVisible(True) + fft_right_line.setVisible(True) + spec_left_line.setVisible(True) + spec_right_line.setVisible(True) + runtime.current_peak_width = markers["width"] + runtime.current_peak_amplitude = markers["amplitude"] + else: + fft_bg_line.setVisible(False) + fft_left_line.setVisible(False) + fft_right_line.setVisible(False) + spec_left_line.setVisible(False) + spec_right_line.setVisible(False) + runtime.current_peak_width = None + runtime.current_peak_amplitude = None else: fft_bg_line.setVisible(False) fft_left_line.setVisible(False) @@ -1346,16 +1483,90 @@ def run_pyqtgraph(args) -> None: runtime.current_peak_width = None runtime.current_peak_amplitude = None else: - fft_bg_line.setVisible(False) - fft_left_line.setVisible(False) - fft_right_line.setVisible(False) - spec_left_line.setVisible(False) - spec_right_line.setVisible(False) - runtime.current_peak_width = None - runtime.current_peak_amplitude = None + curve_fft_real.setData([], []) + curve_fft_imag.setData([], []) + if fft_abs_enabled: + curve_fft.setData(xs_fft, fft_vals_db) + else: + curve_fft.setData([], []) + + finite_fft = np.isfinite(xs_fft) & np.isfinite(fft_vals_db) + y_for_range = fft_vals_db[finite_fft] if fft_abs_enabled else np.zeros((0,), dtype=np.float32) + if peak_search_enabled and fft_abs_enabled: + fft_ref = rolling_median_ref(xs_fft, fft_vals_db, peak_ref_window) + finite_ref = np.isfinite(xs_fft) & np.isfinite(fft_ref) + if np.any(finite_ref): + curve_fft_ref.setData(xs_fft[finite_ref], fft_ref[finite_ref]) + curve_fft_ref.setVisible(True) + y_for_range = np.concatenate((y_for_range, fft_ref[finite_ref])) + else: + curve_fft_ref.setVisible(False) + runtime.peak_candidates = find_top_peaks_over_ref(xs_fft, fft_vals_db, fft_ref, top_n=3) + refresh_peak_params_label(runtime.peak_candidates) + for idx, box in enumerate(fft_peak_boxes): + if idx < len(runtime.peak_candidates): + peak = runtime.peak_candidates[idx] + box.setData( + [peak["left"], peak["left"], peak["right"], peak["right"], peak["left"]], + [peak["ref"], peak["peak_y"], peak["peak_y"], peak["ref"], peak["ref"]], + ) + box.setVisible(True) + else: + box.setVisible(False) + else: + runtime.peak_candidates = [] + refresh_peak_params_label([]) + curve_fft_ref.setVisible(False) + for box in fft_peak_boxes: + box.setVisible(False) + + if active_background is not None and fft_abs_enabled: + p_fft.setYRange(-10.0, 30.0, padding=0) + else: + finite_y = y_for_range[np.isfinite(y_for_range)] + if finite_y.size > 0: + y0 = float(np.min(finite_y)) + y1 = float(np.max(finite_y)) + if y1 <= y0: + y1 = y0 + 1e-3 + p_fft.setYRange(y0, y1, padding=0) + + if peak_calibrate_mode and fft_abs_enabled: + markers = find_peak_width_markers(xs_fft, fft_vals_db) + if markers is not None: + fft_bg_line.setValue(markers["background"]) + fft_left_line.setValue(markers["left"]) + fft_right_line.setValue(markers["right"]) + spec_left_line.setValue(markers["left"]) + spec_right_line.setValue(markers["right"]) + fft_bg_line.setVisible(True) + fft_left_line.setVisible(True) + fft_right_line.setVisible(True) + spec_left_line.setVisible(True) + spec_right_line.setVisible(True) + runtime.current_peak_width = markers["width"] + runtime.current_peak_amplitude = markers["amplitude"] + else: + fft_bg_line.setVisible(False) + fft_left_line.setVisible(False) + fft_right_line.setVisible(False) + spec_left_line.setVisible(False) + spec_right_line.setVisible(False) + runtime.current_peak_width = None + runtime.current_peak_amplitude = None + else: + fft_bg_line.setVisible(False) + fft_left_line.setVisible(False) + fft_right_line.setVisible(False) + spec_left_line.setVisible(False) + spec_right_line.setVisible(False) + runtime.current_peak_width = None + runtime.current_peak_amplitude = None else: curve_fft_ref.setVisible(False) curve_fft.setData([], []) + curve_fft_real.setData([], []) + curve_fft_imag.setData([], []) for box in fft_peak_boxes: box.setVisible(False) fft_bg_line.setVisible(False) diff --git a/rfg_adc_plotter/io/sweep_parser_core.py b/rfg_adc_plotter/io/sweep_parser_core.py index e5fa2c3..f42a1b4 100644 --- a/rfg_adc_plotter/io/sweep_parser_core.py +++ b/rfg_adc_plotter/io/sweep_parser_core.py @@ -231,14 +231,14 @@ class LogScale16BitX2BinaryParser: del self._buf[:8] continue if words[3] == 0xFFFF and words[0] != 0xFFFF: - avg_1 = u16_to_i16(words[1]) - avg_2 = u16_to_i16(words[2]) + real = u16_to_i16(words[1]) + imag = u16_to_i16(words[2]) events.append( PointEvent( ch=self._current_channel, x=int(words[0]), - y=log_pair_to_sweep(avg_1, avg_2), - aux=(float(avg_1), float(avg_2)), + y=float(abs(complex(real, imag))), + aux=(float(real), float(imag)), ) ) del self._buf[:8] @@ -268,14 +268,14 @@ class ParserTestStreamParser: return None if self._expected_step is not None and step < self._expected_step: return None - avg_1 = u16_to_i16(int(self._point_buf[1])) - avg_2 = u16_to_i16(int(self._point_buf[2])) + real = u16_to_i16(int(self._point_buf[1])) + imag = u16_to_i16(int(self._point_buf[2])) self._expected_step = step + 1 return PointEvent( ch=self._current_channel, x=step, - y=log_pair_to_sweep(avg_1, avg_2), - aux=(float(avg_1), float(avg_2)), + y=float(abs(complex(real, imag))), + aux=(float(real), float(imag)), ) def feed(self, data: bytes) -> List[ParserEvent]: diff --git a/rfg_adc_plotter/processing/__init__.py b/rfg_adc_plotter/processing/__init__.py index ca489a0..e104de5 100644 --- a/rfg_adc_plotter/processing/__init__.py +++ b/rfg_adc_plotter/processing/__init__.py @@ -18,6 +18,7 @@ from rfg_adc_plotter.processing.calibration import ( ) from rfg_adc_plotter.processing.fft import ( compute_distance_axis, + compute_fft_complex_row, compute_fft_mag_row, compute_fft_row, fft_mag_to_db, @@ -44,6 +45,7 @@ __all__ = [ "calibrate_freqs", "compute_auto_ylim", "compute_distance_axis", + "compute_fft_complex_row", "compute_fft_mag_row", "compute_fft_row", "fft_mag_to_db", diff --git a/rfg_adc_plotter/processing/fft.py b/rfg_adc_plotter/processing/fft.py index 8688337..86cb06e 100644 --- a/rfg_adc_plotter/processing/fft.py +++ b/rfg_adc_plotter/processing/fft.py @@ -179,25 +179,23 @@ def fft_mag_to_db(mag: np.ndarray) -> np.ndarray: return (20.0 * np.log10(safe_mag + 1e-9)).astype(np.float32, copy=False) -def _compute_fft_mag_row_direct( +def _compute_fft_complex_row_direct( sweep: np.ndarray, freqs: Optional[np.ndarray], bins: int, ) -> np.ndarray: prepared = prepare_fft_segment(sweep, freqs, fft_len=FFT_LEN) if prepared is None: - return np.full((bins,), np.nan, dtype=np.float32) + return np.full((bins,), np.nan + 0j, dtype=np.complex64) fft_seg, take_fft = prepared - fft_dtype = np.complex64 if np.iscomplexobj(fft_seg) else np.float32 - fft_in = np.zeros((FFT_LEN,), dtype=fft_dtype) + fft_in = np.zeros((FFT_LEN,), dtype=np.complex64) window = np.hanning(take_fft).astype(np.float32) - fft_in[:take_fft] = fft_seg * window - spec = np.fft.ifft(fft_in) - mag = np.abs(spec).astype(np.float32) - if mag.shape[0] != bins: - mag = mag[:bins] - return mag + fft_in[:take_fft] = np.asarray(fft_seg, dtype=np.complex64) * window + spec = np.fft.ifft(fft_in).astype(np.complex64, copy=False) + if spec.shape[0] != bins: + spec = spec[:bins] + return spec def _normalize_fft_mode(mode: str | None, symmetric: Optional[bool]) -> str: @@ -213,6 +211,36 @@ def _normalize_fft_mode(mode: str | None, symmetric: Optional[bool]) -> str: raise ValueError(f"Unsupported FFT mode: {mode!r}") +def compute_fft_complex_row( + sweep: np.ndarray, + freqs: Optional[np.ndarray], + bins: int, + *, + mode: str = "symmetric", + symmetric: Optional[bool] = None, +) -> np.ndarray: + """Compute a complex FFT/IFFT row on the distance axis.""" + if bins <= 0: + return np.zeros((0,), dtype=np.complex64) + + fft_mode = _normalize_fft_mode(mode, symmetric) + if fft_mode == "direct": + return _compute_fft_complex_row_direct(sweep, freqs, bins) + + if fft_mode == "positive_only": + spectrum_centered = build_positive_only_centered_ifft_spectrum(sweep, freqs, fft_len=FFT_LEN) + else: + spectrum_centered = build_symmetric_ifft_spectrum(sweep, freqs, fft_len=FFT_LEN) + if spectrum_centered is None: + return np.full((bins,), np.nan + 0j, dtype=np.complex64) + + spec = np.fft.ifft(np.fft.ifftshift(np.asarray(spectrum_centered, dtype=np.complex64))) + spec = np.asarray(spec, dtype=np.complex64) + if spec.shape[0] != bins: + spec = spec[:bins] + return spec + + def compute_fft_mag_row( sweep: np.ndarray, freqs: Optional[np.ndarray], @@ -222,25 +250,8 @@ def compute_fft_mag_row( symmetric: Optional[bool] = None, ) -> np.ndarray: """Compute a linear FFT magnitude row.""" - if bins <= 0: - return np.zeros((0,), dtype=np.float32) - - fft_mode = _normalize_fft_mode(mode, symmetric) - if fft_mode == "direct": - return _compute_fft_mag_row_direct(sweep, freqs, bins) - - if fft_mode == "positive_only": - spectrum_centered = build_positive_only_centered_ifft_spectrum(sweep, freqs, fft_len=FFT_LEN) - else: - spectrum_centered = build_symmetric_ifft_spectrum(sweep, freqs, fft_len=FFT_LEN) - if spectrum_centered is None: - return np.full((bins,), np.nan, dtype=np.float32) - - spec = np.fft.ifft(np.fft.ifftshift(spectrum_centered)) - mag = np.abs(spec).astype(np.float32) - if mag.shape[0] != bins: - mag = mag[:bins] - return mag + complex_row = compute_fft_complex_row(sweep, freqs, bins, mode=mode, symmetric=symmetric) + return np.abs(complex_row).astype(np.float32, copy=False) def compute_fft_row( diff --git a/rfg_adc_plotter/state/runtime_state.py b/rfg_adc_plotter/state/runtime_state.py index d08efa5..c8fdebf 100644 --- a/rfg_adc_plotter/state/runtime_state.py +++ b/rfg_adc_plotter/state/runtime_state.py @@ -27,6 +27,7 @@ class RuntimeState: current_sweep_raw: Optional[np.ndarray] = None current_fft_source: Optional[np.ndarray] = None current_fft_input: Optional[np.ndarray] = None + current_fft_complex: Optional[np.ndarray] = None current_aux_curves: SweepAuxCurves = None current_sweep_norm: Optional[np.ndarray] = None current_fft_mag: Optional[np.ndarray] = None diff --git a/tests/test_processing.py b/tests/test_processing.py index 69503ce..b4bd978 100644 --- a/tests/test_processing.py +++ b/tests/test_processing.py @@ -10,6 +10,7 @@ from rfg_adc_plotter.gui.pyqtgraph_backend import ( apply_working_range, apply_working_range_to_aux_curves, compute_background_subtracted_bscan_levels, + resolve_visible_fft_curves, resolve_visible_aux_curves, ) from rfg_adc_plotter.processing.calibration import ( @@ -28,6 +29,7 @@ from rfg_adc_plotter.processing.fft import ( build_positive_only_centered_ifft_spectrum, build_symmetric_ifft_spectrum, compute_distance_axis, + compute_fft_complex_row, compute_fft_mag_row, compute_fft_row, fft_mag_to_db, @@ -287,14 +289,51 @@ class ProcessingTests(unittest.TestCase): def test_compute_fft_helpers_accept_complex_input(self): sweep = np.exp(1j * np.linspace(0.0, 2.0 * np.pi, 128)).astype(np.complex64) freqs = np.linspace(3.3, 14.3, 128, dtype=np.float64) + complex_row = compute_fft_complex_row(sweep, freqs, 513, mode="positive_only") mag = compute_fft_mag_row(sweep, freqs, 513, mode="positive_only") row = compute_fft_row(sweep, freqs, 513, mode="positive_only") + self.assertEqual(complex_row.shape, (513,)) + self.assertTrue(np.iscomplexobj(complex_row)) self.assertEqual(mag.shape, (513,)) self.assertEqual(row.shape, (513,)) + self.assertTrue(np.allclose(mag, np.abs(complex_row), equal_nan=True)) self.assertTrue(np.any(np.isfinite(mag))) self.assertTrue(np.any(np.isfinite(row))) + def test_resolve_visible_fft_curves_handles_complex_mode(self): + complex_row = np.asarray([1.0 + 2.0j, -3.0 + 4.0j], dtype=np.complex64) + mag = np.abs(complex_row).astype(np.float32) + + abs_curve, real_curve, imag_curve = resolve_visible_fft_curves( + complex_row, + mag, + complex_mode=True, + show_abs=True, + show_real=False, + show_imag=True, + ) + + self.assertTrue(np.allclose(abs_curve, mag)) + self.assertIsNone(real_curve) + self.assertTrue(np.allclose(imag_curve, np.asarray([2.0, 4.0], dtype=np.float32))) + + def test_resolve_visible_fft_curves_preserves_legacy_abs_mode(self): + mag = np.asarray([1.0, 2.0, 3.0], dtype=np.float32) + + abs_curve, real_curve, imag_curve = resolve_visible_fft_curves( + None, + mag, + complex_mode=False, + show_abs=True, + show_real=True, + show_imag=True, + ) + + self.assertTrue(np.allclose(abs_curve, mag)) + self.assertIsNone(real_curve) + self.assertIsNone(imag_curve) + def test_symmetric_distance_axis_uses_windowed_frequency_bounds(self): freqs = np.linspace(4.0, 10.0, 128, dtype=np.float64) axis = compute_distance_axis(freqs, 513, mode="symmetric") diff --git a/tests/test_ring_buffer.py b/tests/test_ring_buffer.py index 26fd692..0a14397 100644 --- a/tests/test_ring_buffer.py +++ b/tests/test_ring_buffer.py @@ -84,6 +84,7 @@ class RingBufferTests(unittest.TestCase): expected = compute_fft_mag_row(complex_input, freqs, ring.fft_bins, mode="direct") self.assertTrue(np.allclose(ring.get_last_fft_linear(), expected)) + self.assertFalse(np.iscomplexobj(ring.get_display_fft_linear())) self.assertTrue(np.allclose(ring.get_display_raw()[: display_sweep.size, -1], display_sweep)) def test_ring_buffer_reset_clears_cached_history(self): diff --git a/tests/test_sweep_parser_core.py b/tests/test_sweep_parser_core.py index aa8ccc8..836f6f2 100644 --- a/tests/test_sweep_parser_core.py +++ b/tests/test_sweep_parser_core.py @@ -134,6 +134,7 @@ class SweepParserCoreTests(unittest.TestCase): self.assertEqual(events[0].ch, 2) self.assertIsInstance(events[1], PointEvent) self.assertEqual(events[1].ch, 2) + self.assertAlmostEqual(events[1].y, math.hypot(100.0, 90.0), places=6) self.assertEqual(events[1].aux, (100.0, 90.0)) def test_parser_test_stream_parser_recovers_point_after_single_separator(self): @@ -155,7 +156,8 @@ class SweepParserCoreTests(unittest.TestCase): self.assertIsInstance(events[1], PointEvent) self.assertEqual(events[1].ch, 4) self.assertEqual(events[1].x, 1) - self.assertTrue(math.isfinite(events[1].y)) + self.assertAlmostEqual(events[1].y, math.hypot(100.0, 90.0), places=6) + self.assertEqual(events[1].aux, (100.0, 90.0)) def test_sweep_assembler_builds_aux_curves_without_inversion(self): assembler = SweepAssembler(fancy=False, apply_inversion=False)