This commit is contained in:
awe
2026-04-30 17:58:38 +03:00
parent f9bca446b2
commit d0809e6710

View File

@ -46,9 +46,9 @@ from rfg_adc_plotter.processing.peaks import (
from rfg_adc_plotter.state import RingBuffer, RuntimeState
from rfg_adc_plotter.types import SweepAuxCurves, SweepInfo, SweepPacket
RAW_PLOT_MAX_POINTS = 4096
RAW_PLOT_MAX_POINTS = 2048
RAW_WATERFALL_MAX_POINTS = 2048
BSCAN_MAX_POINTS = 512
BSCAN_MAX_POINTS = 256
UI_QUEUE_MAXSIZE = 128
UI_MAX_PACKETS_PER_TICK = 8
DEBUG_FRAME_LOG_EVERY = 10
@ -57,6 +57,10 @@ UI_BACKLOG_LATEST_ONLY_THRESHOLD_MULTIPLIER = 2
UI_HEAVY_REFRESH_BACKLOG_MULTIPLIER = 1
UI_HEAVY_REFRESH_MAX_STRIDE = 4
UI_AXIS_REFRESH_INTERVAL_S = 0.35
UI_LINE_REFRESH_INTERVAL_S = 1.0 / 15.0
UI_FFT_REFRESH_INTERVAL_S = 0.15
UI_IMAGE_REFRESH_INTERVAL_S = 0.25
UI_BSCAN_REFRESH_INTERVAL_S = 0.35
UI_DATA_WAIT_NOTE_AFTER_S = 3.0
FFT_LOW_CUT_SLIDER_SCALE = 10
FFT_LOW_CUT_MAX_PERCENT = 99.0
@ -955,6 +959,7 @@ def run_pyqtgraph(args) -> None:
p_line_phase.setLabel("left", "рад")
try:
p_line_phase.setXLink(p_line)
p_line_phase.setVisible(False)
except Exception:
pass
@ -1030,7 +1035,7 @@ def run_pyqtgraph(args) -> None:
p_complex_calib.setLabel("left", "В" if bin_iq_power_mode else "Амплитуда")
try:
p_complex_calib.setXLink(p_line)
p_complex_calib.setVisible(bool(complex_sweep_mode))
p_complex_calib.setVisible(False)
except Exception:
pass
@ -1174,7 +1179,7 @@ def run_pyqtgraph(args) -> None:
if complex_sweep_mode:
try:
parsed_data_cb.setText("Сырые CH1/CH2 (В)" if bin_iq_power_mode else "Сырые Re/Im")
parsed_data_cb.setChecked(True)
parsed_data_cb.setChecked(False)
except Exception:
pass
fft_curve_group = QtWidgets.QGroupBox("FFT кривые")
@ -1184,11 +1189,13 @@ def run_pyqtgraph(args) -> None:
fft_abs_cb = QtWidgets.QCheckBox("Abs после FFT")
fft_real_cb = QtWidgets.QCheckBox("Re после FFT")
fft_imag_cb = QtWidgets.QCheckBox("Im после FFT")
try:
fft_abs_cb.setChecked(True)
fft_real_cb.setChecked(False)
fft_imag_cb.setChecked(False)
except Exception:
pass
for checkbox in (fft_abs_cb, fft_real_cb, fft_imag_cb):
try:
checkbox.setChecked(True)
except Exception:
pass
fft_curve_layout.addWidget(checkbox)
if not complex_sweep_mode:
try:
@ -1221,8 +1228,8 @@ def run_pyqtgraph(args) -> None:
parsed_data_enabled = False
background_enabled = False
fft_abs_enabled = True
fft_real_enabled = True
fft_imag_enabled = True
fft_real_enabled = False
fft_imag_enabled = False
fft_mode = "symmetric"
fft_low_cut_percent = 0.0
status_note = ""
@ -1242,7 +1249,10 @@ def run_pyqtgraph(args) -> None:
last_packet_processed_at: Optional[float] = None
axis_range_cache: Dict[str, Tuple[float, ...]] = {}
image_rect_cache: Dict[str, Tuple[float, ...]] = {}
last_signal_mode_signature: Optional[Tuple[Optional[str], bool, bool]] = None
curve_has_data_cache: Dict[str, bool] = {}
item_visibility_cache: Dict[str, bool] = {}
text_value_cache: Dict[str, str] = {}
last_signal_mode_signature: Optional[Tuple[Optional[str], bool, bool, bool, bool]] = None
last_fft_low_cut_label_text: Optional[str] = None
def finite_range_pair(lower: float, upper: float) -> Optional[Tuple[float, float]]:
@ -1334,6 +1344,70 @@ def run_pyqtgraph(args) -> None:
image_rect_cache[key] = values
return True
def set_item_visible_if_changed(key: str, item, visible: bool) -> bool:
visible_value = bool(visible)
if item_visibility_cache.get(key) == visible_value:
return False
try:
item.setVisible(visible_value)
except Exception:
return False
item_visibility_cache[key] = visible_value
return True
def show_line_aux_axis_if_changed(visible: bool) -> None:
if p_line_aux_vb is None:
return
visible_value = bool(visible)
set_item_visible_if_changed("line_aux_view", p_line_aux_vb, visible_value)
if item_visibility_cache.get("line_aux_axis") == visible_value:
return
try:
p_line.showAxis("right", show=visible_value)
except Exception:
return
item_visibility_cache["line_aux_axis"] = visible_value
def _payload_size(values: Any) -> int:
try:
return int(np.asarray(values).size)
except Exception:
try:
return int(len(values))
except Exception:
return 0
def clear_curve_if_needed(key: str, curve_item) -> bool:
if curve_has_data_cache.get(key) is False:
return False
try:
curve_item.setData([], [])
except Exception:
return False
curve_has_data_cache[key] = False
return True
def set_curve_data(key: str, curve_item, xs: Any, ys: Any, **kwargs: Any) -> bool:
if _payload_size(xs) <= 0 or _payload_size(ys) <= 0:
return clear_curve_if_needed(key, curve_item)
try:
curve_item.setData(xs, ys, **kwargs)
except Exception:
return False
curve_has_data_cache[key] = True
return True
def set_text_if_changed(key: str, item, text: str) -> bool:
text_value = str(text)
if text_value_cache.get(key) == text_value:
return False
try:
item.setText(text_value)
except Exception:
return False
text_value_cache[key] = text_value
return True
fixed_ylim: Optional[Tuple[float, float]] = None
if args.ylim:
try:
@ -1539,7 +1613,13 @@ def run_pyqtgraph(args) -> None:
is_logdet = signal_kind == "bin_logdet"
is_bin_iq = signal_kind == "bin_iq"
is_do1_tagged = signal_kind == "bin_iq_do1_tagged"
signature = (signal_kind, bool(active_complex), bool(is_do1_tagged))
signature = (
signal_kind,
bool(active_complex),
bool(is_do1_tagged),
bool(parsed_data_enabled),
bool(complex_calib_enabled),
)
if (not force) and signature == last_signal_mode_signature:
return
@ -1570,10 +1650,15 @@ def run_pyqtgraph(args) -> None:
p_fft.setTitle("FFT")
parsed_data_cb.setText("данные после парсинга")
p_fft.setLabel("left", "Амплитуда" if active_complex else "дБ")
p_img.setVisible(not is_do1_tagged)
p_fft.setVisible(not is_do1_tagged)
p_spec.setVisible(not is_do1_tagged)
p_complex_calib.setVisible((not is_do1_tagged) and bool(active_complex))
set_item_visible_if_changed("raw_waterfall_plot", p_img, not is_do1_tagged)
set_item_visible_if_changed("fft_plot", p_fft, not is_do1_tagged)
set_item_visible_if_changed("fft_waterfall_plot", p_spec, not is_do1_tagged)
set_item_visible_if_changed(
"complex_calib_plot",
p_complex_calib,
(not is_do1_tagged) and bool(active_complex) and bool(complex_calib_enabled),
)
show_line_aux_axis_if_changed(bool(bin_iq_power_mode and parsed_data_enabled and (not is_logdet)))
last_signal_mode_signature = signature
except Exception:
pass
@ -2476,6 +2561,10 @@ def run_pyqtgraph(args) -> None:
ui_started_at = time.perf_counter()
update_ticks = 0
last_axis_range_refresh_at = 0.0
last_line_refresh_at = 0.0
last_fft_refresh_at = 0.0
last_image_refresh_at = 0.0
last_bscan_refresh_at = 0.0
def refresh_current_fft_cache(sweep_for_fft: np.ndarray, bins: int) -> None:
fft_complex = compute_fft_complex_row(
@ -2710,6 +2799,7 @@ def run_pyqtgraph(args) -> None:
def update() -> None:
nonlocal peak_ref_window, status_dirty, update_ticks, last_axis_range_refresh_at
nonlocal last_line_refresh_at, last_fft_refresh_at, last_image_refresh_at, last_bscan_refresh_at
norm_display_scale = 500.0
if peak_calibrate_mode and any(edit.hasFocus() for edit in c_edits):
return
@ -2721,22 +2811,48 @@ def run_pyqtgraph(args) -> None:
now_perf = time.perf_counter()
changed = drain_queue() > 0
redraw_needed = changed or runtime.plot_dirty
refresh_auto_ranges = bool(runtime.plot_dirty) or (
redraw_needed and (now_perf - last_axis_range_refresh_at) >= UI_AXIS_REFRESH_INTERVAL_S
force_redraw = bool(runtime.plot_dirty)
redraw_needed = changed or force_redraw
refresh_line_views = force_redraw or (
changed and (now_perf - last_line_refresh_at) >= UI_LINE_REFRESH_INTERVAL_S
)
if refresh_auto_ranges:
last_axis_range_refresh_at = now_perf
refresh_heavy_views = (
runtime.plot_dirty
refresh_fft_views = force_redraw or (
redraw_needed and (now_perf - last_fft_refresh_at) >= UI_FFT_REFRESH_INTERVAL_S
)
heavy_stride_due = (
force_redraw
or last_heavy_refresh_stride <= 1
or (update_ticks % last_heavy_refresh_stride) == 0
)
refresh_bscan_views = (
runtime.plot_dirty
bscan_stride_due = (
force_redraw
or last_bscan_refresh_stride <= 1
or (update_ticks % last_bscan_refresh_stride) == 0
)
refresh_heavy_views = force_redraw or (
changed
and heavy_stride_due
and (now_perf - last_image_refresh_at) >= UI_IMAGE_REFRESH_INTERVAL_S
)
refresh_bscan_views = force_redraw or (
redraw_needed
and bscan_stride_due
and (now_perf - last_bscan_refresh_at) >= UI_BSCAN_REFRESH_INTERVAL_S
)
refresh_auto_ranges = force_redraw or (
(refresh_line_views or refresh_fft_views or refresh_bscan_views)
and (now_perf - last_axis_range_refresh_at) >= UI_AXIS_REFRESH_INTERVAL_S
)
if refresh_auto_ranges:
last_axis_range_refresh_at = now_perf
if refresh_line_views:
last_line_refresh_at = now_perf
if refresh_fft_views:
last_fft_refresh_at = now_perf
if refresh_heavy_views:
last_image_refresh_at = now_perf
if refresh_bscan_views:
last_bscan_refresh_at = now_perf
do1_tagged_now = current_packet_is_do1_tagged()
if redraw_needed:
@ -2744,224 +2860,239 @@ def run_pyqtgraph(args) -> None:
active_signal_kind = get_signal_kind()
active_complex_mode = current_packet_is_complex()
active_do1_tagged = active_signal_kind == "bin_iq_do1_tagged"
xs = resolve_curve_xs(
runtime.current_sweep_raw.size
if runtime.current_sweep_raw is not None
else (runtime.calib_envelope.size if runtime.calib_envelope is not None else 0)
)
displayed_calib = None
displayed_aux = resolve_visible_aux_curves(runtime.current_aux_curves, parsed_data_enabled)
displayed_tagged_aux_low, displayed_tagged_aux_high = resolve_visible_do1_tagged_aux_curves(
runtime.current_do1_tagged_aux_low,
runtime.current_do1_tagged_aux_high,
parsed_data_enabled,
)
if active_do1_tagged:
displayed_phase, displayed_phase_high = compute_do1_tagged_phase_curves(
if refresh_line_views:
xs = resolve_curve_xs(
runtime.current_sweep_raw.size
if runtime.current_sweep_raw is not None
else (runtime.calib_envelope.size if runtime.calib_envelope is not None else 0)
)
displayed_calib = None
displayed_aux = resolve_visible_aux_curves(runtime.current_aux_curves, parsed_data_enabled)
displayed_tagged_aux_low, displayed_tagged_aux_high = resolve_visible_do1_tagged_aux_curves(
runtime.current_do1_tagged_aux_low,
runtime.current_do1_tagged_aux_high,
parsed_data_enabled,
)
else:
displayed_phase = compute_aux_phase_curve(runtime.current_aux_curves)
displayed_phase_high = None
try:
p_line_phase.setVisible(
if active_do1_tagged and parsed_data_enabled:
displayed_phase, displayed_phase_high = compute_do1_tagged_phase_curves(
runtime.current_do1_tagged_aux_low,
runtime.current_do1_tagged_aux_high,
)
elif parsed_data_enabled:
displayed_phase = compute_aux_phase_curve(runtime.current_aux_curves)
displayed_phase_high = None
else:
displayed_phase = None
displayed_phase_high = None
set_item_visible_if_changed(
"line_phase_plot",
p_line_phase,
(displayed_phase is not None and displayed_phase.size > 0)
or (displayed_phase_high is not None and displayed_phase_high.size > 0)
or (displayed_phase_high is not None and displayed_phase_high.size > 0),
)
except Exception:
pass
if active_do1_tagged:
curve.setData([], [])
if runtime.current_do1_tagged_raw_low is not None:
raw_low_x, raw_low_y = decimate_curve_for_display(xs, runtime.current_do1_tagged_raw_low)
raw_low_x, raw_low_y = sanitize_curve_data_for_display(raw_low_x, raw_low_y)
curve_raw_low.setData(raw_low_x, raw_low_y, autoDownsample=False)
else:
curve_raw_low.setData([], [])
if runtime.current_do1_tagged_raw_high is not None:
raw_high_x, raw_high_y = decimate_curve_for_display(xs, runtime.current_do1_tagged_raw_high)
raw_high_x, raw_high_y = sanitize_curve_data_for_display(raw_high_x, raw_high_y)
curve_raw_high.setData(raw_high_x, raw_high_y, autoDownsample=False)
else:
curve_raw_high.setData([], [])
elif runtime.current_sweep_raw is not None:
raw_x, raw_y = decimate_curve_for_display(xs, runtime.current_sweep_raw)
raw_x, raw_y = sanitize_curve_data_for_display(raw_x, raw_y)
curve.setData(raw_x, raw_y, autoDownsample=False)
curve_raw_low.setData([], [])
curve_raw_high.setData([], [])
else:
curve.setData([], [])
curve_raw_low.setData([], [])
curve_raw_high.setData([], [])
if active_do1_tagged:
if displayed_tagged_aux_low is not None:
aux_low_1, aux_low_2 = displayed_tagged_aux_low
aux_low_width = min(xs.size, aux_low_1.size, aux_low_2.size)
low_x_1, low_y_1 = decimate_curve_for_display(xs[:aux_low_width], aux_low_1[:aux_low_width])
low_x_2, low_y_2 = decimate_curve_for_display(xs[:aux_low_width], aux_low_2[:aux_low_width])
low_x_1, low_y_1 = sanitize_curve_data_for_display(low_x_1, low_y_1)
low_x_2, low_y_2 = sanitize_curve_data_for_display(low_x_2, low_y_2)
curve_aux_1.setData(low_x_1, low_y_1, autoDownsample=False)
curve_aux_2.setData(low_x_2, low_y_2, autoDownsample=False)
else:
curve_aux_1.setData([], [])
curve_aux_2.setData([], [])
if displayed_tagged_aux_high is not None:
aux_high_1, aux_high_2 = displayed_tagged_aux_high
aux_high_width = min(xs.size, aux_high_1.size, aux_high_2.size)
high_x_1, high_y_1 = decimate_curve_for_display(xs[:aux_high_width], aux_high_1[:aux_high_width])
high_x_2, high_y_2 = decimate_curve_for_display(xs[:aux_high_width], aux_high_2[:aux_high_width])
high_x_1, high_y_1 = sanitize_curve_data_for_display(high_x_1, high_y_1)
high_x_2, high_y_2 = sanitize_curve_data_for_display(high_x_2, high_y_2)
curve_aux_3.setData(high_x_1, high_y_1, autoDownsample=False)
curve_aux_4.setData(high_x_2, high_y_2, autoDownsample=False)
else:
curve_aux_3.setData([], [])
curve_aux_4.setData([], [])
elif displayed_aux is not None:
aux_1, aux_2 = displayed_aux
aux_width = min(xs.size, aux_1.size, aux_2.size)
aux_x_1, aux_y_1 = decimate_curve_for_display(xs[:aux_width], aux_1[:aux_width])
aux_x_2, aux_y_2 = decimate_curve_for_display(xs[:aux_width], aux_2[:aux_width])
aux_x_1, aux_y_1 = sanitize_curve_data_for_display(aux_x_1, aux_y_1)
aux_x_2, aux_y_2 = sanitize_curve_data_for_display(aux_x_2, aux_y_2)
curve_aux_1.setData(aux_x_1, aux_y_1, autoDownsample=False)
curve_aux_2.setData(aux_x_2, aux_y_2, autoDownsample=False)
curve_aux_3.setData([], [])
curve_aux_4.setData([], [])
else:
curve_aux_1.setData([], [])
curve_aux_2.setData([], [])
curve_aux_3.setData([], [])
curve_aux_4.setData([], [])
if displayed_phase is not None:
phase_width = min(xs.size, displayed_phase.size)
phase_x, phase_y = decimate_curve_for_display(xs[:phase_width], displayed_phase[:phase_width])
phase_x, phase_y = sanitize_curve_data_for_display(phase_x, phase_y)
curve_phase.setData(phase_x, phase_y, autoDownsample=False)
else:
curve_phase.setData([], [])
if displayed_phase_high is not None:
phase_high_width = min(xs.size, displayed_phase_high.size)
phase_high_x, phase_high_y = decimate_curve_for_display(
xs[:phase_high_width],
displayed_phase_high[:phase_high_width],
)
phase_high_x, phase_high_y = sanitize_curve_data_for_display(phase_high_x, phase_high_y)
curve_phase_high.setData(phase_high_x, phase_high_y, autoDownsample=False)
else:
curve_phase_high.setData([], [])
if (not active_do1_tagged) and runtime.calib_envelope is not None:
if runtime.current_sweep_raw is not None:
displayed_calib = resample_envelope(runtime.calib_envelope, runtime.current_sweep_raw.size)
xs_calib = xs[: displayed_calib.size]
else:
displayed_calib = runtime.calib_envelope
xs_calib = resolve_curve_xs(displayed_calib.size)
calib_x, calib_y = decimate_curve_for_display(xs_calib, displayed_calib)
calib_x, calib_y = sanitize_curve_data_for_display(calib_x, calib_y)
curve_calib.setData(calib_x, calib_y, autoDownsample=False)
else:
curve_calib.setData([], [])
if (not active_do1_tagged) and runtime.current_sweep_norm is not None:
norm_display = runtime.current_sweep_norm * norm_display_scale
norm_x, norm_y = decimate_curve_for_display(xs[: norm_display.size], norm_display)
norm_x, norm_y = sanitize_curve_data_for_display(norm_x, norm_y)
curve_norm.setData(norm_x, norm_y, autoDownsample=False)
else:
curve_norm.setData([], [])
if fixed_ylim is None:
if active_do1_tagged:
y_series = [
runtime.current_do1_tagged_raw_low,
runtime.current_do1_tagged_raw_high,
runtime.current_sweep_raw,
]
elif active_signal_kind == "bin_iq":
y_series = [
runtime.current_sweep_raw,
displayed_calib,
(runtime.current_sweep_norm * norm_display_scale) if runtime.current_sweep_norm is not None else None,
]
clear_curve_if_needed("raw", curve)
if runtime.current_do1_tagged_raw_low is not None:
raw_low_x, raw_low_y = decimate_curve_for_display(xs, runtime.current_do1_tagged_raw_low)
raw_low_x, raw_low_y = sanitize_curve_data_for_display(raw_low_x, raw_low_y)
set_curve_data("raw_low", curve_raw_low, raw_low_x, raw_low_y, autoDownsample=False)
else:
clear_curve_if_needed("raw_low", curve_raw_low)
if runtime.current_do1_tagged_raw_high is not None:
raw_high_x, raw_high_y = decimate_curve_for_display(xs, runtime.current_do1_tagged_raw_high)
raw_high_x, raw_high_y = sanitize_curve_data_for_display(raw_high_x, raw_high_y)
set_curve_data("raw_high", curve_raw_high, raw_high_x, raw_high_y, autoDownsample=False)
else:
clear_curve_if_needed("raw_high", curve_raw_high)
elif runtime.current_sweep_raw is not None:
raw_x, raw_y = decimate_curve_for_display(xs, runtime.current_sweep_raw)
raw_x, raw_y = sanitize_curve_data_for_display(raw_x, raw_y)
set_curve_data("raw", curve, raw_x, raw_y, autoDownsample=False)
clear_curve_if_needed("raw_low", curve_raw_low)
clear_curve_if_needed("raw_high", curve_raw_high)
else:
y_series = [
runtime.current_sweep_raw,
displayed_aux[0] if displayed_aux is not None else None,
displayed_aux[1] if displayed_aux is not None else None,
displayed_calib,
(runtime.current_sweep_norm * norm_display_scale) if runtime.current_sweep_norm is not None else None,
]
y_limits = compute_auto_ylim(*y_series)
if refresh_auto_ranges and y_limits is not None:
set_y_range_if_changed("line_y", p_line, y_limits[0], y_limits[1], padding=0)
if p_line_aux_vb is not None:
aux_limits = compute_auto_ylim(
displayed_aux[0] if displayed_aux is not None else None,
displayed_aux[1] if displayed_aux is not None else None,
displayed_tagged_aux_low[0] if displayed_tagged_aux_low is not None else None,
displayed_tagged_aux_low[1] if displayed_tagged_aux_low is not None else None,
displayed_tagged_aux_high[0] if displayed_tagged_aux_high is not None else None,
displayed_tagged_aux_high[1] if displayed_tagged_aux_high is not None else None,
)
if refresh_auto_ranges and aux_limits is not None:
set_y_range_if_changed("line_aux_y", p_line_aux_vb, aux_limits[0], aux_limits[1], padding=0)
phase_limits = compute_auto_ylim(displayed_phase, displayed_phase_high)
if refresh_auto_ranges and phase_limits is not None:
set_y_range_if_changed("line_phase_y", p_line_phase, phase_limits[0], phase_limits[1], padding=0)
clear_curve_if_needed("raw", curve)
clear_curve_if_needed("raw_low", curve_raw_low)
clear_curve_if_needed("raw_high", curve_raw_high)
line_x_bounds = resolve_axis_bounds(xs)
if line_x_bounds is not None:
set_x_range_if_changed("line_x", p_line, line_x_bounds[0], line_x_bounds[1], padding=0)
set_x_range_if_changed("line_phase_x", p_line_phase, line_x_bounds[0], line_x_bounds[1], padding=0)
if active_do1_tagged:
if displayed_tagged_aux_low is not None:
aux_low_1, aux_low_2 = displayed_tagged_aux_low
aux_low_width = min(xs.size, aux_low_1.size, aux_low_2.size)
low_x_1, low_y_1 = decimate_curve_for_display(xs[:aux_low_width], aux_low_1[:aux_low_width])
low_x_2, low_y_2 = decimate_curve_for_display(xs[:aux_low_width], aux_low_2[:aux_low_width])
low_x_1, low_y_1 = sanitize_curve_data_for_display(low_x_1, low_y_1)
low_x_2, low_y_2 = sanitize_curve_data_for_display(low_x_2, low_y_2)
set_curve_data("aux_1", curve_aux_1, low_x_1, low_y_1, autoDownsample=False)
set_curve_data("aux_2", curve_aux_2, low_x_2, low_y_2, autoDownsample=False)
else:
clear_curve_if_needed("aux_1", curve_aux_1)
clear_curve_if_needed("aux_2", curve_aux_2)
if displayed_tagged_aux_high is not None:
aux_high_1, aux_high_2 = displayed_tagged_aux_high
aux_high_width = min(xs.size, aux_high_1.size, aux_high_2.size)
high_x_1, high_y_1 = decimate_curve_for_display(xs[:aux_high_width], aux_high_1[:aux_high_width])
high_x_2, high_y_2 = decimate_curve_for_display(xs[:aux_high_width], aux_high_2[:aux_high_width])
high_x_1, high_y_1 = sanitize_curve_data_for_display(high_x_1, high_y_1)
high_x_2, high_y_2 = sanitize_curve_data_for_display(high_x_2, high_y_2)
set_curve_data("aux_3", curve_aux_3, high_x_1, high_y_1, autoDownsample=False)
set_curve_data("aux_4", curve_aux_4, high_x_2, high_y_2, autoDownsample=False)
else:
clear_curve_if_needed("aux_3", curve_aux_3)
clear_curve_if_needed("aux_4", curve_aux_4)
elif displayed_aux is not None:
aux_1, aux_2 = displayed_aux
aux_width = min(xs.size, aux_1.size, aux_2.size)
aux_x_1, aux_y_1 = decimate_curve_for_display(xs[:aux_width], aux_1[:aux_width])
aux_x_2, aux_y_2 = decimate_curve_for_display(xs[:aux_width], aux_2[:aux_width])
aux_x_1, aux_y_1 = sanitize_curve_data_for_display(aux_x_1, aux_y_1)
aux_x_2, aux_y_2 = sanitize_curve_data_for_display(aux_x_2, aux_y_2)
set_curve_data("aux_1", curve_aux_1, aux_x_1, aux_y_1, autoDownsample=False)
set_curve_data("aux_2", curve_aux_2, aux_x_2, aux_y_2, autoDownsample=False)
clear_curve_if_needed("aux_3", curve_aux_3)
clear_curve_if_needed("aux_4", curve_aux_4)
else:
clear_curve_if_needed("aux_1", curve_aux_1)
clear_curve_if_needed("aux_2", curve_aux_2)
clear_curve_if_needed("aux_3", curve_aux_3)
clear_curve_if_needed("aux_4", curve_aux_4)
complex_calib_plot_signal: Optional[np.ndarray] = None
if (
active_complex_mode
and complex_calib_enabled
and runtime.current_fft_input is not None
and np.iscomplexobj(runtime.current_fft_input)
):
complex_calib_plot_signal = np.asarray(runtime.current_fft_input, dtype=np.complex64).reshape(-1)
if displayed_phase is not None:
phase_width = min(xs.size, displayed_phase.size)
phase_x, phase_y = decimate_curve_for_display(xs[:phase_width], displayed_phase[:phase_width])
phase_x, phase_y = sanitize_curve_data_for_display(phase_x, phase_y)
set_curve_data("phase", curve_phase, phase_x, phase_y, autoDownsample=False)
else:
clear_curve_if_needed("phase", curve_phase)
if displayed_phase_high is not None:
phase_high_width = min(xs.size, displayed_phase_high.size)
phase_high_x, phase_high_y = decimate_curve_for_display(
xs[:phase_high_width],
displayed_phase_high[:phase_high_width],
)
phase_high_x, phase_high_y = sanitize_curve_data_for_display(phase_high_x, phase_high_y)
set_curve_data("phase_high", curve_phase_high, phase_high_x, phase_high_y, autoDownsample=False)
else:
clear_curve_if_needed("phase_high", curve_phase_high)
if complex_calib_plot_signal is not None and complex_calib_plot_signal.size > 0:
xs_complex = resolve_curve_xs(complex_calib_plot_signal.size)
real_after = complex_calib_plot_signal.real.astype(np.float32, copy=False)
imag_after = complex_calib_plot_signal.imag.astype(np.float32, copy=False)
real_x, real_y = decimate_curve_for_display(xs_complex[: real_after.size], real_after)
imag_x, imag_y = decimate_curve_for_display(xs_complex[: imag_after.size], imag_after)
real_x, real_y = sanitize_curve_data_for_display(real_x, real_y)
imag_x, imag_y = sanitize_curve_data_for_display(imag_x, imag_y)
curve_complex_calib_real.setData(real_x, real_y, autoDownsample=False)
curve_complex_calib_imag.setData(imag_x, imag_y, autoDownsample=False)
complex_ylim = compute_auto_ylim(real_after, imag_after)
if refresh_auto_ranges and complex_ylim is not None:
set_y_range_if_changed(
"complex_calib_y",
p_complex_calib,
complex_ylim[0],
complex_ylim[1],
padding=0,
if (not active_do1_tagged) and runtime.calib_envelope is not None:
if runtime.current_sweep_raw is not None:
displayed_calib = resample_envelope(runtime.calib_envelope, runtime.current_sweep_raw.size)
xs_calib = xs[: displayed_calib.size]
else:
displayed_calib = runtime.calib_envelope
xs_calib = resolve_curve_xs(displayed_calib.size)
calib_x, calib_y = decimate_curve_for_display(xs_calib, displayed_calib)
calib_x, calib_y = sanitize_curve_data_for_display(calib_x, calib_y)
set_curve_data("calib", curve_calib, calib_x, calib_y, autoDownsample=False)
else:
clear_curve_if_needed("calib", curve_calib)
if (not active_do1_tagged) and runtime.current_sweep_norm is not None:
norm_display = runtime.current_sweep_norm * norm_display_scale
norm_x, norm_y = decimate_curve_for_display(xs[: norm_display.size], norm_display)
norm_x, norm_y = sanitize_curve_data_for_display(norm_x, norm_y)
set_curve_data("norm", curve_norm, norm_x, norm_y, autoDownsample=False)
else:
clear_curve_if_needed("norm", curve_norm)
if fixed_ylim is None:
if active_do1_tagged:
y_series = [
runtime.current_do1_tagged_raw_low,
runtime.current_do1_tagged_raw_high,
runtime.current_sweep_raw,
]
elif active_signal_kind == "bin_iq":
y_series = [
runtime.current_sweep_raw,
displayed_calib,
(runtime.current_sweep_norm * norm_display_scale) if runtime.current_sweep_norm is not None else None,
]
else:
y_series = [
runtime.current_sweep_raw,
displayed_aux[0] if displayed_aux is not None else None,
displayed_aux[1] if displayed_aux is not None else None,
displayed_calib,
(runtime.current_sweep_norm * norm_display_scale) if runtime.current_sweep_norm is not None else None,
]
y_limits = compute_auto_ylim(*y_series)
if refresh_auto_ranges and y_limits is not None:
set_y_range_if_changed("line_y", p_line, y_limits[0], y_limits[1], padding=0)
if p_line_aux_vb is not None:
aux_limits = compute_auto_ylim(
displayed_aux[0] if displayed_aux is not None else None,
displayed_aux[1] if displayed_aux is not None else None,
displayed_tagged_aux_low[0] if displayed_tagged_aux_low is not None else None,
displayed_tagged_aux_low[1] if displayed_tagged_aux_low is not None else None,
displayed_tagged_aux_high[0] if displayed_tagged_aux_high is not None else None,
displayed_tagged_aux_high[1] if displayed_tagged_aux_high is not None else None,
)
if refresh_auto_ranges and aux_limits is not None:
set_y_range_if_changed("line_aux_y", p_line_aux_vb, aux_limits[0], aux_limits[1], padding=0)
phase_limits = compute_auto_ylim(displayed_phase, displayed_phase_high)
if refresh_auto_ranges and phase_limits is not None:
set_y_range_if_changed("line_phase_y", p_line_phase, phase_limits[0], phase_limits[1], padding=0)
line_x_bounds = resolve_axis_bounds(xs)
if line_x_bounds is not None:
set_x_range_if_changed("line_x", p_line, line_x_bounds[0], line_x_bounds[1], padding=0)
set_x_range_if_changed("line_phase_x", p_line_phase, line_x_bounds[0], line_x_bounds[1], padding=0)
complex_calib_plot_signal: Optional[np.ndarray] = None
if (
active_complex_mode
and complex_calib_enabled
and runtime.current_fft_input is not None
and np.iscomplexobj(runtime.current_fft_input)
):
complex_calib_plot_signal = np.asarray(runtime.current_fft_input, dtype=np.complex64).reshape(-1)
if complex_calib_plot_signal is not None and complex_calib_plot_signal.size > 0:
xs_complex = resolve_curve_xs(complex_calib_plot_signal.size)
real_after = complex_calib_plot_signal.real.astype(np.float32, copy=False)
imag_after = complex_calib_plot_signal.imag.astype(np.float32, copy=False)
real_x, real_y = decimate_curve_for_display(xs_complex[: real_after.size], real_after)
imag_x, imag_y = decimate_curve_for_display(xs_complex[: imag_after.size], imag_after)
real_x, real_y = sanitize_curve_data_for_display(real_x, real_y)
imag_x, imag_y = sanitize_curve_data_for_display(imag_x, imag_y)
set_curve_data(
"complex_calib_real",
curve_complex_calib_real,
real_x,
real_y,
autoDownsample=False,
)
complex_x_bounds = resolve_axis_bounds(xs_complex)
if complex_x_bounds is not None:
set_x_range_if_changed(
"complex_calib_x",
p_complex_calib,
complex_x_bounds[0],
complex_x_bounds[1],
padding=0,
set_curve_data(
"complex_calib_imag",
curve_complex_calib_imag,
imag_x,
imag_y,
autoDownsample=False,
)
else:
curve_complex_calib_real.setData([], [])
curve_complex_calib_imag.setData([], [])
complex_ylim = compute_auto_ylim(real_after, imag_after)
if refresh_auto_ranges and complex_ylim is not None:
set_y_range_if_changed(
"complex_calib_y",
p_complex_calib,
complex_ylim[0],
complex_ylim[1],
padding=0,
)
complex_x_bounds = resolve_axis_bounds(xs_complex)
if complex_x_bounds is not None:
set_x_range_if_changed(
"complex_calib_x",
p_complex_calib,
complex_x_bounds[0],
complex_x_bounds[1],
padding=0,
)
else:
clear_curve_if_needed("complex_calib_real", curve_complex_calib_real)
clear_curve_if_needed("complex_calib_imag", curve_complex_calib_imag)
sweep_for_fft = None if active_do1_tagged else runtime.current_fft_input
if (not active_do1_tagged) and sweep_for_fft is None: