new test
This commit is contained in:
320
rfg_vna_viewer.py
Normal file
320
rfg_vna_viewer.py
Normal file
@ -0,0 +1,320 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Standalone VNA-style viewer for dual-channel ADC data.
|
||||
|
||||
Reads raw binary from /tmp/ttyADC_data, parses 8-byte records with
|
||||
markers 0x000A (main) and 0x00A8 (reference), performs complex
|
||||
normalization and plots amplitude, phase, and FFT in real time.
|
||||
"""
|
||||
|
||||
import os
|
||||
import signal
|
||||
import sys
|
||||
|
||||
import numpy as np
|
||||
import pyqtgraph as pg
|
||||
from pyqtgraph.Qt import QtCore, QtWidgets
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Constants
|
||||
# ---------------------------------------------------------------------------
|
||||
F_START_HZ = 2.046e9
|
||||
F_STOP_HZ = 5.612e9
|
||||
BW_HZ = F_STOP_HZ - F_START_HZ # 3.566 GHz
|
||||
|
||||
TTY_SCALE = 5.0 / 32767.0 # int16 code → volts (±5 V ADC range)
|
||||
C_M_S = 299_792_458.0
|
||||
FFT_LEN = 2048
|
||||
TIMER_MS = 50 # GUI refresh period
|
||||
READ_CHUNK = 65536
|
||||
|
||||
MARKER_MAIN = 0x000A
|
||||
MARKER_REF = 0x00A8
|
||||
|
||||
DATA_PATH = "/tmp/ttyADC_data"
|
||||
|
||||
|
||||
def _u16(buf, off):
|
||||
"""Little-endian uint16 from buffer."""
|
||||
return buf[off] | (buf[off + 1] << 8)
|
||||
|
||||
|
||||
def _i16(v):
|
||||
"""Unsigned uint16 → signed int16."""
|
||||
return v - 0x10000 if (v & 0x8000) else v
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# DataReader
|
||||
# ---------------------------------------------------------------------------
|
||||
class DataReader:
|
||||
"""Non-blocking raw reader for /tmp/ttyADC_data."""
|
||||
|
||||
def __init__(self, path=DATA_PATH):
|
||||
self._fd = os.open(path, os.O_RDONLY | os.O_NOCTTY)
|
||||
os.set_blocking(self._fd, False)
|
||||
|
||||
def read_available(self):
|
||||
try:
|
||||
return os.read(self._fd, READ_CHUNK)
|
||||
except BlockingIOError:
|
||||
return b""
|
||||
|
||||
def close(self):
|
||||
os.close(self._fd)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# SweepAccumulator (inline parser)
|
||||
# ---------------------------------------------------------------------------
|
||||
class SweepAccumulator:
|
||||
"""Parse 8-byte binary records and assemble main/ref sweep pairs."""
|
||||
|
||||
def __init__(self):
|
||||
self._buf = bytearray()
|
||||
self._main = {} # step → (ch1_i16, ch2_i16)
|
||||
self._ref = {}
|
||||
self._last_main_step = -1
|
||||
|
||||
def feed(self, data):
|
||||
"""Feed raw bytes. Return list of completed sweep dicts."""
|
||||
if data:
|
||||
self._buf += data
|
||||
sweeps = []
|
||||
buf = self._buf
|
||||
while len(buf) >= 8:
|
||||
w0 = _u16(buf, 0)
|
||||
w1 = _u16(buf, 2)
|
||||
w2 = _u16(buf, 4)
|
||||
w3 = _u16(buf, 6)
|
||||
|
||||
# Start marker: new sweep begins
|
||||
if w0 == MARKER_MAIN and w1 == 0xFFFF and w2 == 0xFFFF and w3 == 0xFFFF:
|
||||
sw = self._finalize()
|
||||
if sw is not None:
|
||||
sweeps.append(sw)
|
||||
del buf[:8]
|
||||
continue
|
||||
|
||||
# Main channel data point
|
||||
if w0 == MARKER_MAIN and w1 != 0xFFFF:
|
||||
step = w1
|
||||
# Detect sweep wrap (step regression) without start marker
|
||||
if self._last_main_step >= 0 and step < self._last_main_step - 10:
|
||||
sw = self._finalize()
|
||||
if sw is not None:
|
||||
sweeps.append(sw)
|
||||
self._main[step] = (_i16(w2), _i16(w3))
|
||||
self._last_main_step = step
|
||||
del buf[:8]
|
||||
continue
|
||||
|
||||
# Reference channel data point
|
||||
if w0 == MARKER_REF and w1 != 0xFFFF:
|
||||
self._ref[w1] = (_i16(w2), _i16(w3))
|
||||
del buf[:8]
|
||||
continue
|
||||
|
||||
# Unrecognized — advance 1 byte to resync
|
||||
del buf[:1]
|
||||
|
||||
return sweeps
|
||||
|
||||
def _finalize(self):
|
||||
"""Build numpy arrays from accumulated points and reset."""
|
||||
if not self._main or not self._ref:
|
||||
self._main.clear()
|
||||
self._ref.clear()
|
||||
self._last_main_step = -1
|
||||
return None
|
||||
|
||||
all_steps = set(self._main) | set(self._ref)
|
||||
n = max(all_steps) + 1
|
||||
if n < 2:
|
||||
self._main.clear()
|
||||
self._ref.clear()
|
||||
self._last_main_step = -1
|
||||
return None
|
||||
|
||||
main_ch1 = np.full(n, np.nan)
|
||||
main_ch2 = np.full(n, np.nan)
|
||||
ref_ch1 = np.full(n, np.nan)
|
||||
ref_ch2 = np.full(n, np.nan)
|
||||
|
||||
for s, (c1, c2) in self._main.items():
|
||||
if s < n:
|
||||
main_ch1[s] = c1
|
||||
main_ch2[s] = c2
|
||||
for s, (c1, c2) in self._ref.items():
|
||||
if s < n:
|
||||
ref_ch1[s] = c1
|
||||
ref_ch2[s] = c2
|
||||
|
||||
self._main.clear()
|
||||
self._ref.clear()
|
||||
self._last_main_step = -1
|
||||
|
||||
# Keep only points present in both channels
|
||||
valid = np.isfinite(main_ch1) & np.isfinite(ref_ch1)
|
||||
if valid.sum() < 2:
|
||||
return None
|
||||
|
||||
return {
|
||||
"main_ch1": main_ch1[valid],
|
||||
"main_ch2": main_ch2[valid],
|
||||
"ref_ch1": ref_ch1[valid],
|
||||
"ref_ch2": ref_ch2[valid],
|
||||
"num_points": int(valid.sum()),
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Signal processing
|
||||
# ---------------------------------------------------------------------------
|
||||
def process_reference(ref_ch1, ref_ch2, ref_phase_first, freqs_hz):
|
||||
"""Process reference channel: amplitude, phase with linear alignment.
|
||||
|
||||
Returns (amplitude, aligned_phase, ref_phase_first).
|
||||
"""
|
||||
ch1_v = ref_ch1 * TTY_SCALE
|
||||
ch2_v = ref_ch2 * TTY_SCALE
|
||||
|
||||
amplitude = np.sqrt(ch1_v ** 2 + ch2_v ** 2)
|
||||
phase = np.unwrap(np.arctan2(ch2_v, ch1_v))
|
||||
|
||||
if ref_phase_first is None:
|
||||
return amplitude, phase, phase.copy()
|
||||
|
||||
# Linear correction: Δφ at first point, scaled by f/f₁
|
||||
delta_phi = phase[0] - ref_phase_first[0]
|
||||
correction = delta_phi * (freqs_hz / freqs_hz[0])
|
||||
aligned = phase - correction
|
||||
|
||||
return amplitude, aligned, ref_phase_first
|
||||
|
||||
|
||||
def process_main(main_ch1, main_ch2, ref_amplitude, ref_phase_aligned):
|
||||
"""Normalize main channel by reference. Return (amp, phase, fft_mag, fft_dist)."""
|
||||
ch1_v = main_ch1 * TTY_SCALE
|
||||
ch2_v = main_ch2 * TTY_SCALE
|
||||
z_main = ch1_v + 1j * ch2_v
|
||||
|
||||
z_ref = ref_amplitude * np.exp(1j * ref_phase_aligned)
|
||||
z_ref_safe = np.where(np.abs(z_ref) > 1e-12, z_ref, 1e-12 + 0j)
|
||||
z_norm = z_main / z_ref_safe
|
||||
|
||||
amp_norm = np.abs(z_norm)
|
||||
phase_norm = np.unwrap(np.angle(z_norm))
|
||||
|
||||
# FFT → distance domain
|
||||
n = len(z_norm)
|
||||
window = np.hanning(n)
|
||||
spectrum = np.fft.fft(z_norm * window, n=FFT_LEN)
|
||||
fft_mag = np.abs(spectrum[: FFT_LEN // 2])
|
||||
|
||||
df_hz = BW_HZ / max(1, n - 1)
|
||||
dist_step = C_M_S / (2.0 * FFT_LEN * df_hz)
|
||||
fft_dist = np.arange(FFT_LEN // 2) * dist_step
|
||||
|
||||
return amp_norm, phase_norm, fft_mag, fft_dist
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# GUI
|
||||
# ---------------------------------------------------------------------------
|
||||
def build_gui():
|
||||
app = QtWidgets.QApplication.instance() or QtWidgets.QApplication(sys.argv)
|
||||
|
||||
win = pg.GraphicsLayoutWidget(show=True, title="RFG VNA Viewer")
|
||||
win.resize(1200, 800)
|
||||
|
||||
p_amp = win.addPlot(row=0, col=0, title="Нормированная амплитуда |S|")
|
||||
p_amp.showGrid(x=True, y=True, alpha=0.3)
|
||||
p_amp.setLabel("bottom", "Частота", units="ГГц")
|
||||
p_amp.setLabel("left", "Амплитуда")
|
||||
c_amp = p_amp.plot(pen=pg.mkPen((80, 120, 255), width=1))
|
||||
|
||||
p_ph = win.addPlot(row=1, col=0, title="Нормированная фаза arg(S)")
|
||||
p_ph.showGrid(x=True, y=True, alpha=0.3)
|
||||
p_ph.setLabel("bottom", "Частота", units="ГГц")
|
||||
p_ph.setLabel("left", "Фаза", units="рад")
|
||||
p_ph.setXLink(p_amp)
|
||||
c_ph = p_ph.plot(pen=pg.mkPen((230, 180, 40), width=1))
|
||||
|
||||
p_fft = win.addPlot(row=2, col=0, title="FFT — расстояние")
|
||||
p_fft.showGrid(x=True, y=True, alpha=0.3)
|
||||
p_fft.setLabel("bottom", "Расстояние", units="м")
|
||||
p_fft.setLabel("left", "Магнитуда", units="дБ")
|
||||
c_fft = p_fft.plot(pen=pg.mkPen((60, 200, 80), width=1))
|
||||
|
||||
return app, win, (c_amp, c_ph, c_fft)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Update loop
|
||||
# ---------------------------------------------------------------------------
|
||||
def make_update(reader, accumulator, curves):
|
||||
c_amp, c_ph, c_fft = curves
|
||||
state = {"ref_phase_first": None}
|
||||
|
||||
def update():
|
||||
data = reader.read_available()
|
||||
if not data:
|
||||
return
|
||||
|
||||
sweeps = accumulator.feed(data)
|
||||
if not sweeps:
|
||||
return
|
||||
|
||||
sweep = sweeps[-1] # latest complete sweep
|
||||
n = sweep["num_points"]
|
||||
if n < 2:
|
||||
return
|
||||
|
||||
freqs_ghz = np.linspace(F_START_HZ / 1e9, F_STOP_HZ / 1e9, n)
|
||||
freqs_hz = freqs_ghz * 1e9
|
||||
|
||||
ref_amp, ref_phase, state["ref_phase_first"] = process_reference(
|
||||
sweep["ref_ch1"], sweep["ref_ch2"], state["ref_phase_first"], freqs_hz
|
||||
)
|
||||
|
||||
amp, phase, fft_mag, fft_dist = process_main(
|
||||
sweep["main_ch1"], sweep["main_ch2"], ref_amp, ref_phase
|
||||
)
|
||||
|
||||
c_amp.setData(freqs_ghz, amp)
|
||||
c_ph.setData(freqs_ghz, phase)
|
||||
|
||||
fft_db = 20.0 * np.log10(fft_mag + 1e-12)
|
||||
c_fft.setData(fft_dist, fft_db)
|
||||
|
||||
return update
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Main
|
||||
# ---------------------------------------------------------------------------
|
||||
def main():
|
||||
path = sys.argv[1] if len(sys.argv) > 1 else DATA_PATH
|
||||
reader = DataReader(path)
|
||||
accumulator = SweepAccumulator()
|
||||
app, win, curves = build_gui()
|
||||
|
||||
update = make_update(reader, accumulator, curves)
|
||||
timer = QtCore.QTimer()
|
||||
timer.timeout.connect(update)
|
||||
timer.start(TIMER_MS)
|
||||
|
||||
# Allow Ctrl+C to work inside Qt event loop
|
||||
signal.signal(signal.SIGINT, lambda *_: app.quit())
|
||||
kick = QtCore.QTimer()
|
||||
kick.start(200)
|
||||
kick.timeout.connect(lambda: None)
|
||||
|
||||
try:
|
||||
sys.exit(app.exec_())
|
||||
finally:
|
||||
reader.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user