Files
RFG_stm32_ADC_receiver_GUI/tests/test_sweep_reader.py
2026-04-10 18:01:43 +03:00

232 lines
7.4 KiB
Python

from __future__ import annotations
import contextlib
import io
import threading
import time
import unittest
from queue import Queue
from unittest.mock import patch
from rfg_adc_plotter.io import sweep_reader as sweep_reader_module
from rfg_adc_plotter.io.sweep_reader import SweepReader, _PARSER_16_BIT_X2_PROBE_BYTES
def _u16le(word: int) -> bytes:
value = int(word) & 0xFFFF
return bytes((value & 0xFF, (value >> 8) & 0xFF))
def _pack_legacy_point(ch: int, step: int, value_i32: int) -> bytes:
value = int(value_i32) & 0xFFFF_FFFF
return b"".join(
[
_u16le(step),
_u16le((value >> 16) & 0xFFFF),
_u16le(value & 0xFFFF),
bytes((0x0A, int(ch) & 0xFF)),
]
)
def _pack_log16_start(ch: int) -> bytes:
return b"\xff\xff" * 3 + bytes((0x0A, int(ch) & 0xFF))
def _pack_log16_point(step: int, real: int, imag: int) -> bytes:
return b"".join(
[
_u16le(step),
_u16le(real),
_u16le(imag),
_u16le(0xFFFF),
]
)
def _pack_tty_start() -> bytes:
return b"".join(
[
_u16le(0x000A),
_u16le(0xFFFF),
_u16le(0xFFFF),
_u16le(0xFFFF),
]
)
def _pack_tty_point(step: int, ch1: int, ch2: int) -> bytes:
return b"".join(
[
_u16le(0x000A),
_u16le(step),
_u16le(ch1),
_u16le(ch2),
]
)
def _chunk_bytes(data: bytes, size: int = 4096) -> list[bytes]:
return [data[idx : idx + size] for idx in range(0, len(data), size)]
class _FakeSerialLineSource:
def __init__(self, path: str, baud: int, timeout: float = 1.0):
self.path = path
self.baud = baud
self.timeout = timeout
self._using = "fake"
def close(self) -> None:
pass
class _FakeChunkReader:
payload_chunks: list[bytes] = []
def __init__(self, src):
self._src = src
self._chunks = list(type(self).payload_chunks)
def read_available(self) -> bytes:
if self._chunks:
return self._chunks.pop(0)
return b""
class SweepReaderTests(unittest.TestCase):
def _start_reader(self, payload: bytes, **reader_kwargs):
queue: Queue = Queue()
stop_event = threading.Event()
stderr = io.StringIO()
_FakeChunkReader.payload_chunks = _chunk_bytes(payload)
reader = SweepReader(
"/tmp/fake-tty",
115200,
queue,
stop_event,
**reader_kwargs,
)
stack = contextlib.ExitStack()
stack.enter_context(patch.object(sweep_reader_module, "SerialLineSource", _FakeSerialLineSource))
stack.enter_context(patch.object(sweep_reader_module, "SerialChunkReader", _FakeChunkReader))
stack.enter_context(contextlib.redirect_stderr(stderr))
reader.start()
return stack, reader, queue, stop_event, stderr
def test_parser_16_bit_x2_falls_back_to_legacy_stream(self):
payload = bytearray()
while len(payload) < (_PARSER_16_BIT_X2_PROBE_BYTES + 24):
payload += _pack_legacy_point(3, 1, -2)
payload += _pack_legacy_point(3, 2, -3)
payload += _pack_legacy_point(3, 1, -4)
stack, reader, queue, stop_event, stderr = self._start_reader(bytes(payload), parser_16_bit_x2=True)
try:
sweep, info, aux = queue.get(timeout=2.0)
self.assertEqual(info["ch"], 3)
self.assertIsNone(aux)
self.assertGreaterEqual(sweep.shape[0], 3)
self.assertIn("fallback -> legacy", stderr.getvalue())
finally:
stop_event.set()
reader.join(timeout=1.0)
stack.close()
def test_parser_16_bit_x2_falls_back_to_tty_ch1_ch2_stream(self):
payload = bytearray()
while len(payload) < (_PARSER_16_BIT_X2_PROBE_BYTES + 24):
payload += _pack_tty_start()
payload += _pack_tty_point(1, 100, 90)
payload += _pack_tty_point(2, 120, 95)
payload += _pack_tty_point(1, 80, 70)
stack, reader, queue, stop_event, stderr = self._start_reader(bytes(payload), parser_16_bit_x2=True)
try:
sweep, info, aux = queue.get(timeout=2.0)
self.assertEqual(info["ch"], 0)
self.assertIsNotNone(aux)
self.assertGreaterEqual(sweep.shape[0], 3)
self.assertAlmostEqual(float(sweep[1]), 10.0, places=6)
self.assertAlmostEqual(float(sweep[2]), 25.0, places=6)
self.assertIn("fallback -> legacy", stderr.getvalue())
finally:
stop_event.set()
reader.join(timeout=1.0)
stack.close()
def test_parser_16_bit_x2_keeps_true_complex_stream(self):
payload = b"".join(
[
_pack_log16_start(2),
_pack_log16_point(1, 3, 4),
_pack_log16_point(2, 5, 12),
_pack_log16_point(1, 8, 15),
]
)
stack, reader, queue, stop_event, stderr = self._start_reader(payload, parser_16_bit_x2=True)
try:
sweep, info, aux = queue.get(timeout=1.0)
self.assertEqual(info["ch"], 2)
self.assertIsNotNone(aux)
self.assertAlmostEqual(float(sweep[1]), 5.0, places=6)
self.assertAlmostEqual(float(sweep[2]), 13.0, places=6)
self.assertNotIn("fallback -> legacy", stderr.getvalue())
finally:
stop_event.set()
reader.join(timeout=1.0)
stack.close()
def test_parser_16_bit_x2_probe_inconclusive_logs_hint(self):
payload = b"\x00" * (_PARSER_16_BIT_X2_PROBE_BYTES + 128)
stack, reader, queue, stop_event, stderr = self._start_reader(payload, parser_16_bit_x2=True)
try:
deadline = time.time() + 1.5
logs = ""
while time.time() < deadline:
logs = stderr.getvalue()
if "probe inconclusive" in logs:
break
time.sleep(0.02)
self.assertTrue(queue.empty())
self.assertIn("probe inconclusive", logs)
self.assertIn("try --bin", logs)
finally:
stop_event.set()
reader.join(timeout=1.0)
stack.close()
def test_reader_logs_no_input_warning_when_source_is_idle(self):
with patch.object(sweep_reader_module, "_NO_INPUT_WARN_INTERVAL_S", 0.02), patch.object(
sweep_reader_module, "_NO_PACKET_WARN_INTERVAL_S", 0.02
):
stack, reader, _queue, stop_event, stderr = self._start_reader(b"", parser_16_bit_x2=False)
try:
time.sleep(0.12)
logs = stderr.getvalue()
self.assertIn("no input bytes", logs)
self.assertIn("no sweep packets", logs)
finally:
stop_event.set()
reader.join(timeout=1.0)
stack.close()
def test_reader_join_does_not_raise_when_stopped(self):
stack, reader, _queue, stop_event, _stderr = self._start_reader(b"", parser_16_bit_x2=True)
try:
time.sleep(0.01)
stop_event.set()
reader.join(timeout=1.0)
self.assertFalse(reader.is_alive())
finally:
stop_event.set()
if reader.is_alive():
reader.join(timeout=1.0)
stack.close()
if __name__ == "__main__":
unittest.main()