new e502 adc

This commit is contained in:
awe
2026-04-09 18:43:50 +03:00
parent 5152314f21
commit 339cb85dce
4 changed files with 89 additions and 1 deletions

View File

@ -73,7 +73,6 @@ def build_parser() -> argparse.ArgumentParser:
parser.add_argument( parser.add_argument(
"--logscale", "--logscale",
action="store_true", action="store_true",
default=True,
help=( help=(
"Новый бинарный протокол: точка несет пару int32 (avg_1, avg_2), " "Новый бинарный протокол: точка несет пару int32 (avg_1, avg_2), "
"а свип считается как |10**(avg_1*0.001) - 10**(avg_2*0.001)|" "а свип считается как |10**(avg_1*0.001) - 10**(avg_2*0.001)|"

View File

@ -143,6 +143,8 @@ class LegacyBinaryParser:
def __init__(self): def __init__(self):
self._buf = bytearray() self._buf = bytearray()
self._last_step: Optional[int] = None
self._seen_points = False
@staticmethod @staticmethod
def _u16_at(buf: bytearray, offset: int) -> int: def _u16_at(buf: bytearray, offset: int) -> int:
@ -157,11 +159,17 @@ class LegacyBinaryParser:
w1 = self._u16_at(self._buf, 2) w1 = self._u16_at(self._buf, 2)
w2 = self._u16_at(self._buf, 4) w2 = self._u16_at(self._buf, 4)
if w0 == 0xFFFF and w1 == 0xFFFF and w2 == 0xFFFF and self._buf[6] == 0x0A: if w0 == 0xFFFF and w1 == 0xFFFF and w2 == 0xFFFF and self._buf[6] == 0x0A:
self._last_step = None
self._seen_points = False
events.append(StartEvent(ch=int(self._buf[7]))) events.append(StartEvent(ch=int(self._buf[7])))
del self._buf[:8] del self._buf[:8]
continue continue
if self._buf[6] == 0x0A: if self._buf[6] == 0x0A:
ch = int(self._buf[7]) ch = int(self._buf[7])
if self._seen_points and self._last_step is not None and w0 <= self._last_step:
events.append(StartEvent(ch=ch))
self._seen_points = True
self._last_step = int(w0)
value = u32_to_i32((w1 << 16) | w2) value = u32_to_i32((w1 << 16) | w2)
events.append(PointEvent(ch=ch, x=int(w0), y=float(value))) events.append(PointEvent(ch=ch, x=int(w0), y=float(value)))
del self._buf[:8] del self._buf[:8]
@ -175,6 +183,8 @@ class LogScaleBinaryParser32:
def __init__(self): def __init__(self):
self._buf = bytearray() self._buf = bytearray()
self._last_step: Optional[int] = None
self._seen_points = False
@staticmethod @staticmethod
def _u16_at(buf: bytearray, offset: int) -> int: def _u16_at(buf: bytearray, offset: int) -> int:
@ -187,11 +197,17 @@ class LogScaleBinaryParser32:
while len(self._buf) >= 12: while len(self._buf) >= 12:
words = [self._u16_at(self._buf, idx * 2) for idx in range(6)] words = [self._u16_at(self._buf, idx * 2) for idx in range(6)]
if words[0:5] == [0xFFFF] * 5 and (words[5] & 0x00FF) == 0x000A: if words[0:5] == [0xFFFF] * 5 and (words[5] & 0x00FF) == 0x000A:
self._last_step = None
self._seen_points = False
events.append(StartEvent(ch=int((words[5] >> 8) & 0x00FF))) events.append(StartEvent(ch=int((words[5] >> 8) & 0x00FF)))
del self._buf[:12] del self._buf[:12]
continue continue
if (words[5] & 0x00FF) == 0x000A and words[0] != 0xFFFF: if (words[5] & 0x00FF) == 0x000A and words[0] != 0xFFFF:
ch = int((words[5] >> 8) & 0x00FF) ch = int((words[5] >> 8) & 0x00FF)
if self._seen_points and self._last_step is not None and words[0] <= self._last_step:
events.append(StartEvent(ch=ch))
self._seen_points = True
self._last_step = int(words[0])
avg_1 = u32_to_i32((words[1] << 16) | words[2]) avg_1 = u32_to_i32((words[1] << 16) | words[2])
avg_2 = u32_to_i32((words[3] << 16) | words[4]) avg_2 = u32_to_i32((words[3] << 16) | words[4])
events.append( events.append(
@ -214,6 +230,8 @@ class LogScale16BitX2BinaryParser:
def __init__(self): def __init__(self):
self._buf = bytearray() self._buf = bytearray()
self._current_channel = 0 self._current_channel = 0
self._last_step: Optional[int] = None
self._seen_points = False
@staticmethod @staticmethod
def _u16_at(buf: bytearray, offset: int) -> int: def _u16_at(buf: bytearray, offset: int) -> int:
@ -227,10 +245,16 @@ class LogScale16BitX2BinaryParser:
words = [self._u16_at(self._buf, idx * 2) for idx in range(4)] words = [self._u16_at(self._buf, idx * 2) for idx in range(4)]
if words[0:3] == [0xFFFF, 0xFFFF, 0xFFFF] and (words[3] & 0x00FF) == 0x000A: if words[0:3] == [0xFFFF, 0xFFFF, 0xFFFF] and (words[3] & 0x00FF) == 0x000A:
self._current_channel = int((words[3] >> 8) & 0x00FF) self._current_channel = int((words[3] >> 8) & 0x00FF)
self._last_step = None
self._seen_points = False
events.append(StartEvent(ch=self._current_channel)) events.append(StartEvent(ch=self._current_channel))
del self._buf[:8] del self._buf[:8]
continue continue
if words[3] == 0xFFFF and words[0] != 0xFFFF: if words[3] == 0xFFFF and words[0] != 0xFFFF:
if self._seen_points and self._last_step is not None and words[0] <= self._last_step:
events.append(StartEvent(ch=self._current_channel))
self._seen_points = True
self._last_step = int(words[0])
real = u16_to_i16(words[1]) real = u16_to_i16(words[1])
imag = u16_to_i16(words[2]) imag = u16_to_i16(words[2])
events.append( events.append(

View File

@ -5,6 +5,8 @@ import sys
import unittest import unittest
from pathlib import Path from pathlib import Path
from rfg_adc_plotter.cli import build_parser
ROOT = Path(__file__).resolve().parents[1] ROOT = Path(__file__).resolve().parents[1]
@ -20,6 +22,13 @@ def _run(*args: str) -> subprocess.CompletedProcess[str]:
class CliTests(unittest.TestCase): class CliTests(unittest.TestCase):
def test_logscale_is_opt_in(self):
args = build_parser().parse_args(["/dev/null"])
self.assertFalse(args.logscale)
args_log = build_parser().parse_args(["/dev/null", "--logscale"])
self.assertTrue(args_log.logscale)
def test_wrapper_help_works(self): def test_wrapper_help_works(self):
proc = _run("RFG_ADC_dataplotter.py", "--help") proc = _run("RFG_ADC_dataplotter.py", "--help")
self.assertEqual(proc.returncode, 0) self.assertEqual(proc.returncode, 0)

View File

@ -97,6 +97,24 @@ class SweepParserCoreTests(unittest.TestCase):
self.assertEqual(events[1].x, 1) self.assertEqual(events[1].x, 1)
self.assertEqual(events[1].y, -2.0) self.assertEqual(events[1].y, -2.0)
def test_legacy_binary_parser_detects_new_sweep_on_step_reset(self):
parser = LegacyBinaryParser()
stream = b"".join(
[
_pack_legacy_point(3, 1, -2),
_pack_legacy_point(3, 2, -3),
_pack_legacy_point(3, 1, -4),
]
)
events = parser.feed(stream)
self.assertIsInstance(events[0], PointEvent)
self.assertIsInstance(events[1], PointEvent)
self.assertIsInstance(events[2], StartEvent)
self.assertEqual(events[2].ch, 3)
self.assertIsInstance(events[3], PointEvent)
self.assertEqual(events[3].x, 1)
self.assertEqual(events[3].y, -4.0)
def test_complex_ascii_parser_detects_new_sweep_on_step_reset(self): def test_complex_ascii_parser_detects_new_sweep_on_step_reset(self):
parser = ComplexAsciiSweepParser() parser = ComplexAsciiSweepParser()
events = parser.feed(b"0 3 4\n1 5 12\n0 8 15\n") events = parser.feed(b"0 3 4\n1 5 12\n0 8 15\n")
@ -123,6 +141,24 @@ class SweepParserCoreTests(unittest.TestCase):
self.assertAlmostEqual(events[1].y, log_pair_to_sweep(1500, 700), places=6) self.assertAlmostEqual(events[1].y, log_pair_to_sweep(1500, 700), places=6)
self.assertEqual(events[1].aux, (1500.0, 700.0)) self.assertEqual(events[1].aux, (1500.0, 700.0))
def test_logscale_32_parser_detects_new_sweep_on_step_reset(self):
parser = LogScaleBinaryParser32()
stream = b"".join(
[
_pack_log_point(1, 1500, 700, ch=5),
_pack_log_point(2, 1400, 650, ch=5),
_pack_log_point(1, 1300, 600, ch=5),
]
)
events = parser.feed(stream)
self.assertIsInstance(events[0], PointEvent)
self.assertIsInstance(events[1], PointEvent)
self.assertIsInstance(events[2], StartEvent)
self.assertEqual(events[2].ch, 5)
self.assertIsInstance(events[3], PointEvent)
self.assertEqual(events[3].x, 1)
self.assertAlmostEqual(events[3].y, log_pair_to_sweep(1300, 600), places=6)
def test_log_pair_to_sweep_is_order_independent(self): def test_log_pair_to_sweep_is_order_independent(self):
self.assertAlmostEqual(log_pair_to_sweep(1500, 700), log_pair_to_sweep(700, 1500), places=6) self.assertAlmostEqual(log_pair_to_sweep(1500, 700), log_pair_to_sweep(700, 1500), places=6)
@ -137,6 +173,26 @@ class SweepParserCoreTests(unittest.TestCase):
self.assertAlmostEqual(events[1].y, math.hypot(100.0, 90.0), places=6) self.assertAlmostEqual(events[1].y, math.hypot(100.0, 90.0), places=6)
self.assertEqual(events[1].aux, (100.0, 90.0)) self.assertEqual(events[1].aux, (100.0, 90.0))
def test_logscale_16bit_parser_detects_new_sweep_on_step_reset(self):
parser = LogScale16BitX2BinaryParser()
stream = b"".join(
[
_pack_log16_start(2),
_pack_log16_point(1, 100, 90),
_pack_log16_point(2, 110, 95),
_pack_log16_point(1, 120, 80),
]
)
events = parser.feed(stream)
self.assertIsInstance(events[0], StartEvent)
self.assertIsInstance(events[1], PointEvent)
self.assertIsInstance(events[2], PointEvent)
self.assertIsInstance(events[3], StartEvent)
self.assertEqual(events[3].ch, 2)
self.assertIsInstance(events[4], PointEvent)
self.assertEqual(events[4].x, 1)
self.assertAlmostEqual(events[4].y, math.hypot(120.0, 80.0), places=6)
def test_parser_test_stream_parser_recovers_point_after_single_separator(self): def test_parser_test_stream_parser_recovers_point_after_single_separator(self):
parser = ParserTestStreamParser() parser = ParserTestStreamParser()
stream = b"".join( stream = b"".join(