From 1d807d0afc76ccfebc027b03e59d738bf112fc67 Mon Sep 17 00:00:00 2001 From: awe Date: Wed, 29 Apr 2026 19:08:17 +0300 Subject: [PATCH] fix update --- rfg_adc_plotter/gui/pyqtgraph_backend.py | 229 +++++++++++++++++++---- rfg_adc_plotter/io/sweep_parser_core.py | 145 +++++++++++++- rfg_adc_plotter/io/sweep_reader.py | 4 +- rfg_adc_plotter/types.py | 12 +- tests/test_sweep_parser_core.py | 51 +++++ 5 files changed, 401 insertions(+), 40 deletions(-) diff --git a/rfg_adc_plotter/gui/pyqtgraph_backend.py b/rfg_adc_plotter/gui/pyqtgraph_backend.py index 8c79789..24c6ce1 100644 --- a/rfg_adc_plotter/gui/pyqtgraph_backend.py +++ b/rfg_adc_plotter/gui/pyqtgraph_backend.py @@ -49,12 +49,14 @@ from rfg_adc_plotter.types import SweepAuxCurves, SweepInfo, SweepPacket RAW_PLOT_MAX_POINTS = 4096 RAW_WATERFALL_MAX_POINTS = 2048 BSCAN_MAX_POINTS = 512 +UI_QUEUE_MAXSIZE = 128 UI_MAX_PACKETS_PER_TICK = 8 DEBUG_FRAME_LOG_EVERY = 10 UI_BACKLOG_TAIL_THRESHOLD_MULTIPLIER = 1 UI_BACKLOG_LATEST_ONLY_THRESHOLD_MULTIPLIER = 2 UI_HEAVY_REFRESH_BACKLOG_MULTIPLIER = 1 UI_HEAVY_REFRESH_MAX_STRIDE = 4 +UI_AXIS_REFRESH_INTERVAL_S = 0.35 UI_DATA_WAIT_NOTE_AFTER_S = 3.0 FFT_LOW_CUT_SLIDER_SCALE = 10 FFT_LOW_CUT_MAX_PERCENT = 99.0 @@ -804,7 +806,7 @@ def run_pyqtgraph(args) -> None: "pyqtgraph и совместимый Qt backend не найдены. Установите: pip install pyqtgraph PyQt5" ) from exc - queue: Queue[SweepPacket] = Queue(maxsize=1000) + queue: Queue[SweepPacket] = Queue(maxsize=UI_QUEUE_MAXSIZE) stop_event = threading.Event() reader = SweepReader( args.port, @@ -1220,6 +1222,100 @@ def run_pyqtgraph(args) -> None: expected_sweep_width = 0 base_freqs_cache: Dict[int, np.ndarray] = {} last_packet_processed_at: Optional[float] = None + axis_range_cache: Dict[str, Tuple[float, ...]] = {} + image_rect_cache: Dict[str, Tuple[float, ...]] = {} + last_signal_mode_signature: Optional[Tuple[Optional[str], bool, bool]] = None + last_fft_low_cut_label_text: Optional[str] = None + + def finite_range_pair(lower: float, upper: float) -> Optional[Tuple[float, float]]: + try: + lower_val = float(lower) + upper_val = float(upper) + except Exception: + return None + if not (np.isfinite(lower_val) and np.isfinite(upper_val)): + return None + if upper_val <= lower_val: + upper_val = lower_val + 1e-9 + return (lower_val, upper_val) + + def cached_tuple_changed(cache: Dict[str, Tuple[float, ...]], key: str, values: Tuple[float, ...]) -> bool: + previous = cache.get(key) + if previous is None or len(previous) != len(values): + return True + return not bool(np.allclose(previous, values, rtol=1e-6, atol=1e-9)) + + def set_x_range_if_changed(key: str, plot, lower: float, upper: float, *, padding: float = 0.0) -> bool: + bounds = finite_range_pair(lower, upper) + if bounds is None: + return False + values = (bounds[0], bounds[1]) + if not cached_tuple_changed(axis_range_cache, key, values): + return False + try: + plot.setXRange(bounds[0], bounds[1], padding=padding) + except Exception: + return False + axis_range_cache[key] = values + return True + + def set_y_range_if_changed(key: str, plot, lower: float, upper: float, *, padding: float = 0.0) -> bool: + bounds = finite_range_pair(lower, upper) + if bounds is None: + return False + values = (bounds[0], bounds[1]) + if not cached_tuple_changed(axis_range_cache, key, values): + return False + try: + plot.setYRange(bounds[0], bounds[1], padding=padding) + except Exception: + return False + axis_range_cache[key] = values + return True + + def set_xy_range_if_changed( + key: str, + plot, + *, + x_bounds: Tuple[float, float], + y_bounds: Tuple[float, float], + padding: float = 0.0, + ) -> bool: + x_range = finite_range_pair(x_bounds[0], x_bounds[1]) + y_range = finite_range_pair(y_bounds[0], y_bounds[1]) + if x_range is None or y_range is None: + return False + values = (x_range[0], x_range[1], y_range[0], y_range[1]) + if not cached_tuple_changed(axis_range_cache, key, values): + return False + try: + plot.setRange(xRange=x_range, yRange=y_range, padding=padding) + except Exception: + return False + axis_range_cache[key] = values + return True + + def set_image_rect_if_changed(key: str, image_item, x: float, y: float, width: float, height: float) -> bool: + try: + values = (float(x), float(y), float(width), float(height)) + except Exception: + return False + if not ( + np.isfinite(values[0]) + and np.isfinite(values[1]) + and np.isfinite(values[2]) + and np.isfinite(values[3]) + and values[2] > 0.0 + and values[3] > 0.0 + ): + return False + if not cached_tuple_changed(image_rect_cache, key, values): + return False + if not set_image_rect_if_ready(image_item, values[0], values[1], values[2], values[3]): + return False + image_rect_cache[key] = values + return True + fixed_ylim: Optional[Tuple[float, float]] = None if args.ylim: try: @@ -1228,7 +1324,7 @@ def run_pyqtgraph(args) -> None: except Exception: fixed_ylim = None if fixed_ylim is not None: - p_line.setYRange(fixed_ylim[0], fixed_ylim[1], padding=0) + set_y_range_if_changed("line_y", p_line, fixed_ylim[0], fixed_ylim[1], padding=0) def ensure_buffer(sweep_width: int) -> None: changed = runtime.ring.ensure_init(sweep_width) @@ -1242,15 +1338,27 @@ def run_pyqtgraph(args) -> None: disp_raw = sanitize_image_for_display(runtime.ring.get_display_raw_decimated(RAW_WATERFALL_MAX_POINTS)) if disp_raw is not None: img.setImage(disp_raw, autoLevels=False) - set_image_rect_if_ready(img, 0.0, f_min, float(max_sweeps), max(1e-9, f_max - f_min)) - p_img.setRange(xRange=(0, max_sweeps - 1), yRange=(f_min, f_max), padding=0) - p_line.setXRange(f_min, f_max, padding=0) + set_image_rect_if_changed("raw_waterfall_rect", img, 0.0, f_min, float(max_sweeps), max(1e-9, f_max - f_min)) + set_xy_range_if_changed( + "raw_waterfall_range", + p_img, + x_bounds=(0, max_sweeps - 1), + y_bounds=(f_min, f_max), + padding=0, + ) + set_x_range_if_changed("line_x", p_line, f_min, f_max, padding=0) disp_fft = sanitize_image_for_display(runtime.ring.get_display_fft_linear()) if disp_fft is not None: img_fft.setImage(disp_fft, autoLevels=False) - set_image_rect_if_ready(img_fft, 0.0, 0.0, float(max_sweeps), 1.0) - p_spec.setRange(xRange=(0, max_sweeps - 1), yRange=(0.0, 1.0), padding=0) - p_fft.setXRange(0.0, 1.0, padding=0) + set_image_rect_if_changed("fft_waterfall_rect", img_fft, 0.0, 0.0, float(max_sweeps), 1.0) + set_xy_range_if_changed( + "fft_waterfall_range", + p_spec, + x_bounds=(0, max_sweeps - 1), + y_bounds=(0.0, 1.0), + padding=0, + ) + set_x_range_if_changed("fft_x", p_fft, 0.0, 1.0, padding=0) def _active_distance_axis() -> Optional[np.ndarray]: if runtime.current_distances is not None and runtime.current_distances.size > 0: @@ -1261,12 +1369,16 @@ def run_pyqtgraph(args) -> None: return resolve_distance_cut_start(_active_distance_axis(), fft_low_cut_percent) def refresh_fft_low_cut_label() -> None: + nonlocal last_fft_low_cut_label_text text = f"{fft_low_cut_percent:.1f}%" cut_start = _active_distance_cut_start() if cut_start is not None and np.isfinite(cut_start): text = f"{text} (~{cut_start:.4g} м)" + if text == last_fft_low_cut_label_text: + return try: fft_low_cut_value_label.setText(text) + last_fft_low_cut_label_text = text except Exception: pass @@ -1278,9 +1390,15 @@ def run_pyqtgraph(args) -> None: ) if freq_bounds is not None: f_min, f_max = freq_bounds - set_image_rect_if_ready(img, 0.0, f_min, float(max_sweeps), f_max - f_min) - p_img.setRange(xRange=(0, max_sweeps - 1), yRange=(f_min, f_max), padding=0) - p_line.setXRange(f_min, f_max, padding=0) + set_image_rect_if_changed("raw_waterfall_rect", img, 0.0, f_min, float(max_sweeps), f_max - f_min) + set_xy_range_if_changed( + "raw_waterfall_range", + p_img, + x_bounds=(0, max_sweeps - 1), + y_bounds=(f_min, f_max), + padding=0, + ) + set_x_range_if_changed("line_x", p_line, f_min, f_max, padding=0) distance_bounds = resolve_axis_bounds(runtime.ring.distance_axis) if distance_bounds is not None: @@ -1289,9 +1407,15 @@ def run_pyqtgraph(args) -> None: if d_cut is not None and np.isfinite(d_cut): d_min = max(float(d_min), float(d_cut)) span = max(1e-9, float(d_max - d_min)) - set_image_rect_if_ready(img_fft, 0.0, d_min, float(max_sweeps), span) - p_spec.setRange(xRange=(0, max_sweeps - 1), yRange=(d_min, d_max), padding=0) - p_fft.setXRange(d_min, d_max, padding=0) + set_image_rect_if_changed("fft_waterfall_rect", img_fft, 0.0, d_min, float(max_sweeps), span) + set_xy_range_if_changed( + "fft_waterfall_range", + p_spec, + x_bounds=(0, max_sweeps - 1), + y_bounds=(d_min, d_max), + padding=0, + ) + set_x_range_if_changed("fft_x", p_fft, d_min, d_max, padding=0) refresh_fft_low_cut_label() def resolve_curve_xs(size: int) -> np.ndarray: @@ -1390,12 +1514,16 @@ def run_pyqtgraph(args) -> None: signal_kind = get_signal_kind() return bool(complex_sweep_mode) and signal_kind not in {"bin_logdet", "bin_iq_do1_tagged"} - def refresh_signal_mode_labels() -> None: + def refresh_signal_mode_labels(*, force: bool = False) -> None: + nonlocal last_signal_mode_signature signal_kind = get_signal_kind() active_complex = current_packet_is_complex() is_logdet = signal_kind == "bin_logdet" is_bin_iq = signal_kind == "bin_iq" is_do1_tagged = signal_kind == "bin_iq_do1_tagged" + signature = (signal_kind, bool(active_complex), bool(is_do1_tagged)) + if (not force) and signature == last_signal_mode_signature: + return try: if is_logdet: @@ -1428,6 +1556,7 @@ def run_pyqtgraph(args) -> None: p_fft.setVisible(not is_do1_tagged) p_spec.setVisible(not is_do1_tagged) p_complex_calib.setVisible((not is_do1_tagged) and bool(active_complex)) + last_signal_mode_signature = signature except Exception: pass @@ -1544,6 +1673,8 @@ def run_pyqtgraph(args) -> None: def reset_ring_buffers() -> None: runtime.ring.reset() + axis_range_cache.clear() + image_rect_cache.clear() runtime.current_distances = None runtime.current_fft_complex = None runtime.current_fft_mag = None @@ -2135,7 +2266,7 @@ def run_pyqtgraph(args) -> None: set_fft_curve_visibility() set_fft_mode() set_fft_low_cut_percent() - refresh_signal_mode_labels() + refresh_signal_mode_labels(force=True) try: range_min_spin.valueChanged.connect(lambda _v: set_working_range()) @@ -2324,6 +2455,7 @@ def run_pyqtgraph(args) -> None: ui_frames_skipped = 0 ui_started_at = time.perf_counter() update_ticks = 0 + last_axis_range_refresh_at = 0.0 def refresh_current_fft_cache(sweep_for_fft: np.ndarray, bins: int) -> None: fft_complex = compute_fft_complex_row( @@ -2521,7 +2653,6 @@ def run_pyqtgraph(args) -> None: runtime.full_current_aux_curves = None runtime.full_current_aux_curves_codes = None runtime.full_current_sweep_codes = None - refresh_signal_mode_labels() refresh_current_window(push_to_ring=True) processed_frames += 1 last_packet_processed_at = time.time() @@ -2558,7 +2689,7 @@ def run_pyqtgraph(args) -> None: pass def update() -> None: - nonlocal peak_ref_window, status_dirty, update_ticks + nonlocal peak_ref_window, status_dirty, update_ticks, last_axis_range_refresh_at norm_display_scale = 500.0 if peak_calibrate_mode and any(edit.hasFocus() for edit in c_edits): return @@ -2568,8 +2699,14 @@ def run_pyqtgraph(args) -> None: clear_expired_status_note() refresh_waiting_data_note() + now_perf = time.perf_counter() changed = drain_queue() > 0 redraw_needed = changed or runtime.plot_dirty + refresh_auto_ranges = bool(runtime.plot_dirty) or ( + redraw_needed and (now_perf - last_axis_range_refresh_at) >= UI_AXIS_REFRESH_INTERVAL_S + ) + if refresh_auto_ranges: + last_axis_range_refresh_at = now_perf refresh_heavy_views = ( runtime.plot_dirty or last_heavy_refresh_stride <= 1 @@ -2743,8 +2880,8 @@ def run_pyqtgraph(args) -> None: (runtime.current_sweep_norm * norm_display_scale) if runtime.current_sweep_norm is not None else None, ] y_limits = compute_auto_ylim(*y_series) - if y_limits is not None: - p_line.setYRange(y_limits[0], y_limits[1], padding=0) + if refresh_auto_ranges and y_limits is not None: + set_y_range_if_changed("line_y", p_line, y_limits[0], y_limits[1], padding=0) if p_line_aux_vb is not None: aux_limits = compute_auto_ylim( displayed_aux[0] if displayed_aux is not None else None, @@ -2754,16 +2891,16 @@ def run_pyqtgraph(args) -> None: displayed_tagged_aux_high[0] if displayed_tagged_aux_high is not None else None, displayed_tagged_aux_high[1] if displayed_tagged_aux_high is not None else None, ) - if aux_limits is not None: - p_line_aux_vb.setYRange(aux_limits[0], aux_limits[1], padding=0) + if refresh_auto_ranges and aux_limits is not None: + set_y_range_if_changed("line_aux_y", p_line_aux_vb, aux_limits[0], aux_limits[1], padding=0) phase_limits = compute_auto_ylim(displayed_phase, displayed_phase_high) - if phase_limits is not None: - p_line_phase.setYRange(phase_limits[0], phase_limits[1], padding=0) + if refresh_auto_ranges and phase_limits is not None: + set_y_range_if_changed("line_phase_y", p_line_phase, phase_limits[0], phase_limits[1], padding=0) line_x_bounds = resolve_axis_bounds(xs) if line_x_bounds is not None: - p_line.setXRange(line_x_bounds[0], line_x_bounds[1], padding=0) - p_line_phase.setXRange(line_x_bounds[0], line_x_bounds[1], padding=0) + set_x_range_if_changed("line_x", p_line, line_x_bounds[0], line_x_bounds[1], padding=0) + set_x_range_if_changed("line_phase_x", p_line_phase, line_x_bounds[0], line_x_bounds[1], padding=0) complex_calib_plot_signal: Optional[np.ndarray] = None if ( @@ -2785,11 +2922,23 @@ def run_pyqtgraph(args) -> None: curve_complex_calib_real.setData(real_x, real_y, autoDownsample=False) curve_complex_calib_imag.setData(imag_x, imag_y, autoDownsample=False) complex_ylim = compute_auto_ylim(real_after, imag_after) - if complex_ylim is not None: - p_complex_calib.setYRange(complex_ylim[0], complex_ylim[1], padding=0) + if refresh_auto_ranges and complex_ylim is not None: + set_y_range_if_changed( + "complex_calib_y", + p_complex_calib, + complex_ylim[0], + complex_ylim[1], + padding=0, + ) complex_x_bounds = resolve_axis_bounds(xs_complex) if complex_x_bounds is not None: - p_complex_calib.setXRange(complex_x_bounds[0], complex_x_bounds[1], padding=0) + set_x_range_if_changed( + "complex_calib_x", + p_complex_calib, + complex_x_bounds[0], + complex_x_bounds[1], + padding=0, + ) else: curve_complex_calib_real.setData([], []) curve_complex_calib_imag.setData([], []) @@ -2844,7 +2993,7 @@ def run_pyqtgraph(args) -> None: fft_complex_plot = None fft_x_bounds = resolve_axis_bounds(xs_fft) if fft_x_bounds is not None: - p_fft.setXRange(fft_x_bounds[0], fft_x_bounds[1], padding=0) + set_x_range_if_changed("fft_x", p_fft, fft_x_bounds[0], fft_x_bounds[1], padding=0) fft_vals_db = fft_mag_to_db(fft_mag_plot) ref_curve_for_range = None @@ -2911,8 +3060,8 @@ def run_pyqtgraph(args) -> None: box.setVisible(False) y_limits = compute_auto_ylim(visible_abs, visible_real, visible_imag, ref_curve_for_range) - if y_limits is not None: - p_fft.setYRange(y_limits[0], y_limits[1], padding=0) + if refresh_auto_ranges and y_limits is not None: + set_y_range_if_changed("fft_y", p_fft, y_limits[0], y_limits[1], padding=0) if peak_calibrate_mode and visible_abs is not None: markers = find_peak_width_markers(xs_fft, fft_vals_db) @@ -2986,7 +3135,8 @@ def run_pyqtgraph(args) -> None: box.setVisible(False) if active_background is not None and fft_abs_enabled: - p_fft.setYRange(-10.0, 30.0, padding=0) + if refresh_auto_ranges: + set_y_range_if_changed("fft_y", p_fft, -10.0, 30.0, padding=0) else: finite_y = y_for_range[np.isfinite(y_for_range)] if finite_y.size > 0: @@ -2994,7 +3144,8 @@ def run_pyqtgraph(args) -> None: y1 = float(np.max(finite_y)) if y1 <= y0: y1 = y0 + 1e-3 - p_fft.setYRange(y0, y1, padding=0) + if refresh_auto_ranges: + set_y_range_if_changed("fft_y", p_fft, y0, y1, padding=0) if peak_calibrate_mode and fft_abs_enabled: markers = find_peak_width_markers(xs_fft, fft_vals_db) @@ -3202,8 +3353,14 @@ def run_pyqtgraph(args) -> None: distance_bounds = resolve_axis_bounds(disp_fft_axis) if distance_bounds is not None: d_min, d_max = distance_bounds - set_image_rect_if_ready(img_fft, 0.0, d_min, float(max_sweeps), max(1e-9, d_max - d_min)) - p_spec.setRange(xRange=(0, max_sweeps - 1), yRange=(d_min, d_max), padding=0) + set_image_rect_if_changed("fft_waterfall_rect", img_fft, 0.0, d_min, float(max_sweeps), max(1e-9, d_max - d_min)) + set_xy_range_if_changed( + "fft_waterfall_range", + p_spec, + x_bounds=(0, max_sweeps - 1), + y_bounds=(d_min, d_max), + padding=0, + ) if levels is not None: img_fft.setImage(disp_fft, autoLevels=False, levels=levels) else: diff --git a/rfg_adc_plotter/io/sweep_parser_core.py b/rfg_adc_plotter/io/sweep_parser_core.py index 411c231..15364e9 100644 --- a/rfg_adc_plotter/io/sweep_parser_core.py +++ b/rfg_adc_plotter/io/sweep_parser_core.py @@ -11,6 +11,7 @@ import numpy as np from rfg_adc_plotter.constants import DATA_INVERSION_THRESHOLD, LOG_BASE, LOG_EXP_LIMIT, LOG_POSTSCALER, LOG_SCALER from rfg_adc_plotter.types import ( + BatchPointEvent, Do1Level, ParserEvent, PointEvent, @@ -157,12 +158,13 @@ class ComplexAsciiSweepParser: class LegacyBinaryParser: """Byte-resynchronizing parser for supported 8-byte binary record formats.""" - def __init__(self): + def __init__(self, *, batch_events: bool = False): self._buf = bytearray() self._last_step: Optional[int] = None self._seen_points = False self._mode: Optional[str] = None self._current_signal_kind: Optional[SignalKind] = None + self._batch_events = bool(batch_events) self._last_tagged_step_by_level: Dict[Do1Level, Optional[int]] = { "low": None, "high": None, @@ -296,6 +298,78 @@ class LegacyBinaryParser: ) ) + def _try_emit_tty_batch(self, events: List[ParserEvent], *, require_not_legacy: bool) -> bool: + if not self._batch_events or len(self._buf) < 8: + return False + + block_count = len(self._buf) // 8 + if block_count <= 0: + return False + raw = np.frombuffer(self._buf, dtype=np.uint8, count=block_count * 8).reshape(-1, 8) + words = np.frombuffer(self._buf, dtype=" 0 else int(valid.size) + if valid_count <= 0: + return False + + steps = words[:valid_count, 1].astype(np.int64, copy=True) + if self._current_signal_kind != "bin_iq": + if self._seen_points: + events.append(StartEvent(ch=0, signal_kind="bin_iq")) + self._last_step = None + self._seen_points = False + self._current_signal_kind = "bin_iq" + self._reset_tagged_steps() + + if self._seen_points and self._last_step is not None and steps[0] <= int(self._last_step): + events.append(StartEvent(ch=0, signal_kind="bin_iq")) + self._last_step = None + self._seen_points = False + self._reset_tagged_steps() + + reset_idx = np.nonzero(np.diff(steps) <= 0)[0] + take_count = int(reset_idx[0] + 1) if reset_idx.size > 0 else int(steps.size) + if take_count <= 0: + return False + + batch_words = words[:take_count].copy() + xs = batch_words[:, 1].astype(np.int64, copy=False) + ch_1 = batch_words[:, 2].astype(np.uint16, copy=False).view(np.int16) + ch_2 = batch_words[:, 3].astype(np.uint16, copy=False).view(np.int16) + del raw + del words + del w0 + del w1 + del self._buf[: take_count * 8] + + ch_1_i64 = ch_1.astype(np.int64) + ch_2_i64 = ch_2.astype(np.int64) + ys = ((ch_1_i64 * ch_1_i64) + (ch_2_i64 * ch_2_i64)).astype(np.float32) + self._mode = "bin" + self._seen_points = True + self._last_step = int(xs[-1]) + self._current_signal_kind = "bin_iq" + self._reset_tagged_steps() + events.append( + BatchPointEvent( + ch=0, + xs=xs, + ys=ys, + aux=(ch_1.astype(np.float32), ch_2.astype(np.float32)), + signal_kind="bin_iq", + ) + ) + return True + def feed(self, data: bytes) -> List[ParserEvent]: if data: self._buf += data @@ -330,6 +404,8 @@ class LegacyBinaryParser: continue if self._mode == "legacy": + if is_tty_point and (not is_legacy_point) and self._try_emit_tty_batch(events, require_not_legacy=True): + continue if is_legacy_point: self._emit_legacy_point( events, @@ -368,6 +444,8 @@ class LegacyBinaryParser: continue if self._mode == "bin": + if is_tty_point and self._try_emit_tty_batch(events, require_not_legacy=False): + continue if is_tty_point: self._emit_tty_point(events, step=int(w1), ch_1_word=int(w2), ch_2_word=int(w3)) del self._buf[:8] @@ -408,6 +486,8 @@ class LegacyBinaryParser: # Mode is still unknown. Accept only unambiguous point shapes to avoid # jumping between tty and legacy interpretations on coincidental bytes. if is_tty_point and (not is_legacy_point): + if self._try_emit_tty_batch(events, require_not_legacy=True): + continue self._emit_tty_point(events, step=int(w1), ch_1_word=int(w2), ch_2_word=int(w3)) del self._buf[:8] continue @@ -729,6 +809,67 @@ class SweepAssembler: out[both_valid] = (first[both_valid] + second[both_valid]) * 0.5 return out + def _has_current_points(self) -> bool: + return bool(self._xs or self._tagged_low_xs or self._tagged_high_xs) + + def _consume_batch(self, event: BatchPointEvent) -> Optional[SweepPacket]: + xs_arr = np.asarray(event.xs, dtype=np.int64).reshape(-1) + ys_arr = np.asarray(event.ys, dtype=np.float32).reshape(-1) + width = min(xs_arr.size, ys_arr.size) + if width <= 0: + return None + + point_ch = int(event.ch) + point_signal_kind = event.signal_kind + packet: Optional[SweepPacket] = None + if self._cur_channel is None: + self._cur_channel = point_ch + elif point_ch != self._cur_channel: + if self._has_current_points(): + packet = self.finalize_current() + self._reset_current() + self._cur_channel = point_ch + if self._cur_signal_kind != point_signal_kind: + if self._has_current_points(): + packet = self.finalize_current() + self._reset_current() + self._cur_channel = point_ch + self._cur_signal_kind = point_signal_kind + + self._cur_channels.add(point_ch) + xs = xs_arr[:width] + ys = ys_arr[:width] + self._xs.extend(xs.tolist()) + self._ys.extend(ys.tolist()) + if self._cur_signal_kind == "bin_iq_do1_tagged": + level = "high" if event.do1_level == "high" else "low" + if level == "low": + self._tagged_low_xs.extend(xs.tolist()) + self._tagged_low_ys.extend(ys.tolist()) + else: + self._tagged_high_xs.extend(xs.tolist()) + self._tagged_high_ys.extend(ys.tolist()) + if event.aux is not None: + try: + aux_1, aux_2 = event.aux + aux_1_arr = np.asarray(aux_1, dtype=np.float32).reshape(-1) + aux_2_arr = np.asarray(aux_2, dtype=np.float32).reshape(-1) + aux_width = min(width, aux_1_arr.size, aux_2_arr.size) + except Exception: + aux_width = 0 + if aux_width > 0: + if self._cur_signal_kind == "bin_iq_do1_tagged": + if event.do1_level == "high": + self._tagged_high_aux_1.extend(aux_1_arr[:aux_width].tolist()) + self._tagged_high_aux_2.extend(aux_2_arr[:aux_width].tolist()) + else: + self._tagged_low_aux_1.extend(aux_1_arr[:aux_width].tolist()) + self._tagged_low_aux_2.extend(aux_2_arr[:aux_width].tolist()) + else: + self._aux_1.extend(aux_1_arr[:aux_width].tolist()) + self._aux_2.extend(aux_2_arr[:aux_width].tolist()) + return packet + def consume(self, event: ParserEvent) -> Optional[SweepPacket]: if isinstance(event, StartEvent): packet = self.finalize_current() @@ -737,6 +878,8 @@ class SweepAssembler: self._cur_channel = int(event.ch) self._cur_signal_kind = event.signal_kind return packet + if isinstance(event, BatchPointEvent): + return self._consume_batch(event) point_ch = int(event.ch) point_signal_kind = event.signal_kind diff --git a/rfg_adc_plotter/io/sweep_reader.py b/rfg_adc_plotter/io/sweep_reader.py index cd5dde4..bedca8e 100644 --- a/rfg_adc_plotter/io/sweep_reader.py +++ b/rfg_adc_plotter/io/sweep_reader.py @@ -157,7 +157,7 @@ class SweepReader(threading.Thread): if self._logscale: return LogScaleBinaryParser32(), SweepAssembler(fancy=self._fancy, apply_inversion=False) if self._bin_mode: - return LegacyBinaryParser(), SweepAssembler(fancy=self._fancy, apply_inversion=True) + return LegacyBinaryParser(batch_events=True), SweepAssembler(fancy=self._fancy, apply_inversion=True) return AsciiSweepParser(), SweepAssembler(fancy=self._fancy, apply_inversion=True) @staticmethod @@ -216,7 +216,7 @@ class SweepReader(threading.Thread): ) ) sys.stderr.write("[info] parser_16_bit_x2: fallback -> legacy\n") - parser = LegacyBinaryParser() + parser = LegacyBinaryParser(batch_events=True) assembler = SweepAssembler(fancy=self._fancy, apply_inversion=True) probe_packets = self._consume_events(assembler, parser.feed(bytes(probe_buf))) return parser, assembler, probe_packets diff --git a/rfg_adc_plotter/types.py b/rfg_adc_plotter/types.py index 5a8ef0a..8e239d5 100644 --- a/rfg_adc_plotter/types.py +++ b/rfg_adc_plotter/types.py @@ -33,4 +33,14 @@ class PointEvent: do1_level: Optional[Do1Level] = None -ParserEvent: TypeAlias = Union[StartEvent, PointEvent] +@dataclass(frozen=True) +class BatchPointEvent: + ch: int + xs: np.ndarray + ys: np.ndarray + aux: Optional[Tuple[np.ndarray, np.ndarray]] = None + signal_kind: Optional[SignalKind] = None + do1_level: Optional[Do1Level] = None + + +ParserEvent: TypeAlias = Union[StartEvent, PointEvent, BatchPointEvent] diff --git a/tests/test_sweep_parser_core.py b/tests/test_sweep_parser_core.py index 576b3e7..6d9c806 100644 --- a/tests/test_sweep_parser_core.py +++ b/tests/test_sweep_parser_core.py @@ -3,8 +3,11 @@ from __future__ import annotations import math import unittest +import numpy as np + from rfg_adc_plotter.io.sweep_parser_core import ( AsciiSweepParser, + BatchPointEvent, ComplexAsciiSweepParser, LegacyBinaryParser, LogScale16BitX2BinaryParser, @@ -185,6 +188,54 @@ class SweepParserCoreTests(unittest.TestCase): self.assertEqual(events[2].aux, (120.0, 95.0)) self.assertEqual(events[2].signal_kind, "bin_iq") + def test_legacy_binary_parser_batch_mode_emits_tty_batch_event(self): + parser = LegacyBinaryParser(batch_events=True) + stream = b"".join( + [ + _pack_tty_start(), + _pack_tty_point(1, 100, 90), + _pack_tty_point(2, 120, 95), + ] + ) + + events = parser.feed(stream) + + self.assertEqual(len(events), 2) + self.assertIsInstance(events[0], StartEvent) + self.assertIsInstance(events[1], BatchPointEvent) + self.assertTrue(np.array_equal(events[1].xs, np.asarray([1, 2], dtype=np.int64))) + self.assertTrue(np.allclose(events[1].ys, np.asarray([18100.0, 23425.0], dtype=np.float32))) + self.assertIsNotNone(events[1].aux) + self.assertTrue(np.allclose(events[1].aux[0], np.asarray([100.0, 120.0], dtype=np.float32))) + self.assertTrue(np.allclose(events[1].aux[1], np.asarray([90.0, 95.0], dtype=np.float32))) + self.assertEqual(events[1].signal_kind, "bin_iq") + + def test_sweep_assembler_consumes_tty_batch_event(self): + assembler = SweepAssembler(fancy=False, apply_inversion=False) + packet = assembler.consume( + BatchPointEvent( + ch=0, + xs=np.asarray([1, 2], dtype=np.int64), + ys=np.asarray([18100.0, 23425.0], dtype=np.float32), + aux=( + np.asarray([100.0, 120.0], dtype=np.float32), + np.asarray([90.0, 95.0], dtype=np.float32), + ), + signal_kind="bin_iq", + ) + ) + self.assertIsNone(packet) + + sweep, info, aux = assembler.finalize_current() + + self.assertEqual(info["signal_kind"], "bin_iq") + self.assertEqual(sweep.shape[0], 3) + self.assertAlmostEqual(float(sweep[1]), 18100.0, places=6) + self.assertAlmostEqual(float(sweep[2]), 23425.0, places=6) + self.assertIsNotNone(aux) + self.assertAlmostEqual(float(aux[0][1]), 100.0, places=6) + self.assertAlmostEqual(float(aux[1][2]), 95.0, places=6) + def test_legacy_binary_parser_detects_new_tty_sweep_on_step_reset(self): parser = LegacyBinaryParser() stream = b"".join(