From afd8538900d03491d1c333f8921b0fc70295911f Mon Sep 17 00:00:00 2001 From: awe Date: Thu, 9 Apr 2026 19:05:58 +0300 Subject: [PATCH] try new synchro method --- rfg_adc_plotter/cli.py | 5 + rfg_adc_plotter/gui/pyqtgraph_backend.py | 2 +- rfg_adc_plotter/io/sweep_reader.py | 109 ++++++++++++++-- tests/test_cli.py | 7 +- tests/test_sweep_reader.py | 152 +++++++++++++++++++++++ 5 files changed, 263 insertions(+), 12 deletions(-) create mode 100644 tests/test_sweep_reader.py diff --git a/rfg_adc_plotter/cli.py b/rfg_adc_plotter/cli.py index 85323e5..f2fdab1 100644 --- a/rfg_adc_plotter/cli.py +++ b/rfg_adc_plotter/cli.py @@ -55,6 +55,11 @@ def build_parser() -> argparse.ArgumentParser: default="pg", help="Совместимый флаг. Поддерживаются только auto и pg; mpl удален.", ) + parser.add_argument( + "--opengl", + action="store_true", + help="Включить OpenGL-ускорение для PyQtGraph. По умолчанию используется CPU-отрисовка.", + ) parser.add_argument( "--norm-type", choices=["projector", "simple"], diff --git a/rfg_adc_plotter/gui/pyqtgraph_backend.py b/rfg_adc_plotter/gui/pyqtgraph_backend.py index 78a36c2..10b3aa9 100644 --- a/rfg_adc_plotter/gui/pyqtgraph_backend.py +++ b/rfg_adc_plotter/gui/pyqtgraph_backend.py @@ -310,7 +310,7 @@ def run_pyqtgraph(args) -> None: ) pg.setConfigOptions( - useOpenGL=not peak_calibrate_mode, + useOpenGL=bool(getattr(args, "opengl", False)), antialias=False, imageAxisOrder="row-major", ) diff --git a/rfg_adc_plotter/io/sweep_reader.py b/rfg_adc_plotter/io/sweep_reader.py index 2cb6730..eebd63a 100644 --- a/rfg_adc_plotter/io/sweep_reader.py +++ b/rfg_adc_plotter/io/sweep_reader.py @@ -17,7 +17,57 @@ from rfg_adc_plotter.io.sweep_parser_core import ( ParserTestStreamParser, SweepAssembler, ) -from rfg_adc_plotter.types import SweepPacket +from rfg_adc_plotter.types import ParserEvent, PointEvent, SweepPacket + +_PARSER_16_BIT_X2_PROBE_BYTES = 64 * 1024 +_LEGACY_STREAM_MIN_RECORDS = 32 +_LEGACY_STREAM_MIN_MATCH_RATIO = 0.95 + + +def _u16le_at(data: bytes, offset: int) -> int: + return int(data[offset]) | (int(data[offset + 1]) << 8) + + +def _looks_like_legacy_8byte_stream(data: bytes) -> bool: + """Heuristically detect the legacy 8-byte stream on an arbitrary byte offset.""" + buf = bytes(data) + for offset in range(8): + blocks = (len(buf) - offset) // 8 + if blocks < _LEGACY_STREAM_MIN_RECORDS: + continue + min_matches = max(_LEGACY_STREAM_MIN_RECORDS, int(blocks * _LEGACY_STREAM_MIN_MATCH_RATIO)) + matched_steps: list[int] = [] + for block_idx in range(blocks): + base = offset + (block_idx * 8) + if (_u16le_at(buf, base + 6) & 0x00FF) != 0x000A: + continue + matched_steps.append(_u16le_at(buf, base)) + if len(matched_steps) < min_matches: + continue + monotonic_or_reset = 0 + for prev_step, next_step in zip(matched_steps, matched_steps[1:]): + if next_step == (prev_step + 1) or next_step <= prev_step: + monotonic_or_reset += 1 + if monotonic_or_reset >= max(4, len(matched_steps) - 4): + return True + return False + + +def _is_valid_parser_16_bit_x2_probe(events: list[ParserEvent]) -> bool: + """Accept only plausible complex streams and ignore resync noise.""" + point_steps: list[int] = [] + for event in events: + if isinstance(event, PointEvent): + point_steps.append(int(event.x)) + + if len(point_steps) < 3: + return False + + monotonic_or_small_reset = 0 + for prev_step, next_step in zip(point_steps, point_steps[1:]): + if next_step == (prev_step + 1) or next_step <= 2: + monotonic_or_small_reset += 1 + return monotonic_or_small_reset >= max(2, len(point_steps) - 3) class SweepReader(threading.Thread): @@ -40,7 +90,7 @@ class SweepReader(threading.Thread): self._port_path = port_path self._baud = int(baud) self._queue = out_queue - self._stop = stop_event + self._stop_event = stop_event self._fancy = bool(fancy) self._bin_mode = bool(bin_mode) self._logscale = bool(logscale) @@ -62,6 +112,42 @@ class SweepReader(threading.Thread): return LegacyBinaryParser(), SweepAssembler(fancy=self._fancy, apply_inversion=True) return AsciiSweepParser(), SweepAssembler(fancy=self._fancy, apply_inversion=True) + @staticmethod + def _consume_events(assembler: SweepAssembler, events) -> list[SweepPacket]: + packets: list[SweepPacket] = [] + for event in events: + packet = assembler.consume(event) + if packet is not None: + packets.append(packet) + return packets + + def _probe_parser_16_bit_x2(self, chunk_reader: SerialChunkReader): + parser = LogScale16BitX2BinaryParser() + probe_buf = bytearray() + probe_events: list[ParserEvent] = [] + + while not self._stop_event.is_set() and len(probe_buf) < _PARSER_16_BIT_X2_PROBE_BYTES: + data = chunk_reader.read_available() + if not data: + time.sleep(0.0005) + continue + probe_buf += data + probe_events.extend(parser.feed(data)) + if _is_valid_parser_16_bit_x2_probe(probe_events): + assembler = SweepAssembler(fancy=self._fancy, apply_inversion=False) + probe_packets = self._consume_events(assembler, probe_events) + return parser, assembler, probe_packets + + if probe_buf and _looks_like_legacy_8byte_stream(bytes(probe_buf)): + sys.stderr.write("[info] parser_16_bit_x2: fallback -> legacy\n") + parser = LegacyBinaryParser() + assembler = SweepAssembler(fancy=self._fancy, apply_inversion=True) + probe_packets = self._consume_events(assembler, parser.feed(bytes(probe_buf))) + return parser, assembler, probe_packets + + assembler = SweepAssembler(fancy=self._fancy, apply_inversion=False) + return parser, assembler, [] + def _enqueue(self, packet: SweepPacket) -> None: try: self._queue.put_nowait(packet) @@ -83,19 +169,24 @@ class SweepReader(threading.Thread): sys.stderr.write(f"[error] {exc}\n") return - parser, assembler = self._build_parser() - try: chunk_reader = SerialChunkReader(self._src) - while not self._stop.is_set(): + if self._parser_16_bit_x2: + parser, assembler, pending_packets = self._probe_parser_16_bit_x2(chunk_reader) + else: + parser, assembler = self._build_parser() + pending_packets = [] + + for packet in pending_packets: + self._enqueue(packet) + + while not self._stop_event.is_set(): data = chunk_reader.read_available() if not data: time.sleep(0.0005) continue - for event in parser.feed(data): - packet = assembler.consume(event) - if packet is not None: - self._enqueue(packet) + for packet in self._consume_events(assembler, parser.feed(data)): + self._enqueue(packet) packet = assembler.finalize_current() if packet is not None: self._enqueue(packet) diff --git a/tests/test_cli.py b/tests/test_cli.py index 00aeb58..78b666d 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -22,12 +22,14 @@ def _run(*args: str) -> subprocess.CompletedProcess[str]: class CliTests(unittest.TestCase): - def test_logscale_is_opt_in(self): + def test_logscale_and_opengl_are_opt_in(self): args = build_parser().parse_args(["/dev/null"]) self.assertFalse(args.logscale) + self.assertFalse(args.opengl) - args_log = build_parser().parse_args(["/dev/null", "--logscale"]) + args_log = build_parser().parse_args(["/dev/null", "--logscale", "--opengl"]) self.assertTrue(args_log.logscale) + self.assertTrue(args_log.opengl) def test_wrapper_help_works(self): proc = _run("RFG_ADC_dataplotter.py", "--help") @@ -41,6 +43,7 @@ class CliTests(unittest.TestCase): self.assertIn("usage:", proc.stdout) self.assertIn("--parser_16_bit_x2", proc.stdout) self.assertIn("--parser_complex_ascii", proc.stdout) + self.assertIn("--opengl", proc.stdout) def test_backend_mpl_reports_removal(self): proc = _run("-m", "rfg_adc_plotter.main", "/dev/null", "--backend", "mpl") diff --git a/tests/test_sweep_reader.py b/tests/test_sweep_reader.py new file mode 100644 index 0000000..bbe1dac --- /dev/null +++ b/tests/test_sweep_reader.py @@ -0,0 +1,152 @@ +from __future__ import annotations + +import contextlib +import io +import threading +import time +import unittest +from queue import Queue +from unittest.mock import patch + +from rfg_adc_plotter.io import sweep_reader as sweep_reader_module +from rfg_adc_plotter.io.sweep_reader import SweepReader, _PARSER_16_BIT_X2_PROBE_BYTES + + +def _u16le(word: int) -> bytes: + value = int(word) & 0xFFFF + return bytes((value & 0xFF, (value >> 8) & 0xFF)) + + +def _pack_legacy_point(ch: int, step: int, value_i32: int) -> bytes: + value = int(value_i32) & 0xFFFF_FFFF + return b"".join( + [ + _u16le(step), + _u16le((value >> 16) & 0xFFFF), + _u16le(value & 0xFFFF), + bytes((0x0A, int(ch) & 0xFF)), + ] + ) + + +def _pack_log16_start(ch: int) -> bytes: + return b"\xff\xff" * 3 + bytes((0x0A, int(ch) & 0xFF)) + + +def _pack_log16_point(step: int, real: int, imag: int) -> bytes: + return b"".join( + [ + _u16le(step), + _u16le(real), + _u16le(imag), + _u16le(0xFFFF), + ] + ) + + +def _chunk_bytes(data: bytes, size: int = 4096) -> list[bytes]: + return [data[idx : idx + size] for idx in range(0, len(data), size)] + + +class _FakeSerialLineSource: + def __init__(self, path: str, baud: int, timeout: float = 1.0): + self.path = path + self.baud = baud + self.timeout = timeout + self._using = "fake" + + def close(self) -> None: + pass + + +class _FakeChunkReader: + payload_chunks: list[bytes] = [] + + def __init__(self, src): + self._src = src + self._chunks = list(type(self).payload_chunks) + + def read_available(self) -> bytes: + if self._chunks: + return self._chunks.pop(0) + return b"" + + +class SweepReaderTests(unittest.TestCase): + def _start_reader(self, payload: bytes, **reader_kwargs): + queue: Queue = Queue() + stop_event = threading.Event() + stderr = io.StringIO() + _FakeChunkReader.payload_chunks = _chunk_bytes(payload) + reader = SweepReader( + "/tmp/fake-tty", + 115200, + queue, + stop_event, + **reader_kwargs, + ) + stack = contextlib.ExitStack() + stack.enter_context(patch.object(sweep_reader_module, "SerialLineSource", _FakeSerialLineSource)) + stack.enter_context(patch.object(sweep_reader_module, "SerialChunkReader", _FakeChunkReader)) + stack.enter_context(contextlib.redirect_stderr(stderr)) + reader.start() + return stack, reader, queue, stop_event, stderr + + def test_parser_16_bit_x2_falls_back_to_legacy_stream(self): + payload = bytearray() + while len(payload) < (_PARSER_16_BIT_X2_PROBE_BYTES + 24): + payload += _pack_legacy_point(3, 1, -2) + payload += _pack_legacy_point(3, 2, -3) + payload += _pack_legacy_point(3, 1, -4) + + stack, reader, queue, stop_event, stderr = self._start_reader(bytes(payload), parser_16_bit_x2=True) + try: + sweep, info, aux = queue.get(timeout=2.0) + self.assertEqual(info["ch"], 3) + self.assertIsNone(aux) + self.assertGreaterEqual(sweep.shape[0], 3) + self.assertIn("fallback -> legacy", stderr.getvalue()) + finally: + stop_event.set() + reader.join(timeout=1.0) + stack.close() + + def test_parser_16_bit_x2_keeps_true_complex_stream(self): + payload = b"".join( + [ + _pack_log16_start(2), + _pack_log16_point(1, 3, 4), + _pack_log16_point(2, 5, 12), + _pack_log16_point(1, 8, 15), + ] + ) + + stack, reader, queue, stop_event, stderr = self._start_reader(payload, parser_16_bit_x2=True) + try: + sweep, info, aux = queue.get(timeout=1.0) + self.assertEqual(info["ch"], 2) + self.assertIsNotNone(aux) + self.assertAlmostEqual(float(sweep[1]), 5.0, places=6) + self.assertAlmostEqual(float(sweep[2]), 13.0, places=6) + self.assertNotIn("fallback -> legacy", stderr.getvalue()) + finally: + stop_event.set() + reader.join(timeout=1.0) + stack.close() + + def test_reader_join_does_not_raise_when_stopped(self): + stack, reader, _queue, stop_event, _stderr = self._start_reader(b"", parser_16_bit_x2=True) + try: + time.sleep(0.01) + stop_event.set() + reader.join(timeout=1.0) + self.assertFalse(reader.is_alive()) + finally: + stop_event.set() + if reader.is_alive(): + reader.join(timeout=1.0) + stack.close() + + +if __name__ == "__main__": + unittest.main()