366 lines
13 KiB
Python
366 lines
13 KiB
Python
#!/usr/bin/env python3
|
||
"""Standalone VNA-style viewer for dual-channel ADC data.
|
||
|
||
Reads raw binary from /tmp/ttyADC_data, parses 8-byte records with
|
||
markers 0x000A (main) and 0x00A8 (reference), performs complex
|
||
normalization and plots amplitude, phase, and FFT in real time.
|
||
|
||
Uses the project's existing LegacyBinaryParser + SweepAssembler for
|
||
reliable sweep boundary detection.
|
||
"""
|
||
|
||
import os
|
||
import signal
|
||
import sys
|
||
from collections import deque
|
||
|
||
import numpy as np
|
||
import pyqtgraph as pg
|
||
from pyqtgraph.Qt import QtCore, QtWidgets
|
||
|
||
from rfg_adc_plotter.io.sweep_parser_core import LegacyBinaryParser, SweepAssembler
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Constants
|
||
# ---------------------------------------------------------------------------
|
||
F_START_HZ = 2.046e9
|
||
F_STOP_HZ = 5.612e9
|
||
BW_HZ = F_STOP_HZ - F_START_HZ # 3.566 GHz
|
||
|
||
TTY_SCALE = 5.0 / 32767.0 # int16 code → volts (±5 V ADC range)
|
||
C_M_S = 299_792_458.0
|
||
FFT_LEN = 2048
|
||
TIMER_MS = 50 # GUI refresh period
|
||
READ_CHUNK = 65536
|
||
MIN_SWEEP_POINTS = 400
|
||
|
||
DATA_PATH = "/tmp/ttyADC_data"
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# DataReader
|
||
# ---------------------------------------------------------------------------
|
||
class DataReader:
|
||
"""Non-blocking raw reader for /tmp/ttyADC_data (PTY-safe)."""
|
||
|
||
def __init__(self, path=DATA_PATH):
|
||
import termios, tty
|
||
self._fd = os.open(path, os.O_RDONLY | os.O_NOCTTY)
|
||
try:
|
||
tty.setraw(self._fd) # disable line discipline processing
|
||
except termios.error:
|
||
pass # not a TTY — fine, skip
|
||
os.set_blocking(self._fd, False)
|
||
|
||
def read_available(self):
|
||
chunks = []
|
||
while True:
|
||
try:
|
||
data = os.read(self._fd, READ_CHUNK)
|
||
if not data:
|
||
break
|
||
chunks.append(data)
|
||
except BlockingIOError:
|
||
break
|
||
return b"".join(chunks)
|
||
|
||
def close(self):
|
||
os.close(self._fd)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Sweep extraction helper
|
||
# ---------------------------------------------------------------------------
|
||
def extract_sweep(packet):
|
||
"""Extract main/ref ch1/ch2 arrays from a SweepPacket.
|
||
|
||
Returns dict with main_ch1, main_ch2, ref_ch1, ref_ch2, num_points
|
||
or None if data is incomplete.
|
||
"""
|
||
sweep_arr, info, aux = packet
|
||
|
||
# Main channel ch1/ch2 from aux
|
||
if aux is None:
|
||
return None
|
||
main_ch1 = np.asarray(aux[0], dtype=np.float64)
|
||
main_ch2 = np.asarray(aux[1], dtype=np.float64)
|
||
|
||
# Secondary (ref) channel from info payload
|
||
sec = info.get("_secondary_payload")
|
||
if sec is None:
|
||
return None
|
||
ref_ch1 = np.asarray(sec["ch1"], dtype=np.float64)
|
||
ref_ch2 = np.asarray(sec["ch2"], dtype=np.float64)
|
||
|
||
# Keep only points present in both channels
|
||
valid = np.isfinite(main_ch1) & np.isfinite(ref_ch1)
|
||
nvalid = int(valid.sum())
|
||
if nvalid < MIN_SWEEP_POINTS:
|
||
return None
|
||
|
||
return {
|
||
"main_ch1": main_ch1[valid],
|
||
"main_ch2": main_ch2[valid],
|
||
"ref_ch1": ref_ch1[valid],
|
||
"ref_ch2": ref_ch2[valid],
|
||
"num_points": nvalid,
|
||
}
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Signal processing
|
||
# ---------------------------------------------------------------------------
|
||
def process_reference(ref_ch1, ref_ch2):
|
||
"""Process reference channel: amplitude and phase.
|
||
|
||
Returns (amplitude, phase).
|
||
"""
|
||
ch1_v = ref_ch1 * TTY_SCALE
|
||
ch2_v = ref_ch2 * TTY_SCALE
|
||
|
||
amplitude = np.sqrt(ch1_v ** 2 + ch2_v ** 2)
|
||
phase = np.unwrap(np.arctan2(ch2_v, ch1_v))
|
||
|
||
return amplitude, phase
|
||
|
||
|
||
def process_main(main_ch1, main_ch2, ref_amplitude, ref_phase_aligned):
|
||
"""Normalize main channel by reference.
|
||
|
||
Returns (main_amp, ref_amp, norm_ch1, norm_ch2, amp_norm, phase_norm, fft_mag, fft_dist).
|
||
"""
|
||
ch1_v = main_ch1 * TTY_SCALE
|
||
ch2_v = main_ch2 * TTY_SCALE
|
||
z_main = ch1_v + 1j * ch2_v
|
||
main_amp = np.abs(z_main)
|
||
|
||
# Normalize by ref amplitude and subtract ref phase of first point
|
||
ref_amp_safe = np.where(ref_amplitude > 1e-12, ref_amplitude, 1e-12)
|
||
z_norm = (ch1_v / ref_amp_safe + 1j * ch2_v / ref_amp_safe) * np.exp(-1j * ref_phase_aligned)
|
||
|
||
norm_ch1 = np.real(z_norm)
|
||
norm_ch2 = np.imag(z_norm)
|
||
|
||
amp_norm = np.abs(z_norm)
|
||
phase_norm = np.unwrap(np.angle(z_norm))
|
||
|
||
# FFT → distance domain
|
||
n = len(z_norm)
|
||
window = np.hanning(n)
|
||
spectrum = np.fft.fft(z_norm * window, n=FFT_LEN)
|
||
fft_mag = np.abs(spectrum[: FFT_LEN // 2])
|
||
|
||
df_hz = BW_HZ / max(1, n - 1)
|
||
dist_step = C_M_S / (2.0 * FFT_LEN * df_hz)
|
||
fft_dist = np.arange(FFT_LEN // 2) * dist_step
|
||
|
||
return main_amp, ref_amplitude, norm_ch1, norm_ch2, amp_norm, phase_norm, fft_mag, fft_dist
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# GUI
|
||
# ---------------------------------------------------------------------------
|
||
def build_gui():
|
||
app = QtWidgets.QApplication.instance() or QtWidgets.QApplication(sys.argv)
|
||
|
||
win = pg.GraphicsLayoutWidget(show=True, title="RFG VNA Viewer")
|
||
win.resize(1200, 800)
|
||
|
||
# Row 0: raw amplitudes of both channels
|
||
p_raw = win.addPlot(row=0, col=0, title="Амплитуды каналов")
|
||
p_raw.showGrid(x=True, y=True, alpha=0.3)
|
||
p_raw.setLabel("bottom", "Частота", units="ГГц")
|
||
p_raw.setLabel("left", "Амплитуда", units="В")
|
||
p_raw.addLegend(offset=(10, 10))
|
||
c_main_amp = p_raw.plot(pen=pg.mkPen((80, 120, 255), width=1), name="Main (0a00)")
|
||
c_ref_amp = p_raw.plot(pen=pg.mkPen((255, 80, 80), width=1), name="Ref (a800)")
|
||
|
||
# Row 1: normalized CH1 / CH2 (Re/Im of main/ref)
|
||
p_ch = win.addPlot(row=1, col=0, title="Main нормированный: CH1 (Re), CH2 (Im)")
|
||
p_ch.showGrid(x=True, y=True, alpha=0.3)
|
||
p_ch.setLabel("bottom", "Частота", units="ГГц")
|
||
p_ch.setLabel("left", "Значение")
|
||
p_ch.setXLink(p_raw)
|
||
p_ch.addLegend(offset=(10, 10))
|
||
c_norm_ch1 = p_ch.plot(pen=pg.mkPen((80, 120, 255), width=1), name="CH1 (Re)")
|
||
c_norm_ch2 = p_ch.plot(pen=pg.mkPen((255, 80, 80), width=1), name="CH2 (Im)")
|
||
|
||
# Row 2: normalized amplitude
|
||
p_norm = win.addPlot(row=2, col=0, title="Нормированная амплитуда |S|")
|
||
p_norm.showGrid(x=True, y=True, alpha=0.3)
|
||
p_norm.setLabel("bottom", "Частота", units="ГГц")
|
||
p_norm.setLabel("left", "Амплитуда")
|
||
p_norm.setXLink(p_raw)
|
||
c_norm_amp = p_norm.plot(pen=pg.mkPen((80, 120, 255), width=1))
|
||
|
||
# Row 3: normalized phase + linear fit + deviation (right axis)
|
||
p_ph = win.addPlot(row=3, col=0, title="Нормированная фаза arg(S)")
|
||
p_ph.showGrid(x=True, y=True, alpha=0.3)
|
||
p_ph.setLabel("bottom", "Частота", units="ГГц")
|
||
p_ph.setLabel("left", "Фаза", units="рад")
|
||
p_ph.setXLink(p_raw)
|
||
p_ph.addLegend(offset=(10, 10))
|
||
c_ph = p_ph.plot(pen=pg.mkPen((230, 180, 40), width=1), name="Фаза")
|
||
c_ph_line = p_ph.plot(pen=pg.mkPen((180, 180, 180), width=1, style=QtCore.Qt.DashLine), name="Лин. прибл.")
|
||
|
||
# Secondary Y axis (right) for deviation, fixed -1..1
|
||
vb_dev = pg.ViewBox()
|
||
p_ph.showAxis("right")
|
||
p_ph.scene().addItem(vb_dev)
|
||
p_ph.getAxis("right").linkToView(vb_dev)
|
||
vb_dev.setXLink(p_ph)
|
||
p_ph.setLabel("right", "Отклонение", units="рад")
|
||
p_ph.getAxis("right").setPen(pg.mkPen((255, 100, 200)))
|
||
c_ph_dev = pg.PlotCurveItem(pen=pg.mkPen((255, 100, 200), width=1), name="Отклонение")
|
||
vb_dev.addItem(c_ph_dev)
|
||
vb_dev.setYRange(-3, 3)
|
||
vb_dev.enableAutoRange(axis=pg.ViewBox.YAxis, enable=False)
|
||
|
||
# Keep secondary ViewBox geometry in sync
|
||
def sync_views():
|
||
vb_dev.setGeometry(p_ph.vb.sceneBoundingRect())
|
||
vb_dev.linkedViewChanged(p_ph.vb, vb_dev.XAxis)
|
||
p_ph.vb.sigResized.connect(sync_views)
|
||
|
||
# Row 4: FFT distance
|
||
p_fft = win.addPlot(row=4, col=0, title="FFT — расстояние")
|
||
p_fft.showGrid(x=True, y=True, alpha=0.3)
|
||
p_fft.setLabel("bottom", "Расстояние", units="м")
|
||
p_fft.setLabel("left", "Магнитуда", units="дБ")
|
||
c_fft = p_fft.plot(pen=pg.mkPen((60, 200, 80), width=1))
|
||
|
||
plots = [p_raw, p_ch, p_norm, p_ph, p_fft]
|
||
curves = (c_main_amp, c_ref_amp, c_norm_ch1, c_norm_ch2, c_norm_amp, c_ph, c_ph_line, c_ph_dev, c_fft)
|
||
return app, win, curves, plots
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Update loop
|
||
# ---------------------------------------------------------------------------
|
||
def make_update(reader, parser, assembler, curves, plots):
|
||
c_main_amp, c_ref_amp, c_norm_ch1, c_norm_ch2, c_norm_amp, c_ph, c_ph_line, c_ph_dev, c_fft = curves
|
||
state = {"ref_phase_first": None, "axes_locked": False}
|
||
queue = deque(maxlen=64)
|
||
|
||
def update():
|
||
# Read → parse → assemble → queue
|
||
data = reader.read_available()
|
||
if data:
|
||
events = parser.feed(data)
|
||
for ev in events:
|
||
packet = assembler.consume(ev)
|
||
if packet is not None:
|
||
sw = extract_sweep(packet)
|
||
if sw is not None:
|
||
queue.append(sw)
|
||
|
||
# Draw exactly one sweep per tick
|
||
if not queue:
|
||
return
|
||
sweep = queue.popleft()
|
||
n = sweep["num_points"]
|
||
print(f"[VNA] queue={len(queue)} points={n} "
|
||
f"main=[{sweep['main_ch1'].min():.0f}..{sweep['main_ch1'].max():.0f}] "
|
||
f"ref=[{sweep['ref_ch1'].min():.0f}..{sweep['ref_ch1'].max():.0f}]")
|
||
if n < 2:
|
||
return
|
||
|
||
freqs_ghz = np.linspace(F_START_HZ / 1e9, F_STOP_HZ / 1e9, n)
|
||
freqs_hz = freqs_ghz * 1e9
|
||
|
||
ref_amp, ref_phase = process_reference(
|
||
sweep["ref_ch1"], sweep["ref_ch2"]
|
||
)
|
||
|
||
# Fix reference phase from the first sweep
|
||
if state["ref_phase_first"] is None:
|
||
state["ref_phase_first"] = ref_phase[0]
|
||
|
||
main_amp, ref_amplitude, norm_ch1, norm_ch2, norm_amp, phase, fft_mag, fft_dist = process_main(
|
||
sweep["main_ch1"], sweep["main_ch2"], ref_amp, state["ref_phase_first"]
|
||
)
|
||
|
||
c_main_amp.setData(freqs_ghz, main_amp)
|
||
c_ref_amp.setData(freqs_ghz, ref_amplitude)
|
||
c_norm_ch1.setData(freqs_ghz, norm_ch1)
|
||
c_norm_ch2.setData(freqs_ghz, norm_ch2)
|
||
c_norm_amp.setData(freqs_ghz, norm_amp)
|
||
c_ph.setData(freqs_ghz, phase)
|
||
|
||
# Linear fit (line through first and last point) + deviation
|
||
line = np.linspace(phase[0], phase[-1], len(phase))
|
||
deviation = phase - line
|
||
c_ph_line.setData(freqs_ghz, line)
|
||
c_ph_dev.setData(freqs_ghz, deviation)
|
||
|
||
fft_db = 20.0 * np.log10(fft_mag + 1e-12)
|
||
c_fft.setData(fft_dist, fft_db)
|
||
|
||
# Lock axes after first sweep — compute ranges from data, then freeze
|
||
if not state["axes_locked"]:
|
||
state["axes_locked"] = True
|
||
p_raw, p_ch, p_norm, p_ph, p_fft = plots
|
||
|
||
fx0, fx1 = freqs_ghz[0], freqs_ghz[-1]
|
||
|
||
# Row 0: raw amplitudes
|
||
y_all = np.concatenate([main_amp, ref_amplitude])
|
||
p_raw.disableAutoRange()
|
||
p_raw.setXRange(fx0, fx1, padding=0)
|
||
p_raw.setYRange(float(y_all.min()), float(y_all.max()), padding=0.05)
|
||
|
||
# Row 1: normalized ch1/ch2
|
||
y_all = np.concatenate([norm_ch1, norm_ch2])
|
||
p_ch.disableAutoRange()
|
||
p_ch.setXRange(fx0, fx1, padding=0)
|
||
p_ch.setYRange(float(y_all.min()), float(y_all.max()), padding=0.05)
|
||
|
||
# Row 2: normalized amplitude
|
||
p_norm.disableAutoRange()
|
||
p_norm.setXRange(fx0, fx1, padding=0)
|
||
p_norm.setYRange(float(norm_amp.min()), float(norm_amp.max()), padding=0.05)
|
||
|
||
# Row 3: phase + line (deviation is on separate right axis, fixed -1..1)
|
||
y_all = np.concatenate([phase, line])
|
||
p_ph.disableAutoRange()
|
||
p_ph.setXRange(fx0, fx1, padding=0)
|
||
p_ph.setYRange(float(y_all.min()), float(y_all.max()), padding=0.05)
|
||
|
||
# Row 4: FFT
|
||
p_fft.disableAutoRange()
|
||
p_fft.setXRange(float(fft_dist[0]), float(fft_dist[-1]), padding=0)
|
||
p_fft.setYRange(float(fft_db.min()), float(fft_db.max()), padding=0.05)
|
||
|
||
return update
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Main
|
||
# ---------------------------------------------------------------------------
|
||
def main():
|
||
path = sys.argv[1] if len(sys.argv) > 1 else DATA_PATH
|
||
reader = DataReader(path)
|
||
parser = LegacyBinaryParser(batch_events=True)
|
||
assembler = SweepAssembler(fancy=False, apply_inversion=False)
|
||
app, win, curves, plots = build_gui()
|
||
|
||
update = make_update(reader, parser, assembler, curves, plots)
|
||
timer = QtCore.QTimer()
|
||
timer.timeout.connect(update)
|
||
timer.start(TIMER_MS)
|
||
|
||
# Allow Ctrl+C to work inside Qt event loop
|
||
signal.signal(signal.SIGINT, lambda *_: app.quit())
|
||
kick = QtCore.QTimer()
|
||
kick.start(200)
|
||
kick.timeout.connect(lambda: None)
|
||
|
||
try:
|
||
sys.exit(app.exec_())
|
||
finally:
|
||
reader.close()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|