Files
RFG_stm32_ADC_receiver_GUI/rfg_vna_viewer.py
2026-06-09 17:53:04 +03:00

362 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""Standalone VNA-style viewer for dual-channel ADC data.
Reads raw binary from /tmp/ttyADC_data, parses 8-byte records with
markers 0x000A (main) and 0x00A8 (reference), performs complex
normalization and plots amplitude, phase, and FFT in real time.
"""
import os
import signal
import sys
from collections import deque
import numpy as np
import pyqtgraph as pg
from pyqtgraph.Qt import QtCore, QtWidgets
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
F_START_HZ = 2.046e9
F_STOP_HZ = 5.612e9
BW_HZ = F_STOP_HZ - F_START_HZ # 3.566 GHz
TTY_SCALE = 5.0 / 32767.0 # int16 code → volts (±5 V ADC range)
C_M_S = 299_792_458.0
FFT_LEN = 2048
TIMER_MS = 50 # GUI refresh period
READ_CHUNK = 65536
MARKER_MAIN = 0x000A
MARKER_REF = 0x00A8
MAX_SWEEP_POINTS = 4096 # safety limit — force finalize if dict grows past this
MIN_SWEEP_POINTS = 400 # discard fragments shorter than this
DATA_PATH = "/tmp/ttyADC_data"
def _u16(buf, off):
"""Little-endian uint16 from buffer."""
return buf[off] | (buf[off + 1] << 8)
def _i16(v):
"""Unsigned uint16 → signed int16."""
return v - 0x10000 if (v & 0x8000) else v
# ---------------------------------------------------------------------------
# DataReader
# ---------------------------------------------------------------------------
class DataReader:
"""Non-blocking raw reader for /tmp/ttyADC_data."""
def __init__(self, path=DATA_PATH):
self._fd = os.open(path, os.O_RDONLY | os.O_NOCTTY)
os.set_blocking(self._fd, False)
def read_available(self):
try:
return os.read(self._fd, READ_CHUNK)
except BlockingIOError:
return b""
def close(self):
os.close(self._fd)
# ---------------------------------------------------------------------------
# SweepAccumulator (inline parser)
# ---------------------------------------------------------------------------
class SweepAccumulator:
"""Parse 8-byte binary records and assemble main/ref sweep pairs."""
def __init__(self):
self._buf = bytearray()
self._main = {} # step → (ch1_i16, ch2_i16)
self._ref = {}
def feed(self, data):
"""Feed raw bytes. Return list of completed sweep dicts."""
if data:
self._buf += data
sweeps = []
buf = self._buf
while len(buf) >= 8:
w0 = _u16(buf, 0)
w1 = _u16(buf, 2)
w2 = _u16(buf, 4)
w3 = _u16(buf, 6)
# Start marker: new sweep begins
if w0 == MARKER_MAIN and w1 == 0xFFFF and w2 == 0xFFFF and w3 == 0xFFFF:
sw = self._finalize()
if sw is not None:
sweeps.append(sw)
del buf[:8]
continue
# Main channel data point
if w0 == MARKER_MAIN and w1 != 0xFFFF:
self._main[w1] = (_i16(w2), _i16(w3))
# Safety: force finalize if accumulator grew too large
if len(self._main) >= MAX_SWEEP_POINTS:
sw = self._finalize()
if sw is not None:
sweeps.append(sw)
del buf[:8]
continue
# Reference channel data point
if w0 == MARKER_REF and w1 != 0xFFFF:
self._ref[w1] = (_i16(w2), _i16(w3))
del buf[:8]
continue
# Unrecognized — advance 1 byte to resync
del buf[:1]
return sweeps
def _finalize(self):
"""Build numpy arrays from accumulated points and reset."""
if not self._main or not self._ref:
self._main.clear()
self._ref.clear()
self._last_main_step = -1
return None
all_steps = set(self._main) | set(self._ref)
n = max(all_steps) + 1
if n < 2:
self._main.clear()
self._ref.clear()
self._last_main_step = -1
return None
main_ch1 = np.full(n, np.nan)
main_ch2 = np.full(n, np.nan)
ref_ch1 = np.full(n, np.nan)
ref_ch2 = np.full(n, np.nan)
for s, (c1, c2) in self._main.items():
if s < n:
main_ch1[s] = c1
main_ch2[s] = c2
for s, (c1, c2) in self._ref.items():
if s < n:
ref_ch1[s] = c1
ref_ch2[s] = c2
self._main.clear()
self._ref.clear()
self._last_main_step = -1
# Keep only points present in both channels
valid = np.isfinite(main_ch1) & np.isfinite(ref_ch1)
nvalid = int(valid.sum())
if nvalid < MIN_SWEEP_POINTS:
return None
return {
"main_ch1": main_ch1[valid],
"main_ch2": main_ch2[valid],
"ref_ch1": ref_ch1[valid],
"ref_ch2": ref_ch2[valid],
"num_points": nvalid,
}
# ---------------------------------------------------------------------------
# Signal processing
# ---------------------------------------------------------------------------
def process_reference(ref_ch1, ref_ch2, ref_phase_first, freqs_hz):
"""Process reference channel: amplitude, phase with linear alignment.
Returns (amplitude, aligned_phase, ref_phase_first).
"""
ch1_v = ref_ch1 * TTY_SCALE
ch2_v = ref_ch2 * TTY_SCALE
amplitude = np.sqrt(ch1_v ** 2 + ch2_v ** 2)
phase = np.unwrap(np.arctan2(ch2_v, ch1_v))
if ref_phase_first is None:
return amplitude, phase, phase.copy()
# Linear correction: Δφ at first point, scaled by f/f₁
delta_phi = phase[0] - ref_phase_first[0]
correction = delta_phi * (freqs_hz / freqs_hz[0])
aligned = phase - correction
return amplitude, aligned, ref_phase_first
def process_main(main_ch1, main_ch2, ref_amplitude, ref_phase_aligned):
"""Normalize main channel by reference.
Returns (main_amp, ref_amp, amp_norm, phase_norm, fft_mag, fft_dist).
"""
ch1_v = main_ch1 * TTY_SCALE
ch2_v = main_ch2 * TTY_SCALE
z_main = ch1_v + 1j * ch2_v
main_amp = np.abs(z_main)
# Normalize ch1/ch2 individually by ref amplitude
ref_amp_safe = np.where(ref_amplitude > 1e-12, ref_amplitude, 1e-12)
norm_ch1 = ch1_v / ref_amp_safe
norm_ch2 = ch2_v / ref_amp_safe
# Build normalized complex signal and subtract ref phase
z_norm = (norm_ch1 + 1j * norm_ch2) * np.exp(-1j * ref_phase_aligned)
amp_norm = np.abs(z_norm)
phase_norm = np.unwrap(np.angle(z_norm))
# FFT → distance domain
n = len(z_norm)
window = np.hanning(n)
spectrum = np.fft.fft(z_norm * window, n=FFT_LEN)
fft_mag = np.abs(spectrum[: FFT_LEN // 2])
df_hz = BW_HZ / max(1, n - 1)
dist_step = C_M_S / (2.0 * FFT_LEN * df_hz)
fft_dist = np.arange(FFT_LEN // 2) * dist_step
return main_amp, ref_amplitude, norm_ch1, norm_ch2, amp_norm, phase_norm, fft_mag, fft_dist
# ---------------------------------------------------------------------------
# GUI
# ---------------------------------------------------------------------------
def build_gui():
app = QtWidgets.QApplication.instance() or QtWidgets.QApplication(sys.argv)
win = pg.GraphicsLayoutWidget(show=True, title="RFG VNA Viewer")
win.resize(1200, 800)
# Row 0: raw amplitudes of both channels
p_raw = win.addPlot(row=0, col=0, title="Амплитуды каналов")
p_raw.showGrid(x=True, y=True, alpha=0.3)
p_raw.setLabel("bottom", "Частота", units="ГГц")
p_raw.setLabel("left", "Амплитуда", units="В")
p_raw.addLegend(offset=(10, 10))
c_main_amp = p_raw.plot(pen=pg.mkPen((80, 120, 255), width=1), name="Main (0a00)")
c_ref_amp = p_raw.plot(pen=pg.mkPen((255, 80, 80), width=1), name="Ref (a800)")
# Row 1: normalized CH1 / CH2 (Re/Im of main/ref)
p_ch = win.addPlot(row=1, col=0, title="Main нормированный: CH1 (Re), CH2 (Im)")
p_ch.showGrid(x=True, y=True, alpha=0.3)
p_ch.setLabel("bottom", "Частота", units="ГГц")
p_ch.setLabel("left", "Значение")
p_ch.setXLink(p_raw)
p_ch.addLegend(offset=(10, 10))
c_norm_ch1 = p_ch.plot(pen=pg.mkPen((80, 120, 255), width=1), name="CH1 (Re)")
c_norm_ch2 = p_ch.plot(pen=pg.mkPen((255, 80, 80), width=1), name="CH2 (Im)")
# Row 2: normalized amplitude
p_norm = win.addPlot(row=2, col=0, title="Нормированная амплитуда |S|")
p_norm.showGrid(x=True, y=True, alpha=0.3)
p_norm.setLabel("bottom", "Частота", units="ГГц")
p_norm.setLabel("left", "Амплитуда")
p_norm.setXLink(p_raw)
c_norm_amp = p_norm.plot(pen=pg.mkPen((80, 120, 255), width=1))
# Row 3: normalized phase
p_ph = win.addPlot(row=3, col=0, title="Нормированная фаза arg(S)")
p_ph.showGrid(x=True, y=True, alpha=0.3)
p_ph.setLabel("bottom", "Частота", units="ГГц")
p_ph.setLabel("left", "Фаза", units="рад")
p_ph.setXLink(p_raw)
c_ph = p_ph.plot(pen=pg.mkPen((230, 180, 40), width=1))
# Row 4: FFT distance
p_fft = win.addPlot(row=4, col=0, title="FFT — расстояние")
p_fft.showGrid(x=True, y=True, alpha=0.3)
p_fft.setLabel("bottom", "Расстояние", units="м")
p_fft.setLabel("left", "Магнитуда", units="дБ")
c_fft = p_fft.plot(pen=pg.mkPen((60, 200, 80), width=1))
return app, win, (c_main_amp, c_ref_amp, c_norm_ch1, c_norm_ch2, c_norm_amp, c_ph, c_fft)
# ---------------------------------------------------------------------------
# Update loop
# ---------------------------------------------------------------------------
def make_update(reader, accumulator, curves):
c_main_amp, c_ref_amp, c_norm_ch1, c_norm_ch2, c_norm_amp, c_ph, c_fft = curves
state = {"ref_phase_first": None}
queue = deque(maxlen=64)
def update():
# Read and parse — push completed sweeps into queue
data = reader.read_available()
if data:
for sw in accumulator.feed(data):
queue.append(sw)
# Draw exactly one sweep per tick
if not queue:
return
sweep = queue.popleft()
n = sweep["num_points"]
print(f"[VNA] queue={len(queue)} points={n} "
f"main=[{sweep['main_ch1'].min():.0f}..{sweep['main_ch1'].max():.0f}] "
f"ref=[{sweep['ref_ch1'].min():.0f}..{sweep['ref_ch1'].max():.0f}]")
if n < 2:
return
freqs_ghz = np.linspace(F_START_HZ / 1e9, F_STOP_HZ / 1e9, n)
freqs_hz = freqs_ghz * 1e9
ref_amp, ref_phase, state["ref_phase_first"] = process_reference(
sweep["ref_ch1"], sweep["ref_ch2"], state["ref_phase_first"], freqs_hz
)
main_amp, ref_amplitude, norm_ch1, norm_ch2, norm_amp, phase, fft_mag, fft_dist = process_main(
sweep["main_ch1"], sweep["main_ch2"], ref_amp, ref_phase
)
c_main_amp.setData(freqs_ghz, main_amp)
c_ref_amp.setData(freqs_ghz, ref_amplitude)
c_norm_ch1.setData(freqs_ghz, norm_ch1)
c_norm_ch2.setData(freqs_ghz, norm_ch2)
c_norm_amp.setData(freqs_ghz, norm_amp)
c_ph.setData(freqs_ghz, phase)
fft_db = 20.0 * np.log10(fft_mag + 1e-12)
c_fft.setData(fft_dist, fft_db)
return update
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
path = sys.argv[1] if len(sys.argv) > 1 else DATA_PATH
reader = DataReader(path)
accumulator = SweepAccumulator()
app, win, curves = build_gui()
update = make_update(reader, accumulator, curves)
timer = QtCore.QTimer()
timer.timeout.connect(update)
timer.start(TIMER_MS)
# Allow Ctrl+C to work inside Qt event loop
signal.signal(signal.SIGINT, lambda *_: app.quit())
kick = QtCore.QTimer()
kick.start(200)
kick.timeout.connect(lambda: None)
try:
sys.exit(app.exec_())
finally:
reader.close()
if __name__ == "__main__":
main()