Files
RFG_stm32_ADC_receiver_GUI/rfg_vna_viewer.py
2026-06-09 16:16:05 +03:00

340 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""Standalone VNA-style viewer for dual-channel ADC data.
Reads raw binary from /tmp/ttyADC_data, parses 8-byte records with
markers 0x000A (main) and 0x00A8 (reference), performs complex
normalization and plots amplitude, phase, and FFT in real time.
"""
import os
import signal
import sys
import numpy as np
import pyqtgraph as pg
from pyqtgraph.Qt import QtCore, QtWidgets
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
F_START_HZ = 2.046e9
F_STOP_HZ = 5.612e9
BW_HZ = F_STOP_HZ - F_START_HZ # 3.566 GHz
TTY_SCALE = 5.0 / 32767.0 # int16 code → volts (±5 V ADC range)
C_M_S = 299_792_458.0
FFT_LEN = 2048
TIMER_MS = 50 # GUI refresh period
READ_CHUNK = 65536
MARKER_MAIN = 0x000A
MARKER_REF = 0x00A8
DATA_PATH = "/tmp/ttyADC_data"
def _u16(buf, off):
"""Little-endian uint16 from buffer."""
return buf[off] | (buf[off + 1] << 8)
def _i16(v):
"""Unsigned uint16 → signed int16."""
return v - 0x10000 if (v & 0x8000) else v
# ---------------------------------------------------------------------------
# DataReader
# ---------------------------------------------------------------------------
class DataReader:
"""Non-blocking raw reader for /tmp/ttyADC_data."""
def __init__(self, path=DATA_PATH):
self._fd = os.open(path, os.O_RDONLY | os.O_NOCTTY)
os.set_blocking(self._fd, False)
def read_available(self):
try:
return os.read(self._fd, READ_CHUNK)
except BlockingIOError:
return b""
def close(self):
os.close(self._fd)
# ---------------------------------------------------------------------------
# SweepAccumulator (inline parser)
# ---------------------------------------------------------------------------
class SweepAccumulator:
"""Parse 8-byte binary records and assemble main/ref sweep pairs."""
def __init__(self):
self._buf = bytearray()
self._main = {} # step → (ch1_i16, ch2_i16)
self._ref = {}
self._last_main_step = -1
def feed(self, data):
"""Feed raw bytes. Return list of completed sweep dicts."""
if data:
self._buf += data
sweeps = []
buf = self._buf
while len(buf) >= 8:
w0 = _u16(buf, 0)
w1 = _u16(buf, 2)
w2 = _u16(buf, 4)
w3 = _u16(buf, 6)
# Start marker: new sweep begins
if w0 == MARKER_MAIN and w1 == 0xFFFF and w2 == 0xFFFF and w3 == 0xFFFF:
sw = self._finalize()
if sw is not None:
sweeps.append(sw)
del buf[:8]
continue
# Main channel data point
if w0 == MARKER_MAIN and w1 != 0xFFFF:
step = w1
# Detect sweep wrap (step regression) without start marker
if self._last_main_step >= 0 and step < self._last_main_step - 10:
sw = self._finalize()
if sw is not None:
sweeps.append(sw)
self._main[step] = (_i16(w2), _i16(w3))
self._last_main_step = step
del buf[:8]
continue
# Reference channel data point
if w0 == MARKER_REF and w1 != 0xFFFF:
self._ref[w1] = (_i16(w2), _i16(w3))
del buf[:8]
continue
# Unrecognized — advance 1 byte to resync
del buf[:1]
return sweeps
def _finalize(self):
"""Build numpy arrays from accumulated points and reset."""
if not self._main or not self._ref:
self._main.clear()
self._ref.clear()
self._last_main_step = -1
return None
all_steps = set(self._main) | set(self._ref)
n = max(all_steps) + 1
if n < 2:
self._main.clear()
self._ref.clear()
self._last_main_step = -1
return None
main_ch1 = np.full(n, np.nan)
main_ch2 = np.full(n, np.nan)
ref_ch1 = np.full(n, np.nan)
ref_ch2 = np.full(n, np.nan)
for s, (c1, c2) in self._main.items():
if s < n:
main_ch1[s] = c1
main_ch2[s] = c2
for s, (c1, c2) in self._ref.items():
if s < n:
ref_ch1[s] = c1
ref_ch2[s] = c2
self._main.clear()
self._ref.clear()
self._last_main_step = -1
# Keep only points present in both channels
valid = np.isfinite(main_ch1) & np.isfinite(ref_ch1)
if valid.sum() < 2:
return None
return {
"main_ch1": main_ch1[valid],
"main_ch2": main_ch2[valid],
"ref_ch1": ref_ch1[valid],
"ref_ch2": ref_ch2[valid],
"num_points": int(valid.sum()),
}
# ---------------------------------------------------------------------------
# Signal processing
# ---------------------------------------------------------------------------
def process_reference(ref_ch1, ref_ch2, ref_phase_first, freqs_hz):
"""Process reference channel: amplitude, phase with linear alignment.
Returns (amplitude, aligned_phase, ref_phase_first).
"""
ch1_v = ref_ch1 * TTY_SCALE
ch2_v = ref_ch2 * TTY_SCALE
amplitude = np.sqrt(ch1_v ** 2 + ch2_v ** 2)
phase = np.unwrap(np.arctan2(ch2_v, ch1_v))
if ref_phase_first is None:
return amplitude, phase, phase.copy()
# Linear correction: Δφ at first point, scaled by f/f₁
delta_phi = phase[0] - ref_phase_first[0]
correction = delta_phi * (freqs_hz / freqs_hz[0])
aligned = phase - correction
return amplitude, aligned, ref_phase_first
def process_main(main_ch1, main_ch2, ref_amplitude, ref_phase_aligned):
"""Normalize main channel by reference.
Returns (main_amp, ref_amp, amp_norm, phase_norm, fft_mag, fft_dist).
"""
ch1_v = main_ch1 * TTY_SCALE
ch2_v = main_ch2 * TTY_SCALE
z_main = ch1_v + 1j * ch2_v
main_amp = np.abs(z_main)
z_ref = ref_amplitude * np.exp(1j * ref_phase_aligned)
z_ref_safe = np.where(np.abs(z_ref) > 1e-12, z_ref, 1e-12 + 0j)
z_norm = z_main / z_ref_safe
amp_norm = np.abs(z_norm)
phase_norm = np.unwrap(np.angle(z_norm))
# FFT → distance domain
n = len(z_norm)
window = np.hanning(n)
spectrum = np.fft.fft(z_norm * window, n=FFT_LEN)
fft_mag = np.abs(spectrum[: FFT_LEN // 2])
df_hz = BW_HZ / max(1, n - 1)
dist_step = C_M_S / (2.0 * FFT_LEN * df_hz)
fft_dist = np.arange(FFT_LEN // 2) * dist_step
return main_amp, ref_amplitude, amp_norm, phase_norm, fft_mag, fft_dist
# ---------------------------------------------------------------------------
# GUI
# ---------------------------------------------------------------------------
def build_gui():
app = QtWidgets.QApplication.instance() or QtWidgets.QApplication(sys.argv)
win = pg.GraphicsLayoutWidget(show=True, title="RFG VNA Viewer")
win.resize(1200, 800)
# Row 0: raw amplitudes of both channels
p_raw = win.addPlot(row=0, col=0, title="Амплитуды каналов")
p_raw.showGrid(x=True, y=True, alpha=0.3)
p_raw.setLabel("bottom", "Частота", units="ГГц")
p_raw.setLabel("left", "Амплитуда", units="В")
p_raw.addLegend(offset=(10, 10))
c_main_amp = p_raw.plot(pen=pg.mkPen((80, 120, 255), width=1), name="Main (0a00)")
c_ref_amp = p_raw.plot(pen=pg.mkPen((255, 80, 80), width=1), name="Ref (a800)")
# Row 1: normalized amplitude
p_norm = win.addPlot(row=1, col=0, title="Нормированная амплитуда |S|")
p_norm.showGrid(x=True, y=True, alpha=0.3)
p_norm.setLabel("bottom", "Частота", units="ГГц")
p_norm.setLabel("left", "Амплитуда")
p_norm.setXLink(p_raw)
c_norm_amp = p_norm.plot(pen=pg.mkPen((80, 120, 255), width=1))
# Row 2: normalized phase
p_ph = win.addPlot(row=2, col=0, title="Нормированная фаза arg(S)")
p_ph.showGrid(x=True, y=True, alpha=0.3)
p_ph.setLabel("bottom", "Частота", units="ГГц")
p_ph.setLabel("left", "Фаза", units="рад")
p_ph.setXLink(p_raw)
c_ph = p_ph.plot(pen=pg.mkPen((230, 180, 40), width=1))
# Row 3: FFT distance
p_fft = win.addPlot(row=3, col=0, title="FFT — расстояние")
p_fft.showGrid(x=True, y=True, alpha=0.3)
p_fft.setLabel("bottom", "Расстояние", units="м")
p_fft.setLabel("left", "Магнитуда", units="дБ")
c_fft = p_fft.plot(pen=pg.mkPen((60, 200, 80), width=1))
return app, win, (c_main_amp, c_ref_amp, c_norm_amp, c_ph, c_fft)
# ---------------------------------------------------------------------------
# Update loop
# ---------------------------------------------------------------------------
def make_update(reader, accumulator, curves):
c_main_amp, c_ref_amp, c_norm_amp, c_ph, c_fft = curves
state = {"ref_phase_first": None}
def update():
data = reader.read_available()
if not data:
return
sweeps = accumulator.feed(data)
if not sweeps:
return
sweep = sweeps[-1] # latest complete sweep
n = sweep["num_points"]
if n < 2:
return
freqs_ghz = np.linspace(F_START_HZ / 1e9, F_STOP_HZ / 1e9, n)
freqs_hz = freqs_ghz * 1e9
ref_amp, ref_phase, state["ref_phase_first"] = process_reference(
sweep["ref_ch1"], sweep["ref_ch2"], state["ref_phase_first"], freqs_hz
)
main_amp, ref_amplitude, norm_amp, phase, fft_mag, fft_dist = process_main(
sweep["main_ch1"], sweep["main_ch2"], ref_amp, ref_phase
)
c_main_amp.setData(freqs_ghz, main_amp)
c_ref_amp.setData(freqs_ghz, ref_amplitude)
c_norm_amp.setData(freqs_ghz, norm_amp)
c_ph.setData(freqs_ghz, phase)
fft_db = 20.0 * np.log10(fft_mag + 1e-12)
c_fft.setData(fft_dist, fft_db)
return update
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
path = sys.argv[1] if len(sys.argv) > 1 else DATA_PATH
reader = DataReader(path)
accumulator = SweepAccumulator()
app, win, curves = build_gui()
update = make_update(reader, accumulator, curves)
timer = QtCore.QTimer()
timer.timeout.connect(update)
timer.start(TIMER_MS)
# Allow Ctrl+C to work inside Qt event loop
signal.signal(signal.SIGINT, lambda *_: app.quit())
kick = QtCore.QTimer()
kick.start(200)
kick.timeout.connect(lambda: None)
try:
sys.exit(app.exec_())
finally:
reader.close()
if __name__ == "__main__":
main()