legacy parcer impl
This commit is contained in:
@ -4,6 +4,9 @@
|
||||
Reads raw binary from /tmp/ttyADC_data, parses 8-byte records with
|
||||
markers 0x000A (main) and 0x00A8 (reference), performs complex
|
||||
normalization and plots amplitude, phase, and FFT in real time.
|
||||
|
||||
Uses the project's existing LegacyBinaryParser + SweepAssembler for
|
||||
reliable sweep boundary detection.
|
||||
"""
|
||||
|
||||
import os
|
||||
@ -15,6 +18,8 @@ import numpy as np
|
||||
import pyqtgraph as pg
|
||||
from pyqtgraph.Qt import QtCore, QtWidgets
|
||||
|
||||
from rfg_adc_plotter.io.sweep_parser_core import LegacyBinaryParser, SweepAssembler
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Constants
|
||||
# ---------------------------------------------------------------------------
|
||||
@ -27,25 +32,11 @@ C_M_S = 299_792_458.0
|
||||
FFT_LEN = 2048
|
||||
TIMER_MS = 50 # GUI refresh period
|
||||
READ_CHUNK = 65536
|
||||
|
||||
MARKER_MAIN = 0x000A
|
||||
MARKER_REF = 0x00A8
|
||||
MAX_SWEEP_POINTS = 4096 # safety limit — force finalize if dict grows past this
|
||||
MIN_SWEEP_POINTS = 400 # discard fragments shorter than this
|
||||
MIN_SWEEP_POINTS = 400
|
||||
|
||||
DATA_PATH = "/tmp/ttyADC_data"
|
||||
|
||||
|
||||
def _u16(buf, off):
|
||||
"""Little-endian uint16 from buffer."""
|
||||
return buf[off] | (buf[off + 1] << 8)
|
||||
|
||||
|
||||
def _i16(v):
|
||||
"""Unsigned uint16 → signed int16."""
|
||||
return v - 0x10000 if (v & 0x8000) else v
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# DataReader
|
||||
# ---------------------------------------------------------------------------
|
||||
@ -67,105 +58,42 @@ class DataReader:
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# SweepAccumulator (inline parser)
|
||||
# Sweep extraction helper
|
||||
# ---------------------------------------------------------------------------
|
||||
class SweepAccumulator:
|
||||
"""Parse 8-byte binary records and assemble main/ref sweep pairs."""
|
||||
def extract_sweep(packet):
|
||||
"""Extract main/ref ch1/ch2 arrays from a SweepPacket.
|
||||
|
||||
def __init__(self):
|
||||
self._buf = bytearray()
|
||||
self._main = {} # step → (ch1_i16, ch2_i16)
|
||||
self._ref = {}
|
||||
Returns dict with main_ch1, main_ch2, ref_ch1, ref_ch2, num_points
|
||||
or None if data is incomplete.
|
||||
"""
|
||||
sweep_arr, info, aux = packet
|
||||
|
||||
def feed(self, data):
|
||||
"""Feed raw bytes. Return list of completed sweep dicts."""
|
||||
if data:
|
||||
self._buf += data
|
||||
sweeps = []
|
||||
buf = self._buf
|
||||
while len(buf) >= 8:
|
||||
w0 = _u16(buf, 0)
|
||||
w1 = _u16(buf, 2)
|
||||
w2 = _u16(buf, 4)
|
||||
w3 = _u16(buf, 6)
|
||||
# Main channel ch1/ch2 from aux
|
||||
if aux is None:
|
||||
return None
|
||||
main_ch1 = np.asarray(aux[0], dtype=np.float64)
|
||||
main_ch2 = np.asarray(aux[1], dtype=np.float64)
|
||||
|
||||
# Start marker: new sweep begins
|
||||
if w0 == MARKER_MAIN and w1 == 0xFFFF and w2 == 0xFFFF and w3 == 0xFFFF:
|
||||
sw = self._finalize()
|
||||
if sw is not None:
|
||||
sweeps.append(sw)
|
||||
del buf[:8]
|
||||
continue
|
||||
# Secondary (ref) channel from info payload
|
||||
sec = info.get("_secondary_payload")
|
||||
if sec is None:
|
||||
return None
|
||||
ref_ch1 = np.asarray(sec["ch1"], dtype=np.float64)
|
||||
ref_ch2 = np.asarray(sec["ch2"], dtype=np.float64)
|
||||
|
||||
# Main channel data point
|
||||
if w0 == MARKER_MAIN and w1 != 0xFFFF:
|
||||
self._main[w1] = (_i16(w2), _i16(w3))
|
||||
# Safety: force finalize if accumulator grew too large
|
||||
if len(self._main) >= MAX_SWEEP_POINTS:
|
||||
sw = self._finalize()
|
||||
if sw is not None:
|
||||
sweeps.append(sw)
|
||||
del buf[:8]
|
||||
continue
|
||||
# Keep only points present in both channels
|
||||
valid = np.isfinite(main_ch1) & np.isfinite(ref_ch1)
|
||||
nvalid = int(valid.sum())
|
||||
if nvalid < MIN_SWEEP_POINTS:
|
||||
return None
|
||||
|
||||
# Reference channel data point
|
||||
if w0 == MARKER_REF and w1 != 0xFFFF:
|
||||
self._ref[w1] = (_i16(w2), _i16(w3))
|
||||
del buf[:8]
|
||||
continue
|
||||
|
||||
# Unrecognized — advance 1 byte to resync
|
||||
del buf[:1]
|
||||
|
||||
return sweeps
|
||||
|
||||
def _finalize(self):
|
||||
"""Build numpy arrays from accumulated points and reset."""
|
||||
if not self._main or not self._ref:
|
||||
self._main.clear()
|
||||
self._ref.clear()
|
||||
self._last_main_step = -1
|
||||
return None
|
||||
|
||||
all_steps = set(self._main) | set(self._ref)
|
||||
n = max(all_steps) + 1
|
||||
if n < 2:
|
||||
self._main.clear()
|
||||
self._ref.clear()
|
||||
self._last_main_step = -1
|
||||
return None
|
||||
|
||||
main_ch1 = np.full(n, np.nan)
|
||||
main_ch2 = np.full(n, np.nan)
|
||||
ref_ch1 = np.full(n, np.nan)
|
||||
ref_ch2 = np.full(n, np.nan)
|
||||
|
||||
for s, (c1, c2) in self._main.items():
|
||||
if s < n:
|
||||
main_ch1[s] = c1
|
||||
main_ch2[s] = c2
|
||||
for s, (c1, c2) in self._ref.items():
|
||||
if s < n:
|
||||
ref_ch1[s] = c1
|
||||
ref_ch2[s] = c2
|
||||
|
||||
self._main.clear()
|
||||
self._ref.clear()
|
||||
self._last_main_step = -1
|
||||
|
||||
# Keep only points present in both channels
|
||||
valid = np.isfinite(main_ch1) & np.isfinite(ref_ch1)
|
||||
nvalid = int(valid.sum())
|
||||
if nvalid < MIN_SWEEP_POINTS:
|
||||
return None
|
||||
|
||||
return {
|
||||
"main_ch1": main_ch1[valid],
|
||||
"main_ch2": main_ch2[valid],
|
||||
"ref_ch1": ref_ch1[valid],
|
||||
"ref_ch2": ref_ch2[valid],
|
||||
"num_points": nvalid,
|
||||
}
|
||||
return {
|
||||
"main_ch1": main_ch1[valid],
|
||||
"main_ch2": main_ch2[valid],
|
||||
"ref_ch1": ref_ch1[valid],
|
||||
"ref_ch2": ref_ch2[valid],
|
||||
"num_points": nvalid,
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
@ -196,7 +124,7 @@ def process_reference(ref_ch1, ref_ch2, ref_phase_first, freqs_hz):
|
||||
def process_main(main_ch1, main_ch2, ref_amplitude, ref_phase_aligned):
|
||||
"""Normalize main channel by reference.
|
||||
|
||||
Returns (main_amp, ref_amp, amp_norm, phase_norm, fft_mag, fft_dist).
|
||||
Returns (main_amp, ref_amp, norm_ch1, norm_ch2, amp_norm, phase_norm, fft_mag, fft_dist).
|
||||
"""
|
||||
ch1_v = main_ch1 * TTY_SCALE
|
||||
ch2_v = main_ch2 * TTY_SCALE
|
||||
@ -284,17 +212,22 @@ def build_gui():
|
||||
# ---------------------------------------------------------------------------
|
||||
# Update loop
|
||||
# ---------------------------------------------------------------------------
|
||||
def make_update(reader, accumulator, curves):
|
||||
def make_update(reader, parser, assembler, curves):
|
||||
c_main_amp, c_ref_amp, c_norm_ch1, c_norm_ch2, c_norm_amp, c_ph, c_fft = curves
|
||||
state = {"ref_phase_first": None}
|
||||
queue = deque(maxlen=64)
|
||||
|
||||
def update():
|
||||
# Read and parse — push completed sweeps into queue
|
||||
# Read → parse → assemble → queue
|
||||
data = reader.read_available()
|
||||
if data:
|
||||
for sw in accumulator.feed(data):
|
||||
queue.append(sw)
|
||||
events = parser.feed(data)
|
||||
for ev in events:
|
||||
packet = assembler.consume(ev)
|
||||
if packet is not None:
|
||||
sw = extract_sweep(packet)
|
||||
if sw is not None:
|
||||
queue.append(sw)
|
||||
|
||||
# Draw exactly one sweep per tick
|
||||
if not queue:
|
||||
@ -337,10 +270,11 @@ def make_update(reader, accumulator, curves):
|
||||
def main():
|
||||
path = sys.argv[1] if len(sys.argv) > 1 else DATA_PATH
|
||||
reader = DataReader(path)
|
||||
accumulator = SweepAccumulator()
|
||||
parser = LegacyBinaryParser(batch_events=True)
|
||||
assembler = SweepAssembler(fancy=False, apply_inversion=False)
|
||||
app, win, curves = build_gui()
|
||||
|
||||
update = make_update(reader, accumulator, curves)
|
||||
update = make_update(reader, parser, assembler, curves)
|
||||
timer = QtCore.QTimer()
|
||||
timer.timeout.connect(update)
|
||||
timer.start(TIMER_MS)
|
||||
|
||||
Reference in New Issue
Block a user