From 9aac1623209456d4e2fbff82d57a88e99193d7f3 Mon Sep 17 00:00:00 2001 From: awe Date: Fri, 10 Apr 2026 14:46:58 +0300 Subject: [PATCH] fix --- README.md | 8 ++++ rfg_adc_plotter/cli.py | 5 +- rfg_adc_plotter/gui/pyqtgraph_backend.py | 38 +++++++++++---- rfg_adc_plotter/io/sweep_parser_core.py | 31 +++++++++++- rfg_adc_plotter/io/sweep_reader.py | 38 ++++++++++----- tests/test_processing.py | 25 ++++++++++ tests/test_sweep_parser_core.py | 60 ++++++++++++++++++++++++ tests/test_sweep_reader.py | 44 +++++++++++++++++ 8 files changed, 227 insertions(+), 22 deletions(-) diff --git a/README.md b/README.md index e76bca6..9b461a5 100644 --- a/README.md +++ b/README.md @@ -109,6 +109,8 @@ Legacy binary: .venv/bin/python -m rfg_adc_plotter.main /dev/ttyACM0 --bin ``` +`--bin` также понимает `tty`-поток CH1/CH2 из `kamil_adc` (`tty:/tmp/ttyADC_data`) в 8-байтном формате `0x000A,step,ch1_i16,ch2_i16`. + Logscale binary с парой `int32`: ```bash @@ -170,6 +172,12 @@ ssh 192.148.0.148 'ls -l /dev/ttyACM0' Если на удаленной машине есть доступ к потоку, удобнее сохранять его в файл и уже этот файл гонять локально через `replay_pty.py`. +Для локального `tty`-потока из `kamil_adc` используйте: + +```bash +.venv/bin/python -m rfg_adc_plotter.main /tmp/ttyADC_data --bin +``` + ## Проверка и тесты Синтаксическая проверка: diff --git a/rfg_adc_plotter/cli.py b/rfg_adc_plotter/cli.py index f2fdab1..162a5f9 100644 --- a/rfg_adc_plotter/cli.py +++ b/rfg_adc_plotter/cli.py @@ -71,8 +71,9 @@ def build_parser() -> argparse.ArgumentParser: dest="bin_mode", action="store_true", help=( - "Бинарный протокол: старт свипа 0xFFFF,0xFFFF,0xFFFF,(CH<<8)|0x0A; " - "точки step,uint32(hi16,lo16),0x000A" + "8-байтный бинарный протокол: либо legacy старт " + "0xFFFF,0xFFFF,0xFFFF,(CH<<8)|0x0A и точки step,uint32(hi16,lo16),0x000A, " + "либо tty CH1/CH2 поток из kamil_adc в формате 0x000A,step,ch1_i16,ch2_i16" ), ) parser.add_argument( diff --git a/rfg_adc_plotter/gui/pyqtgraph_backend.py b/rfg_adc_plotter/gui/pyqtgraph_backend.py index 6bc435e..5154d9a 100644 --- a/rfg_adc_plotter/gui/pyqtgraph_backend.py +++ b/rfg_adc_plotter/gui/pyqtgraph_backend.py @@ -7,7 +7,7 @@ import sys import threading import time from queue import Empty, Queue -from typing import Dict, List, Optional, Tuple +from typing import Dict, List, Optional, Sequence, Tuple import numpy as np @@ -40,6 +40,7 @@ from rfg_adc_plotter.types import SweepAuxCurves, SweepInfo, SweepPacket RAW_PLOT_MAX_POINTS = 4096 RAW_WATERFALL_MAX_POINTS = 2048 +UI_MAX_PACKETS_PER_TICK = 8 DEBUG_FRAME_LOG_EVERY = 10 @@ -242,6 +243,20 @@ def decimate_curve_for_display( return x_arr[display_idx], y_arr[display_idx] +def coalesce_packets_for_ui( + packets: Sequence[SweepPacket], + *, + max_packets: int = UI_MAX_PACKETS_PER_TICK, +) -> Tuple[List[SweepPacket], int]: + """Keep only the newest packets so a burst cannot starve the Qt event loop.""" + packet_list = list(packets) + limit = max(1, int(max_packets)) + if len(packet_list) <= limit: + return packet_list, 0 + skipped = len(packet_list) - limit + return packet_list[-limit:], skipped + + def resolve_visible_fft_curves( fft_complex: Optional[np.ndarray], fft_mag: Optional[np.ndarray], @@ -1244,6 +1259,7 @@ def run_pyqtgraph(args) -> None: pass processed_frames = 0 + ui_frames_skipped = 0 ui_started_at = time.perf_counter() def refresh_current_fft_cache(sweep_for_fft: np.ndarray, bins: int) -> None: @@ -1257,14 +1273,20 @@ def run_pyqtgraph(args) -> None: runtime.current_fft_db = fft_mag_to_db(runtime.current_fft_mag) def drain_queue() -> int: - nonlocal processed_frames - drained = 0 + nonlocal processed_frames, ui_frames_skipped + pending_packets: List[SweepPacket] = [] while True: try: - sweep, info, aux_curves = queue.get_nowait() + pending_packets.append(queue.get_nowait()) except Empty: break - drained += 1 + drained = len(pending_packets) + if drained <= 0: + return 0 + + pending_packets, skipped_packets = coalesce_packets_for_ui(pending_packets) + ui_frames_skipped += skipped_packets + for sweep, info, aux_curves in pending_packets: base_freqs = np.linspace(SWEEP_FREQ_MIN_GHZ, SWEEP_FREQ_MAX_GHZ, sweep.size, dtype=np.float64) runtime.full_current_aux_curves = None runtime.full_current_fft_source = None @@ -1317,7 +1339,7 @@ def run_pyqtgraph(args) -> None: elapsed_s = max(time.perf_counter() - ui_started_at, 1e-9) frames_per_sec = float(processed_frames) / elapsed_s sys.stderr.write( - "[debug] ui frames:%d rate:%.2f/s last_sweep:%s ch:%s width:%d queue:%d\n" + "[debug] ui frames:%d rate:%.2f/s last_sweep:%s ch:%s width:%d queue:%d dropped:%d\n" % ( processed_frames, frames_per_sec, @@ -1325,10 +1347,10 @@ def run_pyqtgraph(args) -> None: str(info.get("ch") if isinstance(info, dict) else None), int(getattr(sweep, "size", 0)), int(queue_size), + int(ui_frames_skipped), ) ) - if drained > 0: - update_physical_axes() + update_physical_axes() return drained try: diff --git a/rfg_adc_plotter/io/sweep_parser_core.py b/rfg_adc_plotter/io/sweep_parser_core.py index a1f800c..ec7dce4 100644 --- a/rfg_adc_plotter/io/sweep_parser_core.py +++ b/rfg_adc_plotter/io/sweep_parser_core.py @@ -32,6 +32,11 @@ def log_pair_to_sweep(avg_1: int, avg_2: int) -> float: return abs(value_1 - value_2) * LOG_POSTSCALER +def tty_ch_pair_to_sweep(ch_1: int, ch_2: int) -> float: + """Reduce a raw CH1/CH2 TTY point to a single sweep value.""" + return float(abs(int(ch_1) - int(ch_2))) + + class AsciiSweepParser: """Incremental parser for ASCII sweep streams.""" @@ -139,7 +144,7 @@ class ComplexAsciiSweepParser: class LegacyBinaryParser: - """Byte-resynchronizing parser for legacy 8-byte binary records.""" + """Byte-resynchronizing parser for supported 8-byte binary record formats.""" def __init__(self): self._buf = bytearray() @@ -158,6 +163,7 @@ class LegacyBinaryParser: w0 = self._u16_at(self._buf, 0) w1 = self._u16_at(self._buf, 2) w2 = self._u16_at(self._buf, 4) + w3 = self._u16_at(self._buf, 6) if w0 == 0xFFFF and w1 == 0xFFFF and w2 == 0xFFFF and self._buf[6] == 0x0A: self._last_step = None self._seen_points = False @@ -174,6 +180,29 @@ class LegacyBinaryParser: events.append(PointEvent(ch=ch, x=int(w0), y=float(value))) del self._buf[:8] continue + if w0 == 0x000A and w1 == 0xFFFF and w2 == 0xFFFF and w3 == 0xFFFF: + self._last_step = None + self._seen_points = False + events.append(StartEvent(ch=0)) + del self._buf[:8] + continue + if w0 == 0x000A and w1 != 0xFFFF: + if self._seen_points and self._last_step is not None and w1 <= self._last_step: + events.append(StartEvent(ch=0)) + self._seen_points = True + self._last_step = int(w1) + ch_1 = u16_to_i16(w2) + ch_2 = u16_to_i16(w3) + events.append( + PointEvent( + ch=0, + x=int(w1), + y=tty_ch_pair_to_sweep(ch_1, ch_2), + aux=(float(ch_1), float(ch_2)), + ) + ) + del self._buf[:8] + continue del self._buf[:1] return events diff --git a/rfg_adc_plotter/io/sweep_reader.py b/rfg_adc_plotter/io/sweep_reader.py index 79d0010..888706e 100644 --- a/rfg_adc_plotter/io/sweep_reader.py +++ b/rfg_adc_plotter/io/sweep_reader.py @@ -22,6 +22,7 @@ from rfg_adc_plotter.types import ParserEvent, PointEvent, SweepPacket _PARSER_16_BIT_X2_PROBE_BYTES = 64 * 1024 _LEGACY_STREAM_MIN_RECORDS = 32 _LEGACY_STREAM_MIN_MATCH_RATIO = 0.95 +_TTY_STREAM_MIN_MATCH_RATIO = 0.60 _DEBUG_FRAME_LOG_EVERY = 10 @@ -30,27 +31,42 @@ def _u16le_at(data: bytes, offset: int) -> int: def _looks_like_legacy_8byte_stream(data: bytes) -> bool: - """Heuristically detect the legacy 8-byte stream on an arbitrary byte offset.""" + """Heuristically detect supported 8-byte binary streams on an arbitrary byte offset.""" buf = bytes(data) for offset in range(8): blocks = (len(buf) - offset) // 8 if blocks < _LEGACY_STREAM_MIN_RECORDS: continue min_matches = max(_LEGACY_STREAM_MIN_RECORDS, int(blocks * _LEGACY_STREAM_MIN_MATCH_RATIO)) - matched_steps: list[int] = [] + matched_steps_legacy: list[int] = [] + matched_steps_tty: list[int] = [] for block_idx in range(blocks): base = offset + (block_idx * 8) if (_u16le_at(buf, base + 6) & 0x00FF) != 0x000A: + w0 = _u16le_at(buf, base) + w1 = _u16le_at(buf, base + 2) + if w0 == 0x000A and w1 != 0xFFFF: + matched_steps_tty.append(w1) continue - matched_steps.append(_u16le_at(buf, base)) - if len(matched_steps) < min_matches: - continue - monotonic_or_reset = 0 - for prev_step, next_step in zip(matched_steps, matched_steps[1:]): - if next_step == (prev_step + 1) or next_step <= prev_step: - monotonic_or_reset += 1 - if monotonic_or_reset >= max(4, len(matched_steps) - 4): - return True + matched_steps_legacy.append(_u16le_at(buf, base)) + + if len(matched_steps_legacy) >= min_matches: + monotonic_or_reset = 0 + for prev_step, next_step in zip(matched_steps_legacy, matched_steps_legacy[1:]): + if next_step == (prev_step + 1) or next_step <= prev_step: + monotonic_or_reset += 1 + if monotonic_or_reset >= max(4, len(matched_steps_legacy) - 4): + return True + + tty_min_matches = max(_LEGACY_STREAM_MIN_RECORDS, int(blocks * _TTY_STREAM_MIN_MATCH_RATIO)) + if len(matched_steps_tty) >= tty_min_matches: + monotonic_or_reset = 0 + for prev_step, next_step in zip(matched_steps_tty, matched_steps_tty[1:]): + if next_step == (prev_step + 1) or next_step <= 2: + monotonic_or_reset += 1 + if monotonic_or_reset >= max(4, len(matched_steps_tty) - 4): + return True + return False diff --git a/tests/test_processing.py b/tests/test_processing.py index 31fdc8d..31a2741 100644 --- a/tests/test_processing.py +++ b/tests/test_processing.py @@ -9,6 +9,7 @@ from rfg_adc_plotter.constants import FFT_LEN, SWEEP_FREQ_MAX_GHZ, SWEEP_FREQ_MI from rfg_adc_plotter.gui.pyqtgraph_backend import ( apply_working_range, apply_working_range_to_aux_curves, + coalesce_packets_for_ui, compute_background_subtracted_bscan_levels, decimate_curve_for_display, resolve_visible_fft_curves, @@ -228,6 +229,30 @@ class ProcessingTests(unittest.TestCase): self.assertAlmostEqual(float(decimated_y[0]), float(ys[0]), places=6) self.assertAlmostEqual(float(decimated_y[-1]), float(ys[-1]), places=6) + def test_coalesce_packets_for_ui_keeps_newest_packets(self): + packets = [ + (np.asarray([float(idx)], dtype=np.float32), {"sweep": idx}, None) + for idx in range(6) + ] + + kept, skipped = coalesce_packets_for_ui(packets, max_packets=2) + + self.assertEqual(skipped, 4) + self.assertEqual(len(kept), 2) + self.assertEqual(int(kept[0][1]["sweep"]), 4) + self.assertEqual(int(kept[1][1]["sweep"]), 5) + + def test_coalesce_packets_for_ui_never_returns_empty_for_non_empty_input(self): + packets = [ + (np.asarray([1.0], dtype=np.float32), {"sweep": 1}, None), + ] + + kept, skipped = coalesce_packets_for_ui(packets, max_packets=0) + + self.assertEqual(skipped, 0) + self.assertEqual(len(kept), 1) + self.assertEqual(int(kept[0][1]["sweep"]), 1) + def test_background_subtracted_bscan_levels_ignore_zero_floor(self): disp_fft_lin = np.zeros((4, 8), dtype=np.float32) disp_fft_lin[1, 2:6] = np.asarray([0.05, 0.1, 0.5, 2.0], dtype=np.float32) diff --git a/tests/test_sweep_parser_core.py b/tests/test_sweep_parser_core.py index 2160c0b..b9ef49d 100644 --- a/tests/test_sweep_parser_core.py +++ b/tests/test_sweep_parser_core.py @@ -72,6 +72,21 @@ def _pack_log16_point(step: int, avg1: int, avg2: int) -> bytes: ) +def _pack_tty_start() -> bytes: + return b"".join([_u16le(0x000A), _u16le(0xFFFF), _u16le(0xFFFF), _u16le(0xFFFF)]) + + +def _pack_tty_point(step: int, ch1: int, ch2: int) -> bytes: + return b"".join( + [ + _u16le(0x000A), + _u16le(step), + _u16le(ch1), + _u16le(ch2), + ] + ) + + class SweepParserCoreTests(unittest.TestCase): def test_ascii_parser_emits_start_and_points(self): parser = AsciiSweepParser() @@ -115,6 +130,51 @@ class SweepParserCoreTests(unittest.TestCase): self.assertEqual(events[3].x, 1) self.assertEqual(events[3].y, -4.0) + def test_legacy_binary_parser_accepts_tty_ch1_ch2_stream(self): + parser = LegacyBinaryParser() + stream = b"".join( + [ + _pack_tty_start(), + _pack_tty_point(1, 100, 90), + _pack_tty_point(2, 120, 95), + ] + ) + + events = parser.feed(stream) + + self.assertIsInstance(events[0], StartEvent) + self.assertEqual(events[0].ch, 0) + self.assertIsInstance(events[1], PointEvent) + self.assertEqual(events[1].x, 1) + self.assertEqual(events[1].y, 10.0) + self.assertEqual(events[1].aux, (100.0, 90.0)) + self.assertIsInstance(events[2], PointEvent) + self.assertEqual(events[2].x, 2) + self.assertEqual(events[2].y, 25.0) + self.assertEqual(events[2].aux, (120.0, 95.0)) + + def test_legacy_binary_parser_detects_new_tty_sweep_on_step_reset(self): + parser = LegacyBinaryParser() + stream = b"".join( + [ + _pack_tty_start(), + _pack_tty_point(1, 100, 90), + _pack_tty_point(2, 110, 95), + _pack_tty_point(1, 120, 80), + ] + ) + + events = parser.feed(stream) + + self.assertIsInstance(events[0], StartEvent) + self.assertIsInstance(events[1], PointEvent) + self.assertIsInstance(events[2], PointEvent) + self.assertIsInstance(events[3], StartEvent) + self.assertEqual(events[3].ch, 0) + self.assertIsInstance(events[4], PointEvent) + self.assertEqual(events[4].x, 1) + self.assertEqual(events[4].aux, (120.0, 80.0)) + def test_complex_ascii_parser_detects_new_sweep_on_step_reset(self): parser = ComplexAsciiSweepParser() events = parser.feed(b"0 3 4\n1 5 12\n0 8 15\n") diff --git a/tests/test_sweep_reader.py b/tests/test_sweep_reader.py index bbe1dac..5544a3d 100644 --- a/tests/test_sweep_reader.py +++ b/tests/test_sweep_reader.py @@ -44,6 +44,28 @@ def _pack_log16_point(step: int, real: int, imag: int) -> bytes: ) +def _pack_tty_start() -> bytes: + return b"".join( + [ + _u16le(0x000A), + _u16le(0xFFFF), + _u16le(0xFFFF), + _u16le(0xFFFF), + ] + ) + + +def _pack_tty_point(step: int, ch1: int, ch2: int) -> bytes: + return b"".join( + [ + _u16le(0x000A), + _u16le(step), + _u16le(ch1), + _u16le(ch2), + ] + ) + + def _chunk_bytes(data: bytes, size: int = 4096) -> list[bytes]: return [data[idx : idx + size] for idx in range(0, len(data), size)] @@ -111,6 +133,28 @@ class SweepReaderTests(unittest.TestCase): reader.join(timeout=1.0) stack.close() + def test_parser_16_bit_x2_falls_back_to_tty_ch1_ch2_stream(self): + payload = bytearray() + while len(payload) < (_PARSER_16_BIT_X2_PROBE_BYTES + 24): + payload += _pack_tty_start() + payload += _pack_tty_point(1, 100, 90) + payload += _pack_tty_point(2, 120, 95) + payload += _pack_tty_point(1, 80, 70) + + stack, reader, queue, stop_event, stderr = self._start_reader(bytes(payload), parser_16_bit_x2=True) + try: + sweep, info, aux = queue.get(timeout=2.0) + self.assertEqual(info["ch"], 0) + self.assertIsNotNone(aux) + self.assertGreaterEqual(sweep.shape[0], 3) + self.assertAlmostEqual(float(sweep[1]), 10.0, places=6) + self.assertAlmostEqual(float(sweep[2]), 25.0, places=6) + self.assertIn("fallback -> legacy", stderr.getvalue()) + finally: + stop_event.set() + reader.join(timeout=1.0) + stack.close() + def test_parser_16_bit_x2_keeps_true_complex_stream(self): payload = b"".join( [