From d9685456178f25f0462cd8907cd74fe8c8148fda Mon Sep 17 00:00:00 2001 From: awe Date: Tue, 9 Jun 2026 16:11:45 +0300 Subject: [PATCH] new test --- rfg_vna_viewer.py | 320 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 320 insertions(+) create mode 100644 rfg_vna_viewer.py diff --git a/rfg_vna_viewer.py b/rfg_vna_viewer.py new file mode 100644 index 0000000..a4d074f --- /dev/null +++ b/rfg_vna_viewer.py @@ -0,0 +1,320 @@ +#!/usr/bin/env python3 +"""Standalone VNA-style viewer for dual-channel ADC data. + +Reads raw binary from /tmp/ttyADC_data, parses 8-byte records with +markers 0x000A (main) and 0x00A8 (reference), performs complex +normalization and plots amplitude, phase, and FFT in real time. +""" + +import os +import signal +import sys + +import numpy as np +import pyqtgraph as pg +from pyqtgraph.Qt import QtCore, QtWidgets + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- +F_START_HZ = 2.046e9 +F_STOP_HZ = 5.612e9 +BW_HZ = F_STOP_HZ - F_START_HZ # 3.566 GHz + +TTY_SCALE = 5.0 / 32767.0 # int16 code → volts (±5 V ADC range) +C_M_S = 299_792_458.0 +FFT_LEN = 2048 +TIMER_MS = 50 # GUI refresh period +READ_CHUNK = 65536 + +MARKER_MAIN = 0x000A +MARKER_REF = 0x00A8 + +DATA_PATH = "/tmp/ttyADC_data" + + +def _u16(buf, off): + """Little-endian uint16 from buffer.""" + return buf[off] | (buf[off + 1] << 8) + + +def _i16(v): + """Unsigned uint16 → signed int16.""" + return v - 0x10000 if (v & 0x8000) else v + + +# --------------------------------------------------------------------------- +# DataReader +# --------------------------------------------------------------------------- +class DataReader: + """Non-blocking raw reader for /tmp/ttyADC_data.""" + + def __init__(self, path=DATA_PATH): + self._fd = os.open(path, os.O_RDONLY | os.O_NOCTTY) + os.set_blocking(self._fd, False) + + def read_available(self): + try: + return os.read(self._fd, READ_CHUNK) + except BlockingIOError: + return b"" + + def close(self): + os.close(self._fd) + + +# --------------------------------------------------------------------------- +# SweepAccumulator (inline parser) +# --------------------------------------------------------------------------- +class SweepAccumulator: + """Parse 8-byte binary records and assemble main/ref sweep pairs.""" + + def __init__(self): + self._buf = bytearray() + self._main = {} # step → (ch1_i16, ch2_i16) + self._ref = {} + self._last_main_step = -1 + + def feed(self, data): + """Feed raw bytes. Return list of completed sweep dicts.""" + if data: + self._buf += data + sweeps = [] + buf = self._buf + while len(buf) >= 8: + w0 = _u16(buf, 0) + w1 = _u16(buf, 2) + w2 = _u16(buf, 4) + w3 = _u16(buf, 6) + + # Start marker: new sweep begins + if w0 == MARKER_MAIN and w1 == 0xFFFF and w2 == 0xFFFF and w3 == 0xFFFF: + sw = self._finalize() + if sw is not None: + sweeps.append(sw) + del buf[:8] + continue + + # Main channel data point + if w0 == MARKER_MAIN and w1 != 0xFFFF: + step = w1 + # Detect sweep wrap (step regression) without start marker + if self._last_main_step >= 0 and step < self._last_main_step - 10: + sw = self._finalize() + if sw is not None: + sweeps.append(sw) + self._main[step] = (_i16(w2), _i16(w3)) + self._last_main_step = step + del buf[:8] + continue + + # Reference channel data point + if w0 == MARKER_REF and w1 != 0xFFFF: + self._ref[w1] = (_i16(w2), _i16(w3)) + del buf[:8] + continue + + # Unrecognized — advance 1 byte to resync + del buf[:1] + + return sweeps + + def _finalize(self): + """Build numpy arrays from accumulated points and reset.""" + if not self._main or not self._ref: + self._main.clear() + self._ref.clear() + self._last_main_step = -1 + return None + + all_steps = set(self._main) | set(self._ref) + n = max(all_steps) + 1 + if n < 2: + self._main.clear() + self._ref.clear() + self._last_main_step = -1 + return None + + main_ch1 = np.full(n, np.nan) + main_ch2 = np.full(n, np.nan) + ref_ch1 = np.full(n, np.nan) + ref_ch2 = np.full(n, np.nan) + + for s, (c1, c2) in self._main.items(): + if s < n: + main_ch1[s] = c1 + main_ch2[s] = c2 + for s, (c1, c2) in self._ref.items(): + if s < n: + ref_ch1[s] = c1 + ref_ch2[s] = c2 + + self._main.clear() + self._ref.clear() + self._last_main_step = -1 + + # Keep only points present in both channels + valid = np.isfinite(main_ch1) & np.isfinite(ref_ch1) + if valid.sum() < 2: + return None + + return { + "main_ch1": main_ch1[valid], + "main_ch2": main_ch2[valid], + "ref_ch1": ref_ch1[valid], + "ref_ch2": ref_ch2[valid], + "num_points": int(valid.sum()), + } + + +# --------------------------------------------------------------------------- +# Signal processing +# --------------------------------------------------------------------------- +def process_reference(ref_ch1, ref_ch2, ref_phase_first, freqs_hz): + """Process reference channel: amplitude, phase with linear alignment. + + Returns (amplitude, aligned_phase, ref_phase_first). + """ + ch1_v = ref_ch1 * TTY_SCALE + ch2_v = ref_ch2 * TTY_SCALE + + amplitude = np.sqrt(ch1_v ** 2 + ch2_v ** 2) + phase = np.unwrap(np.arctan2(ch2_v, ch1_v)) + + if ref_phase_first is None: + return amplitude, phase, phase.copy() + + # Linear correction: Δφ at first point, scaled by f/f₁ + delta_phi = phase[0] - ref_phase_first[0] + correction = delta_phi * (freqs_hz / freqs_hz[0]) + aligned = phase - correction + + return amplitude, aligned, ref_phase_first + + +def process_main(main_ch1, main_ch2, ref_amplitude, ref_phase_aligned): + """Normalize main channel by reference. Return (amp, phase, fft_mag, fft_dist).""" + ch1_v = main_ch1 * TTY_SCALE + ch2_v = main_ch2 * TTY_SCALE + z_main = ch1_v + 1j * ch2_v + + z_ref = ref_amplitude * np.exp(1j * ref_phase_aligned) + z_ref_safe = np.where(np.abs(z_ref) > 1e-12, z_ref, 1e-12 + 0j) + z_norm = z_main / z_ref_safe + + amp_norm = np.abs(z_norm) + phase_norm = np.unwrap(np.angle(z_norm)) + + # FFT → distance domain + n = len(z_norm) + window = np.hanning(n) + spectrum = np.fft.fft(z_norm * window, n=FFT_LEN) + fft_mag = np.abs(spectrum[: FFT_LEN // 2]) + + df_hz = BW_HZ / max(1, n - 1) + dist_step = C_M_S / (2.0 * FFT_LEN * df_hz) + fft_dist = np.arange(FFT_LEN // 2) * dist_step + + return amp_norm, phase_norm, fft_mag, fft_dist + + +# --------------------------------------------------------------------------- +# GUI +# --------------------------------------------------------------------------- +def build_gui(): + app = QtWidgets.QApplication.instance() or QtWidgets.QApplication(sys.argv) + + win = pg.GraphicsLayoutWidget(show=True, title="RFG VNA Viewer") + win.resize(1200, 800) + + p_amp = win.addPlot(row=0, col=0, title="Нормированная амплитуда |S|") + p_amp.showGrid(x=True, y=True, alpha=0.3) + p_amp.setLabel("bottom", "Частота", units="ГГц") + p_amp.setLabel("left", "Амплитуда") + c_amp = p_amp.plot(pen=pg.mkPen((80, 120, 255), width=1)) + + p_ph = win.addPlot(row=1, col=0, title="Нормированная фаза arg(S)") + p_ph.showGrid(x=True, y=True, alpha=0.3) + p_ph.setLabel("bottom", "Частота", units="ГГц") + p_ph.setLabel("left", "Фаза", units="рад") + p_ph.setXLink(p_amp) + c_ph = p_ph.plot(pen=pg.mkPen((230, 180, 40), width=1)) + + p_fft = win.addPlot(row=2, col=0, title="FFT — расстояние") + p_fft.showGrid(x=True, y=True, alpha=0.3) + p_fft.setLabel("bottom", "Расстояние", units="м") + p_fft.setLabel("left", "Магнитуда", units="дБ") + c_fft = p_fft.plot(pen=pg.mkPen((60, 200, 80), width=1)) + + return app, win, (c_amp, c_ph, c_fft) + + +# --------------------------------------------------------------------------- +# Update loop +# --------------------------------------------------------------------------- +def make_update(reader, accumulator, curves): + c_amp, c_ph, c_fft = curves + state = {"ref_phase_first": None} + + def update(): + data = reader.read_available() + if not data: + return + + sweeps = accumulator.feed(data) + if not sweeps: + return + + sweep = sweeps[-1] # latest complete sweep + n = sweep["num_points"] + if n < 2: + return + + freqs_ghz = np.linspace(F_START_HZ / 1e9, F_STOP_HZ / 1e9, n) + freqs_hz = freqs_ghz * 1e9 + + ref_amp, ref_phase, state["ref_phase_first"] = process_reference( + sweep["ref_ch1"], sweep["ref_ch2"], state["ref_phase_first"], freqs_hz + ) + + amp, phase, fft_mag, fft_dist = process_main( + sweep["main_ch1"], sweep["main_ch2"], ref_amp, ref_phase + ) + + c_amp.setData(freqs_ghz, amp) + c_ph.setData(freqs_ghz, phase) + + fft_db = 20.0 * np.log10(fft_mag + 1e-12) + c_fft.setData(fft_dist, fft_db) + + return update + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- +def main(): + path = sys.argv[1] if len(sys.argv) > 1 else DATA_PATH + reader = DataReader(path) + accumulator = SweepAccumulator() + app, win, curves = build_gui() + + update = make_update(reader, accumulator, curves) + timer = QtCore.QTimer() + timer.timeout.connect(update) + timer.start(TIMER_MS) + + # Allow Ctrl+C to work inside Qt event loop + signal.signal(signal.SIGINT, lambda *_: app.quit()) + kick = QtCore.QTimer() + kick.start(200) + kick.timeout.connect(lambda: None) + + try: + sys.exit(app.exec_()) + finally: + reader.close() + + +if __name__ == "__main__": + main()