From 339cb85dce5710fe5f2398d1a03b4163f7bdef3d Mon Sep 17 00:00:00 2001 From: awe Date: Thu, 9 Apr 2026 18:43:50 +0300 Subject: [PATCH] new e502 adc --- rfg_adc_plotter/cli.py | 1 - rfg_adc_plotter/io/sweep_parser_core.py | 24 +++++++++++ tests/test_cli.py | 9 ++++ tests/test_sweep_parser_core.py | 56 +++++++++++++++++++++++++ 4 files changed, 89 insertions(+), 1 deletion(-) diff --git a/rfg_adc_plotter/cli.py b/rfg_adc_plotter/cli.py index e7c2bb0..85323e5 100644 --- a/rfg_adc_plotter/cli.py +++ b/rfg_adc_plotter/cli.py @@ -73,7 +73,6 @@ def build_parser() -> argparse.ArgumentParser: parser.add_argument( "--logscale", action="store_true", - default=True, help=( "Новый бинарный протокол: точка несет пару int32 (avg_1, avg_2), " "а свип считается как |10**(avg_1*0.001) - 10**(avg_2*0.001)|" diff --git a/rfg_adc_plotter/io/sweep_parser_core.py b/rfg_adc_plotter/io/sweep_parser_core.py index f42a1b4..a1f800c 100644 --- a/rfg_adc_plotter/io/sweep_parser_core.py +++ b/rfg_adc_plotter/io/sweep_parser_core.py @@ -143,6 +143,8 @@ class LegacyBinaryParser: def __init__(self): self._buf = bytearray() + self._last_step: Optional[int] = None + self._seen_points = False @staticmethod def _u16_at(buf: bytearray, offset: int) -> int: @@ -157,11 +159,17 @@ class LegacyBinaryParser: w1 = self._u16_at(self._buf, 2) w2 = self._u16_at(self._buf, 4) if w0 == 0xFFFF and w1 == 0xFFFF and w2 == 0xFFFF and self._buf[6] == 0x0A: + self._last_step = None + self._seen_points = False events.append(StartEvent(ch=int(self._buf[7]))) del self._buf[:8] continue if self._buf[6] == 0x0A: ch = int(self._buf[7]) + if self._seen_points and self._last_step is not None and w0 <= self._last_step: + events.append(StartEvent(ch=ch)) + self._seen_points = True + self._last_step = int(w0) value = u32_to_i32((w1 << 16) | w2) events.append(PointEvent(ch=ch, x=int(w0), y=float(value))) del self._buf[:8] @@ -175,6 +183,8 @@ class LogScaleBinaryParser32: def __init__(self): self._buf = bytearray() + self._last_step: Optional[int] = None + self._seen_points = False @staticmethod def _u16_at(buf: bytearray, offset: int) -> int: @@ -187,11 +197,17 @@ class LogScaleBinaryParser32: while len(self._buf) >= 12: words = [self._u16_at(self._buf, idx * 2) for idx in range(6)] if words[0:5] == [0xFFFF] * 5 and (words[5] & 0x00FF) == 0x000A: + self._last_step = None + self._seen_points = False events.append(StartEvent(ch=int((words[5] >> 8) & 0x00FF))) del self._buf[:12] continue if (words[5] & 0x00FF) == 0x000A and words[0] != 0xFFFF: ch = int((words[5] >> 8) & 0x00FF) + if self._seen_points and self._last_step is not None and words[0] <= self._last_step: + events.append(StartEvent(ch=ch)) + self._seen_points = True + self._last_step = int(words[0]) avg_1 = u32_to_i32((words[1] << 16) | words[2]) avg_2 = u32_to_i32((words[3] << 16) | words[4]) events.append( @@ -214,6 +230,8 @@ class LogScale16BitX2BinaryParser: def __init__(self): self._buf = bytearray() self._current_channel = 0 + self._last_step: Optional[int] = None + self._seen_points = False @staticmethod def _u16_at(buf: bytearray, offset: int) -> int: @@ -227,10 +245,16 @@ class LogScale16BitX2BinaryParser: words = [self._u16_at(self._buf, idx * 2) for idx in range(4)] if words[0:3] == [0xFFFF, 0xFFFF, 0xFFFF] and (words[3] & 0x00FF) == 0x000A: self._current_channel = int((words[3] >> 8) & 0x00FF) + self._last_step = None + self._seen_points = False events.append(StartEvent(ch=self._current_channel)) del self._buf[:8] continue if words[3] == 0xFFFF and words[0] != 0xFFFF: + if self._seen_points and self._last_step is not None and words[0] <= self._last_step: + events.append(StartEvent(ch=self._current_channel)) + self._seen_points = True + self._last_step = int(words[0]) real = u16_to_i16(words[1]) imag = u16_to_i16(words[2]) events.append( diff --git a/tests/test_cli.py b/tests/test_cli.py index 41e4f83..00aeb58 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -5,6 +5,8 @@ import sys import unittest from pathlib import Path +from rfg_adc_plotter.cli import build_parser + ROOT = Path(__file__).resolve().parents[1] @@ -20,6 +22,13 @@ def _run(*args: str) -> subprocess.CompletedProcess[str]: class CliTests(unittest.TestCase): + def test_logscale_is_opt_in(self): + args = build_parser().parse_args(["/dev/null"]) + self.assertFalse(args.logscale) + + args_log = build_parser().parse_args(["/dev/null", "--logscale"]) + self.assertTrue(args_log.logscale) + def test_wrapper_help_works(self): proc = _run("RFG_ADC_dataplotter.py", "--help") self.assertEqual(proc.returncode, 0) diff --git a/tests/test_sweep_parser_core.py b/tests/test_sweep_parser_core.py index 836f6f2..2160c0b 100644 --- a/tests/test_sweep_parser_core.py +++ b/tests/test_sweep_parser_core.py @@ -97,6 +97,24 @@ class SweepParserCoreTests(unittest.TestCase): self.assertEqual(events[1].x, 1) self.assertEqual(events[1].y, -2.0) + def test_legacy_binary_parser_detects_new_sweep_on_step_reset(self): + parser = LegacyBinaryParser() + stream = b"".join( + [ + _pack_legacy_point(3, 1, -2), + _pack_legacy_point(3, 2, -3), + _pack_legacy_point(3, 1, -4), + ] + ) + events = parser.feed(stream) + self.assertIsInstance(events[0], PointEvent) + self.assertIsInstance(events[1], PointEvent) + self.assertIsInstance(events[2], StartEvent) + self.assertEqual(events[2].ch, 3) + self.assertIsInstance(events[3], PointEvent) + self.assertEqual(events[3].x, 1) + self.assertEqual(events[3].y, -4.0) + def test_complex_ascii_parser_detects_new_sweep_on_step_reset(self): parser = ComplexAsciiSweepParser() events = parser.feed(b"0 3 4\n1 5 12\n0 8 15\n") @@ -123,6 +141,24 @@ class SweepParserCoreTests(unittest.TestCase): self.assertAlmostEqual(events[1].y, log_pair_to_sweep(1500, 700), places=6) self.assertEqual(events[1].aux, (1500.0, 700.0)) + def test_logscale_32_parser_detects_new_sweep_on_step_reset(self): + parser = LogScaleBinaryParser32() + stream = b"".join( + [ + _pack_log_point(1, 1500, 700, ch=5), + _pack_log_point(2, 1400, 650, ch=5), + _pack_log_point(1, 1300, 600, ch=5), + ] + ) + events = parser.feed(stream) + self.assertIsInstance(events[0], PointEvent) + self.assertIsInstance(events[1], PointEvent) + self.assertIsInstance(events[2], StartEvent) + self.assertEqual(events[2].ch, 5) + self.assertIsInstance(events[3], PointEvent) + self.assertEqual(events[3].x, 1) + self.assertAlmostEqual(events[3].y, log_pair_to_sweep(1300, 600), places=6) + def test_log_pair_to_sweep_is_order_independent(self): self.assertAlmostEqual(log_pair_to_sweep(1500, 700), log_pair_to_sweep(700, 1500), places=6) @@ -137,6 +173,26 @@ class SweepParserCoreTests(unittest.TestCase): self.assertAlmostEqual(events[1].y, math.hypot(100.0, 90.0), places=6) self.assertEqual(events[1].aux, (100.0, 90.0)) + def test_logscale_16bit_parser_detects_new_sweep_on_step_reset(self): + parser = LogScale16BitX2BinaryParser() + stream = b"".join( + [ + _pack_log16_start(2), + _pack_log16_point(1, 100, 90), + _pack_log16_point(2, 110, 95), + _pack_log16_point(1, 120, 80), + ] + ) + events = parser.feed(stream) + self.assertIsInstance(events[0], StartEvent) + self.assertIsInstance(events[1], PointEvent) + self.assertIsInstance(events[2], PointEvent) + self.assertIsInstance(events[3], StartEvent) + self.assertEqual(events[3].ch, 2) + self.assertIsInstance(events[4], PointEvent) + self.assertEqual(events[4].x, 1) + self.assertAlmostEqual(events[4].y, math.hypot(120.0, 80.0), places=6) + def test_parser_test_stream_parser_recovers_point_after_single_separator(self): parser = ParserTestStreamParser() stream = b"".join(