try new synchro method
This commit is contained in:
@ -22,12 +22,14 @@ def _run(*args: str) -> subprocess.CompletedProcess[str]:
|
||||
|
||||
|
||||
class CliTests(unittest.TestCase):
|
||||
def test_logscale_is_opt_in(self):
|
||||
def test_logscale_and_opengl_are_opt_in(self):
|
||||
args = build_parser().parse_args(["/dev/null"])
|
||||
self.assertFalse(args.logscale)
|
||||
self.assertFalse(args.opengl)
|
||||
|
||||
args_log = build_parser().parse_args(["/dev/null", "--logscale"])
|
||||
args_log = build_parser().parse_args(["/dev/null", "--logscale", "--opengl"])
|
||||
self.assertTrue(args_log.logscale)
|
||||
self.assertTrue(args_log.opengl)
|
||||
|
||||
def test_wrapper_help_works(self):
|
||||
proc = _run("RFG_ADC_dataplotter.py", "--help")
|
||||
@ -41,6 +43,7 @@ class CliTests(unittest.TestCase):
|
||||
self.assertIn("usage:", proc.stdout)
|
||||
self.assertIn("--parser_16_bit_x2", proc.stdout)
|
||||
self.assertIn("--parser_complex_ascii", proc.stdout)
|
||||
self.assertIn("--opengl", proc.stdout)
|
||||
|
||||
def test_backend_mpl_reports_removal(self):
|
||||
proc = _run("-m", "rfg_adc_plotter.main", "/dev/null", "--backend", "mpl")
|
||||
|
||||
152
tests/test_sweep_reader.py
Normal file
152
tests/test_sweep_reader.py
Normal file
@ -0,0 +1,152 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import contextlib
|
||||
import io
|
||||
import threading
|
||||
import time
|
||||
import unittest
|
||||
from queue import Queue
|
||||
from unittest.mock import patch
|
||||
|
||||
from rfg_adc_plotter.io import sweep_reader as sweep_reader_module
|
||||
from rfg_adc_plotter.io.sweep_reader import SweepReader, _PARSER_16_BIT_X2_PROBE_BYTES
|
||||
|
||||
|
||||
def _u16le(word: int) -> bytes:
|
||||
value = int(word) & 0xFFFF
|
||||
return bytes((value & 0xFF, (value >> 8) & 0xFF))
|
||||
|
||||
|
||||
def _pack_legacy_point(ch: int, step: int, value_i32: int) -> bytes:
|
||||
value = int(value_i32) & 0xFFFF_FFFF
|
||||
return b"".join(
|
||||
[
|
||||
_u16le(step),
|
||||
_u16le((value >> 16) & 0xFFFF),
|
||||
_u16le(value & 0xFFFF),
|
||||
bytes((0x0A, int(ch) & 0xFF)),
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def _pack_log16_start(ch: int) -> bytes:
|
||||
return b"\xff\xff" * 3 + bytes((0x0A, int(ch) & 0xFF))
|
||||
|
||||
|
||||
def _pack_log16_point(step: int, real: int, imag: int) -> bytes:
|
||||
return b"".join(
|
||||
[
|
||||
_u16le(step),
|
||||
_u16le(real),
|
||||
_u16le(imag),
|
||||
_u16le(0xFFFF),
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def _chunk_bytes(data: bytes, size: int = 4096) -> list[bytes]:
|
||||
return [data[idx : idx + size] for idx in range(0, len(data), size)]
|
||||
|
||||
|
||||
class _FakeSerialLineSource:
|
||||
def __init__(self, path: str, baud: int, timeout: float = 1.0):
|
||||
self.path = path
|
||||
self.baud = baud
|
||||
self.timeout = timeout
|
||||
self._using = "fake"
|
||||
|
||||
def close(self) -> None:
|
||||
pass
|
||||
|
||||
|
||||
class _FakeChunkReader:
|
||||
payload_chunks: list[bytes] = []
|
||||
|
||||
def __init__(self, src):
|
||||
self._src = src
|
||||
self._chunks = list(type(self).payload_chunks)
|
||||
|
||||
def read_available(self) -> bytes:
|
||||
if self._chunks:
|
||||
return self._chunks.pop(0)
|
||||
return b""
|
||||
|
||||
|
||||
class SweepReaderTests(unittest.TestCase):
|
||||
def _start_reader(self, payload: bytes, **reader_kwargs):
|
||||
queue: Queue = Queue()
|
||||
stop_event = threading.Event()
|
||||
stderr = io.StringIO()
|
||||
_FakeChunkReader.payload_chunks = _chunk_bytes(payload)
|
||||
reader = SweepReader(
|
||||
"/tmp/fake-tty",
|
||||
115200,
|
||||
queue,
|
||||
stop_event,
|
||||
**reader_kwargs,
|
||||
)
|
||||
stack = contextlib.ExitStack()
|
||||
stack.enter_context(patch.object(sweep_reader_module, "SerialLineSource", _FakeSerialLineSource))
|
||||
stack.enter_context(patch.object(sweep_reader_module, "SerialChunkReader", _FakeChunkReader))
|
||||
stack.enter_context(contextlib.redirect_stderr(stderr))
|
||||
reader.start()
|
||||
return stack, reader, queue, stop_event, stderr
|
||||
|
||||
def test_parser_16_bit_x2_falls_back_to_legacy_stream(self):
|
||||
payload = bytearray()
|
||||
while len(payload) < (_PARSER_16_BIT_X2_PROBE_BYTES + 24):
|
||||
payload += _pack_legacy_point(3, 1, -2)
|
||||
payload += _pack_legacy_point(3, 2, -3)
|
||||
payload += _pack_legacy_point(3, 1, -4)
|
||||
|
||||
stack, reader, queue, stop_event, stderr = self._start_reader(bytes(payload), parser_16_bit_x2=True)
|
||||
try:
|
||||
sweep, info, aux = queue.get(timeout=2.0)
|
||||
self.assertEqual(info["ch"], 3)
|
||||
self.assertIsNone(aux)
|
||||
self.assertGreaterEqual(sweep.shape[0], 3)
|
||||
self.assertIn("fallback -> legacy", stderr.getvalue())
|
||||
finally:
|
||||
stop_event.set()
|
||||
reader.join(timeout=1.0)
|
||||
stack.close()
|
||||
|
||||
def test_parser_16_bit_x2_keeps_true_complex_stream(self):
|
||||
payload = b"".join(
|
||||
[
|
||||
_pack_log16_start(2),
|
||||
_pack_log16_point(1, 3, 4),
|
||||
_pack_log16_point(2, 5, 12),
|
||||
_pack_log16_point(1, 8, 15),
|
||||
]
|
||||
)
|
||||
|
||||
stack, reader, queue, stop_event, stderr = self._start_reader(payload, parser_16_bit_x2=True)
|
||||
try:
|
||||
sweep, info, aux = queue.get(timeout=1.0)
|
||||
self.assertEqual(info["ch"], 2)
|
||||
self.assertIsNotNone(aux)
|
||||
self.assertAlmostEqual(float(sweep[1]), 5.0, places=6)
|
||||
self.assertAlmostEqual(float(sweep[2]), 13.0, places=6)
|
||||
self.assertNotIn("fallback -> legacy", stderr.getvalue())
|
||||
finally:
|
||||
stop_event.set()
|
||||
reader.join(timeout=1.0)
|
||||
stack.close()
|
||||
|
||||
def test_reader_join_does_not_raise_when_stopped(self):
|
||||
stack, reader, _queue, stop_event, _stderr = self._start_reader(b"", parser_16_bit_x2=True)
|
||||
try:
|
||||
time.sleep(0.01)
|
||||
stop_event.set()
|
||||
reader.join(timeout=1.0)
|
||||
self.assertFalse(reader.is_alive())
|
||||
finally:
|
||||
stop_event.set()
|
||||
if reader.is_alive():
|
||||
reader.join(timeout=1.0)
|
||||
stack.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user