test new variant

This commit is contained in:
awe
2026-04-28 19:32:10 +03:00
parent ffb7dc3f25
commit 9ff97bf737
11 changed files with 813 additions and 33 deletions

View File

@ -46,6 +46,7 @@ class CliTests(unittest.TestCase):
self.assertIn("--parser_16_bit_x2", proc.stdout)
self.assertIn("--parser_complex_ascii", proc.stdout)
self.assertIn("--opengl", proc.stdout)
self.assertIn("0x00A3/0x00A4", proc.stdout)
def test_backend_mpl_reports_removal(self):
proc = _run("-m", "rfg_adc_plotter.main", "/dev/null", "--backend", "mpl")

View File

@ -15,6 +15,8 @@ from rfg_adc_plotter.gui.pyqtgraph_backend import (
coalesce_packets_for_ui,
compute_background_subtracted_bscan_levels,
compute_aux_phase_curve,
compute_do1_tagged_aggregate,
compute_do1_tagged_phase_curves,
convert_tty_i16_to_voltage,
decimate_bscan_rows_for_display,
decimate_curve_for_display,
@ -30,6 +32,7 @@ from rfg_adc_plotter.gui.pyqtgraph_backend import (
set_image_rect_if_ready,
resolve_visible_fft_curves,
resolve_visible_aux_curves,
resolve_visible_do1_tagged_aux_curves,
)
from rfg_adc_plotter.processing.calibration import (
build_calib_envelope,
@ -316,6 +319,53 @@ class ProcessingTests(unittest.TestCase):
self.assertEqual(phase.shape, expected.shape)
self.assertTrue(np.allclose(phase, expected, atol=1e-6))
def test_compute_do1_tagged_aggregate_nanmean_merges_low_and_high(self):
low = np.asarray([1.0, np.nan, 5.0, np.nan], dtype=np.float32)
high = np.asarray([3.0, 7.0, np.nan, np.nan], dtype=np.float32)
merged = compute_do1_tagged_aggregate(low, high)
self.assertIsNotNone(merged)
self.assertTrue(np.allclose(merged[:3], np.asarray([2.0, 7.0, 5.0], dtype=np.float32), equal_nan=True))
self.assertTrue(np.isnan(merged[3]))
def test_resolve_visible_do1_tagged_aux_curves_obeys_checkbox_state(self):
aux_low = (
np.asarray([1.0, 2.0], dtype=np.float32),
np.asarray([3.0, 4.0], dtype=np.float32),
)
aux_high = (
np.asarray([5.0, 6.0], dtype=np.float32),
np.asarray([7.0, 8.0], dtype=np.float32),
)
hidden_low, hidden_high = resolve_visible_do1_tagged_aux_curves(aux_low, aux_high, enabled=False)
self.assertIsNone(hidden_low)
self.assertIsNone(hidden_high)
visible_low, visible_high = resolve_visible_do1_tagged_aux_curves(aux_low, aux_high, enabled=True)
self.assertIsNotNone(visible_low)
self.assertIsNotNone(visible_high)
self.assertTrue(np.allclose(visible_low[0], aux_low[0]))
self.assertTrue(np.allclose(visible_high[1], aux_high[1]))
def test_compute_do1_tagged_phase_curves_returns_two_independent_series(self):
aux_low = (
np.asarray([1.0, 1.0], dtype=np.float32),
np.asarray([0.0, 1.0], dtype=np.float32),
)
aux_high = (
np.asarray([1.0, -1.0], dtype=np.float32),
np.asarray([1.0, 1.0], dtype=np.float32),
)
phase_low, phase_high = compute_do1_tagged_phase_curves(aux_low, aux_high)
self.assertIsNotNone(phase_low)
self.assertIsNotNone(phase_high)
self.assertTrue(np.allclose(phase_low, np.asarray([0.0, np.pi / 4.0], dtype=np.float32), atol=1e-6))
self.assertTrue(np.allclose(phase_high, np.asarray([np.pi / 4.0, 3.0 * np.pi / 4.0], dtype=np.float32), atol=1e-6))
def test_decimate_curve_for_display_preserves_small_series(self):
xs = np.linspace(3.3, 14.3, 64, dtype=np.float64)
ys = np.linspace(-1.0, 1.0, 64, dtype=np.float32)

View File

@ -87,6 +87,25 @@ def _pack_tty_point(step: int, ch1: int, ch2: int) -> bytes:
)
def _pack_tty_tagged_point(marker_word0: int, step: int, ch1: int, ch2: int) -> bytes:
return b"".join(
[
_u16le(marker_word0),
_u16le(step),
_u16le(ch1),
_u16le(ch2),
]
)
def _pack_tty_tagged_low_point(step: int, ch1: int, ch2: int) -> bytes:
return _pack_tty_tagged_point(0x00A3, step, ch1, ch2)
def _pack_tty_tagged_high_point(step: int, ch1: int, ch2: int) -> bytes:
return _pack_tty_tagged_point(0x00A4, step, ch1, ch2)
def _pack_logdet_point(step: int, value: int) -> bytes:
return b"".join(
[
@ -189,6 +208,78 @@ class SweepParserCoreTests(unittest.TestCase):
self.assertEqual(events[4].aux, (120.0, 80.0))
self.assertEqual(events[4].signal_kind, "bin_iq")
def test_legacy_binary_parser_accepts_tty_do1_tagged_stream(self):
parser = LegacyBinaryParser()
stream = b"".join(
[
_pack_tty_start(),
_pack_tty_tagged_low_point(1, 100, 90),
_pack_tty_tagged_high_point(1, 120, 95),
]
)
events = parser.feed(stream)
self.assertEqual(len(events), 3)
self.assertIsInstance(events[0], StartEvent)
self.assertEqual(events[0].signal_kind, "bin_iq")
self.assertIsInstance(events[1], PointEvent)
self.assertEqual(events[1].signal_kind, "bin_iq_do1_tagged")
self.assertEqual(events[1].do1_level, "low")
self.assertEqual(events[1].x, 1)
self.assertEqual(events[1].aux, (100.0, 90.0))
self.assertIsInstance(events[2], PointEvent)
self.assertEqual(events[2].signal_kind, "bin_iq_do1_tagged")
self.assertEqual(events[2].do1_level, "high")
self.assertEqual(events[2].x, 1)
self.assertEqual(events[2].aux, (120.0, 95.0))
def test_legacy_binary_parser_keeps_same_step_for_different_do1_levels_in_one_sweep(self):
parser = LegacyBinaryParser()
stream = b"".join(
[
_pack_tty_start(),
_pack_tty_tagged_low_point(1, 100, 90),
_pack_tty_tagged_high_point(1, 120, 95),
_pack_tty_tagged_low_point(2, 130, 80),
_pack_tty_tagged_high_point(2, 140, 75),
]
)
events = parser.feed(stream)
start_events = [event for event in events if isinstance(event, StartEvent)]
self.assertEqual(len(start_events), 1)
self.assertEqual(start_events[0].signal_kind, "bin_iq")
point_levels = [event.do1_level for event in events if isinstance(event, PointEvent)]
self.assertEqual(point_levels, ["low", "high", "low", "high"])
def test_legacy_binary_parser_resets_tagged_stream_only_on_same_level_step_reset(self):
parser = LegacyBinaryParser()
stream = b"".join(
[
_pack_tty_start(),
_pack_tty_tagged_low_point(1, 100, 90),
_pack_tty_tagged_high_point(1, 120, 95),
_pack_tty_tagged_low_point(2, 130, 80),
_pack_tty_tagged_high_point(2, 140, 75),
_pack_tty_tagged_low_point(1, 110, 85),
]
)
events = parser.feed(stream)
self.assertIsInstance(events[0], StartEvent)
self.assertIsInstance(events[1], PointEvent)
self.assertIsInstance(events[2], PointEvent)
self.assertIsInstance(events[3], PointEvent)
self.assertIsInstance(events[4], PointEvent)
self.assertIsInstance(events[5], StartEvent)
self.assertEqual(events[5].signal_kind, "bin_iq_do1_tagged")
self.assertIsInstance(events[6], PointEvent)
self.assertEqual(events[6].do1_level, "low")
self.assertEqual(events[6].x, 1)
def test_legacy_binary_parser_tty_mode_does_not_flip_to_legacy_on_ch2_low_byte_0x0a(self):
parser = LegacyBinaryParser()
stream = b"".join(
@ -377,6 +468,40 @@ class SweepParserCoreTests(unittest.TestCase):
self.assertEqual(aux[0][1], 100.0)
self.assertEqual(aux[1][2], 95.0)
def test_sweep_assembler_builds_tagged_payload_and_nanmean_aggregate(self):
assembler = SweepAssembler(fancy=False, apply_inversion=False)
self.assertIsNone(assembler.consume(StartEvent(ch=0, signal_kind="bin_iq_do1_tagged")))
assembler.consume(
PointEvent(
ch=0,
x=1,
y=10.0,
aux=(100.0, 90.0),
signal_kind="bin_iq_do1_tagged",
do1_level="low",
)
)
assembler.consume(
PointEvent(
ch=0,
x=1,
y=20.0,
aux=(120.0, 95.0),
signal_kind="bin_iq_do1_tagged",
do1_level="high",
)
)
sweep, info, aux = assembler.finalize_current()
self.assertIsNone(aux)
self.assertEqual(info["signal_kind"], "bin_iq_do1_tagged")
self.assertAlmostEqual(float(sweep[1]), 15.0, places=6)
payload = info.get("_do1_tagged_payload")
self.assertIsInstance(payload, dict)
self.assertIn("raw_low", payload)
self.assertIn("raw_high", payload)
self.assertIn("aux_low", payload)
self.assertIn("aux_high", payload)
def test_sweep_assembler_splits_packet_on_channel_switch(self):
assembler = SweepAssembler(fancy=False, apply_inversion=False)
self.assertIsNone(assembler.consume(PointEvent(ch=1, x=1, y=10.0)))

View File

@ -66,6 +66,25 @@ def _pack_tty_point(step: int, ch1: int, ch2: int) -> bytes:
)
def _pack_tty_tagged_point(marker_word0: int, step: int, ch1: int, ch2: int) -> bytes:
return b"".join(
[
_u16le(marker_word0),
_u16le(step),
_u16le(ch1),
_u16le(ch2),
]
)
def _pack_tty_tagged_low(step: int, ch1: int, ch2: int) -> bytes:
return _pack_tty_tagged_point(0x00A3, step, ch1, ch2)
def _pack_tty_tagged_high(step: int, ch1: int, ch2: int) -> bytes:
return _pack_tty_tagged_point(0x00A4, step, ch1, ch2)
def _pack_logdet_point(step: int, value: int) -> bytes:
return b"".join(
[
@ -166,6 +185,29 @@ class SweepReaderTests(unittest.TestCase):
reader.join(timeout=1.0)
stack.close()
def test_parser_16_bit_x2_falls_back_to_tty_do1_tagged_stream(self):
payload = bytearray()
while len(payload) < (_PARSER_16_BIT_X2_PROBE_BYTES + 40):
payload += _pack_tty_start()
payload += _pack_tty_tagged_low(1, 100, 90)
payload += _pack_tty_tagged_high(1, 120, 95)
payload += _pack_tty_tagged_low(2, 110, 80)
payload += _pack_tty_tagged_high(2, 130, 70)
payload += _pack_tty_tagged_low(1, 105, 85)
stack, reader, queue, stop_event, stderr = self._start_reader(bytes(payload), parser_16_bit_x2=True)
try:
sweep, info, aux = queue.get(timeout=2.0)
self.assertEqual(info["signal_kind"], "bin_iq_do1_tagged")
self.assertIsNone(aux)
self.assertIn("_do1_tagged_payload", info)
self.assertGreaterEqual(sweep.shape[0], 2)
self.assertIn("fallback -> legacy", stderr.getvalue())
finally:
stop_event.set()
reader.join(timeout=1.0)
stack.close()
def test_parser_16_bit_x2_keeps_true_complex_stream(self):
payload = b"".join(
[