fix update

This commit is contained in:
awe
2026-04-29 19:08:17 +03:00
parent ecd05568c6
commit 1d807d0afc
5 changed files with 401 additions and 40 deletions

View File

@ -11,6 +11,7 @@ import numpy as np
from rfg_adc_plotter.constants import DATA_INVERSION_THRESHOLD, LOG_BASE, LOG_EXP_LIMIT, LOG_POSTSCALER, LOG_SCALER
from rfg_adc_plotter.types import (
BatchPointEvent,
Do1Level,
ParserEvent,
PointEvent,
@ -157,12 +158,13 @@ class ComplexAsciiSweepParser:
class LegacyBinaryParser:
"""Byte-resynchronizing parser for supported 8-byte binary record formats."""
def __init__(self):
def __init__(self, *, batch_events: bool = False):
self._buf = bytearray()
self._last_step: Optional[int] = None
self._seen_points = False
self._mode: Optional[str] = None
self._current_signal_kind: Optional[SignalKind] = None
self._batch_events = bool(batch_events)
self._last_tagged_step_by_level: Dict[Do1Level, Optional[int]] = {
"low": None,
"high": None,
@ -296,6 +298,78 @@ class LegacyBinaryParser:
)
)
def _try_emit_tty_batch(self, events: List[ParserEvent], *, require_not_legacy: bool) -> bool:
if not self._batch_events or len(self._buf) < 8:
return False
block_count = len(self._buf) // 8
if block_count <= 0:
return False
raw = np.frombuffer(self._buf, dtype=np.uint8, count=block_count * 8).reshape(-1, 8)
words = np.frombuffer(self._buf, dtype="<u2", count=block_count * 4).reshape(-1, 4)
w0 = words[:, 0]
w1 = words[:, 1]
is_tty_point = (w0 == 0x000A) & (w1 != 0xFFFF)
is_legacy_point = (raw[:, 6] == 0x0A) & (w0 != 0xFFFF)
valid = is_tty_point
if require_not_legacy:
valid = valid & (~is_legacy_point)
if valid.size <= 0 or not bool(valid[0]):
return False
invalid_idx = np.nonzero(~valid)[0]
valid_count = int(invalid_idx[0]) if invalid_idx.size > 0 else int(valid.size)
if valid_count <= 0:
return False
steps = words[:valid_count, 1].astype(np.int64, copy=True)
if self._current_signal_kind != "bin_iq":
if self._seen_points:
events.append(StartEvent(ch=0, signal_kind="bin_iq"))
self._last_step = None
self._seen_points = False
self._current_signal_kind = "bin_iq"
self._reset_tagged_steps()
if self._seen_points and self._last_step is not None and steps[0] <= int(self._last_step):
events.append(StartEvent(ch=0, signal_kind="bin_iq"))
self._last_step = None
self._seen_points = False
self._reset_tagged_steps()
reset_idx = np.nonzero(np.diff(steps) <= 0)[0]
take_count = int(reset_idx[0] + 1) if reset_idx.size > 0 else int(steps.size)
if take_count <= 0:
return False
batch_words = words[:take_count].copy()
xs = batch_words[:, 1].astype(np.int64, copy=False)
ch_1 = batch_words[:, 2].astype(np.uint16, copy=False).view(np.int16)
ch_2 = batch_words[:, 3].astype(np.uint16, copy=False).view(np.int16)
del raw
del words
del w0
del w1
del self._buf[: take_count * 8]
ch_1_i64 = ch_1.astype(np.int64)
ch_2_i64 = ch_2.astype(np.int64)
ys = ((ch_1_i64 * ch_1_i64) + (ch_2_i64 * ch_2_i64)).astype(np.float32)
self._mode = "bin"
self._seen_points = True
self._last_step = int(xs[-1])
self._current_signal_kind = "bin_iq"
self._reset_tagged_steps()
events.append(
BatchPointEvent(
ch=0,
xs=xs,
ys=ys,
aux=(ch_1.astype(np.float32), ch_2.astype(np.float32)),
signal_kind="bin_iq",
)
)
return True
def feed(self, data: bytes) -> List[ParserEvent]:
if data:
self._buf += data
@ -330,6 +404,8 @@ class LegacyBinaryParser:
continue
if self._mode == "legacy":
if is_tty_point and (not is_legacy_point) and self._try_emit_tty_batch(events, require_not_legacy=True):
continue
if is_legacy_point:
self._emit_legacy_point(
events,
@ -368,6 +444,8 @@ class LegacyBinaryParser:
continue
if self._mode == "bin":
if is_tty_point and self._try_emit_tty_batch(events, require_not_legacy=False):
continue
if is_tty_point:
self._emit_tty_point(events, step=int(w1), ch_1_word=int(w2), ch_2_word=int(w3))
del self._buf[:8]
@ -408,6 +486,8 @@ class LegacyBinaryParser:
# Mode is still unknown. Accept only unambiguous point shapes to avoid
# jumping between tty and legacy interpretations on coincidental bytes.
if is_tty_point and (not is_legacy_point):
if self._try_emit_tty_batch(events, require_not_legacy=True):
continue
self._emit_tty_point(events, step=int(w1), ch_1_word=int(w2), ch_2_word=int(w3))
del self._buf[:8]
continue
@ -729,6 +809,67 @@ class SweepAssembler:
out[both_valid] = (first[both_valid] + second[both_valid]) * 0.5
return out
def _has_current_points(self) -> bool:
return bool(self._xs or self._tagged_low_xs or self._tagged_high_xs)
def _consume_batch(self, event: BatchPointEvent) -> Optional[SweepPacket]:
xs_arr = np.asarray(event.xs, dtype=np.int64).reshape(-1)
ys_arr = np.asarray(event.ys, dtype=np.float32).reshape(-1)
width = min(xs_arr.size, ys_arr.size)
if width <= 0:
return None
point_ch = int(event.ch)
point_signal_kind = event.signal_kind
packet: Optional[SweepPacket] = None
if self._cur_channel is None:
self._cur_channel = point_ch
elif point_ch != self._cur_channel:
if self._has_current_points():
packet = self.finalize_current()
self._reset_current()
self._cur_channel = point_ch
if self._cur_signal_kind != point_signal_kind:
if self._has_current_points():
packet = self.finalize_current()
self._reset_current()
self._cur_channel = point_ch
self._cur_signal_kind = point_signal_kind
self._cur_channels.add(point_ch)
xs = xs_arr[:width]
ys = ys_arr[:width]
self._xs.extend(xs.tolist())
self._ys.extend(ys.tolist())
if self._cur_signal_kind == "bin_iq_do1_tagged":
level = "high" if event.do1_level == "high" else "low"
if level == "low":
self._tagged_low_xs.extend(xs.tolist())
self._tagged_low_ys.extend(ys.tolist())
else:
self._tagged_high_xs.extend(xs.tolist())
self._tagged_high_ys.extend(ys.tolist())
if event.aux is not None:
try:
aux_1, aux_2 = event.aux
aux_1_arr = np.asarray(aux_1, dtype=np.float32).reshape(-1)
aux_2_arr = np.asarray(aux_2, dtype=np.float32).reshape(-1)
aux_width = min(width, aux_1_arr.size, aux_2_arr.size)
except Exception:
aux_width = 0
if aux_width > 0:
if self._cur_signal_kind == "bin_iq_do1_tagged":
if event.do1_level == "high":
self._tagged_high_aux_1.extend(aux_1_arr[:aux_width].tolist())
self._tagged_high_aux_2.extend(aux_2_arr[:aux_width].tolist())
else:
self._tagged_low_aux_1.extend(aux_1_arr[:aux_width].tolist())
self._tagged_low_aux_2.extend(aux_2_arr[:aux_width].tolist())
else:
self._aux_1.extend(aux_1_arr[:aux_width].tolist())
self._aux_2.extend(aux_2_arr[:aux_width].tolist())
return packet
def consume(self, event: ParserEvent) -> Optional[SweepPacket]:
if isinstance(event, StartEvent):
packet = self.finalize_current()
@ -737,6 +878,8 @@ class SweepAssembler:
self._cur_channel = int(event.ch)
self._cur_signal_kind = event.signal_kind
return packet
if isinstance(event, BatchPointEvent):
return self._consume_batch(event)
point_ch = int(event.ch)
point_signal_kind = event.signal_kind

View File

@ -157,7 +157,7 @@ class SweepReader(threading.Thread):
if self._logscale:
return LogScaleBinaryParser32(), SweepAssembler(fancy=self._fancy, apply_inversion=False)
if self._bin_mode:
return LegacyBinaryParser(), SweepAssembler(fancy=self._fancy, apply_inversion=True)
return LegacyBinaryParser(batch_events=True), SweepAssembler(fancy=self._fancy, apply_inversion=True)
return AsciiSweepParser(), SweepAssembler(fancy=self._fancy, apply_inversion=True)
@staticmethod
@ -216,7 +216,7 @@ class SweepReader(threading.Thread):
)
)
sys.stderr.write("[info] parser_16_bit_x2: fallback -> legacy\n")
parser = LegacyBinaryParser()
parser = LegacyBinaryParser(batch_events=True)
assembler = SweepAssembler(fancy=self._fancy, apply_inversion=True)
probe_packets = self._consume_events(assembler, parser.feed(bytes(probe_buf)))
return parser, assembler, probe_packets