fix
This commit is contained in:
@ -9,6 +9,7 @@ from rfg_adc_plotter.constants import FFT_LEN, SWEEP_FREQ_MAX_GHZ, SWEEP_FREQ_MI
|
||||
from rfg_adc_plotter.gui.pyqtgraph_backend import (
|
||||
apply_working_range,
|
||||
apply_working_range_to_aux_curves,
|
||||
coalesce_packets_for_ui,
|
||||
compute_background_subtracted_bscan_levels,
|
||||
decimate_curve_for_display,
|
||||
resolve_visible_fft_curves,
|
||||
@ -228,6 +229,30 @@ class ProcessingTests(unittest.TestCase):
|
||||
self.assertAlmostEqual(float(decimated_y[0]), float(ys[0]), places=6)
|
||||
self.assertAlmostEqual(float(decimated_y[-1]), float(ys[-1]), places=6)
|
||||
|
||||
def test_coalesce_packets_for_ui_keeps_newest_packets(self):
|
||||
packets = [
|
||||
(np.asarray([float(idx)], dtype=np.float32), {"sweep": idx}, None)
|
||||
for idx in range(6)
|
||||
]
|
||||
|
||||
kept, skipped = coalesce_packets_for_ui(packets, max_packets=2)
|
||||
|
||||
self.assertEqual(skipped, 4)
|
||||
self.assertEqual(len(kept), 2)
|
||||
self.assertEqual(int(kept[0][1]["sweep"]), 4)
|
||||
self.assertEqual(int(kept[1][1]["sweep"]), 5)
|
||||
|
||||
def test_coalesce_packets_for_ui_never_returns_empty_for_non_empty_input(self):
|
||||
packets = [
|
||||
(np.asarray([1.0], dtype=np.float32), {"sweep": 1}, None),
|
||||
]
|
||||
|
||||
kept, skipped = coalesce_packets_for_ui(packets, max_packets=0)
|
||||
|
||||
self.assertEqual(skipped, 0)
|
||||
self.assertEqual(len(kept), 1)
|
||||
self.assertEqual(int(kept[0][1]["sweep"]), 1)
|
||||
|
||||
def test_background_subtracted_bscan_levels_ignore_zero_floor(self):
|
||||
disp_fft_lin = np.zeros((4, 8), dtype=np.float32)
|
||||
disp_fft_lin[1, 2:6] = np.asarray([0.05, 0.1, 0.5, 2.0], dtype=np.float32)
|
||||
|
||||
@ -72,6 +72,21 @@ def _pack_log16_point(step: int, avg1: int, avg2: int) -> bytes:
|
||||
)
|
||||
|
||||
|
||||
def _pack_tty_start() -> bytes:
|
||||
return b"".join([_u16le(0x000A), _u16le(0xFFFF), _u16le(0xFFFF), _u16le(0xFFFF)])
|
||||
|
||||
|
||||
def _pack_tty_point(step: int, ch1: int, ch2: int) -> bytes:
|
||||
return b"".join(
|
||||
[
|
||||
_u16le(0x000A),
|
||||
_u16le(step),
|
||||
_u16le(ch1),
|
||||
_u16le(ch2),
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
class SweepParserCoreTests(unittest.TestCase):
|
||||
def test_ascii_parser_emits_start_and_points(self):
|
||||
parser = AsciiSweepParser()
|
||||
@ -115,6 +130,51 @@ class SweepParserCoreTests(unittest.TestCase):
|
||||
self.assertEqual(events[3].x, 1)
|
||||
self.assertEqual(events[3].y, -4.0)
|
||||
|
||||
def test_legacy_binary_parser_accepts_tty_ch1_ch2_stream(self):
|
||||
parser = LegacyBinaryParser()
|
||||
stream = b"".join(
|
||||
[
|
||||
_pack_tty_start(),
|
||||
_pack_tty_point(1, 100, 90),
|
||||
_pack_tty_point(2, 120, 95),
|
||||
]
|
||||
)
|
||||
|
||||
events = parser.feed(stream)
|
||||
|
||||
self.assertIsInstance(events[0], StartEvent)
|
||||
self.assertEqual(events[0].ch, 0)
|
||||
self.assertIsInstance(events[1], PointEvent)
|
||||
self.assertEqual(events[1].x, 1)
|
||||
self.assertEqual(events[1].y, 10.0)
|
||||
self.assertEqual(events[1].aux, (100.0, 90.0))
|
||||
self.assertIsInstance(events[2], PointEvent)
|
||||
self.assertEqual(events[2].x, 2)
|
||||
self.assertEqual(events[2].y, 25.0)
|
||||
self.assertEqual(events[2].aux, (120.0, 95.0))
|
||||
|
||||
def test_legacy_binary_parser_detects_new_tty_sweep_on_step_reset(self):
|
||||
parser = LegacyBinaryParser()
|
||||
stream = b"".join(
|
||||
[
|
||||
_pack_tty_start(),
|
||||
_pack_tty_point(1, 100, 90),
|
||||
_pack_tty_point(2, 110, 95),
|
||||
_pack_tty_point(1, 120, 80),
|
||||
]
|
||||
)
|
||||
|
||||
events = parser.feed(stream)
|
||||
|
||||
self.assertIsInstance(events[0], StartEvent)
|
||||
self.assertIsInstance(events[1], PointEvent)
|
||||
self.assertIsInstance(events[2], PointEvent)
|
||||
self.assertIsInstance(events[3], StartEvent)
|
||||
self.assertEqual(events[3].ch, 0)
|
||||
self.assertIsInstance(events[4], PointEvent)
|
||||
self.assertEqual(events[4].x, 1)
|
||||
self.assertEqual(events[4].aux, (120.0, 80.0))
|
||||
|
||||
def test_complex_ascii_parser_detects_new_sweep_on_step_reset(self):
|
||||
parser = ComplexAsciiSweepParser()
|
||||
events = parser.feed(b"0 3 4\n1 5 12\n0 8 15\n")
|
||||
|
||||
@ -44,6 +44,28 @@ def _pack_log16_point(step: int, real: int, imag: int) -> bytes:
|
||||
)
|
||||
|
||||
|
||||
def _pack_tty_start() -> bytes:
|
||||
return b"".join(
|
||||
[
|
||||
_u16le(0x000A),
|
||||
_u16le(0xFFFF),
|
||||
_u16le(0xFFFF),
|
||||
_u16le(0xFFFF),
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def _pack_tty_point(step: int, ch1: int, ch2: int) -> bytes:
|
||||
return b"".join(
|
||||
[
|
||||
_u16le(0x000A),
|
||||
_u16le(step),
|
||||
_u16le(ch1),
|
||||
_u16le(ch2),
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def _chunk_bytes(data: bytes, size: int = 4096) -> list[bytes]:
|
||||
return [data[idx : idx + size] for idx in range(0, len(data), size)]
|
||||
|
||||
@ -111,6 +133,28 @@ class SweepReaderTests(unittest.TestCase):
|
||||
reader.join(timeout=1.0)
|
||||
stack.close()
|
||||
|
||||
def test_parser_16_bit_x2_falls_back_to_tty_ch1_ch2_stream(self):
|
||||
payload = bytearray()
|
||||
while len(payload) < (_PARSER_16_BIT_X2_PROBE_BYTES + 24):
|
||||
payload += _pack_tty_start()
|
||||
payload += _pack_tty_point(1, 100, 90)
|
||||
payload += _pack_tty_point(2, 120, 95)
|
||||
payload += _pack_tty_point(1, 80, 70)
|
||||
|
||||
stack, reader, queue, stop_event, stderr = self._start_reader(bytes(payload), parser_16_bit_x2=True)
|
||||
try:
|
||||
sweep, info, aux = queue.get(timeout=2.0)
|
||||
self.assertEqual(info["ch"], 0)
|
||||
self.assertIsNotNone(aux)
|
||||
self.assertGreaterEqual(sweep.shape[0], 3)
|
||||
self.assertAlmostEqual(float(sweep[1]), 10.0, places=6)
|
||||
self.assertAlmostEqual(float(sweep[2]), 25.0, places=6)
|
||||
self.assertIn("fallback -> legacy", stderr.getvalue())
|
||||
finally:
|
||||
stop_event.set()
|
||||
reader.join(timeout=1.0)
|
||||
stack.close()
|
||||
|
||||
def test_parser_16_bit_x2_keeps_true_complex_stream(self):
|
||||
payload = b"".join(
|
||||
[
|
||||
|
||||
Reference in New Issue
Block a user