diff --git a/rfg_adc_plotter/io/sweep_parser_core.py b/rfg_adc_plotter/io/sweep_parser_core.py index ec7dce4..d6942ff 100644 --- a/rfg_adc_plotter/io/sweep_parser_core.py +++ b/rfg_adc_plotter/io/sweep_parser_core.py @@ -150,11 +150,50 @@ class LegacyBinaryParser: self._buf = bytearray() self._last_step: Optional[int] = None self._seen_points = False + self._mode: Optional[str] = None @staticmethod def _u16_at(buf: bytearray, offset: int) -> int: return int(buf[offset]) | (int(buf[offset + 1]) << 8) + def _emit_legacy_start(self, events: List[ParserEvent], ch: int) -> None: + self._mode = "legacy" + self._last_step = None + self._seen_points = False + events.append(StartEvent(ch=int(ch))) + + def _emit_tty_start(self, events: List[ParserEvent]) -> None: + self._mode = "tty" + self._last_step = None + self._seen_points = False + events.append(StartEvent(ch=0)) + + def _emit_legacy_point(self, events: List[ParserEvent], step: int, value_word_hi: int, value_word_lo: int, ch: int) -> None: + self._mode = "legacy" + if self._seen_points and self._last_step is not None and step <= self._last_step: + events.append(StartEvent(ch=int(ch))) + self._seen_points = True + self._last_step = int(step) + value = u32_to_i32((int(value_word_hi) << 16) | int(value_word_lo)) + events.append(PointEvent(ch=int(ch), x=int(step), y=float(value))) + + def _emit_tty_point(self, events: List[ParserEvent], step: int, ch_1_word: int, ch_2_word: int) -> None: + self._mode = "tty" + if self._seen_points and self._last_step is not None and step <= self._last_step: + events.append(StartEvent(ch=0)) + self._seen_points = True + self._last_step = int(step) + ch_1 = u16_to_i16(int(ch_1_word)) + ch_2 = u16_to_i16(int(ch_2_word)) + events.append( + PointEvent( + ch=0, + x=int(step), + y=tty_ch_pair_to_sweep(ch_1, ch_2), + aux=(float(ch_1), float(ch_2)), + ) + ) + def feed(self, data: bytes) -> List[ParserEvent]: if data: self._buf += data @@ -164,45 +203,50 @@ class LegacyBinaryParser: w1 = self._u16_at(self._buf, 2) w2 = self._u16_at(self._buf, 4) w3 = self._u16_at(self._buf, 6) - if w0 == 0xFFFF and w1 == 0xFFFF and w2 == 0xFFFF and self._buf[6] == 0x0A: - self._last_step = None - self._seen_points = False - events.append(StartEvent(ch=int(self._buf[7]))) + + is_legacy_start = (w0 == 0xFFFF and w1 == 0xFFFF and w2 == 0xFFFF and self._buf[6] == 0x0A) + is_tty_start = (w0 == 0x000A and w1 == 0xFFFF and w2 == 0xFFFF and w3 == 0xFFFF) + is_legacy_point = (self._buf[6] == 0x0A and w0 != 0xFFFF) + is_tty_point = (w0 == 0x000A and w1 != 0xFFFF) + + if is_legacy_start: + self._emit_legacy_start(events, ch=int(self._buf[7])) del self._buf[:8] continue - if self._buf[6] == 0x0A: - ch = int(self._buf[7]) - if self._seen_points and self._last_step is not None and w0 <= self._last_step: - events.append(StartEvent(ch=ch)) - self._seen_points = True - self._last_step = int(w0) - value = u32_to_i32((w1 << 16) | w2) - events.append(PointEvent(ch=ch, x=int(w0), y=float(value))) + + if is_tty_start: + self._emit_tty_start(events) del self._buf[:8] continue - if w0 == 0x000A and w1 == 0xFFFF and w2 == 0xFFFF and w3 == 0xFFFF: - self._last_step = None - self._seen_points = False - events.append(StartEvent(ch=0)) + + if self._mode == "legacy": + if is_legacy_point: + self._emit_legacy_point(events, step=int(w0), value_word_hi=int(w1), value_word_lo=int(w2), ch=int(self._buf[7])) + del self._buf[:8] + continue + del self._buf[:1] + continue + + if self._mode == "tty": + if is_tty_point: + self._emit_tty_point(events, step=int(w1), ch_1_word=int(w2), ch_2_word=int(w3)) + del self._buf[:8] + continue + del self._buf[:1] + continue + + # Mode is still unknown. Accept only unambiguous point shapes to avoid + # jumping between tty and legacy interpretations on coincidental bytes. + if is_tty_point and (not is_legacy_point): + self._emit_tty_point(events, step=int(w1), ch_1_word=int(w2), ch_2_word=int(w3)) del self._buf[:8] continue - if w0 == 0x000A and w1 != 0xFFFF: - if self._seen_points and self._last_step is not None and w1 <= self._last_step: - events.append(StartEvent(ch=0)) - self._seen_points = True - self._last_step = int(w1) - ch_1 = u16_to_i16(w2) - ch_2 = u16_to_i16(w3) - events.append( - PointEvent( - ch=0, - x=int(w1), - y=tty_ch_pair_to_sweep(ch_1, ch_2), - aux=(float(ch_1), float(ch_2)), - ) - ) + + if is_legacy_point and (not is_tty_point): + self._emit_legacy_point(events, step=int(w0), value_word_hi=int(w1), value_word_lo=int(w2), ch=int(self._buf[7])) del self._buf[:8] continue + del self._buf[:1] return events diff --git a/tests/test_sweep_parser_core.py b/tests/test_sweep_parser_core.py index b9ef49d..ba9a951 100644 --- a/tests/test_sweep_parser_core.py +++ b/tests/test_sweep_parser_core.py @@ -175,6 +175,34 @@ class SweepParserCoreTests(unittest.TestCase): self.assertEqual(events[4].x, 1) self.assertEqual(events[4].aux, (120.0, 80.0)) + def test_legacy_binary_parser_tty_mode_does_not_flip_to_legacy_on_ch2_low_byte_0x0a(self): + parser = LegacyBinaryParser() + stream = b"".join( + [ + _pack_tty_start(), + _pack_tty_point(1, 100, 0x040A), # low byte is 0x0A: used to be misparsed as legacy + _pack_tty_point(2, 120, 0x0410), + ] + ) + + events = parser.feed(stream) + + self.assertEqual(len(events), 3) + self.assertIsInstance(events[0], StartEvent) + self.assertEqual(events[0].ch, 0) + + self.assertIsInstance(events[1], PointEvent) + self.assertEqual(events[1].ch, 0) + self.assertEqual(events[1].x, 1) + self.assertEqual(events[1].aux, (100.0, 1034.0)) + self.assertEqual(events[1].y, 934.0) + + self.assertIsInstance(events[2], PointEvent) + self.assertEqual(events[2].ch, 0) + self.assertEqual(events[2].x, 2) + self.assertEqual(events[2].aux, (120.0, 1040.0)) + self.assertEqual(events[2].y, 920.0) + def test_complex_ascii_parser_detects_new_sweep_on_step_reset(self): parser = ComplexAsciiSweepParser() events = parser.feed(b"0 3 4\n1 5 12\n0 8 15\n")