diff --git a/rfg_adc_plotter/gui/pyqtgraph_backend.py b/rfg_adc_plotter/gui/pyqtgraph_backend.py index d2570b5..1ada5f9 100644 --- a/rfg_adc_plotter/gui/pyqtgraph_backend.py +++ b/rfg_adc_plotter/gui/pyqtgraph_backend.py @@ -46,9 +46,9 @@ from rfg_adc_plotter.processing.peaks import ( from rfg_adc_plotter.state import RingBuffer, RuntimeState from rfg_adc_plotter.types import SweepAuxCurves, SweepInfo, SweepPacket -RAW_PLOT_MAX_POINTS = 4096 +RAW_PLOT_MAX_POINTS = 2048 RAW_WATERFALL_MAX_POINTS = 2048 -BSCAN_MAX_POINTS = 512 +BSCAN_MAX_POINTS = 256 UI_QUEUE_MAXSIZE = 128 UI_MAX_PACKETS_PER_TICK = 8 DEBUG_FRAME_LOG_EVERY = 10 @@ -57,6 +57,10 @@ UI_BACKLOG_LATEST_ONLY_THRESHOLD_MULTIPLIER = 2 UI_HEAVY_REFRESH_BACKLOG_MULTIPLIER = 1 UI_HEAVY_REFRESH_MAX_STRIDE = 4 UI_AXIS_REFRESH_INTERVAL_S = 0.35 +UI_LINE_REFRESH_INTERVAL_S = 1.0 / 15.0 +UI_FFT_REFRESH_INTERVAL_S = 0.15 +UI_IMAGE_REFRESH_INTERVAL_S = 0.25 +UI_BSCAN_REFRESH_INTERVAL_S = 0.35 UI_DATA_WAIT_NOTE_AFTER_S = 3.0 FFT_LOW_CUT_SLIDER_SCALE = 10 FFT_LOW_CUT_MAX_PERCENT = 99.0 @@ -955,6 +959,7 @@ def run_pyqtgraph(args) -> None: p_line_phase.setLabel("left", "рад") try: p_line_phase.setXLink(p_line) + p_line_phase.setVisible(False) except Exception: pass @@ -1030,7 +1035,7 @@ def run_pyqtgraph(args) -> None: p_complex_calib.setLabel("left", "В" if bin_iq_power_mode else "Амплитуда") try: p_complex_calib.setXLink(p_line) - p_complex_calib.setVisible(bool(complex_sweep_mode)) + p_complex_calib.setVisible(False) except Exception: pass @@ -1174,7 +1179,7 @@ def run_pyqtgraph(args) -> None: if complex_sweep_mode: try: parsed_data_cb.setText("Сырые CH1/CH2 (В)" if bin_iq_power_mode else "Сырые Re/Im") - parsed_data_cb.setChecked(True) + parsed_data_cb.setChecked(False) except Exception: pass fft_curve_group = QtWidgets.QGroupBox("FFT кривые") @@ -1184,11 +1189,13 @@ def run_pyqtgraph(args) -> None: fft_abs_cb = QtWidgets.QCheckBox("Abs после FFT") fft_real_cb = QtWidgets.QCheckBox("Re после FFT") fft_imag_cb = QtWidgets.QCheckBox("Im после FFT") + try: + fft_abs_cb.setChecked(True) + fft_real_cb.setChecked(False) + fft_imag_cb.setChecked(False) + except Exception: + pass for checkbox in (fft_abs_cb, fft_real_cb, fft_imag_cb): - try: - checkbox.setChecked(True) - except Exception: - pass fft_curve_layout.addWidget(checkbox) if not complex_sweep_mode: try: @@ -1221,8 +1228,8 @@ def run_pyqtgraph(args) -> None: parsed_data_enabled = False background_enabled = False fft_abs_enabled = True - fft_real_enabled = True - fft_imag_enabled = True + fft_real_enabled = False + fft_imag_enabled = False fft_mode = "symmetric" fft_low_cut_percent = 0.0 status_note = "" @@ -1242,7 +1249,10 @@ def run_pyqtgraph(args) -> None: last_packet_processed_at: Optional[float] = None axis_range_cache: Dict[str, Tuple[float, ...]] = {} image_rect_cache: Dict[str, Tuple[float, ...]] = {} - last_signal_mode_signature: Optional[Tuple[Optional[str], bool, bool]] = None + curve_has_data_cache: Dict[str, bool] = {} + item_visibility_cache: Dict[str, bool] = {} + text_value_cache: Dict[str, str] = {} + last_signal_mode_signature: Optional[Tuple[Optional[str], bool, bool, bool, bool]] = None last_fft_low_cut_label_text: Optional[str] = None def finite_range_pair(lower: float, upper: float) -> Optional[Tuple[float, float]]: @@ -1334,6 +1344,70 @@ def run_pyqtgraph(args) -> None: image_rect_cache[key] = values return True + def set_item_visible_if_changed(key: str, item, visible: bool) -> bool: + visible_value = bool(visible) + if item_visibility_cache.get(key) == visible_value: + return False + try: + item.setVisible(visible_value) + except Exception: + return False + item_visibility_cache[key] = visible_value + return True + + def show_line_aux_axis_if_changed(visible: bool) -> None: + if p_line_aux_vb is None: + return + visible_value = bool(visible) + set_item_visible_if_changed("line_aux_view", p_line_aux_vb, visible_value) + if item_visibility_cache.get("line_aux_axis") == visible_value: + return + try: + p_line.showAxis("right", show=visible_value) + except Exception: + return + item_visibility_cache["line_aux_axis"] = visible_value + + def _payload_size(values: Any) -> int: + try: + return int(np.asarray(values).size) + except Exception: + try: + return int(len(values)) + except Exception: + return 0 + + def clear_curve_if_needed(key: str, curve_item) -> bool: + if curve_has_data_cache.get(key) is False: + return False + try: + curve_item.setData([], []) + except Exception: + return False + curve_has_data_cache[key] = False + return True + + def set_curve_data(key: str, curve_item, xs: Any, ys: Any, **kwargs: Any) -> bool: + if _payload_size(xs) <= 0 or _payload_size(ys) <= 0: + return clear_curve_if_needed(key, curve_item) + try: + curve_item.setData(xs, ys, **kwargs) + except Exception: + return False + curve_has_data_cache[key] = True + return True + + def set_text_if_changed(key: str, item, text: str) -> bool: + text_value = str(text) + if text_value_cache.get(key) == text_value: + return False + try: + item.setText(text_value) + except Exception: + return False + text_value_cache[key] = text_value + return True + fixed_ylim: Optional[Tuple[float, float]] = None if args.ylim: try: @@ -1539,7 +1613,13 @@ def run_pyqtgraph(args) -> None: is_logdet = signal_kind == "bin_logdet" is_bin_iq = signal_kind == "bin_iq" is_do1_tagged = signal_kind == "bin_iq_do1_tagged" - signature = (signal_kind, bool(active_complex), bool(is_do1_tagged)) + signature = ( + signal_kind, + bool(active_complex), + bool(is_do1_tagged), + bool(parsed_data_enabled), + bool(complex_calib_enabled), + ) if (not force) and signature == last_signal_mode_signature: return @@ -1570,10 +1650,15 @@ def run_pyqtgraph(args) -> None: p_fft.setTitle("FFT") parsed_data_cb.setText("данные после парсинга") p_fft.setLabel("left", "Амплитуда" if active_complex else "дБ") - p_img.setVisible(not is_do1_tagged) - p_fft.setVisible(not is_do1_tagged) - p_spec.setVisible(not is_do1_tagged) - p_complex_calib.setVisible((not is_do1_tagged) and bool(active_complex)) + set_item_visible_if_changed("raw_waterfall_plot", p_img, not is_do1_tagged) + set_item_visible_if_changed("fft_plot", p_fft, not is_do1_tagged) + set_item_visible_if_changed("fft_waterfall_plot", p_spec, not is_do1_tagged) + set_item_visible_if_changed( + "complex_calib_plot", + p_complex_calib, + (not is_do1_tagged) and bool(active_complex) and bool(complex_calib_enabled), + ) + show_line_aux_axis_if_changed(bool(bin_iq_power_mode and parsed_data_enabled and (not is_logdet))) last_signal_mode_signature = signature except Exception: pass @@ -2476,6 +2561,10 @@ def run_pyqtgraph(args) -> None: ui_started_at = time.perf_counter() update_ticks = 0 last_axis_range_refresh_at = 0.0 + last_line_refresh_at = 0.0 + last_fft_refresh_at = 0.0 + last_image_refresh_at = 0.0 + last_bscan_refresh_at = 0.0 def refresh_current_fft_cache(sweep_for_fft: np.ndarray, bins: int) -> None: fft_complex = compute_fft_complex_row( @@ -2710,6 +2799,7 @@ def run_pyqtgraph(args) -> None: def update() -> None: nonlocal peak_ref_window, status_dirty, update_ticks, last_axis_range_refresh_at + nonlocal last_line_refresh_at, last_fft_refresh_at, last_image_refresh_at, last_bscan_refresh_at norm_display_scale = 500.0 if peak_calibrate_mode and any(edit.hasFocus() for edit in c_edits): return @@ -2721,22 +2811,48 @@ def run_pyqtgraph(args) -> None: now_perf = time.perf_counter() changed = drain_queue() > 0 - redraw_needed = changed or runtime.plot_dirty - refresh_auto_ranges = bool(runtime.plot_dirty) or ( - redraw_needed and (now_perf - last_axis_range_refresh_at) >= UI_AXIS_REFRESH_INTERVAL_S + force_redraw = bool(runtime.plot_dirty) + redraw_needed = changed or force_redraw + refresh_line_views = force_redraw or ( + changed and (now_perf - last_line_refresh_at) >= UI_LINE_REFRESH_INTERVAL_S ) - if refresh_auto_ranges: - last_axis_range_refresh_at = now_perf - refresh_heavy_views = ( - runtime.plot_dirty + refresh_fft_views = force_redraw or ( + redraw_needed and (now_perf - last_fft_refresh_at) >= UI_FFT_REFRESH_INTERVAL_S + ) + heavy_stride_due = ( + force_redraw or last_heavy_refresh_stride <= 1 or (update_ticks % last_heavy_refresh_stride) == 0 ) - refresh_bscan_views = ( - runtime.plot_dirty + bscan_stride_due = ( + force_redraw or last_bscan_refresh_stride <= 1 or (update_ticks % last_bscan_refresh_stride) == 0 ) + refresh_heavy_views = force_redraw or ( + changed + and heavy_stride_due + and (now_perf - last_image_refresh_at) >= UI_IMAGE_REFRESH_INTERVAL_S + ) + refresh_bscan_views = force_redraw or ( + redraw_needed + and bscan_stride_due + and (now_perf - last_bscan_refresh_at) >= UI_BSCAN_REFRESH_INTERVAL_S + ) + refresh_auto_ranges = force_redraw or ( + (refresh_line_views or refresh_fft_views or refresh_bscan_views) + and (now_perf - last_axis_range_refresh_at) >= UI_AXIS_REFRESH_INTERVAL_S + ) + if refresh_auto_ranges: + last_axis_range_refresh_at = now_perf + if refresh_line_views: + last_line_refresh_at = now_perf + if refresh_fft_views: + last_fft_refresh_at = now_perf + if refresh_heavy_views: + last_image_refresh_at = now_perf + if refresh_bscan_views: + last_bscan_refresh_at = now_perf do1_tagged_now = current_packet_is_do1_tagged() if redraw_needed: @@ -2744,224 +2860,239 @@ def run_pyqtgraph(args) -> None: active_signal_kind = get_signal_kind() active_complex_mode = current_packet_is_complex() active_do1_tagged = active_signal_kind == "bin_iq_do1_tagged" - xs = resolve_curve_xs( - runtime.current_sweep_raw.size - if runtime.current_sweep_raw is not None - else (runtime.calib_envelope.size if runtime.calib_envelope is not None else 0) - ) - displayed_calib = None - displayed_aux = resolve_visible_aux_curves(runtime.current_aux_curves, parsed_data_enabled) - displayed_tagged_aux_low, displayed_tagged_aux_high = resolve_visible_do1_tagged_aux_curves( - runtime.current_do1_tagged_aux_low, - runtime.current_do1_tagged_aux_high, - parsed_data_enabled, - ) - if active_do1_tagged: - displayed_phase, displayed_phase_high = compute_do1_tagged_phase_curves( + if refresh_line_views: + xs = resolve_curve_xs( + runtime.current_sweep_raw.size + if runtime.current_sweep_raw is not None + else (runtime.calib_envelope.size if runtime.calib_envelope is not None else 0) + ) + displayed_calib = None + displayed_aux = resolve_visible_aux_curves(runtime.current_aux_curves, parsed_data_enabled) + displayed_tagged_aux_low, displayed_tagged_aux_high = resolve_visible_do1_tagged_aux_curves( runtime.current_do1_tagged_aux_low, runtime.current_do1_tagged_aux_high, + parsed_data_enabled, ) - else: - displayed_phase = compute_aux_phase_curve(runtime.current_aux_curves) - displayed_phase_high = None - try: - p_line_phase.setVisible( + if active_do1_tagged and parsed_data_enabled: + displayed_phase, displayed_phase_high = compute_do1_tagged_phase_curves( + runtime.current_do1_tagged_aux_low, + runtime.current_do1_tagged_aux_high, + ) + elif parsed_data_enabled: + displayed_phase = compute_aux_phase_curve(runtime.current_aux_curves) + displayed_phase_high = None + else: + displayed_phase = None + displayed_phase_high = None + set_item_visible_if_changed( + "line_phase_plot", + p_line_phase, (displayed_phase is not None and displayed_phase.size > 0) - or (displayed_phase_high is not None and displayed_phase_high.size > 0) + or (displayed_phase_high is not None and displayed_phase_high.size > 0), ) - except Exception: - pass - if active_do1_tagged: - curve.setData([], []) - if runtime.current_do1_tagged_raw_low is not None: - raw_low_x, raw_low_y = decimate_curve_for_display(xs, runtime.current_do1_tagged_raw_low) - raw_low_x, raw_low_y = sanitize_curve_data_for_display(raw_low_x, raw_low_y) - curve_raw_low.setData(raw_low_x, raw_low_y, autoDownsample=False) - else: - curve_raw_low.setData([], []) - if runtime.current_do1_tagged_raw_high is not None: - raw_high_x, raw_high_y = decimate_curve_for_display(xs, runtime.current_do1_tagged_raw_high) - raw_high_x, raw_high_y = sanitize_curve_data_for_display(raw_high_x, raw_high_y) - curve_raw_high.setData(raw_high_x, raw_high_y, autoDownsample=False) - else: - curve_raw_high.setData([], []) - elif runtime.current_sweep_raw is not None: - raw_x, raw_y = decimate_curve_for_display(xs, runtime.current_sweep_raw) - raw_x, raw_y = sanitize_curve_data_for_display(raw_x, raw_y) - curve.setData(raw_x, raw_y, autoDownsample=False) - curve_raw_low.setData([], []) - curve_raw_high.setData([], []) - else: - curve.setData([], []) - curve_raw_low.setData([], []) - curve_raw_high.setData([], []) - - if active_do1_tagged: - if displayed_tagged_aux_low is not None: - aux_low_1, aux_low_2 = displayed_tagged_aux_low - aux_low_width = min(xs.size, aux_low_1.size, aux_low_2.size) - low_x_1, low_y_1 = decimate_curve_for_display(xs[:aux_low_width], aux_low_1[:aux_low_width]) - low_x_2, low_y_2 = decimate_curve_for_display(xs[:aux_low_width], aux_low_2[:aux_low_width]) - low_x_1, low_y_1 = sanitize_curve_data_for_display(low_x_1, low_y_1) - low_x_2, low_y_2 = sanitize_curve_data_for_display(low_x_2, low_y_2) - curve_aux_1.setData(low_x_1, low_y_1, autoDownsample=False) - curve_aux_2.setData(low_x_2, low_y_2, autoDownsample=False) - else: - curve_aux_1.setData([], []) - curve_aux_2.setData([], []) - if displayed_tagged_aux_high is not None: - aux_high_1, aux_high_2 = displayed_tagged_aux_high - aux_high_width = min(xs.size, aux_high_1.size, aux_high_2.size) - high_x_1, high_y_1 = decimate_curve_for_display(xs[:aux_high_width], aux_high_1[:aux_high_width]) - high_x_2, high_y_2 = decimate_curve_for_display(xs[:aux_high_width], aux_high_2[:aux_high_width]) - high_x_1, high_y_1 = sanitize_curve_data_for_display(high_x_1, high_y_1) - high_x_2, high_y_2 = sanitize_curve_data_for_display(high_x_2, high_y_2) - curve_aux_3.setData(high_x_1, high_y_1, autoDownsample=False) - curve_aux_4.setData(high_x_2, high_y_2, autoDownsample=False) - else: - curve_aux_3.setData([], []) - curve_aux_4.setData([], []) - elif displayed_aux is not None: - aux_1, aux_2 = displayed_aux - aux_width = min(xs.size, aux_1.size, aux_2.size) - aux_x_1, aux_y_1 = decimate_curve_for_display(xs[:aux_width], aux_1[:aux_width]) - aux_x_2, aux_y_2 = decimate_curve_for_display(xs[:aux_width], aux_2[:aux_width]) - aux_x_1, aux_y_1 = sanitize_curve_data_for_display(aux_x_1, aux_y_1) - aux_x_2, aux_y_2 = sanitize_curve_data_for_display(aux_x_2, aux_y_2) - curve_aux_1.setData(aux_x_1, aux_y_1, autoDownsample=False) - curve_aux_2.setData(aux_x_2, aux_y_2, autoDownsample=False) - curve_aux_3.setData([], []) - curve_aux_4.setData([], []) - else: - curve_aux_1.setData([], []) - curve_aux_2.setData([], []) - curve_aux_3.setData([], []) - curve_aux_4.setData([], []) - - if displayed_phase is not None: - phase_width = min(xs.size, displayed_phase.size) - phase_x, phase_y = decimate_curve_for_display(xs[:phase_width], displayed_phase[:phase_width]) - phase_x, phase_y = sanitize_curve_data_for_display(phase_x, phase_y) - curve_phase.setData(phase_x, phase_y, autoDownsample=False) - else: - curve_phase.setData([], []) - if displayed_phase_high is not None: - phase_high_width = min(xs.size, displayed_phase_high.size) - phase_high_x, phase_high_y = decimate_curve_for_display( - xs[:phase_high_width], - displayed_phase_high[:phase_high_width], - ) - phase_high_x, phase_high_y = sanitize_curve_data_for_display(phase_high_x, phase_high_y) - curve_phase_high.setData(phase_high_x, phase_high_y, autoDownsample=False) - else: - curve_phase_high.setData([], []) - - if (not active_do1_tagged) and runtime.calib_envelope is not None: - if runtime.current_sweep_raw is not None: - displayed_calib = resample_envelope(runtime.calib_envelope, runtime.current_sweep_raw.size) - xs_calib = xs[: displayed_calib.size] - else: - displayed_calib = runtime.calib_envelope - xs_calib = resolve_curve_xs(displayed_calib.size) - calib_x, calib_y = decimate_curve_for_display(xs_calib, displayed_calib) - calib_x, calib_y = sanitize_curve_data_for_display(calib_x, calib_y) - curve_calib.setData(calib_x, calib_y, autoDownsample=False) - else: - curve_calib.setData([], []) - - if (not active_do1_tagged) and runtime.current_sweep_norm is not None: - norm_display = runtime.current_sweep_norm * norm_display_scale - norm_x, norm_y = decimate_curve_for_display(xs[: norm_display.size], norm_display) - norm_x, norm_y = sanitize_curve_data_for_display(norm_x, norm_y) - curve_norm.setData(norm_x, norm_y, autoDownsample=False) - else: - curve_norm.setData([], []) - - if fixed_ylim is None: if active_do1_tagged: - y_series = [ - runtime.current_do1_tagged_raw_low, - runtime.current_do1_tagged_raw_high, - runtime.current_sweep_raw, - ] - elif active_signal_kind == "bin_iq": - y_series = [ - runtime.current_sweep_raw, - displayed_calib, - (runtime.current_sweep_norm * norm_display_scale) if runtime.current_sweep_norm is not None else None, - ] + clear_curve_if_needed("raw", curve) + if runtime.current_do1_tagged_raw_low is not None: + raw_low_x, raw_low_y = decimate_curve_for_display(xs, runtime.current_do1_tagged_raw_low) + raw_low_x, raw_low_y = sanitize_curve_data_for_display(raw_low_x, raw_low_y) + set_curve_data("raw_low", curve_raw_low, raw_low_x, raw_low_y, autoDownsample=False) + else: + clear_curve_if_needed("raw_low", curve_raw_low) + if runtime.current_do1_tagged_raw_high is not None: + raw_high_x, raw_high_y = decimate_curve_for_display(xs, runtime.current_do1_tagged_raw_high) + raw_high_x, raw_high_y = sanitize_curve_data_for_display(raw_high_x, raw_high_y) + set_curve_data("raw_high", curve_raw_high, raw_high_x, raw_high_y, autoDownsample=False) + else: + clear_curve_if_needed("raw_high", curve_raw_high) + elif runtime.current_sweep_raw is not None: + raw_x, raw_y = decimate_curve_for_display(xs, runtime.current_sweep_raw) + raw_x, raw_y = sanitize_curve_data_for_display(raw_x, raw_y) + set_curve_data("raw", curve, raw_x, raw_y, autoDownsample=False) + clear_curve_if_needed("raw_low", curve_raw_low) + clear_curve_if_needed("raw_high", curve_raw_high) else: - y_series = [ - runtime.current_sweep_raw, - displayed_aux[0] if displayed_aux is not None else None, - displayed_aux[1] if displayed_aux is not None else None, - displayed_calib, - (runtime.current_sweep_norm * norm_display_scale) if runtime.current_sweep_norm is not None else None, - ] - y_limits = compute_auto_ylim(*y_series) - if refresh_auto_ranges and y_limits is not None: - set_y_range_if_changed("line_y", p_line, y_limits[0], y_limits[1], padding=0) - if p_line_aux_vb is not None: - aux_limits = compute_auto_ylim( - displayed_aux[0] if displayed_aux is not None else None, - displayed_aux[1] if displayed_aux is not None else None, - displayed_tagged_aux_low[0] if displayed_tagged_aux_low is not None else None, - displayed_tagged_aux_low[1] if displayed_tagged_aux_low is not None else None, - displayed_tagged_aux_high[0] if displayed_tagged_aux_high is not None else None, - displayed_tagged_aux_high[1] if displayed_tagged_aux_high is not None else None, - ) - if refresh_auto_ranges and aux_limits is not None: - set_y_range_if_changed("line_aux_y", p_line_aux_vb, aux_limits[0], aux_limits[1], padding=0) - phase_limits = compute_auto_ylim(displayed_phase, displayed_phase_high) - if refresh_auto_ranges and phase_limits is not None: - set_y_range_if_changed("line_phase_y", p_line_phase, phase_limits[0], phase_limits[1], padding=0) + clear_curve_if_needed("raw", curve) + clear_curve_if_needed("raw_low", curve_raw_low) + clear_curve_if_needed("raw_high", curve_raw_high) - line_x_bounds = resolve_axis_bounds(xs) - if line_x_bounds is not None: - set_x_range_if_changed("line_x", p_line, line_x_bounds[0], line_x_bounds[1], padding=0) - set_x_range_if_changed("line_phase_x", p_line_phase, line_x_bounds[0], line_x_bounds[1], padding=0) + if active_do1_tagged: + if displayed_tagged_aux_low is not None: + aux_low_1, aux_low_2 = displayed_tagged_aux_low + aux_low_width = min(xs.size, aux_low_1.size, aux_low_2.size) + low_x_1, low_y_1 = decimate_curve_for_display(xs[:aux_low_width], aux_low_1[:aux_low_width]) + low_x_2, low_y_2 = decimate_curve_for_display(xs[:aux_low_width], aux_low_2[:aux_low_width]) + low_x_1, low_y_1 = sanitize_curve_data_for_display(low_x_1, low_y_1) + low_x_2, low_y_2 = sanitize_curve_data_for_display(low_x_2, low_y_2) + set_curve_data("aux_1", curve_aux_1, low_x_1, low_y_1, autoDownsample=False) + set_curve_data("aux_2", curve_aux_2, low_x_2, low_y_2, autoDownsample=False) + else: + clear_curve_if_needed("aux_1", curve_aux_1) + clear_curve_if_needed("aux_2", curve_aux_2) + if displayed_tagged_aux_high is not None: + aux_high_1, aux_high_2 = displayed_tagged_aux_high + aux_high_width = min(xs.size, aux_high_1.size, aux_high_2.size) + high_x_1, high_y_1 = decimate_curve_for_display(xs[:aux_high_width], aux_high_1[:aux_high_width]) + high_x_2, high_y_2 = decimate_curve_for_display(xs[:aux_high_width], aux_high_2[:aux_high_width]) + high_x_1, high_y_1 = sanitize_curve_data_for_display(high_x_1, high_y_1) + high_x_2, high_y_2 = sanitize_curve_data_for_display(high_x_2, high_y_2) + set_curve_data("aux_3", curve_aux_3, high_x_1, high_y_1, autoDownsample=False) + set_curve_data("aux_4", curve_aux_4, high_x_2, high_y_2, autoDownsample=False) + else: + clear_curve_if_needed("aux_3", curve_aux_3) + clear_curve_if_needed("aux_4", curve_aux_4) + elif displayed_aux is not None: + aux_1, aux_2 = displayed_aux + aux_width = min(xs.size, aux_1.size, aux_2.size) + aux_x_1, aux_y_1 = decimate_curve_for_display(xs[:aux_width], aux_1[:aux_width]) + aux_x_2, aux_y_2 = decimate_curve_for_display(xs[:aux_width], aux_2[:aux_width]) + aux_x_1, aux_y_1 = sanitize_curve_data_for_display(aux_x_1, aux_y_1) + aux_x_2, aux_y_2 = sanitize_curve_data_for_display(aux_x_2, aux_y_2) + set_curve_data("aux_1", curve_aux_1, aux_x_1, aux_y_1, autoDownsample=False) + set_curve_data("aux_2", curve_aux_2, aux_x_2, aux_y_2, autoDownsample=False) + clear_curve_if_needed("aux_3", curve_aux_3) + clear_curve_if_needed("aux_4", curve_aux_4) + else: + clear_curve_if_needed("aux_1", curve_aux_1) + clear_curve_if_needed("aux_2", curve_aux_2) + clear_curve_if_needed("aux_3", curve_aux_3) + clear_curve_if_needed("aux_4", curve_aux_4) - complex_calib_plot_signal: Optional[np.ndarray] = None - if ( - active_complex_mode - and complex_calib_enabled - and runtime.current_fft_input is not None - and np.iscomplexobj(runtime.current_fft_input) - ): - complex_calib_plot_signal = np.asarray(runtime.current_fft_input, dtype=np.complex64).reshape(-1) + if displayed_phase is not None: + phase_width = min(xs.size, displayed_phase.size) + phase_x, phase_y = decimate_curve_for_display(xs[:phase_width], displayed_phase[:phase_width]) + phase_x, phase_y = sanitize_curve_data_for_display(phase_x, phase_y) + set_curve_data("phase", curve_phase, phase_x, phase_y, autoDownsample=False) + else: + clear_curve_if_needed("phase", curve_phase) + if displayed_phase_high is not None: + phase_high_width = min(xs.size, displayed_phase_high.size) + phase_high_x, phase_high_y = decimate_curve_for_display( + xs[:phase_high_width], + displayed_phase_high[:phase_high_width], + ) + phase_high_x, phase_high_y = sanitize_curve_data_for_display(phase_high_x, phase_high_y) + set_curve_data("phase_high", curve_phase_high, phase_high_x, phase_high_y, autoDownsample=False) + else: + clear_curve_if_needed("phase_high", curve_phase_high) - if complex_calib_plot_signal is not None and complex_calib_plot_signal.size > 0: - xs_complex = resolve_curve_xs(complex_calib_plot_signal.size) - real_after = complex_calib_plot_signal.real.astype(np.float32, copy=False) - imag_after = complex_calib_plot_signal.imag.astype(np.float32, copy=False) - real_x, real_y = decimate_curve_for_display(xs_complex[: real_after.size], real_after) - imag_x, imag_y = decimate_curve_for_display(xs_complex[: imag_after.size], imag_after) - real_x, real_y = sanitize_curve_data_for_display(real_x, real_y) - imag_x, imag_y = sanitize_curve_data_for_display(imag_x, imag_y) - curve_complex_calib_real.setData(real_x, real_y, autoDownsample=False) - curve_complex_calib_imag.setData(imag_x, imag_y, autoDownsample=False) - complex_ylim = compute_auto_ylim(real_after, imag_after) - if refresh_auto_ranges and complex_ylim is not None: - set_y_range_if_changed( - "complex_calib_y", - p_complex_calib, - complex_ylim[0], - complex_ylim[1], - padding=0, + if (not active_do1_tagged) and runtime.calib_envelope is not None: + if runtime.current_sweep_raw is not None: + displayed_calib = resample_envelope(runtime.calib_envelope, runtime.current_sweep_raw.size) + xs_calib = xs[: displayed_calib.size] + else: + displayed_calib = runtime.calib_envelope + xs_calib = resolve_curve_xs(displayed_calib.size) + calib_x, calib_y = decimate_curve_for_display(xs_calib, displayed_calib) + calib_x, calib_y = sanitize_curve_data_for_display(calib_x, calib_y) + set_curve_data("calib", curve_calib, calib_x, calib_y, autoDownsample=False) + else: + clear_curve_if_needed("calib", curve_calib) + + if (not active_do1_tagged) and runtime.current_sweep_norm is not None: + norm_display = runtime.current_sweep_norm * norm_display_scale + norm_x, norm_y = decimate_curve_for_display(xs[: norm_display.size], norm_display) + norm_x, norm_y = sanitize_curve_data_for_display(norm_x, norm_y) + set_curve_data("norm", curve_norm, norm_x, norm_y, autoDownsample=False) + else: + clear_curve_if_needed("norm", curve_norm) + + if fixed_ylim is None: + if active_do1_tagged: + y_series = [ + runtime.current_do1_tagged_raw_low, + runtime.current_do1_tagged_raw_high, + runtime.current_sweep_raw, + ] + elif active_signal_kind == "bin_iq": + y_series = [ + runtime.current_sweep_raw, + displayed_calib, + (runtime.current_sweep_norm * norm_display_scale) if runtime.current_sweep_norm is not None else None, + ] + else: + y_series = [ + runtime.current_sweep_raw, + displayed_aux[0] if displayed_aux is not None else None, + displayed_aux[1] if displayed_aux is not None else None, + displayed_calib, + (runtime.current_sweep_norm * norm_display_scale) if runtime.current_sweep_norm is not None else None, + ] + y_limits = compute_auto_ylim(*y_series) + if refresh_auto_ranges and y_limits is not None: + set_y_range_if_changed("line_y", p_line, y_limits[0], y_limits[1], padding=0) + if p_line_aux_vb is not None: + aux_limits = compute_auto_ylim( + displayed_aux[0] if displayed_aux is not None else None, + displayed_aux[1] if displayed_aux is not None else None, + displayed_tagged_aux_low[0] if displayed_tagged_aux_low is not None else None, + displayed_tagged_aux_low[1] if displayed_tagged_aux_low is not None else None, + displayed_tagged_aux_high[0] if displayed_tagged_aux_high is not None else None, + displayed_tagged_aux_high[1] if displayed_tagged_aux_high is not None else None, + ) + if refresh_auto_ranges and aux_limits is not None: + set_y_range_if_changed("line_aux_y", p_line_aux_vb, aux_limits[0], aux_limits[1], padding=0) + phase_limits = compute_auto_ylim(displayed_phase, displayed_phase_high) + if refresh_auto_ranges and phase_limits is not None: + set_y_range_if_changed("line_phase_y", p_line_phase, phase_limits[0], phase_limits[1], padding=0) + + line_x_bounds = resolve_axis_bounds(xs) + if line_x_bounds is not None: + set_x_range_if_changed("line_x", p_line, line_x_bounds[0], line_x_bounds[1], padding=0) + set_x_range_if_changed("line_phase_x", p_line_phase, line_x_bounds[0], line_x_bounds[1], padding=0) + + complex_calib_plot_signal: Optional[np.ndarray] = None + if ( + active_complex_mode + and complex_calib_enabled + and runtime.current_fft_input is not None + and np.iscomplexobj(runtime.current_fft_input) + ): + complex_calib_plot_signal = np.asarray(runtime.current_fft_input, dtype=np.complex64).reshape(-1) + + if complex_calib_plot_signal is not None and complex_calib_plot_signal.size > 0: + xs_complex = resolve_curve_xs(complex_calib_plot_signal.size) + real_after = complex_calib_plot_signal.real.astype(np.float32, copy=False) + imag_after = complex_calib_plot_signal.imag.astype(np.float32, copy=False) + real_x, real_y = decimate_curve_for_display(xs_complex[: real_after.size], real_after) + imag_x, imag_y = decimate_curve_for_display(xs_complex[: imag_after.size], imag_after) + real_x, real_y = sanitize_curve_data_for_display(real_x, real_y) + imag_x, imag_y = sanitize_curve_data_for_display(imag_x, imag_y) + set_curve_data( + "complex_calib_real", + curve_complex_calib_real, + real_x, + real_y, + autoDownsample=False, ) - complex_x_bounds = resolve_axis_bounds(xs_complex) - if complex_x_bounds is not None: - set_x_range_if_changed( - "complex_calib_x", - p_complex_calib, - complex_x_bounds[0], - complex_x_bounds[1], - padding=0, + set_curve_data( + "complex_calib_imag", + curve_complex_calib_imag, + imag_x, + imag_y, + autoDownsample=False, ) - else: - curve_complex_calib_real.setData([], []) - curve_complex_calib_imag.setData([], []) + complex_ylim = compute_auto_ylim(real_after, imag_after) + if refresh_auto_ranges and complex_ylim is not None: + set_y_range_if_changed( + "complex_calib_y", + p_complex_calib, + complex_ylim[0], + complex_ylim[1], + padding=0, + ) + complex_x_bounds = resolve_axis_bounds(xs_complex) + if complex_x_bounds is not None: + set_x_range_if_changed( + "complex_calib_x", + p_complex_calib, + complex_x_bounds[0], + complex_x_bounds[1], + padding=0, + ) + else: + clear_curve_if_needed("complex_calib_real", curve_complex_calib_real) + clear_curve_if_needed("complex_calib_imag", curve_complex_calib_imag) sweep_for_fft = None if active_do1_tagged else runtime.current_fft_input if (not active_do1_tagged) and sweep_for_fft is None: