ch1 ch2 new

This commit is contained in:
awe
2026-05-29 17:15:32 +03:00
parent 5591e80c53
commit 08dc6b3a1f
5 changed files with 328 additions and 27 deletions

View File

@ -588,5 +588,114 @@ class SweepParserCoreTests(unittest.TestCase):
self.assertAlmostEqual(float(sweep_2[1]), 20.0, places=6)
def test_legacy_binary_parser_accepts_secondary_0xa8_stream(self):
parser = LegacyBinaryParser()
stream = b"".join(
[
_pack_tty_start(),
_pack_tty_point(1, 5, 0),
_pack_tty_tagged_point(0x00A8, 1, 0xFFCD, 0xFFDC),
_pack_tty_point(2, 0xFFF8, 1),
_pack_tty_tagged_point(0x00A8, 2, 0xFFCE, 0xFFE9),
]
)
events = parser.feed(stream)
start_events = [e for e in events if isinstance(e, StartEvent)]
self.assertEqual(len(start_events), 1)
point_events = [e for e in events if isinstance(e, PointEvent)]
primary = [e for e in point_events if not e.is_secondary]
secondary = [e for e in point_events if e.is_secondary]
self.assertEqual(len(primary), 2)
self.assertEqual(len(secondary), 2)
self.assertEqual(secondary[0].x, 1)
self.assertEqual(secondary[0].aux, (-51.0, -36.0))
self.assertTrue(secondary[0].is_secondary)
self.assertEqual(secondary[1].x, 2)
self.assertEqual(secondary[1].aux, (-50.0, -23.0))
def test_secondary_0xa8_does_not_trigger_sweep_reset(self):
parser = LegacyBinaryParser()
stream = b"".join(
[
_pack_tty_start(),
_pack_tty_point(1, 5, 0),
_pack_tty_tagged_point(0x00A8, 1, 100, 200),
_pack_tty_point(2, 6, 0),
_pack_tty_tagged_point(0x00A8, 2, 110, 210),
]
)
events = parser.feed(stream)
start_events = [e for e in events if isinstance(e, StartEvent)]
self.assertEqual(len(start_events), 1)
def test_legacy_binary_parser_batch_handles_interleaved_secondary(self):
parser = LegacyBinaryParser(batch_events=True)
stream = b"".join(
[
_pack_tty_start(),
_pack_tty_point(1, 5, 0),
_pack_tty_tagged_point(0x00A8, 1, 100, 200),
_pack_tty_point(2, 6, 0),
_pack_tty_tagged_point(0x00A8, 2, 110, 210),
]
)
events = parser.feed(stream)
batch_events = [e for e in events if isinstance(e, BatchPointEvent)]
primary_batches = [e for e in batch_events if not e.is_secondary]
secondary_batches = [e for e in batch_events if e.is_secondary]
self.assertTrue(len(primary_batches) >= 1)
self.assertTrue(len(secondary_batches) >= 1)
pb = primary_batches[0]
self.assertTrue(np.array_equal(pb.xs, np.array([1, 2], dtype=np.int64)))
self.assertFalse(pb.is_secondary)
sb = secondary_batches[0]
self.assertTrue(np.array_equal(sb.xs, np.array([1, 2], dtype=np.int64)))
self.assertTrue(sb.is_secondary)
def test_sweep_assembler_packages_secondary_payload(self):
assembler = SweepAssembler(fancy=False, apply_inversion=False)
assembler.consume(StartEvent(ch=0, signal_kind="bin_iq"))
assembler.consume(PointEvent(ch=0, x=1, y=25.0, aux=(5.0, 0.0), signal_kind="bin_iq"))
assembler.consume(
PointEvent(ch=0, x=1, y=0.0, aux=(-51.0, -36.0), signal_kind="bin_iq", is_secondary=True)
)
assembler.consume(PointEvent(ch=0, x=2, y=65.0, aux=(-8.0, 1.0), signal_kind="bin_iq"))
assembler.consume(
PointEvent(ch=0, x=2, y=0.0, aux=(-50.0, -23.0), signal_kind="bin_iq", is_secondary=True)
)
sweep, info, aux = assembler.finalize_current()
self.assertEqual(info["signal_kind"], "bin_iq")
self.assertAlmostEqual(float(sweep[1]), 25.0, places=6)
self.assertAlmostEqual(float(sweep[2]), 65.0, places=6)
payload = info.get("_secondary_payload")
self.assertIsNotNone(payload)
self.assertIn("ch1", payload)
self.assertIn("ch2", payload)
self.assertAlmostEqual(float(payload["ch1"][1]), -51.0, places=6)
self.assertAlmostEqual(float(payload["ch2"][1]), -36.0, places=6)
self.assertAlmostEqual(float(payload["ch1"][2]), -50.0, places=6)
self.assertAlmostEqual(float(payload["ch2"][2]), -23.0, places=6)
def test_sweep_assembler_secondary_absent_when_no_0xa8_data(self):
assembler = SweepAssembler(fancy=False, apply_inversion=False)
assembler.consume(StartEvent(ch=0, signal_kind="bin_iq"))
assembler.consume(PointEvent(ch=0, x=1, y=25.0, aux=(5.0, 0.0), signal_kind="bin_iq"))
assembler.consume(PointEvent(ch=0, x=2, y=65.0, aux=(-8.0, 1.0), signal_kind="bin_iq"))
sweep, info, aux = assembler.finalize_current()
self.assertNotIn("_secondary_payload", info)
if __name__ == "__main__":
unittest.main()