This commit is contained in:
awe
2026-03-26 20:01:56 +03:00
parent 64e66933e4
commit 5152314f21
10 changed files with 377 additions and 110 deletions

View File

@ -26,7 +26,7 @@ from rfg_adc_plotter.processing.calibration import (
save_calib_envelope,
set_calibration_base_value,
)
from rfg_adc_plotter.processing.fft import compute_fft_mag_row, fft_mag_to_db
from rfg_adc_plotter.processing.fft import compute_fft_complex_row, compute_fft_mag_row, fft_mag_to_db
from rfg_adc_plotter.processing.formatting import compute_auto_ylim, format_status_kv, parse_spec_clip
from rfg_adc_plotter.processing.normalization import normalize_by_envelope, resample_envelope
from rfg_adc_plotter.processing.peaks import (
@ -205,6 +205,40 @@ def resolve_visible_aux_curves(aux_curves: SweepAuxCurves, enabled: bool) -> Swe
return aux_1_arr, aux_2_arr
def resolve_visible_fft_curves(
fft_complex: Optional[np.ndarray],
fft_mag: Optional[np.ndarray],
*,
complex_mode: bool,
show_abs: bool,
show_real: bool,
show_imag: bool,
) -> Tuple[Optional[np.ndarray], Optional[np.ndarray], Optional[np.ndarray]]:
"""Resolve the visible FFT line series for the current mode."""
mag_arr = None if fft_mag is None else np.asarray(fft_mag, dtype=np.float32).reshape(-1)
if not complex_mode:
return (mag_arr if show_abs else None, None, None)
complex_arr = None if fft_complex is None else np.asarray(fft_complex, dtype=np.complex64).reshape(-1)
if complex_arr is None or complex_arr.size <= 0:
return (mag_arr if show_abs else None, None, None)
if mag_arr is None or mag_arr.size != complex_arr.size:
mag_arr = np.abs(complex_arr).astype(np.float32)
abs_curve = mag_arr if show_abs else None
real_curve = complex_arr.real.astype(np.float32) if show_real else None
imag_curve = complex_arr.imag.astype(np.float32) if show_imag else None
return abs_curve, real_curve, imag_curve
def _db_to_linear_amplitude(values: np.ndarray) -> np.ndarray:
"""Convert dB values back into linear amplitude for overlay display."""
vals = np.asarray(values, dtype=np.float32)
out = np.power(np.float32(10.0), vals / np.float32(20.0)).astype(np.float32) - np.float32(1e-9)
return np.maximum(out, 0.0).astype(np.float32, copy=False)
def compute_background_subtracted_bscan_levels(
disp_fft_lin: np.ndarray,
disp_fft: np.ndarray,
@ -233,7 +267,12 @@ def run_pyqtgraph(args) -> None:
"""Start the PyQtGraph GUI."""
peak_calibrate_mode = bool(getattr(args, "calibrate", False))
peak_search_enabled = bool(getattr(args, "peak_search", False))
complex_sweep_mode = bool(getattr(args, "parser_complex_ascii", False))
complex_ascii_mode = bool(getattr(args, "parser_complex_ascii", False))
complex_sweep_mode = bool(
complex_ascii_mode
or getattr(args, "parser_16_bit_x2", False)
or getattr(args, "parser_test", False)
)
try:
import pyqtgraph as pg
from pyqtgraph.Qt import QtCore, QtWidgets # type: ignore
@ -254,7 +293,7 @@ def run_pyqtgraph(args) -> None:
logscale=bool(args.logscale),
parser_16_bit_x2=bool(args.parser_16_bit_x2),
parser_test=bool(args.parser_test),
parser_complex_ascii=complex_sweep_mode,
parser_complex_ascii=complex_ascii_mode,
)
reader.start()
@ -336,6 +375,8 @@ def run_pyqtgraph(args) -> None:
p_fft = win.addPlot(row=1, col=0, title="FFT")
p_fft.showGrid(x=True, y=True, alpha=0.3)
curve_fft = p_fft.plot(pen=pg.mkPen((255, 120, 80), width=1))
curve_fft_real = p_fft.plot(pen=pg.mkPen((80, 120, 255), width=1))
curve_fft_imag = p_fft.plot(pen=pg.mkPen((120, 200, 120), width=1))
curve_fft_ref = p_fft.plot(pen=pg.mkPen((255, 0, 0), width=1))
peak_pen = pg.mkPen((255, 0, 0), width=1)
peak_box_pen = pg.mkPen((0, 170, 0), width=3)
@ -353,7 +394,13 @@ def run_pyqtgraph(args) -> None:
fft_left_line.setVisible(False)
fft_right_line.setVisible(False)
p_fft.setLabel("bottom", "Расстояние, м")
p_fft.setLabel("left", "дБ")
p_fft.setLabel("left", "Амплитуда" if complex_sweep_mode else "дБ")
if complex_sweep_mode:
try:
p_fft.setTitle("FFT: Re / Im / Abs")
p_line.setTitle("Сырые данные до FFT")
except Exception:
pass
p_spec = win.addPlot(row=1, col=1, title="B-scan (дБ)")
p_spec.invertY(False)
@ -452,8 +499,26 @@ def run_pyqtgraph(args) -> None:
parsed_data_cb = QtWidgets.QCheckBox("данные после парсинга")
if complex_sweep_mode:
try:
parsed_data_cb.setText("Re/Im после парсинга")
p_line.setTitle("Модуль комплексного сигнала")
parsed_data_cb.setText("Сырые Re/Im")
except Exception:
pass
fft_curve_group = QtWidgets.QGroupBox("FFT кривые")
fft_curve_layout = QtWidgets.QVBoxLayout(fft_curve_group)
fft_curve_layout.setContentsMargins(6, 6, 6, 6)
fft_curve_layout.setSpacing(4)
fft_abs_cb = QtWidgets.QCheckBox("Abs после FFT")
fft_real_cb = QtWidgets.QCheckBox("Re после FFT")
fft_imag_cb = QtWidgets.QCheckBox("Im после FFT")
for checkbox in (fft_abs_cb, fft_real_cb, fft_imag_cb):
try:
checkbox.setChecked(True)
except Exception:
pass
fft_curve_layout.addWidget(checkbox)
if not complex_sweep_mode:
try:
fft_real_cb.setEnabled(False)
fft_imag_cb.setEnabled(False)
except Exception:
pass
try:
@ -466,6 +531,7 @@ def run_pyqtgraph(args) -> None:
settings_layout.addWidget(background_group)
settings_layout.addWidget(fft_mode_label)
settings_layout.addWidget(fft_mode_combo)
settings_layout.addWidget(fft_curve_group)
settings_layout.addWidget(peak_search_cb)
status = pg.LabelItem(justify="left")
@ -474,6 +540,9 @@ def run_pyqtgraph(args) -> None:
calib_enabled = False
parsed_data_enabled = False
background_enabled = False
fft_abs_enabled = True
fft_real_enabled = True
fft_imag_enabled = True
fft_mode = "symmetric"
status_note = ""
status_dirty = True
@ -587,6 +656,7 @@ def run_pyqtgraph(args) -> None:
runtime.background_buffer.reset()
if clear_profile:
runtime.background_profile = None
runtime.current_fft_complex = None
runtime.current_fft_mag = None
runtime.current_fft_db = None
@ -603,6 +673,7 @@ def run_pyqtgraph(args) -> None:
def reset_ring_buffers() -> None:
runtime.ring.reset()
runtime.current_distances = None
runtime.current_fft_complex = None
runtime.current_fft_mag = None
runtime.current_fft_db = None
runtime.current_peak_width = None
@ -621,6 +692,7 @@ def run_pyqtgraph(args) -> None:
runtime.current_sweep_raw = None
runtime.current_fft_source = None
runtime.current_fft_input = None
runtime.current_fft_complex = None
runtime.current_aux_curves = None
runtime.current_sweep_norm = None
runtime.current_fft_mag = None
@ -658,6 +730,7 @@ def run_pyqtgraph(args) -> None:
runtime.current_sweep_raw = None
runtime.current_fft_source = None
runtime.current_fft_input = None
runtime.current_fft_complex = None
runtime.current_aux_curves = None
runtime.current_sweep_norm = None
runtime.current_fft_mag = None
@ -693,6 +766,7 @@ def run_pyqtgraph(args) -> None:
dtype=np.complex64 if np.iscomplexobj(fft_source) else np.float32,
).copy()
runtime.current_fft_complex = None
runtime.current_fft_mag = None
runtime.current_fft_db = None
if (
@ -707,6 +781,7 @@ def run_pyqtgraph(args) -> None:
ensure_buffer(runtime.current_sweep_raw.size)
runtime.ring.push(sweep_for_processing, runtime.current_freqs, fft_input=fft_input_for_processing)
runtime.current_distances = runtime.ring.distance_axis
runtime.current_fft_complex = None
runtime.current_fft_mag = runtime.ring.get_last_fft_linear()
runtime.current_fft_db = runtime.ring.last_fft_db
if runtime.current_fft_mag is not None:
@ -732,6 +807,22 @@ def run_pyqtgraph(args) -> None:
parsed_data_enabled = False
runtime.mark_dirty()
def set_fft_curve_visibility() -> None:
nonlocal fft_abs_enabled, fft_real_enabled, fft_imag_enabled
try:
fft_abs_enabled = bool(fft_abs_cb.isChecked())
except Exception:
fft_abs_enabled = True
try:
fft_real_enabled = bool(fft_real_cb.isChecked())
except Exception:
fft_real_enabled = True
try:
fft_imag_enabled = bool(fft_imag_cb.isChecked())
except Exception:
fft_imag_enabled = True
runtime.mark_dirty()
def restore_range_controls() -> None:
nonlocal range_change_in_progress
range_change_in_progress = True
@ -935,6 +1026,7 @@ def run_pyqtgraph(args) -> None:
restore_range_controls()
set_parsed_data_enabled()
set_background_enabled()
set_fft_curve_visibility()
set_fft_mode()
try:
@ -950,6 +1042,9 @@ def run_pyqtgraph(args) -> None:
background_save_btn.clicked.connect(lambda _checked=False: save_current_background())
background_load_btn.clicked.connect(lambda _checked=False: load_background_file())
fft_mode_combo.currentIndexChanged.connect(lambda _v: set_fft_mode())
fft_abs_cb.stateChanged.connect(lambda _v: set_fft_curve_visibility())
fft_real_cb.stateChanged.connect(lambda _v: set_fft_curve_visibility())
fft_imag_cb.stateChanged.connect(lambda _v: set_fft_curve_visibility())
except Exception:
pass
@ -1112,12 +1207,13 @@ def run_pyqtgraph(args) -> None:
pass
def refresh_current_fft_cache(sweep_for_fft: np.ndarray, bins: int) -> None:
runtime.current_fft_mag = compute_fft_mag_row(
runtime.current_fft_complex = compute_fft_complex_row(
sweep_for_fft,
runtime.current_freqs,
bins,
mode=fft_mode,
)
runtime.current_fft_mag = np.abs(runtime.current_fft_complex).astype(np.float32, copy=False)
runtime.current_fft_db = fft_mag_to_db(runtime.current_fft_mag)
def drain_queue() -> int:
@ -1258,9 +1354,21 @@ def run_pyqtgraph(args) -> None:
sweep_for_fft = runtime.current_sweep_norm if runtime.current_sweep_norm is not None else runtime.current_sweep_raw
distance_axis = runtime.current_distances if runtime.current_distances is not None else runtime.ring.distance_axis
if sweep_for_fft is not None and sweep_for_fft.size > 0 and distance_axis is not None:
if runtime.current_fft_mag is None or runtime.current_fft_mag.size != distance_axis.size or runtime.plot_dirty:
if (
runtime.current_fft_mag is None
or runtime.current_fft_mag.size != distance_axis.size
or runtime.plot_dirty
or (
complex_sweep_mode
and (
runtime.current_fft_complex is None
or runtime.current_fft_complex.size != distance_axis.size
)
)
):
refresh_current_fft_cache(sweep_for_fft, distance_axis.size)
fft_mag = runtime.current_fft_mag
fft_complex = runtime.current_fft_complex
xs_fft = distance_axis[: fft_mag.size]
active_background = None
try:
@ -1275,68 +1383,97 @@ def run_pyqtgraph(args) -> None:
set_status_note(f"фон: не удалось применить ({exc})")
active_background = None
display_fft_mag = fft_mag
fft_vals = fft_mag_to_db(display_fft_mag)
curve_fft.setData(xs_fft, fft_vals)
finite_x = xs_fft[np.isfinite(xs_fft)]
if finite_x.size > 0:
p_fft.setXRange(float(np.min(finite_x)), float(np.max(finite_x)), padding=0)
finite_fft = np.isfinite(xs_fft) & np.isfinite(fft_vals)
y_for_range = fft_vals[finite_fft]
if peak_search_enabled:
fft_ref = rolling_median_ref(xs_fft, fft_vals, peak_ref_window)
finite_ref = np.isfinite(xs_fft) & np.isfinite(fft_ref)
if np.any(finite_ref):
curve_fft_ref.setData(xs_fft[finite_ref], fft_ref[finite_ref])
curve_fft_ref.setVisible(True)
y_for_range = np.concatenate((y_for_range, fft_ref[finite_ref]))
fft_vals_db = fft_mag_to_db(display_fft_mag)
ref_curve_for_range = None
if complex_sweep_mode:
visible_abs, visible_real, visible_imag = resolve_visible_fft_curves(
fft_complex,
display_fft_mag,
complex_mode=True,
show_abs=fft_abs_enabled,
show_real=fft_real_enabled,
show_imag=fft_imag_enabled,
)
if visible_abs is not None:
curve_fft.setData(xs_fft[: visible_abs.size], visible_abs)
else:
curve_fft_ref.setVisible(False)
runtime.peak_candidates = find_top_peaks_over_ref(xs_fft, fft_vals, fft_ref, top_n=3)
refresh_peak_params_label(runtime.peak_candidates)
for idx, box in enumerate(fft_peak_boxes):
if idx < len(runtime.peak_candidates):
peak = runtime.peak_candidates[idx]
box.setData(
[peak["left"], peak["left"], peak["right"], peak["right"], peak["left"]],
[peak["ref"], peak["peak_y"], peak["peak_y"], peak["ref"], peak["ref"]],
)
box.setVisible(True)
curve_fft.setData([], [])
if visible_real is not None:
curve_fft_real.setData(xs_fft[: visible_real.size], visible_real)
else:
curve_fft_real.setData([], [])
if visible_imag is not None:
curve_fft_imag.setData(xs_fft[: visible_imag.size], visible_imag)
else:
curve_fft_imag.setData([], [])
if peak_search_enabled and visible_abs is not None:
fft_ref_db = rolling_median_ref(xs_fft, fft_vals_db, peak_ref_window)
finite_ref = np.isfinite(xs_fft) & np.isfinite(fft_ref_db)
if np.any(finite_ref):
fft_ref_lin = _db_to_linear_amplitude(fft_ref_db[finite_ref])
curve_fft_ref.setData(xs_fft[finite_ref], fft_ref_lin)
curve_fft_ref.setVisible(True)
ref_curve_for_range = fft_ref_lin
else:
curve_fft_ref.setVisible(False)
runtime.peak_candidates = find_top_peaks_over_ref(xs_fft, fft_vals_db, fft_ref_db, top_n=3)
refresh_peak_params_label(runtime.peak_candidates)
for idx, box in enumerate(fft_peak_boxes):
if idx < len(runtime.peak_candidates):
peak = runtime.peak_candidates[idx]
y_box = _db_to_linear_amplitude(
np.asarray(
[peak["ref"], peak["peak_y"], peak["peak_y"], peak["ref"], peak["ref"]],
dtype=np.float32,
)
)
box.setData(
[peak["left"], peak["left"], peak["right"], peak["right"], peak["left"]],
y_box,
)
box.setVisible(True)
else:
box.setVisible(False)
else:
runtime.peak_candidates = []
refresh_peak_params_label([])
curve_fft_ref.setVisible(False)
for box in fft_peak_boxes:
box.setVisible(False)
else:
runtime.peak_candidates = []
refresh_peak_params_label([])
curve_fft_ref.setVisible(False)
for box in fft_peak_boxes:
box.setVisible(False)
if active_background is not None:
p_fft.setYRange(-10.0, 30.0, padding=0)
else:
finite_y = y_for_range[np.isfinite(y_for_range)]
if finite_y.size > 0:
y0 = float(np.min(finite_y))
y1 = float(np.max(finite_y))
if y1 <= y0:
y1 = y0 + 1e-3
p_fft.setYRange(y0, y1, padding=0)
y_limits = compute_auto_ylim(visible_abs, visible_real, visible_imag, ref_curve_for_range)
if y_limits is not None:
p_fft.setYRange(y_limits[0], y_limits[1], padding=0)
if peak_calibrate_mode:
markers = find_peak_width_markers(xs_fft, fft_vals)
if markers is not None:
fft_bg_line.setValue(markers["background"])
fft_left_line.setValue(markers["left"])
fft_right_line.setValue(markers["right"])
spec_left_line.setValue(markers["left"])
spec_right_line.setValue(markers["right"])
fft_bg_line.setVisible(True)
fft_left_line.setVisible(True)
fft_right_line.setVisible(True)
spec_left_line.setVisible(True)
spec_right_line.setVisible(True)
runtime.current_peak_width = markers["width"]
runtime.current_peak_amplitude = markers["amplitude"]
if peak_calibrate_mode and visible_abs is not None:
markers = find_peak_width_markers(xs_fft, fft_vals_db)
if markers is not None:
fft_bg_line.setValue(float(_db_to_linear_amplitude(np.asarray([markers["background"]]))[0]))
fft_left_line.setValue(markers["left"])
fft_right_line.setValue(markers["right"])
spec_left_line.setValue(markers["left"])
spec_right_line.setValue(markers["right"])
fft_bg_line.setVisible(True)
fft_left_line.setVisible(True)
fft_right_line.setVisible(True)
spec_left_line.setVisible(True)
spec_right_line.setVisible(True)
runtime.current_peak_width = markers["width"]
runtime.current_peak_amplitude = markers["amplitude"]
else:
fft_bg_line.setVisible(False)
fft_left_line.setVisible(False)
fft_right_line.setVisible(False)
spec_left_line.setVisible(False)
spec_right_line.setVisible(False)
runtime.current_peak_width = None
runtime.current_peak_amplitude = None
else:
fft_bg_line.setVisible(False)
fft_left_line.setVisible(False)
@ -1346,16 +1483,90 @@ def run_pyqtgraph(args) -> None:
runtime.current_peak_width = None
runtime.current_peak_amplitude = None
else:
fft_bg_line.setVisible(False)
fft_left_line.setVisible(False)
fft_right_line.setVisible(False)
spec_left_line.setVisible(False)
spec_right_line.setVisible(False)
runtime.current_peak_width = None
runtime.current_peak_amplitude = None
curve_fft_real.setData([], [])
curve_fft_imag.setData([], [])
if fft_abs_enabled:
curve_fft.setData(xs_fft, fft_vals_db)
else:
curve_fft.setData([], [])
finite_fft = np.isfinite(xs_fft) & np.isfinite(fft_vals_db)
y_for_range = fft_vals_db[finite_fft] if fft_abs_enabled else np.zeros((0,), dtype=np.float32)
if peak_search_enabled and fft_abs_enabled:
fft_ref = rolling_median_ref(xs_fft, fft_vals_db, peak_ref_window)
finite_ref = np.isfinite(xs_fft) & np.isfinite(fft_ref)
if np.any(finite_ref):
curve_fft_ref.setData(xs_fft[finite_ref], fft_ref[finite_ref])
curve_fft_ref.setVisible(True)
y_for_range = np.concatenate((y_for_range, fft_ref[finite_ref]))
else:
curve_fft_ref.setVisible(False)
runtime.peak_candidates = find_top_peaks_over_ref(xs_fft, fft_vals_db, fft_ref, top_n=3)
refresh_peak_params_label(runtime.peak_candidates)
for idx, box in enumerate(fft_peak_boxes):
if idx < len(runtime.peak_candidates):
peak = runtime.peak_candidates[idx]
box.setData(
[peak["left"], peak["left"], peak["right"], peak["right"], peak["left"]],
[peak["ref"], peak["peak_y"], peak["peak_y"], peak["ref"], peak["ref"]],
)
box.setVisible(True)
else:
box.setVisible(False)
else:
runtime.peak_candidates = []
refresh_peak_params_label([])
curve_fft_ref.setVisible(False)
for box in fft_peak_boxes:
box.setVisible(False)
if active_background is not None and fft_abs_enabled:
p_fft.setYRange(-10.0, 30.0, padding=0)
else:
finite_y = y_for_range[np.isfinite(y_for_range)]
if finite_y.size > 0:
y0 = float(np.min(finite_y))
y1 = float(np.max(finite_y))
if y1 <= y0:
y1 = y0 + 1e-3
p_fft.setYRange(y0, y1, padding=0)
if peak_calibrate_mode and fft_abs_enabled:
markers = find_peak_width_markers(xs_fft, fft_vals_db)
if markers is not None:
fft_bg_line.setValue(markers["background"])
fft_left_line.setValue(markers["left"])
fft_right_line.setValue(markers["right"])
spec_left_line.setValue(markers["left"])
spec_right_line.setValue(markers["right"])
fft_bg_line.setVisible(True)
fft_left_line.setVisible(True)
fft_right_line.setVisible(True)
spec_left_line.setVisible(True)
spec_right_line.setVisible(True)
runtime.current_peak_width = markers["width"]
runtime.current_peak_amplitude = markers["amplitude"]
else:
fft_bg_line.setVisible(False)
fft_left_line.setVisible(False)
fft_right_line.setVisible(False)
spec_left_line.setVisible(False)
spec_right_line.setVisible(False)
runtime.current_peak_width = None
runtime.current_peak_amplitude = None
else:
fft_bg_line.setVisible(False)
fft_left_line.setVisible(False)
fft_right_line.setVisible(False)
spec_left_line.setVisible(False)
spec_right_line.setVisible(False)
runtime.current_peak_width = None
runtime.current_peak_amplitude = None
else:
curve_fft_ref.setVisible(False)
curve_fft.setData([], [])
curve_fft_real.setData([], [])
curve_fft_imag.setData([], [])
for box in fft_peak_boxes:
box.setVisible(False)
fft_bg_line.setVisible(False)