Files
RFG_stm32_ADC_receiver_GUI/rfg_vna_viewer.py
2026-06-09 18:56:40 +03:00

362 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""Standalone VNA-style viewer for dual-channel ADC data.
Reads raw binary from /tmp/ttyADC_data, parses 8-byte records with
markers 0x000A (main) and 0x00A8 (reference), performs complex
normalization and plots amplitude, phase, and FFT in real time.
Uses the project's existing LegacyBinaryParser + SweepAssembler for
reliable sweep boundary detection.
"""
import os
import signal
import sys
from collections import deque
import numpy as np
import pyqtgraph as pg
from pyqtgraph.Qt import QtCore, QtWidgets
from rfg_adc_plotter.io.sweep_parser_core import LegacyBinaryParser, SweepAssembler
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
F_START_HZ = 2.046e9
F_STOP_HZ = 5.612e9
BW_HZ = F_STOP_HZ - F_START_HZ # 3.566 GHz
TTY_SCALE = 5.0 / 32767.0 # int16 code → volts (±5 V ADC range)
C_M_S = 299_792_458.0
FFT_LEN = 2048
TIMER_MS = 50 # GUI refresh period
READ_CHUNK = 65536
MIN_SWEEP_POINTS = 400
DATA_PATH = "/tmp/ttyADC_data"
# ---------------------------------------------------------------------------
# DataReader
# ---------------------------------------------------------------------------
class DataReader:
"""Non-blocking raw reader for /tmp/ttyADC_data (PTY-safe)."""
def __init__(self, path=DATA_PATH):
import termios, tty
self._fd = os.open(path, os.O_RDONLY | os.O_NOCTTY)
try:
tty.setraw(self._fd) # disable line discipline processing
except termios.error:
pass # not a TTY — fine, skip
os.set_blocking(self._fd, False)
def read_available(self):
chunks = []
while True:
try:
data = os.read(self._fd, READ_CHUNK)
if not data:
break
chunks.append(data)
except BlockingIOError:
break
return b"".join(chunks)
def close(self):
os.close(self._fd)
# ---------------------------------------------------------------------------
# Sweep extraction helper
# ---------------------------------------------------------------------------
def extract_sweep(packet):
"""Extract main/ref ch1/ch2 arrays from a SweepPacket.
Returns dict with main_ch1, main_ch2, ref_ch1, ref_ch2, num_points
or None if data is incomplete.
"""
sweep_arr, info, aux = packet
# Main channel ch1/ch2 from aux
if aux is None:
return None
main_ch1 = np.asarray(aux[0], dtype=np.float64)
main_ch2 = np.asarray(aux[1], dtype=np.float64)
# Secondary (ref) channel from info payload
sec = info.get("_secondary_payload")
if sec is None:
return None
ref_ch1 = np.asarray(sec["ch1"], dtype=np.float64)
ref_ch2 = np.asarray(sec["ch2"], dtype=np.float64)
# Keep only points present in both channels
valid = np.isfinite(main_ch1) & np.isfinite(ref_ch1)
nvalid = int(valid.sum())
if nvalid < MIN_SWEEP_POINTS:
return None
return {
"main_ch1": main_ch1[valid],
"main_ch2": main_ch2[valid],
"ref_ch1": ref_ch1[valid],
"ref_ch2": ref_ch2[valid],
"num_points": nvalid,
}
# ---------------------------------------------------------------------------
# Signal processing
# ---------------------------------------------------------------------------
def process_reference(ref_ch1, ref_ch2):
"""Process reference channel: amplitude and phase.
Returns (amplitude, phase).
"""
ch1_v = ref_ch1 * TTY_SCALE
ch2_v = ref_ch2 * TTY_SCALE
amplitude = np.sqrt(ch1_v ** 2 + ch2_v ** 2)
phase = np.unwrap(np.arctan2(ch2_v, ch1_v))
return amplitude, phase
def process_main(main_ch1, main_ch2, ref_amplitude, ref_phase_aligned):
"""Normalize main channel by reference.
Returns (main_amp, ref_amp, norm_ch1, norm_ch2, amp_norm, phase_norm, fft_mag, fft_dist).
"""
ch1_v = main_ch1 * TTY_SCALE
ch2_v = main_ch2 * TTY_SCALE
z_main = ch1_v + 1j * ch2_v
main_amp = np.abs(z_main)
# Normalize by ref amplitude and subtract ref phase
ref_amp_safe = np.where(ref_amplitude > 1e-12, ref_amplitude, 1e-12)
z_norm = (ch1_v / ref_amp_safe + 1j * ch2_v / ref_amp_safe) * np.exp(-1j * ref_phase_aligned)
norm_ch1 = np.real(z_norm)
norm_ch2 = np.imag(z_norm)
amp_norm = np.abs(z_norm)
phase_norm = np.unwrap(np.angle(z_norm))
# FFT → distance domain
n = len(z_norm)
window = np.hanning(n)
spectrum = np.fft.fft(z_norm * window, n=FFT_LEN)
fft_mag = np.abs(spectrum[: FFT_LEN // 2])
df_hz = BW_HZ / max(1, n - 1)
dist_step = C_M_S / (2.0 * FFT_LEN * df_hz)
fft_dist = np.arange(FFT_LEN // 2) * dist_step
return main_amp, ref_amplitude, norm_ch1, norm_ch2, amp_norm, phase_norm, fft_mag, fft_dist
# ---------------------------------------------------------------------------
# GUI
# ---------------------------------------------------------------------------
def build_gui():
app = QtWidgets.QApplication.instance() or QtWidgets.QApplication(sys.argv)
win = pg.GraphicsLayoutWidget(show=True, title="RFG VNA Viewer")
win.resize(1200, 800)
# Row 0: raw amplitudes of both channels
p_raw = win.addPlot(row=0, col=0, title="Амплитуды каналов")
p_raw.showGrid(x=True, y=True, alpha=0.3)
p_raw.setLabel("bottom", "Частота", units="ГГц")
p_raw.setLabel("left", "Амплитуда", units="В")
p_raw.addLegend(offset=(10, 10))
c_main_amp = p_raw.plot(pen=pg.mkPen((80, 120, 255), width=1), name="Main (0a00)")
c_ref_amp = p_raw.plot(pen=pg.mkPen((255, 80, 80), width=1), name="Ref (a800)")
# Row 1: normalized CH1 / CH2 (Re/Im of main/ref)
p_ch = win.addPlot(row=1, col=0, title="Main нормированный: CH1 (Re), CH2 (Im)")
p_ch.showGrid(x=True, y=True, alpha=0.3)
p_ch.setLabel("bottom", "Частота", units="ГГц")
p_ch.setLabel("left", "Значение")
p_ch.setXLink(p_raw)
p_ch.addLegend(offset=(10, 10))
c_norm_ch1 = p_ch.plot(pen=pg.mkPen((80, 120, 255), width=1), name="CH1 (Re)")
c_norm_ch2 = p_ch.plot(pen=pg.mkPen((255, 80, 80), width=1), name="CH2 (Im)")
# Row 2: normalized amplitude
p_norm = win.addPlot(row=2, col=0, title="Нормированная амплитуда |S|")
p_norm.showGrid(x=True, y=True, alpha=0.3)
p_norm.setLabel("bottom", "Частота", units="ГГц")
p_norm.setLabel("left", "Амплитуда")
p_norm.setXLink(p_raw)
c_norm_amp = p_norm.plot(pen=pg.mkPen((80, 120, 255), width=1))
# Row 3: normalized phase + linear fit + deviation (right axis)
p_ph = win.addPlot(row=3, col=0, title="Нормированная фаза arg(S)")
p_ph.showGrid(x=True, y=True, alpha=0.3)
p_ph.setLabel("bottom", "Частота", units="ГГц")
p_ph.setLabel("left", "Фаза", units="рад")
p_ph.setXLink(p_raw)
p_ph.addLegend(offset=(10, 10))
c_ph = p_ph.plot(pen=pg.mkPen((230, 180, 40), width=1), name="Фаза")
c_ph_line = p_ph.plot(pen=pg.mkPen((180, 180, 180), width=1, style=QtCore.Qt.DashLine), name="Лин. прибл.")
# Secondary Y axis (right) for deviation, fixed -1..1
vb_dev = pg.ViewBox()
p_ph.showAxis("right")
p_ph.scene().addItem(vb_dev)
p_ph.getAxis("right").linkToView(vb_dev)
vb_dev.setXLink(p_ph)
p_ph.setLabel("right", "Отклонение", units="рад")
p_ph.getAxis("right").setPen(pg.mkPen((255, 100, 200)))
c_ph_dev = pg.PlotCurveItem(pen=pg.mkPen((255, 100, 200), width=1), name="Отклонение")
vb_dev.addItem(c_ph_dev)
vb_dev.setYRange(-3, 3)
vb_dev.enableAutoRange(axis=pg.ViewBox.YAxis, enable=False)
# Keep secondary ViewBox geometry in sync
def sync_views():
vb_dev.setGeometry(p_ph.vb.sceneBoundingRect())
vb_dev.linkedViewChanged(p_ph.vb, vb_dev.XAxis)
p_ph.vb.sigResized.connect(sync_views)
# Row 4: FFT distance
p_fft = win.addPlot(row=4, col=0, title="FFT — расстояние")
p_fft.showGrid(x=True, y=True, alpha=0.3)
p_fft.setLabel("bottom", "Расстояние", units="м")
p_fft.setLabel("left", "Магнитуда", units="дБ")
c_fft = p_fft.plot(pen=pg.mkPen((60, 200, 80), width=1))
plots = [p_raw, p_ch, p_norm, p_ph, p_fft]
curves = (c_main_amp, c_ref_amp, c_norm_ch1, c_norm_ch2, c_norm_amp, c_ph, c_ph_line, c_ph_dev, c_fft)
return app, win, curves, plots
# ---------------------------------------------------------------------------
# Update loop
# ---------------------------------------------------------------------------
def make_update(reader, parser, assembler, curves, plots):
c_main_amp, c_ref_amp, c_norm_ch1, c_norm_ch2, c_norm_amp, c_ph, c_ph_line, c_ph_dev, c_fft = curves
state = {"axes_locked": False}
queue = deque(maxlen=64)
def update():
# Read → parse → assemble → queue
data = reader.read_available()
if data:
events = parser.feed(data)
for ev in events:
packet = assembler.consume(ev)
if packet is not None:
sw = extract_sweep(packet)
if sw is not None:
queue.append(sw)
# Draw exactly one sweep per tick
if not queue:
return
sweep = queue.popleft()
n = sweep["num_points"]
print(f"[VNA] queue={len(queue)} points={n} "
f"main=[{sweep['main_ch1'].min():.0f}..{sweep['main_ch1'].max():.0f}] "
f"ref=[{sweep['ref_ch1'].min():.0f}..{sweep['ref_ch1'].max():.0f}]")
if n < 2:
return
freqs_ghz = np.linspace(F_START_HZ / 1e9, F_STOP_HZ / 1e9, n)
freqs_hz = freqs_ghz * 1e9
ref_amp, ref_phase = process_reference(
sweep["ref_ch1"], sweep["ref_ch2"]
)
main_amp, ref_amplitude, norm_ch1, norm_ch2, norm_amp, phase, fft_mag, fft_dist = process_main(
sweep["main_ch1"], sweep["main_ch2"], ref_amp, ref_phase
)
c_main_amp.setData(freqs_ghz, main_amp)
c_ref_amp.setData(freqs_ghz, ref_amplitude)
c_norm_ch1.setData(freqs_ghz, norm_ch1)
c_norm_ch2.setData(freqs_ghz, norm_ch2)
c_norm_amp.setData(freqs_ghz, norm_amp)
c_ph.setData(freqs_ghz, phase)
# Linear fit (line through first and last point) + deviation
line = np.linspace(phase[0], phase[-1], len(phase))
deviation = phase - line
c_ph_line.setData(freqs_ghz, line)
c_ph_dev.setData(freqs_ghz, deviation)
fft_db = 20.0 * np.log10(fft_mag + 1e-12)
c_fft.setData(fft_dist, fft_db)
# Lock axes after first sweep — compute ranges from data, then freeze
if not state["axes_locked"]:
state["axes_locked"] = True
p_raw, p_ch, p_norm, p_ph, p_fft = plots
fx0, fx1 = freqs_ghz[0], freqs_ghz[-1]
# Row 0: raw amplitudes
y_all = np.concatenate([main_amp, ref_amplitude])
p_raw.disableAutoRange()
p_raw.setXRange(fx0, fx1, padding=0)
p_raw.setYRange(float(y_all.min()), float(y_all.max()), padding=0.05)
# Row 1: normalized ch1/ch2
y_all = np.concatenate([norm_ch1, norm_ch2])
p_ch.disableAutoRange()
p_ch.setXRange(fx0, fx1, padding=0)
p_ch.setYRange(float(y_all.min()), float(y_all.max()), padding=0.05)
# Row 2: normalized amplitude
p_norm.disableAutoRange()
p_norm.setXRange(fx0, fx1, padding=0)
p_norm.setYRange(float(norm_amp.min()), float(norm_amp.max()), padding=0.05)
# Row 3: phase + line (deviation is on separate right axis, fixed -1..1)
y_all = np.concatenate([phase, line])
p_ph.disableAutoRange()
p_ph.setXRange(fx0, fx1, padding=0)
p_ph.setYRange(float(y_all.min()), float(y_all.max()), padding=0.05)
# Row 4: FFT
p_fft.disableAutoRange()
p_fft.setXRange(float(fft_dist[0]), float(fft_dist[-1]), padding=0)
p_fft.setYRange(float(fft_db.min()), float(fft_db.max()), padding=0.05)
return update
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
path = sys.argv[1] if len(sys.argv) > 1 else DATA_PATH
reader = DataReader(path)
parser = LegacyBinaryParser(batch_events=True)
assembler = SweepAssembler(fancy=False, apply_inversion=False)
app, win, curves, plots = build_gui()
update = make_update(reader, parser, assembler, curves, plots)
timer = QtCore.QTimer()
timer.timeout.connect(update)
timer.start(TIMER_MS)
# Allow Ctrl+C to work inside Qt event loop
signal.signal(signal.SIGINT, lambda *_: app.quit())
kick = QtCore.QTimer()
kick.start(200)
kick.timeout.connect(lambda: None)
try:
sys.exit(app.exec_())
finally:
reader.close()
if __name__ == "__main__":
main()