From 3cd29c60d6d3c906d0cb0a92a08a08edc4926bef Mon Sep 17 00:00:00 2001 From: kamil Date: Fri, 10 Apr 2026 18:01:43 +0300 Subject: [PATCH] fix st --- rfg_adc_plotter/gui/pyqtgraph_backend.py | 91 +++++++++++++++- rfg_adc_plotter/io/sweep_reader.py | 129 ++++++++++++++++++++++- tests/test_processing.py | 21 ++++ tests/test_sweep_reader.py | 35 ++++++ 4 files changed, 269 insertions(+), 7 deletions(-) diff --git a/rfg_adc_plotter/gui/pyqtgraph_backend.py b/rfg_adc_plotter/gui/pyqtgraph_backend.py index f4fdf31..925c1bb 100644 --- a/rfg_adc_plotter/gui/pyqtgraph_backend.py +++ b/rfg_adc_plotter/gui/pyqtgraph_backend.py @@ -3,6 +3,7 @@ from __future__ import annotations import signal +import os import sys import threading import time @@ -46,6 +47,7 @@ UI_BACKLOG_TAIL_THRESHOLD_MULTIPLIER = 2 UI_BACKLOG_LATEST_ONLY_THRESHOLD_MULTIPLIER = 4 UI_HEAVY_REFRESH_BACKLOG_MULTIPLIER = 2 UI_HEAVY_REFRESH_MAX_STRIDE = 4 +UI_DATA_WAIT_NOTE_AFTER_S = 3.0 DEFAULT_MAIN_WINDOW_WIDTH = 1200 DEFAULT_MAIN_WINDOW_HEIGHT = 680 MIN_MAIN_WINDOW_WIDTH = 640 @@ -93,6 +95,60 @@ def sanitize_image_for_display(data: Optional[np.ndarray]) -> Optional[np.ndarra return arr +def set_image_rect_if_ready(image_item, x: float, y: float, width: float, height: float) -> bool: + """Set ImageItem geometry only when the image payload has valid dimensions.""" + try: + x_val = float(x) + y_val = float(y) + width_val = float(width) + height_val = float(height) + except Exception: + return False + + if not ( + np.isfinite(x_val) + and np.isfinite(y_val) + and np.isfinite(width_val) + and np.isfinite(height_val) + and width_val > 0.0 + and height_val > 0.0 + ): + return False + + has_payload = False + try: + payload = getattr(image_item, "image", None) + if payload is not None: + arr = np.asarray(payload) + has_payload = arr.ndim >= 2 and arr.shape[0] > 0 and arr.shape[1] > 0 + except Exception: + has_payload = False + + if not has_payload: + try: + img_w = image_item.width() + img_h = image_item.height() + has_payload = ( + img_w is not None + and img_h is not None + and np.isfinite(float(img_w)) + and np.isfinite(float(img_h)) + and float(img_w) > 0.0 + and float(img_h) > 0.0 + ) + except Exception: + has_payload = False + + if not has_payload: + return False + + try: + image_item.setRect(x_val, y_val, width_val, height_val) + except Exception: + return False + return True + + def resolve_axis_bounds( values: Optional[np.ndarray], *, @@ -478,6 +534,12 @@ def run_pyqtgraph(args) -> None: or getattr(args, "parser_16_bit_x2", False) or getattr(args, "parser_test", False) ) + if not sys.platform.startswith("win"): + display_name = os.environ.get("DISPLAY") or os.environ.get("WAYLAND_DISPLAY") + if not display_name: + sys.stderr.write( + "[warn] DISPLAY/WAYLAND_DISPLAY not set. GUI windows may not open over plain SSH without desktop/X forwarding.\n" + ) try: import pyqtgraph as pg from pyqtgraph.Qt import QtCore, QtWidgets # type: ignore @@ -743,6 +805,7 @@ def run_pyqtgraph(args) -> None: fft_imag_enabled = True fft_mode = "symmetric" status_note = "" + waiting_data_note = "" status_note_expires_at: Optional[float] = None status_dirty = True range_change_in_progress = False @@ -750,6 +813,7 @@ def run_pyqtgraph(args) -> None: last_queue_backlog = 0 last_backlog_skipped = 0 last_heavy_refresh_stride = 1 + last_packet_processed_at: Optional[float] = None fixed_ylim: Optional[Tuple[float, float]] = None if args.ylim: try: @@ -772,13 +836,13 @@ def run_pyqtgraph(args) -> None: disp_raw = sanitize_image_for_display(runtime.ring.get_display_raw_decimated(RAW_WATERFALL_MAX_POINTS)) if disp_raw is not None: img.setImage(disp_raw, autoLevels=False) - img.setRect(0, f_min, max_sweeps, max(1e-9, f_max - f_min)) + set_image_rect_if_ready(img, 0.0, f_min, float(max_sweeps), max(1e-9, f_max - f_min)) p_img.setRange(xRange=(0, max_sweeps - 1), yRange=(f_min, f_max), padding=0) p_line.setXRange(f_min, f_max, padding=0) disp_fft = sanitize_image_for_display(runtime.ring.get_display_fft_linear()) if disp_fft is not None: img_fft.setImage(disp_fft, autoLevels=False) - img_fft.setRect(0, 0.0, max_sweeps, 1.0) + set_image_rect_if_ready(img_fft, 0.0, 0.0, float(max_sweeps), 1.0) p_spec.setRange(xRange=(0, max_sweeps - 1), yRange=(0.0, 1.0), padding=0) p_fft.setXRange(0.0, 1.0, padding=0) @@ -790,14 +854,14 @@ def run_pyqtgraph(args) -> None: ) if freq_bounds is not None: f_min, f_max = freq_bounds - img.setRect(0, f_min, max_sweeps, f_max - f_min) + set_image_rect_if_ready(img, 0.0, f_min, float(max_sweeps), f_max - f_min) p_img.setRange(xRange=(0, max_sweeps - 1), yRange=(f_min, f_max), padding=0) p_line.setXRange(f_min, f_max, padding=0) distance_bounds = resolve_axis_bounds(runtime.ring.distance_axis) if distance_bounds is not None: d_min, d_max = distance_bounds - img_fft.setRect(0, d_min, max_sweeps, d_max - d_min) + set_image_rect_if_ready(img_fft, 0.0, d_min, float(max_sweeps), d_max - d_min) p_spec.setRange(xRange=(0, max_sweeps - 1), yRange=(d_min, d_max), padding=0) def resolve_curve_xs(size: int) -> np.ndarray: @@ -839,6 +903,20 @@ def run_pyqtgraph(args) -> None: status_note_expires_at = None status_dirty = True + def refresh_waiting_data_note() -> None: + nonlocal waiting_data_note, status_dirty + now = time.time() + idle_s = (now - ui_started_at) if last_packet_processed_at is None else (now - last_packet_processed_at) + new_note = "" + if idle_s >= UI_DATA_WAIT_NOTE_AFTER_S: + if processed_frames <= 0: + new_note = f"ожидание данных: нет свипов {idle_s:.1f} c" + else: + new_note = f"ожидание данных: нет новых свипов {idle_s:.1f} c" + if new_note != waiting_data_note: + waiting_data_note = new_note + status_dirty = True + def log_debug_event(name: str, message: str, *, every: int = DEBUG_FRAME_LOG_EVERY) -> None: count = int(debug_event_counts.get(name, 0)) + 1 debug_event_counts[name] = count @@ -1442,6 +1520,7 @@ def run_pyqtgraph(args) -> None: def drain_queue() -> int: nonlocal processed_frames, ui_frames_skipped, last_queue_backlog, last_backlog_skipped, last_heavy_refresh_stride + nonlocal last_packet_processed_at pending_packets: List[SweepPacket] = [] while True: try: @@ -1518,6 +1597,7 @@ def run_pyqtgraph(args) -> None: runtime.current_info = info refresh_current_window(push_to_ring=True) processed_frames += 1 + last_packet_processed_at = time.time() if processed_frames % DEBUG_FRAME_LOG_EVERY == 0: try: queue_size = queue.qsize() @@ -1559,6 +1639,7 @@ def run_pyqtgraph(args) -> None: return update_ticks += 1 clear_expired_status_note() + refresh_waiting_data_note() changed = drain_queue() > 0 redraw_needed = changed or runtime.plot_dirty @@ -1898,6 +1979,8 @@ def run_pyqtgraph(args) -> None: status_payload["peak_a"] = runtime.current_peak_amplitude base_status = format_status_kv(status_payload) if status_payload else "" status_parts = [] + if waiting_data_note: + status_parts.append(waiting_data_note) if status_note: status_parts.append(status_note) if last_backlog_skipped > 0: diff --git a/rfg_adc_plotter/io/sweep_reader.py b/rfg_adc_plotter/io/sweep_reader.py index 888706e..a26171a 100644 --- a/rfg_adc_plotter/io/sweep_reader.py +++ b/rfg_adc_plotter/io/sweep_reader.py @@ -17,13 +17,16 @@ from rfg_adc_plotter.io.sweep_parser_core import ( ParserTestStreamParser, SweepAssembler, ) -from rfg_adc_plotter.types import ParserEvent, PointEvent, SweepPacket +from rfg_adc_plotter.types import ParserEvent, PointEvent, StartEvent, SweepPacket _PARSER_16_BIT_X2_PROBE_BYTES = 64 * 1024 _LEGACY_STREAM_MIN_RECORDS = 32 _LEGACY_STREAM_MIN_MATCH_RATIO = 0.95 _TTY_STREAM_MIN_MATCH_RATIO = 0.60 _DEBUG_FRAME_LOG_EVERY = 10 +_NO_INPUT_WARN_INTERVAL_S = 5.0 +_NO_PACKET_WARN_INTERVAL_S = 5.0 +_NO_PACKET_HINT_AFTER_S = 10.0 def _u16le_at(data: bytes, offset: int) -> int: @@ -119,6 +122,19 @@ class SweepReader(threading.Thread): self._frames_dropped = 0 self._started_at = time.perf_counter() + def _resolve_parser_mode_label(self) -> str: + if self._parser_complex_ascii: + return "complex_ascii" + if self._parser_test: + return "parser_test_16x2" + if self._parser_16_bit_x2: + return "parser_16_bit_x2" + if self._logscale: + return "logscale_32" + if self._bin_mode: + return "legacy_8byte" + return "ascii" + def _build_parser(self): if self._parser_complex_ascii: return ComplexAsciiSweepParser(), SweepAssembler(fancy=self._fancy, apply_inversion=False) @@ -145,6 +161,7 @@ class SweepReader(threading.Thread): parser = LogScale16BitX2BinaryParser() probe_buf = bytearray() probe_events: list[ParserEvent] = [] + probe_started_at = time.perf_counter() while not self._stop_event.is_set() and len(probe_buf) < _PARSER_16_BIT_X2_PROBE_BYTES: data = chunk_reader.read_available() @@ -156,15 +173,55 @@ class SweepReader(threading.Thread): if _is_valid_parser_16_bit_x2_probe(probe_events): assembler = SweepAssembler(fancy=self._fancy, apply_inversion=False) probe_packets = self._consume_events(assembler, probe_events) + n_points = int(sum(1 for event in probe_events if isinstance(event, PointEvent))) + n_starts = int(sum(1 for event in probe_events if isinstance(event, StartEvent))) + probe_ms = (time.perf_counter() - probe_started_at) * 1000.0 + sys.stderr.write( + "[info] parser_16_bit_x2 probe: bytes:%d events:%d points:%d starts:%d parser:16x2 elapsed_ms:%.1f\n" + % ( + len(probe_buf), + len(probe_events), + n_points, + n_starts, + probe_ms, + ) + ) return parser, assembler, probe_packets - if probe_buf and _looks_like_legacy_8byte_stream(bytes(probe_buf)): + probe_looks_legacy = bool(probe_buf) and _looks_like_legacy_8byte_stream(bytes(probe_buf)) + n_points = int(sum(1 for event in probe_events if isinstance(event, PointEvent))) + n_starts = int(sum(1 for event in probe_events if isinstance(event, StartEvent))) + probe_ms = (time.perf_counter() - probe_started_at) * 1000.0 + if probe_looks_legacy: + sys.stderr.write( + "[info] parser_16_bit_x2 probe: bytes:%d events:%d points:%d starts:%d parser:legacy(fallback) elapsed_ms:%.1f\n" + % ( + len(probe_buf), + len(probe_events), + n_points, + n_starts, + probe_ms, + ) + ) sys.stderr.write("[info] parser_16_bit_x2: fallback -> legacy\n") parser = LegacyBinaryParser() assembler = SweepAssembler(fancy=self._fancy, apply_inversion=True) probe_packets = self._consume_events(assembler, parser.feed(bytes(probe_buf))) return parser, assembler, probe_packets + sys.stderr.write( + "[warn] parser_16_bit_x2 probe inconclusive: bytes:%d events:%d points:%d starts:%d parser:16x2 elapsed_ms:%.1f\n" + % ( + len(probe_buf), + len(probe_events), + n_points, + n_starts, + probe_ms, + ) + ) + sys.stderr.write( + "[hint] parser_16_bit_x2: if source is 8-byte tty CH1/CH2 stream (0x000A,step,ch1,ch2), try --bin\n" + ) assembler = SweepAssembler(fancy=self._fancy, apply_inversion=False) return parser, assembler, [] @@ -212,7 +269,17 @@ class SweepReader(threading.Thread): def run(self) -> None: try: self._src = SerialLineSource(self._port_path, self._baud, timeout=1.0) + queue_cap = int(getattr(self._queue, "maxsize", -1)) sys.stderr.write(f"[info] Открыл порт {self._port_path} ({self._src._using})\n") + sys.stderr.write( + "[info] reader start: parser:%s fancy:%d queue_max:%d source:%s\n" + % ( + self._resolve_parser_mode_label(), + int(self._fancy), + queue_cap, + getattr(self._src, "_using", "unknown"), + ) + ) except Exception as exc: sys.stderr.write(f"[error] {exc}\n") return @@ -228,12 +295,68 @@ class SweepReader(threading.Thread): for packet in pending_packets: self._enqueue(packet) + loop_started_at = time.perf_counter() + last_input_at = loop_started_at + last_packet_at = loop_started_at if self._frames_read > 0 else loop_started_at + last_no_input_warn_at = loop_started_at + last_no_packet_warn_at = loop_started_at + parser_hint_emitted = False + while not self._stop_event.is_set(): data = chunk_reader.read_available() + now_s = time.perf_counter() if not data: + input_idle_s = now_s - last_input_at + if ( + input_idle_s >= _NO_INPUT_WARN_INTERVAL_S + and (now_s - last_no_input_warn_at) >= _NO_INPUT_WARN_INTERVAL_S + ): + sys.stderr.write( + "[warn] reader no input bytes for %.1fs on %s (parser:%s)\n" + % ( + input_idle_s, + self._port_path, + self._resolve_parser_mode_label(), + ) + ) + last_no_input_warn_at = now_s + + packets_idle_s = now_s - last_packet_at + if ( + packets_idle_s >= _NO_PACKET_WARN_INTERVAL_S + and (now_s - last_no_packet_warn_at) >= _NO_PACKET_WARN_INTERVAL_S + ): + try: + queue_size = self._queue.qsize() + except Exception: + queue_size = -1 + sys.stderr.write( + "[warn] reader no sweep packets for %.1fs (input_idle:%.1fs queue:%d parser:%s)\n" + % ( + packets_idle_s, + input_idle_s, + int(queue_size), + self._resolve_parser_mode_label(), + ) + ) + last_no_packet_warn_at = now_s + if ( + self._parser_16_bit_x2 + and (not parser_hint_emitted) + and (now_s - self._started_at) >= _NO_PACKET_HINT_AFTER_S + ): + sys.stderr.write( + "[hint] parser_16_bit_x2 still has no sweeps; if source is tty CH1/CH2, rerun with --bin\n" + ) + parser_hint_emitted = True time.sleep(0.0005) continue - for packet in self._consume_events(assembler, parser.feed(data)): + + last_input_at = now_s + packets = self._consume_events(assembler, parser.feed(data)) + if packets: + last_packet_at = now_s + for packet in packets: self._enqueue(packet) packet = assembler.finalize_current() if packet is not None: diff --git a/tests/test_processing.py b/tests/test_processing.py index 538eadd..bad67b1 100644 --- a/tests/test_processing.py +++ b/tests/test_processing.py @@ -18,6 +18,7 @@ from rfg_adc_plotter.gui.pyqtgraph_backend import ( resolve_initial_window_size, sanitize_curve_data_for_display, sanitize_image_for_display, + set_image_rect_if_ready, resolve_visible_fft_curves, resolve_visible_aux_curves, ) @@ -290,6 +291,26 @@ class ProcessingTests(unittest.TestCase): self.assertIsNone(data) + def test_set_image_rect_if_ready_skips_uninitialized_image(self): + class _DummyImageItem: + def __init__(self): + self.calls = 0 + + def width(self): + return None + + def height(self): + return None + + def setRect(self, *_args): + self.calls += 1 + + image_item = _DummyImageItem() + applied = set_image_rect_if_ready(image_item, 0.0, 0.0, 10.0, 1.0) + + self.assertFalse(applied) + self.assertEqual(image_item.calls, 0) + def test_resolve_axis_bounds_rejects_nonfinite_ranges(self): bounds = resolve_axis_bounds(np.asarray([np.nan, np.inf], dtype=np.float64)) diff --git a/tests/test_sweep_reader.py b/tests/test_sweep_reader.py index 5544a3d..81589be 100644 --- a/tests/test_sweep_reader.py +++ b/tests/test_sweep_reader.py @@ -178,6 +178,41 @@ class SweepReaderTests(unittest.TestCase): reader.join(timeout=1.0) stack.close() + def test_parser_16_bit_x2_probe_inconclusive_logs_hint(self): + payload = b"\x00" * (_PARSER_16_BIT_X2_PROBE_BYTES + 128) + + stack, reader, queue, stop_event, stderr = self._start_reader(payload, parser_16_bit_x2=True) + try: + deadline = time.time() + 1.5 + logs = "" + while time.time() < deadline: + logs = stderr.getvalue() + if "probe inconclusive" in logs: + break + time.sleep(0.02) + self.assertTrue(queue.empty()) + self.assertIn("probe inconclusive", logs) + self.assertIn("try --bin", logs) + finally: + stop_event.set() + reader.join(timeout=1.0) + stack.close() + + def test_reader_logs_no_input_warning_when_source_is_idle(self): + with patch.object(sweep_reader_module, "_NO_INPUT_WARN_INTERVAL_S", 0.02), patch.object( + sweep_reader_module, "_NO_PACKET_WARN_INTERVAL_S", 0.02 + ): + stack, reader, _queue, stop_event, stderr = self._start_reader(b"", parser_16_bit_x2=False) + try: + time.sleep(0.12) + logs = stderr.getvalue() + self.assertIn("no input bytes", logs) + self.assertIn("no sweep packets", logs) + finally: + stop_event.set() + reader.join(timeout=1.0) + stack.close() + def test_reader_join_does_not_raise_when_stopped(self): stack, reader, _queue, stop_event, _stderr = self._start_reader(b"", parser_16_bit_x2=True) try: