diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a58df3d --- /dev/null +++ b/.gitignore @@ -0,0 +1,8 @@ +.venv/ +env/ +__pycache__/ +*.py[cod] +.pytest_cache/ +.Python +my_picocom_logfile.txt +sample_data/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..8e7e028 --- /dev/null +++ b/README.md @@ -0,0 +1,185 @@ +# RFG STM32 ADC Receiver GUI + +PyQtGraph-приложение для чтения свипов из последовательного порта и отображения: + +- текущего свипа +- водопада по свипам +- FFT текущего свипа +- B-scan по FFT + +После рефакторинга проект разделен на пакет `rfg_adc_plotter`. Старый запуск через `RFG_ADC_dataplotter.py` сохранен как совместимый wrapper. + +## Структура + +- `RFG_ADC_dataplotter.py` — совместимый entrypoint +- `rfg_adc_plotter/cli.py` — CLI-аргументы +- `rfg_adc_plotter/io/` — чтение порта и парсеры протоколов +- `rfg_adc_plotter/processing/` — FFT, нормировка, калибровка, поиск пиков +- `rfg_adc_plotter/state/` — runtime state и кольцевые буферы +- `rfg_adc_plotter/gui/pyqtgraph_backend.py` — GUI на PyQtGraph +- `replay_pty.py` — воспроизведение захвата через виртуальный PTY + +## Зависимости + +Минимально нужны: + +```bash +python3 -m venv .venv +. .venv/bin/activate +pip install numpy pyqtgraph PyQt5 +``` + +Если `pyserial` не установлен, приложение попробует открыть порт через raw TTY. + +## Быстрый старт + +Запуск через старый entrypoint: + +```bash +.venv/bin/python RFG_ADC_dataplotter.py /dev/ttyACM0 +``` + +Запуск напрямую через пакет: + +```bash +.venv/bin/python -m rfg_adc_plotter.main /dev/ttyACM0 +``` + +Показать справку: + +```bash +.venv/bin/python RFG_ADC_dataplotter.py --help +``` + +## Примеры запуска + +Обычный запуск с живого порта: + +```bash +.venv/bin/python -m rfg_adc_plotter.main /dev/ttyACM0 --baud 115200 +``` + +Больше истории в водопаде и ограничение FPS: + +```bash +.venv/bin/python -m rfg_adc_plotter.main /dev/ttyACM0 --max-sweeps 400 --max-fps 20 +``` + +Фиксированный диапазон по оси Y: + +```bash +.venv/bin/python -m rfg_adc_plotter.main /dev/ttyACM0 --ylim -1000,1000 +``` + +С включенной нормировкой `simple`: + +```bash +.venv/bin/python -m rfg_adc_plotter.main /dev/ttyACM0 --norm-type simple +``` + +Режим измерения ширины главного пика FFT: + +```bash +.venv/bin/python -m rfg_adc_plotter.main /dev/ttyACM0 --calibrate +``` + +Поиск топ-3 пиков относительно rolling median reference: + +```bash +.venv/bin/python -m rfg_adc_plotter.main /dev/ttyACM0 --peak_search --peak_ref_window 1.5 +``` + +Вычитание среднего спектра по последним секундам: + +```bash +.venv/bin/python -m rfg_adc_plotter.main /dev/ttyACM0 --spec-mean-sec 3 +``` + +## Протоколы ввода + +ASCII-протокол по умолчанию: + +```bash +.venv/bin/python -m rfg_adc_plotter.main /dev/ttyACM0 +``` + +Legacy binary: + +```bash +.venv/bin/python -m rfg_adc_plotter.main /dev/ttyACM0 --bin +``` + +Logscale binary с парой `int32`: + +```bash +.venv/bin/python -m rfg_adc_plotter.main /dev/ttyACM0 --logscale +``` + +Logscale binary `16-bit x2`: + +```bash +.venv/bin/python -m rfg_adc_plotter.main /dev/ttyACM0 --parser_16_bit_x2 +``` + +Тестовый парсер для экспериментального `16-bit x2` потока: + +```bash +.venv/bin/python -m rfg_adc_plotter.main /dev/ttyACM0 --parser_test +``` + +## Локальная проверка через replay_pty + +Если есть лог-файл захвата, его можно воспроизвести как виртуальный последовательный порт. + +В первом терминале: + +```bash +.venv/bin/python replay_pty.py my_picocom_logfile.txt --pty /tmp/ttyVIRT0 --speed 1.0 +``` + +Во втором терминале: + +```bash +.venv/bin/python -m rfg_adc_plotter.main /tmp/ttyVIRT0 +``` + +Максимально быстрый replay: + +```bash +.venv/bin/python replay_pty.py my_picocom_logfile.txt --pty /tmp/ttyVIRT0 --speed 0 +``` + +## Удаленный захват по SSH + +В приложении SSH-источник не встроен. Для удаленной проверки нужно сначала получить поток или лог на локальную машину, а затем либо: + +- запускать GUI напрямую на локальном PTY +- сохранять поток в файл и воспроизводить его через `replay_pty.py` + +Пример команды для ручной диагностики удаленного устройства: + +```bash +ssh 192.148.0.148 'ls -l /dev/ttyACM0' +``` + +Если на удаленной машине есть доступ к потоку, удобнее сохранять его в файл и уже этот файл гонять локально через `replay_pty.py`. + +## Проверка и тесты + +Синтаксическая проверка: + +```bash +python3 -m compileall RFG_ADC_dataplotter.py replay_pty.py rfg_adc_plotter tests +``` + +Запуск тестов: + +```bash +.venv/bin/python -m unittest discover -s tests -v +``` + +## Замечания + +- Поддерживается только PyQtGraph backend. +- `--backend mpl` оставлен только для совместимости CLI и завершится ошибкой. +- Каталоги `sample_data/` и локальные логи добавлены в `.gitignore` и не считаются частью обязательного tracked-состояния репозитория. diff --git a/replay_pty.py b/replay_pty.py new file mode 100644 index 0000000..0c7590d --- /dev/null +++ b/replay_pty.py @@ -0,0 +1,94 @@ +#!/usr/bin/env python3 +"""Replay a capture file through a pseudo-TTY for local GUI verification.""" + +from __future__ import annotations + +import argparse +import os +import sys +import time + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Воспроизводит лог-файл через PTY как виртуальный серийный порт." + ) + parser.add_argument("file", help="Путь к лог-файлу (например my_picocom_logfile.txt)") + parser.add_argument( + "--pty", + default="/tmp/ttyVIRT0", + help="Путь симлинка PTY (по умолчанию /tmp/ttyVIRT0)", + ) + parser.add_argument( + "--speed", + type=float, + default=1.0, + help=( + "Множитель скорости воспроизведения: " + "1.0 = реальное время при --baud, " + "2.0 = вдвое быстрее, " + "0 = максимально быстро" + ), + ) + parser.add_argument( + "--baud", + type=int, + default=115200, + help="Скорость (бод) для расчета задержек (по умолчанию 115200)", + ) + args = parser.parse_args() + + if not os.path.isfile(args.file): + sys.stderr.write(f"[error] Файл не найден: {args.file}\n") + raise SystemExit(1) + + master_fd, slave_fd = os.openpty() + slave_path = os.ttyname(slave_fd) + os.close(slave_fd) + + try: + os.unlink(args.pty) + except FileNotFoundError: + pass + os.symlink(slave_path, args.pty) + + print(f"PTY slave : {slave_path}") + print(f"Симлинк : {args.pty} -> {slave_path}") + print(f"Запустите : python3 -m rfg_adc_plotter.main {args.pty}") + print("Ctrl+C для остановки.\n") + + if args.speed > 0: + bytes_per_sec = args.baud / 10.0 * args.speed + delay_per_byte = 1.0 / bytes_per_sec + else: + delay_per_byte = 0.0 + + chunk_size = 4096 + loop = 0 + try: + while True: + loop += 1 + print(f"[loop {loop}] {args.file}") + with open(args.file, "rb") as handle: + while True: + chunk = handle.read(chunk_size) + if not chunk: + break + os.write(master_fd, chunk) + if delay_per_byte > 0: + time.sleep(delay_per_byte * len(chunk)) + except KeyboardInterrupt: + print("\nОстановлено.") + finally: + try: + os.unlink(args.pty) + except Exception: + pass + try: + os.close(master_fd) + except Exception: + pass + + +if __name__ == "__main__": + main() diff --git a/rfg_adc_plotter/__init__.py b/rfg_adc_plotter/__init__.py new file mode 100644 index 0000000..1a108d6 --- /dev/null +++ b/rfg_adc_plotter/__init__.py @@ -0,0 +1,3 @@ +"""RFG ADC plotter package.""" + +__all__ = [] diff --git a/rfg_adc_plotter/cli.py b/rfg_adc_plotter/cli.py new file mode 100644 index 0000000..edd50c4 --- /dev/null +++ b/rfg_adc_plotter/cli.py @@ -0,0 +1,120 @@ +"""Command-line parser for the ADC plotter.""" + +from __future__ import annotations + +import argparse + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + description=( + "Читает свипы из виртуального COM-порта и рисует: " + "последний свип и водопад (реалтайм)." + ) + ) + parser.add_argument( + "port", + help="Путь к порту, например /dev/ttyACM1 или COM3 (COM10+: \\\\.\\COM10)", + ) + parser.add_argument("--baud", type=int, default=115200, help="Скорость (по умолчанию 115200)") + parser.add_argument("--max-sweeps", type=int, default=200, help="Количество видимых свипов в водопаде") + parser.add_argument("--max-fps", type=float, default=30.0, help="Лимит частоты отрисовки, кадров/с") + parser.add_argument("--cmap", default="viridis", help="Цветовая карта водопада") + parser.add_argument( + "--spec-clip", + default="2,98", + help=( + "Процентильная обрезка уровней водопада спектров, %% (min,max). " + "Напр. 2,98. 'off' — отключить" + ), + ) + parser.add_argument( + "--spec-mean-sec", + type=float, + default=0.0, + help=( + "Вычитание среднего по каждой частоте за последние N секунд " + "в водопаде спектров (0 — отключить)" + ), + ) + parser.add_argument("--title", default="ADC Sweeps", help="Заголовок окна") + parser.add_argument( + "--fancy", + action="store_true", + help="Заполнять выпавшие точки средними значениями между соседними", + ) + parser.add_argument( + "--ylim", + type=str, + default=None, + help="Фиксированные Y-пределы для кривой формата min,max (например -1000,1000). По умолчанию авто", + ) + parser.add_argument( + "--backend", + choices=["auto", "pg", "mpl"], + default="pg", + help="Совместимый флаг. Поддерживаются только auto и pg; mpl удален.", + ) + parser.add_argument( + "--norm-type", + choices=["projector", "simple"], + default="projector", + help="Тип нормировки: projector (по огибающим в [-1,+1]) или simple (raw/calib)", + ) + parser.add_argument( + "--bin", + dest="bin_mode", + action="store_true", + help=( + "Бинарный протокол: старт свипа 0xFFFF,0xFFFF,0xFFFF,(CH<<8)|0x0A; " + "точки step,uint32(hi16,lo16),0x000A" + ), + ) + parser.add_argument( + "--logscale", + action="store_true", + default=True, + help=( + "Новый бинарный протокол: точка несет пару int32 (avg_1, avg_2), " + "а свип считается как 10**(avg_1*0.001) - 10**(avg_2*0.001)" + ), + ) + parser.add_argument( + "--parser_16_bit_x2", + action="store_true", + help=( + "Бинарный logscale-протокол c парой int16 (avg_1, avg_2): " + "старт 0xFFFF,0xFFFF,0xFFFF,(CH<<8)|0x0A; точка step,avg1_lo16,avg2_lo16,0xFFFF" + ), + ) + parser.add_argument( + "--parser_test", + action="store_true", + help=( + "Тестовый парсер для формата 16-bit x2: " + "одиночный 0xFFFF завершает точку, серия 0xFFFF начинает новый свип" + ), + ) + parser.add_argument( + "--calibrate", + action="store_true", + help=( + "Режим измерения ширины главного пика FFT: рисует красные маркеры " + "границ и фона и выводит ширину пика в статус" + ), + ) + parser.add_argument( + "--peak_search", + action="store_true", + help=( + "Поиск топ-3 пиков на FFT относительно референса (скользящая медиана) " + "с отрисовкой bounding box и параметров пиков" + ), + ) + parser.add_argument( + "--peak_ref_window", + type=float, + default=1.0, + help="Ширина окна скользящей медианы для --peak_search, ГГц/м по оси FFT (по умолчанию 1.0)", + ) + return parser diff --git a/rfg_adc_plotter/constants.py b/rfg_adc_plotter/constants.py new file mode 100644 index 0000000..5ac0f42 --- /dev/null +++ b/rfg_adc_plotter/constants.py @@ -0,0 +1,16 @@ +"""Shared constants for sweep parsing and visualization.""" + +WF_WIDTH = 1000 +FFT_LEN = 1024 + +SWEEP_FREQ_MIN_GHZ = 3.3 +SWEEP_FREQ_MAX_GHZ = 14.3 + +LOG_BASE = 10.0 +LOG_SCALER = 0.001 +LOG_POSTSCALER = 10.0 +LOG_EXP_LIMIT = 300.0 + +C_M_S = 299_792_458.0 + +DATA_INVERSION_THRESHOLD = 10.0 diff --git a/rfg_adc_plotter/gui/__init__.py b/rfg_adc_plotter/gui/__init__.py new file mode 100644 index 0000000..249114a --- /dev/null +++ b/rfg_adc_plotter/gui/__init__.py @@ -0,0 +1,5 @@ +"""GUI backends.""" + +from rfg_adc_plotter.gui.pyqtgraph_backend import run_pyqtgraph + +__all__ = ["run_pyqtgraph"] diff --git a/rfg_adc_plotter/gui/pyqtgraph_backend.py b/rfg_adc_plotter/gui/pyqtgraph_backend.py new file mode 100644 index 0000000..c3f1027 --- /dev/null +++ b/rfg_adc_plotter/gui/pyqtgraph_backend.py @@ -0,0 +1,913 @@ +"""PyQtGraph realtime backend for the ADC sweep plotter.""" + +from __future__ import annotations + +import signal +import threading +import time +from queue import Empty, Queue +from typing import Dict, List, Optional, Tuple + +import numpy as np + +from rfg_adc_plotter.constants import FFT_LEN, SWEEP_FREQ_MAX_GHZ, SWEEP_FREQ_MIN_GHZ +from rfg_adc_plotter.io.sweep_reader import SweepReader +from rfg_adc_plotter.processing.calibration import ( + calibrate_freqs, + get_calibration_base, + get_calibration_coeffs, + set_calibration_base_value, +) +from rfg_adc_plotter.processing.fft import compute_fft_row, fft_mag_to_db +from rfg_adc_plotter.processing.formatting import compute_auto_ylim, format_status_kv, parse_spec_clip +from rfg_adc_plotter.processing.normalization import normalize_by_calib +from rfg_adc_plotter.processing.peaks import ( + find_peak_width_markers, + find_top_peaks_over_ref, + rolling_median_ref, +) +from rfg_adc_plotter.state import RingBuffer, RuntimeState +from rfg_adc_plotter.types import SweepAuxCurves, SweepInfo, SweepPacket + + +def _visible_levels_pyqtgraph(data: np.ndarray, plot_item) -> Optional[Tuple[float, float]]: + """Compute vmin/vmax from the currently visible part of an ImageItem.""" + if data.size == 0: + return None + ny, nx = data.shape[0], data.shape[1] + try: + (x0, x1), (y0, y1) = plot_item.viewRange() + except Exception: + x0, x1 = 0.0, float(nx - 1) + y0, y1 = 0.0, float(ny - 1) + xmin, xmax = sorted((float(x0), float(x1))) + ymin, ymax = sorted((float(y0), float(y1))) + ix0 = max(0, min(nx - 1, int(np.floor(xmin)))) + ix1 = max(0, min(nx - 1, int(np.ceil(xmax)))) + iy0 = max(0, min(ny - 1, int(np.floor(ymin)))) + iy1 = max(0, min(ny - 1, int(np.ceil(ymax)))) + if ix1 < ix0: + ix1 = ix0 + if iy1 < iy0: + iy1 = iy0 + sub = data[iy0 : iy1 + 1, ix0 : ix1 + 1] + finite = np.isfinite(sub) + if not finite.any(): + return None + vals = sub[finite] + vmin = float(np.nanpercentile(vals, 5)) + vmax = float(np.nanpercentile(vals, 95)) + if not (np.isfinite(vmin) and np.isfinite(vmax)) or vmin == vmax: + return None + return (vmin, vmax) + + +def run_pyqtgraph(args) -> None: + """Start the PyQtGraph GUI.""" + peak_calibrate_mode = bool(getattr(args, "calibrate", False)) + peak_search_enabled = bool(getattr(args, "peak_search", False)) + try: + import pyqtgraph as pg + from pyqtgraph.Qt import QtCore, QtWidgets # type: ignore + except Exception as exc: + raise RuntimeError( + "pyqtgraph и совместимый Qt backend не найдены. Установите: pip install pyqtgraph PyQt5" + ) from exc + + queue: Queue[SweepPacket] = Queue(maxsize=1000) + stop_event = threading.Event() + reader = SweepReader( + args.port, + args.baud, + queue, + stop_event, + fancy=bool(args.fancy), + bin_mode=bool(args.bin_mode), + logscale=bool(args.logscale), + parser_16_bit_x2=bool(args.parser_16_bit_x2), + parser_test=bool(args.parser_test), + ) + reader.start() + + max_sweeps = int(max(10, args.max_sweeps)) + max_fps = max(1.0, float(args.max_fps)) + interval_ms = int(1000.0 / max_fps) + fft_bins = FFT_LEN // 2 + 1 + spec_clip = parse_spec_clip(getattr(args, "spec_clip", None)) + spec_mean_sec = float(getattr(args, "spec_mean_sec", 0.0)) + norm_type = str(getattr(args, "norm_type", "projector")).strip().lower() + runtime = RuntimeState(ring=RingBuffer(max_sweeps)) + + pg.setConfigOptions( + useOpenGL=not peak_calibrate_mode, + antialias=False, + imageAxisOrder="row-major", + ) + app = QtWidgets.QApplication.instance() + if app is None: + app = QtWidgets.QApplication([]) + try: + app.setApplicationName(str(args.title)) + app.setQuitOnLastWindowClosed(True) + except Exception: + pass + + main_window = QtWidgets.QWidget() + try: + main_window.setWindowTitle(str(args.title)) + except Exception: + pass + + main_layout = QtWidgets.QHBoxLayout(main_window) + main_layout.setContentsMargins(6, 6, 6, 6) + main_layout.setSpacing(6) + + win = pg.GraphicsLayoutWidget(show=False, title=args.title) + main_layout.addWidget(win) + + settings_widget = QtWidgets.QWidget() + settings_layout = QtWidgets.QVBoxLayout(settings_widget) + settings_layout.setContentsMargins(6, 6, 6, 6) + settings_layout.setSpacing(8) + try: + settings_widget.setMinimumWidth(130) + settings_widget.setMaximumWidth(150) + except Exception: + pass + main_layout.addWidget(settings_widget) + + p_line = win.addPlot(row=0, col=0, title="Сырые данные") + p_line.showGrid(x=True, y=True, alpha=0.3) + curve_avg1 = p_line.plot(pen=pg.mkPen((170, 170, 170), width=1)) + curve_avg2 = p_line.plot(pen=pg.mkPen((110, 110, 110), width=1)) + curve = p_line.plot(pen=pg.mkPen((80, 120, 255), width=1)) + curve_calib = p_line.plot(pen=pg.mkPen((220, 60, 60), width=1)) + curve_norm = p_line.plot(pen=pg.mkPen((60, 180, 90), width=1)) + p_line.setLabel("bottom", "ГГц") + p_line.setLabel("left", "Y") + ch_text = pg.TextItem("", anchor=(1, 1)) + ch_text.setZValue(10) + p_line.addItem(ch_text) + + p_img = win.addPlot(row=0, col=1, title="Сырые данные водопад") + p_img.invertY(False) + p_img.showGrid(x=False, y=False) + p_img.setLabel("bottom", "Время, с (новое справа)") + try: + p_img.getAxis("bottom").setStyle(showValues=False) + except Exception: + pass + p_img.setLabel("left", "ГГц") + img = pg.ImageItem() + p_img.addItem(img) + + p_fft = win.addPlot(row=1, col=0, title="FFT") + p_fft.showGrid(x=True, y=True, alpha=0.3) + curve_fft = p_fft.plot(pen=pg.mkPen((255, 120, 80), width=1)) + curve_fft_ref = p_fft.plot(pen=pg.mkPen((255, 0, 0), width=1)) + peak_pen = pg.mkPen((255, 0, 0), width=1) + peak_box_pen = pg.mkPen((0, 170, 0), width=3) + fft_peak_boxes = [p_fft.plot(pen=peak_box_pen) for _ in range(3)] + fft_bg_line = pg.InfiniteLine(angle=0, movable=False, pen=peak_pen) + fft_left_line = pg.InfiniteLine(angle=90, movable=False, pen=peak_pen) + fft_right_line = pg.InfiniteLine(angle=90, movable=False, pen=peak_pen) + curve_fft_ref.setVisible(False) + for box in fft_peak_boxes: + box.setVisible(False) + p_fft.addItem(fft_bg_line) + p_fft.addItem(fft_left_line) + p_fft.addItem(fft_right_line) + fft_bg_line.setVisible(False) + fft_left_line.setVisible(False) + fft_right_line.setVisible(False) + p_fft.setLabel("bottom", "Расстояние, м") + p_fft.setLabel("left", "дБ") + + p_spec = win.addPlot(row=1, col=1, title="B-scan (дБ)") + p_spec.invertY(False) + p_spec.showGrid(x=False, y=False) + p_spec.setLabel("bottom", "Время, с (новое справа)") + try: + p_spec.getAxis("bottom").setStyle(showValues=False) + except Exception: + pass + p_spec.setLabel("left", "Расстояние, м") + img_fft = pg.ImageItem() + p_spec.addItem(img_fft) + spec_left_line = pg.InfiniteLine(angle=0, movable=False, pen=peak_pen) + spec_right_line = pg.InfiniteLine(angle=0, movable=False, pen=peak_pen) + p_spec.addItem(spec_left_line) + p_spec.addItem(spec_right_line) + spec_left_line.setVisible(False) + spec_right_line.setVisible(False) + + calib_cb = QtWidgets.QCheckBox("нормировка") + bg_compute_cb = QtWidgets.QCheckBox("расчет фона") + bg_subtract_cb = QtWidgets.QCheckBox("вычет фона") + fft_bg_subtract_cb = QtWidgets.QCheckBox("FFT вычет фона") + peak_search_cb = QtWidgets.QCheckBox("поиск пиков") + try: + settings_layout.addWidget(QtWidgets.QLabel("Настройки")) + except Exception: + pass + settings_layout.addWidget(calib_cb) + settings_layout.addWidget(bg_compute_cb) + settings_layout.addWidget(bg_subtract_cb) + settings_layout.addWidget(fft_bg_subtract_cb) + settings_layout.addWidget(peak_search_cb) + + status = pg.LabelItem(justify="left") + win.addItem(status, row=3, col=0, colspan=2) + + calib_enabled = False + bg_compute_enabled = True + bg_subtract_enabled = False + fft_bg_subtract_enabled = False + fixed_ylim: Optional[Tuple[float, float]] = None + if args.ylim: + try: + y0, y1 = args.ylim.split(",") + fixed_ylim = (float(y0), float(y1)) + except Exception: + fixed_ylim = None + if fixed_ylim is not None: + p_line.setYRange(fixed_ylim[0], fixed_ylim[1], padding=0) + + def ensure_buffer(sweep_width: int) -> None: + changed = runtime.ring.ensure_init(sweep_width) + if not changed: + return + img.setImage(runtime.ring.get_display_raw(), autoLevels=False) + img.setRect(0, SWEEP_FREQ_MIN_GHZ, max_sweeps, SWEEP_FREQ_MAX_GHZ - SWEEP_FREQ_MIN_GHZ) + p_img.setRange( + xRange=(0, max_sweeps - 1), + yRange=(SWEEP_FREQ_MIN_GHZ, SWEEP_FREQ_MAX_GHZ), + padding=0, + ) + p_line.setXRange(SWEEP_FREQ_MIN_GHZ, SWEEP_FREQ_MAX_GHZ, padding=0) + img_fft.setImage(runtime.ring.get_display_fft_linear(), autoLevels=False) + img_fft.setRect(0, 0.0, max_sweeps, 1.0) + p_spec.setRange(xRange=(0, max_sweeps - 1), yRange=(0.0, 1.0), padding=0) + p_fft.setXRange(0.0, 1.0, padding=0) + + def update_physical_axes() -> None: + if runtime.current_freqs is not None and runtime.current_freqs.size > 0: + finite_f = runtime.current_freqs[np.isfinite(runtime.current_freqs)] + if finite_f.size > 0: + f_min = float(np.min(finite_f)) + f_max = float(np.max(finite_f)) + if f_max <= f_min: + f_max = f_min + 1.0 + img.setRect(0, f_min, max_sweeps, f_max - f_min) + p_img.setRange(xRange=(0, max_sweeps - 1), yRange=(f_min, f_max), padding=0) + + distance_axis = runtime.ring.distance_axis + if distance_axis is not None and distance_axis.size > 0: + d_min = float(distance_axis[0]) + d_max = float(distance_axis[-1]) if distance_axis.size > 1 else float(distance_axis[0] + 1.0) + if d_max <= d_min: + d_max = d_min + 1.0 + img_fft.setRect(0, d_min, max_sweeps, d_max - d_min) + p_spec.setRange(xRange=(0, max_sweeps - 1), yRange=(d_min, d_max), padding=0) + + def normalize_sweep(raw: np.ndarray, calib: np.ndarray) -> np.ndarray: + return normalize_by_calib(raw, calib, norm_type=norm_type) + + def set_calib_enabled() -> None: + nonlocal calib_enabled + try: + calib_enabled = bool(calib_cb.isChecked()) + except Exception: + calib_enabled = False + if calib_enabled and runtime.current_sweep_raw is not None and runtime.last_calib_sweep is not None: + runtime.current_sweep_norm = normalize_sweep(runtime.current_sweep_raw, runtime.last_calib_sweep) + else: + runtime.current_sweep_norm = None + runtime.mark_dirty() + + def set_bg_compute_enabled() -> None: + nonlocal bg_compute_enabled + try: + bg_compute_enabled = bool(bg_compute_cb.isChecked()) + except Exception: + bg_compute_enabled = False + runtime.mark_dirty() + + def set_bg_subtract_enabled() -> None: + nonlocal bg_subtract_enabled + try: + bg_subtract_enabled = bool(bg_subtract_cb.isChecked()) + except Exception: + bg_subtract_enabled = False + runtime.mark_dirty() + + def set_fft_bg_subtract_enabled() -> None: + nonlocal fft_bg_subtract_enabled + try: + fft_bg_subtract_enabled = bool(fft_bg_subtract_cb.isChecked()) + except Exception: + fft_bg_subtract_enabled = False + runtime.mark_dirty() + + try: + bg_compute_cb.setChecked(True) + except Exception: + pass + set_bg_compute_enabled() + + try: + calib_cb.stateChanged.connect(lambda _v: set_calib_enabled()) + bg_compute_cb.stateChanged.connect(lambda _v: set_bg_compute_enabled()) + bg_subtract_cb.stateChanged.connect(lambda _v: set_bg_subtract_enabled()) + fft_bg_subtract_cb.stateChanged.connect(lambda _v: set_fft_bg_subtract_enabled()) + except Exception: + pass + + peak_group = None + peak_window_edit = None + peak_params_label = None + peak_ref_window = float(getattr(args, "peak_ref_window", 1.0)) + if (not np.isfinite(peak_ref_window)) or peak_ref_window <= 0.0: + peak_ref_window = 1.0 + + def refresh_peak_params_label(peaks: List[Dict[str, float]]) -> None: + if peak_params_label is None: + return + lines = [] + for idx in range(3): + if idx < len(peaks): + peak = peaks[idx] + lines.append(f"peak {idx + 1}:") + lines.append(f" X: {peak['x']:.4g} m") + lines.append(f" H: {peak['height']:.4g} dB") + lines.append(f" W: {peak['width']:.4g} m") + else: + lines.append(f"peak {idx + 1}:") + lines.append(" X: - m") + lines.append(" H: - dB") + lines.append(" W: - m") + if idx != 2: + lines.append("") + peak_params_label.setText("\n".join(lines)) + + try: + peak_group = QtWidgets.QGroupBox("Поиск пиков") + peak_layout = QtWidgets.QFormLayout(peak_group) + peak_layout.setContentsMargins(6, 6, 6, 6) + peak_layout.setSpacing(6) + peak_window_edit = QtWidgets.QLineEdit(f"{peak_ref_window:.6g}") + peak_layout.addRow("Окно медианы, ГГц", peak_window_edit) + peak_params_label = QtWidgets.QLabel("") + try: + peak_params_label.setTextInteractionFlags(QtCore.Qt.TextSelectableByMouse) + except Exception: + pass + peak_layout.addRow("Параметры", peak_params_label) + settings_layout.addWidget(peak_group) + + def apply_peak_window() -> None: + nonlocal peak_ref_window + if peak_window_edit is None: + return + try: + value = float(peak_window_edit.text().strip()) + if np.isfinite(value) and value > 0.0: + peak_ref_window = value + runtime.mark_dirty() + except Exception: + pass + try: + peak_window_edit.setText(f"{peak_ref_window:.6g}") + except Exception: + pass + + peak_window_edit.editingFinished.connect(apply_peak_window) + refresh_peak_params_label([]) + except Exception: + peak_group = None + peak_window_edit = None + peak_params_label = None + + def set_peak_search_enabled() -> None: + nonlocal peak_search_enabled + try: + peak_search_enabled = bool(peak_search_cb.isChecked()) + except Exception: + peak_search_enabled = False + try: + if peak_group is not None: + peak_group.setEnabled(peak_search_enabled) + except Exception: + pass + if not peak_search_enabled: + runtime.peak_candidates = [] + refresh_peak_params_label([]) + runtime.mark_dirty() + + try: + peak_search_cb.setChecked(peak_search_enabled) + peak_search_cb.stateChanged.connect(lambda _v: set_peak_search_enabled()) + except Exception: + pass + set_peak_search_enabled() + + calib_window = None + c_edits = [] + c_value_labels = [] + if peak_calibrate_mode: + try: + calib_window = QtWidgets.QWidget() + try: + calib_window.setWindowTitle(f"{args.title} freq calibration") + except Exception: + pass + calib_layout = QtWidgets.QFormLayout(calib_window) + calib_layout.setContentsMargins(8, 8, 8, 8) + + def refresh_c_value_labels() -> None: + coeffs = get_calibration_coeffs() + for idx, label in enumerate(c_value_labels): + try: + label.setText(f"{float(coeffs[idx]):.6g}") + except Exception: + pass + + def apply_c_value(idx: int, edit) -> None: + try: + set_calibration_base_value(idx, float(edit.text().strip())) + runtime.mark_dirty() + except Exception: + try: + edit.setText(f"{float(get_calibration_base()[idx]):.6g}") + except Exception: + pass + refresh_c_value_labels() + + def apply_all_c_values() -> None: + for idx, edit in enumerate(c_edits): + apply_c_value(idx, edit) + + for idx in range(3): + edit = QtWidgets.QLineEdit(f"{float(get_calibration_base()[idx]):.6g}") + try: + edit.setMaximumWidth(120) + edit.editingFinished.connect(lambda i=idx, e=edit: apply_c_value(i, e)) + except Exception: + pass + calib_layout.addRow(f"C{idx}", edit) + c_edits.append(edit) + try: + update_btn = QtWidgets.QPushButton("Update") + update_btn.clicked.connect(lambda _checked=False: apply_all_c_values()) + calib_layout.addRow(update_btn) + calib_layout.addRow(QtWidgets.QLabel("Working C"), QtWidgets.QLabel("")) + except Exception: + pass + for idx in range(3): + label = QtWidgets.QLabel(f"{float(get_calibration_coeffs()[idx]):.6g}") + calib_layout.addRow(f"C*{idx}", label) + c_value_labels.append(label) + refresh_c_value_labels() + try: + calib_window.show() + except Exception: + pass + except Exception: + calib_window = None + + try: + settings_layout.addStretch(1) + except Exception: + pass + + def visible_bg_fft(disp_fft: np.ndarray, force: bool = False) -> Optional[np.ndarray]: + nonlocal bg_compute_enabled, bg_subtract_enabled + need_bg = bool(bg_subtract_enabled or force) + if (not need_bg) or disp_fft.size == 0: + return None + ny, nx = disp_fft.shape + if ny <= 0 or nx <= 0: + return runtime.bg_spec_cache + if runtime.bg_spec_cache is not None and runtime.bg_spec_cache.size != ny: + runtime.bg_spec_cache = None + if not bg_compute_enabled: + return runtime.bg_spec_cache + try: + (x0, x1), _ = p_spec.viewRange() + except Exception: + x0, x1 = 0.0, float(nx - 1) + xmin, xmax = sorted((float(x0), float(x1))) + ix0 = max(0, min(nx - 1, int(np.floor(xmin)))) + ix1 = max(0, min(nx - 1, int(np.ceil(xmax)))) + if ix1 < ix0: + ix1 = ix0 + window = disp_fft[:, ix0 : ix1 + 1] + if window.size == 0: + return runtime.bg_spec_cache + try: + bg_spec = np.nanmedian(window, axis=1) + except Exception: + return runtime.bg_spec_cache + if not np.any(np.isfinite(bg_spec)): + return runtime.bg_spec_cache + runtime.bg_spec_cache = np.nan_to_num(bg_spec, nan=0.0).astype(np.float32, copy=False) + return runtime.bg_spec_cache + + def drain_queue() -> int: + drained = 0 + while True: + try: + sweep, info, aux_curves = queue.get_nowait() + except Empty: + break + drained += 1 + calibrated = calibrate_freqs( + { + "F": np.linspace(SWEEP_FREQ_MIN_GHZ, SWEEP_FREQ_MAX_GHZ, sweep.size, dtype=np.float64), + "I": sweep, + } + ) + runtime.current_freqs = calibrated["F"] + runtime.current_sweep_raw = calibrated["I"] + runtime.current_aux_curves = aux_curves + runtime.current_info = info + + channel = 0 + try: + channel = int(info.get("ch", 0)) if isinstance(info, dict) else 0 + except Exception: + channel = 0 + + if channel == 0: + runtime.last_calib_sweep = runtime.current_sweep_raw + runtime.current_sweep_norm = None + sweep_for_processing = runtime.current_sweep_raw + else: + if calib_enabled and runtime.last_calib_sweep is not None: + runtime.current_sweep_norm = normalize_sweep(runtime.current_sweep_raw, runtime.last_calib_sweep) + sweep_for_processing = runtime.current_sweep_norm + else: + runtime.current_sweep_norm = None + sweep_for_processing = runtime.current_sweep_raw + + ensure_buffer(runtime.current_sweep_raw.size) + runtime.ring.push(sweep_for_processing, runtime.current_freqs) + runtime.current_distances = runtime.ring.distance_axis + runtime.current_fft_db = runtime.ring.last_fft_db + if drained > 0: + update_physical_axes() + return drained + + try: + cm_mod = getattr(pg, "colormap", None) + if cm_mod is not None: + colormap = cm_mod.get(args.cmap) + lut = colormap.getLookupTable(0.0, 1.0, 256) + img.setLookupTable(lut) + img_fft.setLookupTable(lut) + except Exception: + pass + + def update() -> None: + nonlocal peak_ref_window + if peak_calibrate_mode and any(edit.hasFocus() for edit in c_edits): + return + if peak_search_enabled and peak_window_edit is not None and peak_window_edit.hasFocus(): + return + + changed = drain_queue() > 0 + redraw_needed = changed or runtime.plot_dirty + bg_fft_for_line = None + if redraw_needed and fft_bg_subtract_enabled and runtime.ring.ring_fft is not None: + try: + bg_fft_for_line = visible_bg_fft(runtime.ring.get_display_fft_linear(), force=True) + except Exception: + bg_fft_for_line = None + + if redraw_needed and runtime.current_sweep_raw is not None: + xs = None + if runtime.current_freqs is not None and runtime.current_freqs.size == runtime.current_sweep_raw.size: + xs = runtime.current_freqs + elif runtime.ring.x_shared is not None and runtime.current_sweep_raw.size <= runtime.ring.x_shared.size: + xs = runtime.ring.x_shared[: runtime.current_sweep_raw.size] + else: + xs = np.arange(runtime.current_sweep_raw.size) + + curve.setData(xs, runtime.current_sweep_raw, autoDownsample=True) + if runtime.current_aux_curves is not None: + avg_1_curve, avg_2_curve = runtime.current_aux_curves + curve_avg1.setData(xs[: avg_1_curve.size], avg_1_curve, autoDownsample=True) + curve_avg2.setData(xs[: avg_2_curve.size], avg_2_curve, autoDownsample=True) + else: + curve_avg1.setData([], []) + curve_avg2.setData([], []) + + if runtime.last_calib_sweep is not None: + curve_calib.setData(xs[: runtime.last_calib_sweep.size], runtime.last_calib_sweep, autoDownsample=True) + else: + curve_calib.setData([], []) + + if runtime.current_sweep_norm is not None: + curve_norm.setData(xs[: runtime.current_sweep_norm.size], runtime.current_sweep_norm, autoDownsample=True) + else: + curve_norm.setData([], []) + + if fixed_ylim is None: + y_series = [runtime.current_sweep_raw, runtime.last_calib_sweep, runtime.current_sweep_norm] + if runtime.current_aux_curves is not None: + y_series.extend(runtime.current_aux_curves) + y_limits = compute_auto_ylim(*y_series) + if y_limits is not None: + p_line.setYRange(y_limits[0], y_limits[1], padding=0) + + if isinstance(xs, np.ndarray) and xs.size > 0: + finite_x = xs[np.isfinite(xs)] + if finite_x.size > 0: + p_line.setXRange(float(np.min(finite_x)), float(np.max(finite_x)), padding=0) + + sweep_for_fft = runtime.current_sweep_norm if runtime.current_sweep_norm is not None else runtime.current_sweep_raw + distance_axis = runtime.current_distances if runtime.current_distances is not None else runtime.ring.distance_axis + if sweep_for_fft.size > 0 and distance_axis is not None: + if runtime.current_fft_db is None or runtime.current_fft_db.size != distance_axis.size or runtime.plot_dirty: + runtime.current_fft_db = compute_fft_row(sweep_for_fft, runtime.current_freqs, distance_axis.size) + fft_vals = runtime.current_fft_db + xs_fft = distance_axis[: fft_vals.size] + if fft_bg_subtract_enabled and bg_fft_for_line is not None: + n_bg = int(min(fft_vals.size, bg_fft_for_line.size)) + if n_bg > 0: + num = np.maximum( + np.power(10.0, np.asarray(fft_vals[:n_bg], dtype=np.float64) / 20.0), + 0.0, + ) + den = np.maximum(np.asarray(bg_fft_for_line[:n_bg], dtype=np.float64), 0.0) + fft_vals = (20.0 * np.log10((num + 1e-9) / (den + 1e-9))).astype(np.float32, copy=False) + xs_fft = xs_fft[:n_bg] + curve_fft.setData(xs_fft, fft_vals) + finite_x = xs_fft[np.isfinite(xs_fft)] + if finite_x.size > 0: + p_fft.setXRange(float(np.min(finite_x)), float(np.max(finite_x)), padding=0) + + finite_fft = np.isfinite(xs_fft) & np.isfinite(fft_vals) + y_for_range = fft_vals[finite_fft] + if peak_search_enabled: + fft_ref = rolling_median_ref(xs_fft, fft_vals, peak_ref_window) + finite_ref = np.isfinite(xs_fft) & np.isfinite(fft_ref) + if np.any(finite_ref): + curve_fft_ref.setData(xs_fft[finite_ref], fft_ref[finite_ref]) + curve_fft_ref.setVisible(True) + y_for_range = np.concatenate((y_for_range, fft_ref[finite_ref])) + else: + curve_fft_ref.setVisible(False) + runtime.peak_candidates = find_top_peaks_over_ref(xs_fft, fft_vals, fft_ref, top_n=3) + refresh_peak_params_label(runtime.peak_candidates) + for idx, box in enumerate(fft_peak_boxes): + if idx < len(runtime.peak_candidates): + peak = runtime.peak_candidates[idx] + box.setData( + [peak["left"], peak["left"], peak["right"], peak["right"], peak["left"]], + [peak["ref"], peak["peak_y"], peak["peak_y"], peak["ref"], peak["ref"]], + ) + box.setVisible(True) + else: + box.setVisible(False) + else: + runtime.peak_candidates = [] + refresh_peak_params_label([]) + curve_fft_ref.setVisible(False) + for box in fft_peak_boxes: + box.setVisible(False) + + if fft_bg_subtract_enabled and bg_fft_for_line is not None: + p_fft.setYRange(-10.0, 30.0, padding=0) + else: + finite_y = y_for_range[np.isfinite(y_for_range)] + if finite_y.size > 0: + y0 = float(np.min(finite_y)) + y1 = float(np.max(finite_y)) + if y1 <= y0: + y1 = y0 + 1e-3 + p_fft.setYRange(y0, y1, padding=0) + + if peak_calibrate_mode: + markers = find_peak_width_markers(xs_fft, fft_vals) + if markers is not None: + fft_bg_line.setValue(markers["background"]) + fft_left_line.setValue(markers["left"]) + fft_right_line.setValue(markers["right"]) + spec_left_line.setValue(markers["left"]) + spec_right_line.setValue(markers["right"]) + fft_bg_line.setVisible(True) + fft_left_line.setVisible(True) + fft_right_line.setVisible(True) + spec_left_line.setVisible(True) + spec_right_line.setVisible(True) + runtime.current_peak_width = markers["width"] + runtime.current_peak_amplitude = markers["amplitude"] + else: + fft_bg_line.setVisible(False) + fft_left_line.setVisible(False) + fft_right_line.setVisible(False) + spec_left_line.setVisible(False) + spec_right_line.setVisible(False) + runtime.current_peak_width = None + runtime.current_peak_amplitude = None + else: + fft_bg_line.setVisible(False) + fft_left_line.setVisible(False) + fft_right_line.setVisible(False) + spec_left_line.setVisible(False) + spec_right_line.setVisible(False) + runtime.current_peak_width = None + runtime.current_peak_amplitude = None + else: + curve_fft_ref.setVisible(False) + for box in fft_peak_boxes: + box.setVisible(False) + runtime.peak_candidates = [] + refresh_peak_params_label([]) + runtime.plot_dirty = False + + if changed and runtime.ring.ring is not None: + disp = runtime.ring.get_display_raw() + levels = _visible_levels_pyqtgraph(disp, p_img) + if levels is not None: + img.setImage(disp, autoLevels=False, levels=levels) + else: + img.setImage(disp, autoLevels=False) + + if changed and runtime.current_info: + try: + status_payload = dict(runtime.current_info) + if peak_calibrate_mode and runtime.current_peak_width is not None: + status_payload["peak_w"] = runtime.current_peak_width + if peak_calibrate_mode and runtime.current_peak_amplitude is not None: + status_payload["peak_a"] = runtime.current_peak_amplitude + status.setText(format_status_kv(status_payload)) + except Exception: + pass + try: + chs = runtime.current_info.get("chs") if isinstance(runtime.current_info, dict) else None + if chs is None: + chs = runtime.current_info.get("ch") if isinstance(runtime.current_info, dict) else None + if chs is None: + ch_text.setText("") + else: + if isinstance(chs, (list, tuple, set)): + ch_list = sorted(int(v) for v in chs) + ch_text_val = ", ".join(str(v) for v in ch_list) + else: + ch_text_val = str(int(chs)) + ch_text.setText(f"chs {ch_text_val}") + (x0, x1), (y0, y1) = p_line.viewRange() + dx = 0.01 * max(1.0, float(x1 - x0)) + dy = 0.01 * max(1.0, float(y1 - y0)) + ch_text.setPos(float(x1 - dx), float(y1 - dy)) + except Exception: + pass + + if changed and runtime.ring.ring_fft is not None: + disp_fft_lin = runtime.ring.get_display_fft_linear() + if spec_mean_sec > 0.0: + disp_times = runtime.ring.get_display_times() + if disp_times is not None: + now_t = time.time() + mask = np.isfinite(disp_times) & (disp_times >= (now_t - spec_mean_sec)) + if np.any(mask): + try: + mean_spec = np.nanmean(disp_fft_lin[:, mask], axis=1) + mean_spec = np.nan_to_num(mean_spec, nan=0.0) + disp_fft_lin = disp_fft_lin - mean_spec[:, None] + except Exception: + pass + + bg_spec = visible_bg_fft(disp_fft_lin) + if bg_spec is not None: + num = np.maximum(disp_fft_lin, 0.0).astype(np.float32, copy=False) + 1e-9 + den = bg_spec[:, None] + 1e-9 + disp_fft = (20.0 * np.log10(num / den)).astype(np.float32, copy=False) + else: + disp_fft = fft_mag_to_db(disp_fft_lin) + + levels = None + if bg_spec is not None: + try: + p5 = float(np.nanpercentile(disp_fft, 5)) + p95 = float(np.nanpercentile(disp_fft, 95)) + span = max(abs(p5), abs(p95)) + if np.isfinite(span) and span > 0.0: + levels = (-span, span) + except Exception: + levels = None + else: + try: + mean_spec = np.nanmean(disp_fft, axis=1) + vmin_v = float(np.nanmin(mean_spec)) + vmax_v = float(np.nanmax(mean_spec)) + if np.isfinite(vmin_v) and np.isfinite(vmax_v) and vmin_v != vmax_v: + levels = (vmin_v, vmax_v) + except Exception: + levels = None + if levels is None and spec_clip is not None: + try: + vmin_v = float(np.nanpercentile(disp_fft, spec_clip[0])) + vmax_v = float(np.nanpercentile(disp_fft, spec_clip[1])) + if np.isfinite(vmin_v) and np.isfinite(vmax_v) and vmin_v != vmax_v: + levels = (vmin_v, vmax_v) + except Exception: + levels = None + if ( + levels is None + and runtime.ring.y_min_fft is not None + and runtime.ring.y_max_fft is not None + and np.isfinite(runtime.ring.y_min_fft) + and np.isfinite(runtime.ring.y_max_fft) + and runtime.ring.y_min_fft != runtime.ring.y_max_fft + ): + levels = (runtime.ring.y_min_fft, runtime.ring.y_max_fft) + if levels is not None: + img_fft.setImage(disp_fft, autoLevels=False, levels=levels) + else: + img_fft.setImage(disp_fft, autoLevels=False) + + timer = pg.QtCore.QTimer() + timer.timeout.connect(update) + timer.start(interval_ms) + + sigint_requested = threading.Event() + sigint_timer = pg.QtCore.QTimer() + sigint_timer.setInterval(50) + sigint_timer.timeout.connect(lambda: app.quit() if sigint_requested.is_set() else None) + sigint_timer.start() + + cleanup_done = False + + def on_quit() -> None: + nonlocal cleanup_done + if cleanup_done: + return + cleanup_done = True + try: + timer.stop() + sigint_timer.stop() + except Exception: + pass + stop_event.set() + reader.join(timeout=1.0) + try: + main_window.close() + except Exception: + pass + if calib_window is not None: + try: + calib_window.close() + except Exception: + pass + + def handle_sigint(_signum, _frame) -> None: + sigint_requested.set() + + prev_sigint = signal.getsignal(signal.SIGINT) + try: + signal.signal(signal.SIGINT, handle_sigint) + except Exception: + prev_sigint = None + + orig_close_event = getattr(main_window, "closeEvent", None) + + def close_event(event) -> None: + try: + if callable(orig_close_event): + orig_close_event(event) + else: + event.accept() + except Exception: + try: + event.accept() + except Exception: + pass + try: + app.quit() + except Exception: + pass + + try: + main_window.closeEvent = close_event # type: ignore[method-assign] + except Exception: + pass + + app.aboutToQuit.connect(on_quit) + try: + main_window.resize(1200, 680) + except Exception: + pass + main_window.show() + exec_fn = getattr(app, "exec_", None) or getattr(app, "exec", None) + try: + exec_fn() + finally: + on_quit() + if prev_sigint is not None: + try: + signal.signal(signal.SIGINT, prev_sigint) + except Exception: + pass diff --git a/rfg_adc_plotter/io/__init__.py b/rfg_adc_plotter/io/__init__.py new file mode 100644 index 0000000..4fd0912 --- /dev/null +++ b/rfg_adc_plotter/io/__init__.py @@ -0,0 +1,6 @@ +"""I/O helpers for serial sources and sweep parsing.""" + +from rfg_adc_plotter.io.serial_source import SerialChunkReader, SerialLineSource +from rfg_adc_plotter.io.sweep_reader import SweepReader + +__all__ = ["SerialChunkReader", "SerialLineSource", "SweepReader"] diff --git a/rfg_adc_plotter/io/serial_source.py b/rfg_adc_plotter/io/serial_source.py new file mode 100644 index 0000000..d91d797 --- /dev/null +++ b/rfg_adc_plotter/io/serial_source.py @@ -0,0 +1,177 @@ +"""Serial input helpers with pyserial and raw TTY fallbacks.""" + +from __future__ import annotations + +import io +import os +import sys +from typing import Optional + + +def try_open_pyserial(path: str, baud: int, timeout: float): + try: + import serial # type: ignore + except Exception: + return None + try: + return serial.Serial(path, baudrate=baud, timeout=timeout) + except Exception: + return None + + +class FDReader: + """Buffered wrapper around a raw TTY file descriptor.""" + + def __init__(self, fd: int): + self._fd = fd + raw = os.fdopen(fd, "rb", closefd=False) + self._file = raw + self._buf = io.BufferedReader(raw, buffer_size=65536) + + def fileno(self) -> int: + return self._fd + + def readline(self) -> bytes: + return self._buf.readline() + + def close(self) -> None: + try: + self._buf.close() + except Exception: + pass + + +def open_raw_tty(path: str, baud: int) -> Optional[FDReader]: + """Open a TTY without pyserial and configure it via termios.""" + try: + import termios + import tty + except Exception: + return None + + try: + fd = os.open(path, os.O_RDONLY | os.O_NOCTTY) + except Exception: + return None + + try: + attrs = termios.tcgetattr(fd) + tty.setraw(fd) + + baud_map = { + 9600: termios.B9600, + 19200: termios.B19200, + 38400: termios.B38400, + 57600: termios.B57600, + 115200: termios.B115200, + 230400: getattr(termios, "B230400", None), + 460800: getattr(termios, "B460800", None), + } + speed = baud_map.get(baud) or termios.B115200 + + attrs[4] = speed + attrs[5] = speed + cc = attrs[6] + cc[termios.VMIN] = 1 + cc[termios.VTIME] = 0 + attrs[6] = cc + termios.tcsetattr(fd, termios.TCSANOW, attrs) + except Exception: + try: + os.close(fd) + except Exception: + pass + return None + + return FDReader(fd) + + +class SerialLineSource: + """Unified line-oriented wrapper for pyserial and raw TTY readers.""" + + def __init__(self, path: str, baud: int, timeout: float = 1.0): + self._pyserial = try_open_pyserial(path, baud, timeout) + self._fdreader: Optional[FDReader] = None + self._using = "pyserial" if self._pyserial is not None else "raw" + if self._pyserial is None: + self._fdreader = open_raw_tty(path, baud) + if self._fdreader is None: + msg = f"Не удалось открыть порт '{path}' (pyserial и raw TTY не сработали)" + if sys.platform.startswith("win"): + msg += ". На Windows нужен pyserial: pip install pyserial" + raise RuntimeError(msg) + + def readline(self) -> bytes: + if self._pyserial is not None: + try: + return self._pyserial.readline() + except Exception: + return b"" + try: + return self._fdreader.readline() # type: ignore[union-attr] + except Exception: + return b"" + + def close(self) -> None: + try: + if self._pyserial is not None: + self._pyserial.close() + elif self._fdreader is not None: + self._fdreader.close() + except Exception: + pass + + +class SerialChunkReader: + """Fast non-blocking chunk reader for serial sources.""" + + def __init__(self, src: SerialLineSource): + self._src = src + self._ser = src._pyserial + self._fd: Optional[int] = None + if self._ser is not None: + try: + self._ser.timeout = 0 + except Exception: + pass + else: + try: + self._fd = src._fdreader.fileno() # type: ignore[union-attr] + try: + os.set_blocking(self._fd, False) + except Exception: + pass + except Exception: + self._fd = None + + def read_available(self) -> bytes: + """Return currently available bytes or b"" when nothing is ready.""" + if self._ser is not None: + try: + available = int(getattr(self._ser, "in_waiting", 0)) + except Exception: + available = 0 + if available > 0: + try: + return self._ser.read(available) + except Exception: + return b"" + return b"" + + if self._fd is None: + return b"" + + out = bytearray() + while True: + try: + chunk = os.read(self._fd, 65536) + if not chunk: + break + out += chunk + if len(chunk) < 65536: + break + except BlockingIOError: + break + except Exception: + break + return bytes(out) diff --git a/rfg_adc_plotter/io/sweep_parser_core.py b/rfg_adc_plotter/io/sweep_parser_core.py new file mode 100644 index 0000000..e9bc9b2 --- /dev/null +++ b/rfg_adc_plotter/io/sweep_parser_core.py @@ -0,0 +1,425 @@ +"""Reusable sweep parsers and sweep assembly helpers.""" + +from __future__ import annotations + +import math +import time +from collections import deque +from typing import List, Optional, Sequence, Set + +import numpy as np + +from rfg_adc_plotter.constants import DATA_INVERSION_THRESHOLD, LOG_BASE, LOG_EXP_LIMIT, LOG_POSTSCALER, LOG_SCALER +from rfg_adc_plotter.types import ParserEvent, PointEvent, StartEvent, SweepAuxCurves, SweepInfo, SweepPacket + + +def u32_to_i32(value: int) -> int: + return value - 0x1_0000_0000 if (value & 0x8000_0000) else value + + +def u16_to_i16(value: int) -> int: + return value - 0x1_0000 if (value & 0x8000) else value + + +def log_value_to_linear(value: int) -> float: + exponent = max(-LOG_EXP_LIMIT, min(LOG_EXP_LIMIT, float(value) * LOG_SCALER)) + return float(LOG_BASE ** exponent) + + +def log_pair_to_sweep(avg_1: int, avg_2: int) -> float: + return (log_value_to_linear(avg_1) - log_value_to_linear(avg_2)) * LOG_POSTSCALER + + +class AsciiSweepParser: + """Incremental parser for ASCII sweep streams.""" + + def __init__(self): + self._buf = bytearray() + + def feed(self, data: bytes) -> List[ParserEvent]: + if data: + self._buf += data + events: List[ParserEvent] = [] + while True: + nl = self._buf.find(b"\n") + if nl == -1: + break + line = bytes(self._buf[:nl]) + del self._buf[: nl + 1] + if line.endswith(b"\r"): + line = line[:-1] + if not line: + continue + if line.startswith(b"Sweep_start"): + events.append(StartEvent()) + continue + + parts = line.split() + if len(parts) < 3: + continue + head = parts[0].lower() + try: + if head == b"s": + if len(parts) >= 4: + ch = int(parts[1], 10) + x = int(parts[2], 10) + y = int(parts[3], 10) + else: + ch = 0 + x = int(parts[1], 10) + y = int(parts[2], 10) + elif head.startswith(b"s"): + ch = int(head[1:], 10) + x = int(parts[1], 10) + y = int(parts[2], 10) + else: + continue + except Exception: + continue + events.append(PointEvent(ch=int(ch), x=int(x), y=float(y))) + return events + + +class LegacyBinaryParser: + """Byte-resynchronizing parser for legacy 8-byte binary records.""" + + def __init__(self): + self._buf = bytearray() + + @staticmethod + def _u16_at(buf: bytearray, offset: int) -> int: + return int(buf[offset]) | (int(buf[offset + 1]) << 8) + + def feed(self, data: bytes) -> List[ParserEvent]: + if data: + self._buf += data + events: List[ParserEvent] = [] + while len(self._buf) >= 8: + w0 = self._u16_at(self._buf, 0) + w1 = self._u16_at(self._buf, 2) + w2 = self._u16_at(self._buf, 4) + if w0 == 0xFFFF and w1 == 0xFFFF and w2 == 0xFFFF and self._buf[6] == 0x0A: + events.append(StartEvent(ch=int(self._buf[7]))) + del self._buf[:8] + continue + if self._buf[6] == 0x0A: + ch = int(self._buf[7]) + value = u32_to_i32((w1 << 16) | w2) + events.append(PointEvent(ch=ch, x=int(w0), y=float(value))) + del self._buf[:8] + continue + del self._buf[:1] + return events + + +class LogScaleBinaryParser32: + """Byte-resynchronizing parser for 32-bit logscale pair records.""" + + def __init__(self): + self._buf = bytearray() + + @staticmethod + def _u16_at(buf: bytearray, offset: int) -> int: + return int(buf[offset]) | (int(buf[offset + 1]) << 8) + + def feed(self, data: bytes) -> List[ParserEvent]: + if data: + self._buf += data + events: List[ParserEvent] = [] + while len(self._buf) >= 12: + words = [self._u16_at(self._buf, idx * 2) for idx in range(6)] + if words[0:5] == [0xFFFF] * 5 and (words[5] & 0x00FF) == 0x000A: + events.append(StartEvent(ch=int((words[5] >> 8) & 0x00FF))) + del self._buf[:12] + continue + if (words[5] & 0x00FF) == 0x000A and words[0] != 0xFFFF: + ch = int((words[5] >> 8) & 0x00FF) + avg_1 = u32_to_i32((words[1] << 16) | words[2]) + avg_2 = u32_to_i32((words[3] << 16) | words[4]) + events.append( + PointEvent( + ch=ch, + x=int(words[0]), + y=log_pair_to_sweep(avg_1, avg_2), + aux=(float(avg_1), float(avg_2)), + ) + ) + del self._buf[:12] + continue + del self._buf[:1] + return events + + +class LogScale16BitX2BinaryParser: + """Byte-resynchronizing parser for 16-bit x2 logscale records.""" + + def __init__(self): + self._buf = bytearray() + self._current_channel = 0 + + @staticmethod + def _u16_at(buf: bytearray, offset: int) -> int: + return int(buf[offset]) | (int(buf[offset + 1]) << 8) + + def feed(self, data: bytes) -> List[ParserEvent]: + if data: + self._buf += data + events: List[ParserEvent] = [] + while len(self._buf) >= 8: + words = [self._u16_at(self._buf, idx * 2) for idx in range(4)] + if words[0:3] == [0xFFFF, 0xFFFF, 0xFFFF] and (words[3] & 0x00FF) == 0x000A: + self._current_channel = int((words[3] >> 8) & 0x00FF) + events.append(StartEvent(ch=self._current_channel)) + del self._buf[:8] + continue + if words[3] == 0xFFFF and words[0] != 0xFFFF: + avg_1 = u16_to_i16(words[1]) + avg_2 = u16_to_i16(words[2]) + events.append( + PointEvent( + ch=self._current_channel, + x=int(words[0]), + y=log_pair_to_sweep(avg_1, avg_2), + aux=(float(avg_1), float(avg_2)), + ) + ) + del self._buf[:8] + continue + del self._buf[:1] + return events + + +class ParserTestStreamParser: + """Parser for the special test 16-bit x2 stream format.""" + + def __init__(self): + self._buf = bytearray() + self._buf_pos = 0 + self._point_buf: list[int] = [] + self._ffff_run = 0 + self._current_channel = 0 + self._expected_step: Optional[int] = None + self._in_sweep = False + self._local_resync = False + + def _consume_point(self) -> Optional[PointEvent]: + if len(self._point_buf) != 3: + return None + step = int(self._point_buf[0]) + if step <= 0: + return None + if self._expected_step is not None and step < self._expected_step: + return None + avg_1 = u16_to_i16(int(self._point_buf[1])) + avg_2 = u16_to_i16(int(self._point_buf[2])) + self._expected_step = step + 1 + return PointEvent( + ch=self._current_channel, + x=step, + y=log_pair_to_sweep(avg_1, avg_2), + aux=(float(avg_1), float(avg_2)), + ) + + def feed(self, data: bytes) -> List[ParserEvent]: + if data: + self._buf += data + events: List[ParserEvent] = [] + + while (self._buf_pos + 1) < len(self._buf): + word = int(self._buf[self._buf_pos]) | (int(self._buf[self._buf_pos + 1]) << 8) + self._buf_pos += 2 + + if word == 0xFFFF: + self._ffff_run += 1 + continue + + if self._ffff_run > 0: + bad_point_on_delim = False + if self._in_sweep and self._point_buf and not self._local_resync: + point = self._consume_point() + if point is None: + self._local_resync = True + bad_point_on_delim = True + else: + events.append(point) + self._point_buf.clear() + + if self._ffff_run >= 2: + if (word & 0x00FF) == 0x000A: + self._current_channel = (word >> 8) & 0x00FF + self._in_sweep = True + self._expected_step = 1 + self._local_resync = False + self._point_buf.clear() + events.append(StartEvent(ch=self._current_channel)) + self._ffff_run = 0 + continue + if self._in_sweep: + self._local_resync = True + self._ffff_run = 0 + continue + + if self._local_resync and not bad_point_on_delim: + self._local_resync = False + self._point_buf.clear() + self._ffff_run = 0 + + if self._in_sweep and not self._local_resync: + self._point_buf.append(word) + if len(self._point_buf) > 3: + self._point_buf.clear() + self._local_resync = True + + if self._buf_pos >= 262144: + del self._buf[: self._buf_pos] + self._buf_pos = 0 + if (len(self._buf) - self._buf_pos) > 1_000_000: + tail = self._buf[self._buf_pos :] + if len(tail) > 262144: + tail = tail[-262144:] + self._buf = bytearray(tail) + self._buf_pos = 0 + return events + + +class SweepAssembler: + """Collect parser events into sweep packets matching runtime expectations.""" + + def __init__(self, fancy: bool = False, apply_inversion: bool = True): + self._fancy = bool(fancy) + self._apply_inversion = bool(apply_inversion) + self._max_width = 0 + self._sweep_idx = 0 + self._last_sweep_ts: Optional[float] = None + self._n_valid_hist = deque() + self._xs: list[int] = [] + self._ys: list[float] = [] + self._aux_1: list[float] = [] + self._aux_2: list[float] = [] + self._cur_channel: Optional[int] = None + self._cur_channels: set[int] = set() + + def _reset_current(self) -> None: + self._xs.clear() + self._ys.clear() + self._aux_1.clear() + self._aux_2.clear() + self._cur_channel = None + self._cur_channels.clear() + + def _scatter(self, xs: Sequence[int], values: Sequence[float], width: int) -> np.ndarray: + series = np.full((width,), np.nan, dtype=np.float32) + try: + idx = np.asarray(xs, dtype=np.int64) + vals = np.asarray(values, dtype=np.float32) + series[idx] = vals + except Exception: + for x, y in zip(xs, values): + xi = int(x) + if 0 <= xi < width: + series[xi] = float(y) + return series + + @staticmethod + def _fill_missing(series: np.ndarray) -> None: + known = ~np.isnan(series) + if not np.any(known): + return + known_idx = np.nonzero(known)[0] + for i0, i1 in zip(known_idx[:-1], known_idx[1:]): + if i1 - i0 > 1: + avg = (series[i0] + series[i1]) * 0.5 + series[i0 + 1 : i1] = avg + first_idx = int(known_idx[0]) + last_idx = int(known_idx[-1]) + if first_idx > 0: + series[:first_idx] = series[first_idx] + if last_idx < series.size - 1: + series[last_idx + 1 :] = series[last_idx] + + def consume(self, event: ParserEvent) -> Optional[SweepPacket]: + if isinstance(event, StartEvent): + packet = self.finalize_current() + self._reset_current() + if event.ch is not None: + self._cur_channel = int(event.ch) + self._cur_channels.add(int(event.ch)) + return packet + + if self._cur_channel is None: + self._cur_channel = int(event.ch) + self._cur_channels.add(int(event.ch)) + self._xs.append(int(event.x)) + self._ys.append(float(event.y)) + if event.aux is not None: + self._aux_1.append(float(event.aux[0])) + self._aux_2.append(float(event.aux[1])) + return None + + def finalize_current(self) -> Optional[SweepPacket]: + if not self._xs: + return None + + ch_list = sorted(self._cur_channels) if self._cur_channels else [0] + ch_primary = ch_list[0] if ch_list else 0 + width = max(int(max(self._xs)) + 1, 1) + self._max_width = max(self._max_width, width) + target_width = self._max_width if self._fancy else width + + sweep = self._scatter(self._xs, self._ys, target_width) + aux_curves: SweepAuxCurves = None + if self._aux_1 and self._aux_2 and len(self._aux_1) == len(self._xs): + aux_curves = ( + self._scatter(self._xs, self._aux_1, target_width), + self._scatter(self._xs, self._aux_2, target_width), + ) + + n_valid_cur = int(np.count_nonzero(np.isfinite(sweep))) + + if self._fancy: + self._fill_missing(sweep) + if aux_curves is not None: + self._fill_missing(aux_curves[0]) + self._fill_missing(aux_curves[1]) + + if self._apply_inversion: + try: + mean_value = float(np.nanmean(sweep)) + if np.isfinite(mean_value) and mean_value < DATA_INVERSION_THRESHOLD: + sweep *= -1.0 + except Exception: + pass + + self._sweep_idx += 1 + now = time.time() + if self._last_sweep_ts is None: + dt_ms = float("nan") + else: + dt_ms = (now - self._last_sweep_ts) * 1000.0 + self._last_sweep_ts = now + + self._n_valid_hist.append((now, n_valid_cur)) + while self._n_valid_hist and (now - self._n_valid_hist[0][0]) > 1.0: + self._n_valid_hist.popleft() + n_valid = float(sum(value for _ts, value in self._n_valid_hist) / len(self._n_valid_hist)) + + if n_valid_cur > 0: + vmin = float(np.nanmin(sweep)) + vmax = float(np.nanmax(sweep)) + mean = float(np.nanmean(sweep)) + std = float(np.nanstd(sweep)) + else: + vmin = vmax = mean = std = float("nan") + + info: SweepInfo = { + "sweep": self._sweep_idx, + "ch": ch_primary, + "chs": ch_list, + "n_valid": n_valid, + "min": vmin, + "max": vmax, + "mean": mean, + "std": std, + "dt_ms": dt_ms, + } + return (sweep, info, aux_curves) diff --git a/rfg_adc_plotter/io/sweep_reader.py b/rfg_adc_plotter/io/sweep_reader.py new file mode 100644 index 0000000..3c3639f --- /dev/null +++ b/rfg_adc_plotter/io/sweep_reader.py @@ -0,0 +1,102 @@ +"""Background sweep reader thread.""" + +from __future__ import annotations + +import sys +import threading +import time +from queue import Full, Queue + +from rfg_adc_plotter.io.serial_source import SerialChunkReader, SerialLineSource +from rfg_adc_plotter.io.sweep_parser_core import ( + AsciiSweepParser, + LegacyBinaryParser, + LogScale16BitX2BinaryParser, + LogScaleBinaryParser32, + ParserTestStreamParser, + SweepAssembler, +) +from rfg_adc_plotter.types import SweepPacket + + +class SweepReader(threading.Thread): + """Read a serial source in the background and emit completed sweep packets.""" + + def __init__( + self, + port_path: str, + baud: int, + out_queue: "Queue[SweepPacket]", + stop_event: threading.Event, + fancy: bool = False, + bin_mode: bool = False, + logscale: bool = False, + parser_16_bit_x2: bool = False, + parser_test: bool = False, + ): + super().__init__(daemon=True) + self._port_path = port_path + self._baud = int(baud) + self._queue = out_queue + self._stop = stop_event + self._fancy = bool(fancy) + self._bin_mode = bool(bin_mode) + self._logscale = bool(logscale) + self._parser_16_bit_x2 = bool(parser_16_bit_x2) + self._parser_test = bool(parser_test) + self._src: SerialLineSource | None = None + + def _build_parser(self): + if self._parser_test: + return ParserTestStreamParser(), SweepAssembler(fancy=self._fancy, apply_inversion=False) + if self._parser_16_bit_x2: + return LogScale16BitX2BinaryParser(), SweepAssembler(fancy=self._fancy, apply_inversion=False) + if self._logscale: + return LogScaleBinaryParser32(), SweepAssembler(fancy=self._fancy, apply_inversion=False) + if self._bin_mode: + return LegacyBinaryParser(), SweepAssembler(fancy=self._fancy, apply_inversion=True) + return AsciiSweepParser(), SweepAssembler(fancy=self._fancy, apply_inversion=True) + + def _enqueue(self, packet: SweepPacket) -> None: + try: + self._queue.put_nowait(packet) + except Full: + try: + _ = self._queue.get_nowait() + except Exception: + pass + try: + self._queue.put_nowait(packet) + except Exception: + pass + + def run(self) -> None: + try: + self._src = SerialLineSource(self._port_path, self._baud, timeout=1.0) + sys.stderr.write(f"[info] Открыл порт {self._port_path} ({self._src._using})\n") + except Exception as exc: + sys.stderr.write(f"[error] {exc}\n") + return + + parser, assembler = self._build_parser() + + try: + chunk_reader = SerialChunkReader(self._src) + while not self._stop.is_set(): + data = chunk_reader.read_available() + if not data: + time.sleep(0.0005) + continue + for event in parser.feed(data): + packet = assembler.consume(event) + if packet is not None: + self._enqueue(packet) + packet = assembler.finalize_current() + if packet is not None: + self._enqueue(packet) + finally: + try: + if self._src is not None: + self._src.close() + except Exception: + pass diff --git a/rfg_adc_plotter/main.py b/rfg_adc_plotter/main.py new file mode 100644 index 0000000..f2a5310 --- /dev/null +++ b/rfg_adc_plotter/main.py @@ -0,0 +1,26 @@ +"""Main entrypoint for the modularized ADC plotter.""" + +from __future__ import annotations + +import sys + +from rfg_adc_plotter.cli import build_parser + + +def main() -> None: + args = build_parser().parse_args() + if args.backend == "mpl": + sys.stderr.write("[error] Matplotlib backend removed. Use --backend pg or --backend auto.\n") + raise SystemExit(2) + + from rfg_adc_plotter.gui.pyqtgraph_backend import run_pyqtgraph + + try: + run_pyqtgraph(args) + except Exception as exc: + sys.stderr.write(f"[error] PyQtGraph бэкенд недоступен: {exc}\n") + raise SystemExit(1) from exc + + +if __name__ == "__main__": + main() diff --git a/rfg_adc_plotter/processing/__init__.py b/rfg_adc_plotter/processing/__init__.py new file mode 100644 index 0000000..7d36236 --- /dev/null +++ b/rfg_adc_plotter/processing/__init__.py @@ -0,0 +1,49 @@ +"""Pure sweep-processing helpers.""" + +from rfg_adc_plotter.processing.calibration import ( + calibrate_freqs, + get_calibration_base, + get_calibration_coeffs, + recalculate_calibration_c, + set_calibration_base_value, +) +from rfg_adc_plotter.processing.fft import ( + compute_distance_axis, + compute_fft_mag_row, + compute_fft_row, + fft_mag_to_db, +) +from rfg_adc_plotter.processing.formatting import ( + compute_auto_ylim, + format_status_kv, + parse_spec_clip, +) +from rfg_adc_plotter.processing.normalization import ( + build_calib_envelopes, + normalize_by_calib, +) +from rfg_adc_plotter.processing.peaks import ( + find_peak_width_markers, + find_top_peaks_over_ref, + rolling_median_ref, +) + +__all__ = [ + "build_calib_envelopes", + "calibrate_freqs", + "compute_auto_ylim", + "compute_distance_axis", + "compute_fft_mag_row", + "compute_fft_row", + "fft_mag_to_db", + "find_peak_width_markers", + "find_top_peaks_over_ref", + "format_status_kv", + "get_calibration_base", + "get_calibration_coeffs", + "normalize_by_calib", + "parse_spec_clip", + "recalculate_calibration_c", + "rolling_median_ref", + "set_calibration_base_value", +] diff --git a/rfg_adc_plotter/processing/calibration.py b/rfg_adc_plotter/processing/calibration.py new file mode 100644 index 0000000..236ee2e --- /dev/null +++ b/rfg_adc_plotter/processing/calibration.py @@ -0,0 +1,81 @@ +"""Frequency-axis calibration helpers.""" + +from __future__ import annotations + +from typing import Any, Mapping + +import numpy as np + +from rfg_adc_plotter.constants import SWEEP_FREQ_MAX_GHZ, SWEEP_FREQ_MIN_GHZ +from rfg_adc_plotter.types import SweepData + + +def recalculate_calibration_c( + base_coeffs: np.ndarray, + f_min: float = SWEEP_FREQ_MIN_GHZ, + f_max: float = SWEEP_FREQ_MAX_GHZ, +) -> np.ndarray: + """Recalculate coefficients while preserving sweep edges.""" + coeffs = np.asarray(base_coeffs, dtype=np.float64).reshape(-1) + if coeffs.size < 3: + out = np.zeros((3,), dtype=np.float64) + out[: coeffs.size] = coeffs + coeffs = out + c0, c1, c2 = float(coeffs[0]), float(coeffs[1]), float(coeffs[2]) + x0 = float(f_min) + x1 = float(f_max) + y0 = c0 + c1 * x0 + c2 * (x0 ** 2) + y1 = c0 + c1 * x1 + c2 * (x1 ** 2) + if not (np.isfinite(y0) and np.isfinite(y1)) or y1 == y0: + return np.asarray([c0, c1, c2], dtype=np.float64) + scale = (x1 - x0) / (y1 - y0) + shift = x0 - scale * y0 + return np.asarray( + [ + shift + scale * c0, + scale * c1, + scale * c2, + ], + dtype=np.float64, + ) + + +CALIBRATION_C_BASE = np.asarray([0.0, 1.0, 0.025], dtype=np.float64) +CALIBRATION_C = recalculate_calibration_c(CALIBRATION_C_BASE) + + +def get_calibration_base() -> np.ndarray: + return np.asarray(CALIBRATION_C_BASE, dtype=np.float64).copy() + + +def get_calibration_coeffs() -> np.ndarray: + return np.asarray(CALIBRATION_C, dtype=np.float64).copy() + + +def set_calibration_base_value(index: int, value: float) -> np.ndarray: + """Update one base coefficient and recalculate the working coefficients.""" + global CALIBRATION_C + CALIBRATION_C_BASE[int(index)] = float(value) + CALIBRATION_C = recalculate_calibration_c(CALIBRATION_C_BASE) + return get_calibration_coeffs() + + +def calibrate_freqs(sweep: Mapping[str, Any]) -> SweepData: + """Return a sweep copy with calibrated and resampled frequency axis.""" + freqs = np.asarray(sweep["F"], dtype=np.float64).copy() + values = np.asarray(sweep["I"], dtype=np.float64).copy() + coeffs = np.asarray(CALIBRATION_C, dtype=np.float64) + if freqs.size > 0: + freqs = coeffs[0] + coeffs[1] * freqs + coeffs[2] * (freqs * freqs) + + if freqs.size >= 2: + freqs_cal = np.linspace(float(freqs[0]), float(freqs[-1]), freqs.size, dtype=np.float64) + values_cal = np.interp(freqs_cal, freqs, values).astype(np.float64) + else: + freqs_cal = freqs.copy() + values_cal = values.copy() + + return { + "F": freqs_cal, + "I": values_cal, + } diff --git a/rfg_adc_plotter/processing/fft.py b/rfg_adc_plotter/processing/fft.py new file mode 100644 index 0000000..3d35123 --- /dev/null +++ b/rfg_adc_plotter/processing/fft.py @@ -0,0 +1,109 @@ +"""FFT helpers for line and waterfall views.""" + +from __future__ import annotations + +from typing import Optional, Tuple + +import numpy as np + +from rfg_adc_plotter.constants import C_M_S, FFT_LEN + + +def prepare_fft_segment( + sweep: np.ndarray, + freqs: Optional[np.ndarray], + fft_len: int = FFT_LEN, +) -> Optional[Tuple[np.ndarray, int]]: + """Prepare a sweep segment for FFT on a uniform frequency grid.""" + take_fft = min(int(sweep.size), int(fft_len)) + if take_fft <= 0: + return None + + sweep_seg = np.asarray(sweep[:take_fft], dtype=np.float32) + fallback = np.nan_to_num(sweep_seg, nan=0.0).astype(np.float32, copy=False) + if freqs is None: + return fallback, take_fft + + freq_arr = np.asarray(freqs) + if freq_arr.size < take_fft: + return fallback, take_fft + + freq_seg = np.asarray(freq_arr[:take_fft], dtype=np.float64) + valid = np.isfinite(sweep_seg) & np.isfinite(freq_seg) + if int(np.count_nonzero(valid)) < 2: + return fallback, take_fft + + x_valid = freq_seg[valid] + y_valid = sweep_seg[valid] + order = np.argsort(x_valid, kind="mergesort") + x_valid = x_valid[order] + y_valid = y_valid[order] + x_unique, unique_idx = np.unique(x_valid, return_index=True) + y_unique = y_valid[unique_idx] + if x_unique.size < 2 or x_unique[-1] <= x_unique[0]: + return fallback, take_fft + + x_uniform = np.linspace(float(x_unique[0]), float(x_unique[-1]), take_fft, dtype=np.float64) + resampled = np.interp(x_uniform, x_unique, y_unique).astype(np.float32) + return resampled, take_fft + + +def fft_mag_to_db(mag: np.ndarray) -> np.ndarray: + """Convert magnitude to dB with safe zero handling.""" + mag_arr = np.asarray(mag, dtype=np.float32) + safe_mag = np.maximum(mag_arr, 0.0) + return (20.0 * np.log10(safe_mag + 1e-9)).astype(np.float32, copy=False) + + +def compute_fft_mag_row( + sweep: np.ndarray, + freqs: Optional[np.ndarray], + bins: int, +) -> np.ndarray: + """Compute a linear FFT magnitude row.""" + if bins <= 0: + return np.zeros((0,), dtype=np.float32) + + prepared = prepare_fft_segment(sweep, freqs, fft_len=FFT_LEN) + if prepared is None: + return np.full((bins,), np.nan, dtype=np.float32) + + fft_seg, take_fft = prepared + fft_in = np.zeros((FFT_LEN,), dtype=np.float32) + window = np.hanning(take_fft).astype(np.float32) + fft_in[:take_fft] = fft_seg * window + spec = np.fft.ifft(fft_in) + mag = np.abs(spec).astype(np.float32) + if mag.shape[0] != bins: + mag = mag[:bins] + return mag + + +def compute_fft_row( + sweep: np.ndarray, + freqs: Optional[np.ndarray], + bins: int, +) -> np.ndarray: + """Compute a dB FFT row.""" + return fft_mag_to_db(compute_fft_mag_row(sweep, freqs, bins)) + + +def compute_distance_axis(freqs: Optional[np.ndarray], bins: int) -> np.ndarray: + """Compute the one-way distance axis for IFFT output.""" + if bins <= 0: + return np.zeros((0,), dtype=np.float64) + if freqs is None: + return np.arange(bins, dtype=np.float64) + + freq_arr = np.asarray(freqs, dtype=np.float64) + finite = freq_arr[np.isfinite(freq_arr)] + if finite.size < 2: + return np.arange(bins, dtype=np.float64) + + df_ghz = float((finite[-1] - finite[0]) / max(1, finite.size - 1)) + df_hz = abs(df_ghz) * 1e9 + if not np.isfinite(df_hz) or df_hz <= 0.0: + return np.arange(bins, dtype=np.float64) + + step_m = C_M_S / (2.0 * FFT_LEN * df_hz) + return np.arange(bins, dtype=np.float64) * step_m diff --git a/rfg_adc_plotter/processing/formatting.py b/rfg_adc_plotter/processing/formatting.py new file mode 100644 index 0000000..1f4c9a2 --- /dev/null +++ b/rfg_adc_plotter/processing/formatting.py @@ -0,0 +1,71 @@ +"""Formatting and display-range helpers.""" + +from __future__ import annotations + +from typing import Any, Mapping, Optional, Tuple + +import numpy as np + + +def format_status_kv(data: Mapping[str, Any]) -> str: + """Convert status metrics into a compact single-line representation.""" + + def _fmt(value: Any) -> str: + if value is None: + return "NA" + try: + f_value = float(value) + except Exception: + return str(value) + if not np.isfinite(f_value): + return "nan" + if abs(f_value) >= 1000 or (0 < abs(f_value) < 0.01): + return f"{f_value:.3g}" + return f"{f_value:.3f}".rstrip("0").rstrip(".") + + return " ".join(f"{key}:{_fmt(value)}" for key, value in data.items()) + + +def parse_spec_clip(spec: Optional[str]) -> Optional[Tuple[float, float]]: + """Parse a waterfall percentile clip specification.""" + if not spec: + return None + value = str(spec).strip().lower() + if value in ("off", "none", "no"): + return None + try: + p0, p1 = value.replace(";", ",").split(",") + low = float(p0) + high = float(p1) + if not (0.0 <= low < high <= 100.0): + return None + return (low, high) + except Exception: + return None + + +def compute_auto_ylim(*series_list: Optional[np.ndarray]) -> Optional[Tuple[float, float]]: + """Compute a common Y-range with a small padding.""" + y_min: Optional[float] = None + y_max: Optional[float] = None + for series in series_list: + if series is None: + continue + arr = np.asarray(series) + if arr.size == 0: + continue + finite = arr[np.isfinite(arr)] + if finite.size == 0: + continue + cur_min = float(np.min(finite)) + cur_max = float(np.max(finite)) + y_min = cur_min if y_min is None else min(y_min, cur_min) + y_max = cur_max if y_max is None else max(y_max, cur_max) + + if y_min is None or y_max is None: + return None + if y_min == y_max: + pad = max(1.0, abs(y_min) * 0.05) + else: + pad = 0.05 * (y_max - y_min) + return (y_min - pad, y_max + pad) diff --git a/rfg_adc_plotter/processing/normalization.py b/rfg_adc_plotter/processing/normalization.py new file mode 100644 index 0000000..4b03683 --- /dev/null +++ b/rfg_adc_plotter/processing/normalization.py @@ -0,0 +1,116 @@ +"""Sweep normalization helpers.""" + +from __future__ import annotations + +from typing import Tuple + +import numpy as np + + +def normalize_sweep_simple(raw: np.ndarray, calib: np.ndarray) -> np.ndarray: + """Simple element-wise raw/calib normalization.""" + width = min(raw.size, calib.size) + if width <= 0: + return raw + out = np.full_like(raw, np.nan, dtype=np.float32) + with np.errstate(divide="ignore", invalid="ignore"): + out[:width] = raw[:width] / calib[:width] + out = np.nan_to_num(out, nan=np.nan, posinf=np.nan, neginf=np.nan) + return out + + +def build_calib_envelopes(calib: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: + """Estimate lower and upper envelopes of a calibration curve.""" + n = int(calib.size) + if n <= 0: + empty = np.zeros((0,), dtype=np.float32) + return empty, empty + + values = np.asarray(calib, dtype=np.float32) + finite = np.isfinite(values) + if not np.any(finite): + zeros = np.zeros_like(values, dtype=np.float32) + return zeros, zeros + + if not np.all(finite): + x = np.arange(n, dtype=np.float32) + values = values.copy() + values[~finite] = np.interp(x[~finite], x[finite], values[finite]).astype(np.float32) + + if n < 3: + return values.copy(), values.copy() + + dy = np.diff(values) + signs = np.sign(dy).astype(np.int8, copy=False) + + if np.any(signs == 0): + for i in range(1, signs.size): + if signs[i] == 0: + signs[i] = signs[i - 1] + for i in range(signs.size - 2, -1, -1): + if signs[i] == 0: + signs[i] = signs[i + 1] + signs[signs == 0] = 1 + + max_idx = np.where((signs[:-1] > 0) & (signs[1:] < 0))[0] + 1 + min_idx = np.where((signs[:-1] < 0) & (signs[1:] > 0))[0] + 1 + + x = np.arange(n, dtype=np.float32) + + def _interp_nodes(nodes: np.ndarray) -> np.ndarray: + if nodes.size == 0: + idx = np.array([0, n - 1], dtype=np.int64) + else: + idx = np.unique(np.concatenate(([0], nodes, [n - 1]))).astype(np.int64) + return np.interp(x, idx.astype(np.float32), values[idx]).astype(np.float32) + + upper = _interp_nodes(max_idx) + lower = _interp_nodes(min_idx) + + swap = lower > upper + if np.any(swap): + tmp = upper[swap].copy() + upper[swap] = lower[swap] + lower[swap] = tmp + + return lower, upper + + +def normalize_sweep_projector(raw: np.ndarray, calib: np.ndarray) -> np.ndarray: + """Project raw values between calibration envelopes into [-1000, 1000].""" + width = min(raw.size, calib.size) + if width <= 0: + return raw + + out = np.full_like(raw, np.nan, dtype=np.float32) + raw_seg = np.asarray(raw[:width], dtype=np.float32) + lower, upper = build_calib_envelopes(np.asarray(calib[:width], dtype=np.float32)) + span = upper - lower + + finite_span = span[np.isfinite(span) & (span > 0)] + if finite_span.size > 0: + eps = max(float(np.median(finite_span)) * 1e-6, 1e-9) + else: + eps = 1e-9 + + valid = ( + np.isfinite(raw_seg) + & np.isfinite(lower) + & np.isfinite(upper) + & (span > eps) + ) + if np.any(valid): + proj = np.empty_like(raw_seg, dtype=np.float32) + proj[valid] = ((2.0 * (raw_seg[valid] - lower[valid]) / span[valid]) - 1.0) * 1000.0 + proj[valid] = np.clip(proj[valid], -1000.0, 1000.0) + proj[~valid] = np.nan + out[:width] = proj + return out + + +def normalize_by_calib(raw: np.ndarray, calib: np.ndarray, norm_type: str) -> np.ndarray: + """Apply the selected normalization method.""" + norm = str(norm_type).strip().lower() + if norm == "simple": + return normalize_sweep_simple(raw, calib) + return normalize_sweep_projector(raw, calib) diff --git a/rfg_adc_plotter/processing/peaks.py b/rfg_adc_plotter/processing/peaks.py new file mode 100644 index 0000000..e4b9b68 --- /dev/null +++ b/rfg_adc_plotter/processing/peaks.py @@ -0,0 +1,209 @@ +"""Peak-search helpers for FFT visualizations.""" + +from __future__ import annotations + +from typing import Dict, List, Optional + +import numpy as np + + +def find_peak_width_markers(xs: np.ndarray, ys: np.ndarray) -> Optional[Dict[str, float]]: + """Find the dominant non-zero peak and its half-height width.""" + x_arr = np.asarray(xs, dtype=np.float64) + y_arr = np.asarray(ys, dtype=np.float64) + valid = np.isfinite(x_arr) & np.isfinite(y_arr) & (x_arr > 0.0) + if int(np.count_nonzero(valid)) < 3: + return None + + x = x_arr[valid] + y = y_arr[valid] + x_min = float(x[0]) + x_max = float(x[-1]) + x_span = x_max - x_min + central_mask = (x >= (x_min + 0.25 * x_span)) & (x <= (x_min + 0.75 * x_span)) + if int(np.count_nonzero(central_mask)) > 0: + central_idx = np.flatnonzero(central_mask) + peak_idx = int(central_idx[int(np.argmax(y[central_mask]))]) + else: + peak_idx = int(np.argmax(y)) + peak_y = float(y[peak_idx]) + shoulder_gap = max(1, min(8, y.size // 64 if y.size > 0 else 1)) + shoulder_width = max(4, min(32, y.size // 16 if y.size > 0 else 4)) + left_lo = max(0, peak_idx - shoulder_gap - shoulder_width) + left_hi = max(0, peak_idx - shoulder_gap) + right_lo = min(y.size, peak_idx + shoulder_gap + 1) + right_hi = min(y.size, right_lo + shoulder_width) + background_parts = [] + if left_hi > left_lo: + background_parts.append(float(np.nanmedian(y[left_lo:left_hi]))) + if right_hi > right_lo: + background_parts.append(float(np.nanmedian(y[right_lo:right_hi]))) + if background_parts: + background = float(np.mean(background_parts)) + else: + background = float(np.nanpercentile(y, 10)) + if not np.isfinite(peak_y) or not np.isfinite(background) or peak_y <= background: + return None + + half_level = background + 0.5 * (peak_y - background) + + def _interp_cross(x0: float, y0: float, x1: float, y1: float) -> float: + if not (np.isfinite(x0) and np.isfinite(y0) and np.isfinite(x1) and np.isfinite(y1)): + return x1 + dy = y1 - y0 + if dy == 0.0: + return x1 + t = (half_level - y0) / dy + t = min(1.0, max(0.0, t)) + return x0 + t * (x1 - x0) + + left_x = float(x[0]) + for i in range(peak_idx, 0, -1): + if y[i - 1] <= half_level <= y[i]: + left_x = _interp_cross(float(x[i - 1]), float(y[i - 1]), float(x[i]), float(y[i])) + break + + right_x = float(x[-1]) + for i in range(peak_idx, x.size - 1): + if y[i] >= half_level >= y[i + 1]: + right_x = _interp_cross(float(x[i]), float(y[i]), float(x[i + 1]), float(y[i + 1])) + break + + width = right_x - left_x + if not np.isfinite(width) or width <= 0.0: + return None + + return { + "background": background, + "left": left_x, + "right": right_x, + "width": width, + "amplitude": peak_y, + } + + +def rolling_median_ref(xs: np.ndarray, ys: np.ndarray, window_ghz: float) -> np.ndarray: + """Compute a rolling median reference on a fixed-width X window.""" + x = np.asarray(xs, dtype=np.float64) + y = np.asarray(ys, dtype=np.float64) + out = np.full(y.shape, np.nan, dtype=np.float64) + if x.size == 0 or y.size == 0 or x.size != y.size: + return out + width = float(window_ghz) + if not np.isfinite(width) or width <= 0.0: + return out + half = 0.5 * width + for i in range(x.size): + xi = x[i] + if not np.isfinite(xi): + continue + left = np.searchsorted(x, xi - half, side="left") + right = np.searchsorted(x, xi + half, side="right") + if right <= left: + continue + segment = y[left:right] + finite = np.isfinite(segment) + if not np.any(finite): + continue + out[i] = float(np.nanmedian(segment)) + return out + + +def find_top_peaks_over_ref( + xs: np.ndarray, + ys: np.ndarray, + ref: np.ndarray, + top_n: int = 3, +) -> List[Dict[str, float]]: + """Find the top-N non-overlapping peaks above a reference curve.""" + x = np.asarray(xs, dtype=np.float64) + y = np.asarray(ys, dtype=np.float64) + r = np.asarray(ref, dtype=np.float64) + if x.size < 3 or y.size != x.size or r.size != x.size: + return [] + + valid = np.isfinite(x) & np.isfinite(y) & np.isfinite(r) + if not np.any(valid): + return [] + delta = np.full_like(y, np.nan, dtype=np.float64) + delta[valid] = y[valid] - r[valid] + + candidates: List[int] = [] + for i in range(1, x.size - 1): + if not (np.isfinite(delta[i - 1]) and np.isfinite(delta[i]) and np.isfinite(delta[i + 1])): + continue + if delta[i] <= 0.0: + continue + left_ok = delta[i] > delta[i - 1] + right_ok = delta[i] >= delta[i + 1] + alt_left_ok = delta[i] >= delta[i - 1] + alt_right_ok = delta[i] > delta[i + 1] + if (left_ok and right_ok) or (alt_left_ok and alt_right_ok): + candidates.append(i) + if not candidates: + return [] + + candidates.sort(key=lambda i: float(delta[i]), reverse=True) + + def _interp_cross(x0: float, y0: float, x1: float, y1: float, y_cross: float) -> float: + dy = y1 - y0 + if not np.isfinite(dy) or dy == 0.0: + return x1 + t = (y_cross - y0) / dy + t = min(1.0, max(0.0, t)) + return x0 + t * (x1 - x0) + + picked: List[Dict[str, float]] = [] + for idx in candidates: + peak_y = float(y[idx]) + peak_ref = float(r[idx]) + peak_h = float(delta[idx]) + if not (np.isfinite(peak_y) and np.isfinite(peak_ref) and np.isfinite(peak_h)) or peak_h <= 0.0: + continue + + half_level = peak_ref + 0.5 * peak_h + + left_x = float(x[0]) + for i in range(idx, 0, -1): + y0 = float(y[i - 1]) + y1 = float(y[i]) + if np.isfinite(y0) and np.isfinite(y1) and (y0 <= half_level <= y1): + left_x = _interp_cross(float(x[i - 1]), y0, float(x[i]), y1, half_level) + break + + right_x = float(x[-1]) + for i in range(idx, x.size - 1): + y0 = float(y[i]) + y1 = float(y[i + 1]) + if np.isfinite(y0) and np.isfinite(y1) and (y0 >= half_level >= y1): + right_x = _interp_cross(float(x[i]), y0, float(x[i + 1]), y1, half_level) + break + + width = float(right_x - left_x) + if not np.isfinite(width) or width <= 0.0: + continue + + overlap = False + for peak in picked: + if not (right_x <= peak["left"] or left_x >= peak["right"]): + overlap = True + break + if overlap: + continue + + picked.append( + { + "x": float(x[idx]), + "peak_y": peak_y, + "ref": peak_ref, + "height": peak_h, + "left": left_x, + "right": right_x, + "width": width, + } + ) + if len(picked) >= int(max(1, top_n)): + break + + picked.sort(key=lambda peak: peak["x"]) + return picked diff --git a/rfg_adc_plotter/state/__init__.py b/rfg_adc_plotter/state/__init__.py new file mode 100644 index 0000000..500f041 --- /dev/null +++ b/rfg_adc_plotter/state/__init__.py @@ -0,0 +1,6 @@ +"""Runtime state helpers.""" + +from rfg_adc_plotter.state.ring_buffer import RingBuffer +from rfg_adc_plotter.state.runtime_state import RuntimeState + +__all__ = ["RingBuffer", "RuntimeState"] diff --git a/rfg_adc_plotter/state/ring_buffer.py b/rfg_adc_plotter/state/ring_buffer.py new file mode 100644 index 0000000..e789595 --- /dev/null +++ b/rfg_adc_plotter/state/ring_buffer.py @@ -0,0 +1,106 @@ +"""Ring buffers for raw sweeps and FFT waterfall rows.""" + +from __future__ import annotations + +import time +from typing import Optional + +import numpy as np + +from rfg_adc_plotter.constants import FFT_LEN, SWEEP_FREQ_MAX_GHZ, SWEEP_FREQ_MIN_GHZ, WF_WIDTH +from rfg_adc_plotter.processing.fft import compute_distance_axis, compute_fft_mag_row, fft_mag_to_db + + +class RingBuffer: + """Store raw sweeps, FFT rows, and matching time markers.""" + + def __init__(self, max_sweeps: int): + self.max_sweeps = int(max_sweeps) + self.fft_bins = FFT_LEN // 2 + 1 + self.width = 0 + self.head = 0 + self.ring: Optional[np.ndarray] = None + self.ring_time: Optional[np.ndarray] = None + self.ring_fft: Optional[np.ndarray] = None + self.x_shared: Optional[np.ndarray] = None + self.distance_axis: Optional[np.ndarray] = None + self.last_fft_db: Optional[np.ndarray] = None + self.y_min_fft: Optional[float] = None + self.y_max_fft: Optional[float] = None + + @property + def is_ready(self) -> bool: + return self.ring is not None and self.ring_fft is not None + + def ensure_init(self, sweep_width: int) -> bool: + """Allocate or resize buffers. Returns True when geometry changed.""" + target_width = max(int(sweep_width), int(WF_WIDTH)) + changed = False + if self.ring is None or self.ring_time is None or self.ring_fft is None: + self.width = target_width + self.ring = np.full((self.max_sweeps, self.width), np.nan, dtype=np.float32) + self.ring_time = np.full((self.max_sweeps,), np.nan, dtype=np.float64) + self.ring_fft = np.full((self.max_sweeps, self.fft_bins), np.nan, dtype=np.float32) + self.head = 0 + changed = True + elif target_width != self.width: + new_ring = np.full((self.max_sweeps, target_width), np.nan, dtype=np.float32) + take = min(self.width, target_width) + new_ring[:, :take] = self.ring[:, :take] + self.ring = new_ring + self.width = target_width + changed = True + + if self.x_shared is None or self.x_shared.size != self.width: + self.x_shared = np.linspace( + SWEEP_FREQ_MIN_GHZ, + SWEEP_FREQ_MAX_GHZ, + self.width, + dtype=np.float32, + ) + changed = True + return changed + + def push(self, sweep: np.ndarray, freqs: Optional[np.ndarray] = None) -> None: + """Push a processed sweep and refresh raw/FFT buffers.""" + if sweep is None or sweep.size == 0: + return + self.ensure_init(int(sweep.size)) + if self.ring is None or self.ring_time is None or self.ring_fft is None: + return + + row = np.full((self.width,), np.nan, dtype=np.float32) + take = min(self.width, int(sweep.size)) + row[:take] = np.asarray(sweep[:take], dtype=np.float32) + self.ring[self.head, :] = row + self.ring_time[self.head] = time.time() + + fft_mag = compute_fft_mag_row(sweep, freqs, self.fft_bins) + self.ring_fft[self.head, :] = fft_mag + self.last_fft_db = fft_mag_to_db(fft_mag) + + if self.last_fft_db.size > 0: + fr_min = float(np.nanmin(self.last_fft_db)) + fr_max = float(np.nanmax(self.last_fft_db)) + self.y_min_fft = fr_min if self.y_min_fft is None else min(self.y_min_fft, fr_min) + self.y_max_fft = fr_max if self.y_max_fft is None else max(self.y_max_fft, fr_max) + + self.distance_axis = compute_distance_axis(freqs, self.fft_bins) + self.head = (self.head + 1) % self.max_sweeps + + def get_display_raw(self) -> np.ndarray: + if self.ring is None: + return np.zeros((1, 1), dtype=np.float32) + base = self.ring if self.head == 0 else np.roll(self.ring, -self.head, axis=0) + return base.T + + def get_display_fft_linear(self) -> np.ndarray: + if self.ring_fft is None: + return np.zeros((1, 1), dtype=np.float32) + base = self.ring_fft if self.head == 0 else np.roll(self.ring_fft, -self.head, axis=0) + return base.T + + def get_display_times(self) -> Optional[np.ndarray]: + if self.ring_time is None: + return None + return self.ring_time if self.head == 0 else np.roll(self.ring_time, -self.head) diff --git a/rfg_adc_plotter/state/runtime_state.py b/rfg_adc_plotter/state/runtime_state.py new file mode 100644 index 0000000..e0633b0 --- /dev/null +++ b/rfg_adc_plotter/state/runtime_state.py @@ -0,0 +1,32 @@ +"""Mutable state container for the PyQtGraph backend.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Dict, List, Optional + +import numpy as np + +from rfg_adc_plotter.state.ring_buffer import RingBuffer +from rfg_adc_plotter.types import SweepAuxCurves, SweepInfo + + +@dataclass +class RuntimeState: + ring: RingBuffer + current_freqs: Optional[np.ndarray] = None + current_distances: Optional[np.ndarray] = None + current_sweep_raw: Optional[np.ndarray] = None + current_aux_curves: SweepAuxCurves = None + current_sweep_norm: Optional[np.ndarray] = None + current_fft_db: Optional[np.ndarray] = None + last_calib_sweep: Optional[np.ndarray] = None + current_info: Optional[SweepInfo] = None + bg_spec_cache: Optional[np.ndarray] = None + current_peak_width: Optional[float] = None + current_peak_amplitude: Optional[float] = None + peak_candidates: List[Dict[str, float]] = field(default_factory=list) + plot_dirty: bool = False + + def mark_dirty(self) -> None: + self.plot_dirty = True diff --git a/rfg_adc_plotter/types.py b/rfg_adc_plotter/types.py new file mode 100644 index 0000000..660b5d5 --- /dev/null +++ b/rfg_adc_plotter/types.py @@ -0,0 +1,31 @@ +"""Shared runtime and parser types.""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any, Dict, Optional, Tuple, TypeAlias, Union + +import numpy as np + + +Number = Union[int, float] +SweepInfo = Dict[str, Any] +SweepData = Dict[str, np.ndarray] +SweepAuxCurves = Optional[Tuple[np.ndarray, np.ndarray]] +SweepPacket = Tuple[np.ndarray, SweepInfo, SweepAuxCurves] + + +@dataclass(frozen=True) +class StartEvent: + ch: Optional[int] = None + + +@dataclass(frozen=True) +class PointEvent: + ch: int + x: int + y: float + aux: Optional[Tuple[float, float]] = None + + +ParserEvent: TypeAlias = Union[StartEvent, PointEvent] diff --git a/tests/test_cli.py b/tests/test_cli.py new file mode 100644 index 0000000..a67b802 --- /dev/null +++ b/tests/test_cli.py @@ -0,0 +1,42 @@ +from __future__ import annotations + +import subprocess +import sys +import unittest +from pathlib import Path + + +ROOT = Path(__file__).resolve().parents[1] + + +def _run(*args: str) -> subprocess.CompletedProcess[str]: + return subprocess.run( + [sys.executable, *args], + cwd=ROOT, + text=True, + capture_output=True, + check=False, + ) + + +class CliTests(unittest.TestCase): + def test_wrapper_help_works(self): + proc = _run("RFG_ADC_dataplotter.py", "--help") + self.assertEqual(proc.returncode, 0) + self.assertIn("usage:", proc.stdout) + self.assertIn("--peak_search", proc.stdout) + + def test_module_help_works(self): + proc = _run("-m", "rfg_adc_plotter.main", "--help") + self.assertEqual(proc.returncode, 0) + self.assertIn("usage:", proc.stdout) + self.assertIn("--parser_16_bit_x2", proc.stdout) + + def test_backend_mpl_reports_removal(self): + proc = _run("-m", "rfg_adc_plotter.main", "/dev/null", "--backend", "mpl") + self.assertNotEqual(proc.returncode, 0) + self.assertIn("Matplotlib backend removed", proc.stderr) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_processing.py b/tests/test_processing.py new file mode 100644 index 0000000..425263f --- /dev/null +++ b/tests/test_processing.py @@ -0,0 +1,68 @@ +from __future__ import annotations + +import numpy as np +import unittest + +from rfg_adc_plotter.processing.calibration import calibrate_freqs, recalculate_calibration_c +from rfg_adc_plotter.processing.fft import compute_distance_axis, compute_fft_mag_row, compute_fft_row +from rfg_adc_plotter.processing.normalization import build_calib_envelopes, normalize_by_calib +from rfg_adc_plotter.processing.peaks import find_peak_width_markers, find_top_peaks_over_ref, rolling_median_ref + + +class ProcessingTests(unittest.TestCase): + def test_recalculate_calibration_preserves_requested_edges(self): + coeffs = recalculate_calibration_c(np.asarray([0.0, 1.0, 0.025], dtype=np.float64), 3.3, 14.3) + y0 = coeffs[0] + coeffs[1] * 3.3 + coeffs[2] * (3.3 ** 2) + y1 = coeffs[0] + coeffs[1] * 14.3 + coeffs[2] * (14.3 ** 2) + self.assertTrue(np.isclose(y0, 3.3)) + self.assertTrue(np.isclose(y1, 14.3)) + + def test_calibrate_freqs_returns_monotonic_axis_and_same_shape(self): + sweep = {"F": np.linspace(3.3, 14.3, 32), "I": np.linspace(-1.0, 1.0, 32)} + calibrated = calibrate_freqs(sweep) + self.assertEqual(calibrated["F"].shape, (32,)) + self.assertEqual(calibrated["I"].shape, (32,)) + self.assertTrue(np.all(np.diff(calibrated["F"]) >= 0.0)) + + def test_normalizers_and_envelopes_return_finite_ranges(self): + calib = (np.sin(np.linspace(0.0, 4.0 * np.pi, 64)) * 5.0).astype(np.float32) + raw = calib * 0.75 + lower, upper = build_calib_envelopes(calib) + self.assertEqual(lower.shape, calib.shape) + self.assertEqual(upper.shape, calib.shape) + self.assertTrue(np.all(lower <= upper)) + + simple = normalize_by_calib(raw, calib + 10.0, norm_type="simple") + projector = normalize_by_calib(raw, calib, norm_type="projector") + self.assertEqual(simple.shape, raw.shape) + self.assertEqual(projector.shape, raw.shape) + self.assertTrue(np.any(np.isfinite(simple))) + self.assertTrue(np.any(np.isfinite(projector))) + + def test_fft_helpers_return_expected_shapes(self): + sweep = np.sin(np.linspace(0.0, 4.0 * np.pi, 128)).astype(np.float32) + freqs = np.linspace(3.3, 14.3, 128, dtype=np.float64) + mag = compute_fft_mag_row(sweep, freqs, 513) + row = compute_fft_row(sweep, freqs, 513) + axis = compute_distance_axis(freqs, 513) + self.assertEqual(mag.shape, (513,)) + self.assertEqual(row.shape, (513,)) + self.assertEqual(axis.shape, (513,)) + self.assertTrue(np.all(np.diff(axis) >= 0.0)) + + def test_peak_helpers_find_reference_and_peak_boxes(self): + xs = np.linspace(0.0, 10.0, 200) + ys = np.exp(-((xs - 5.0) ** 2) / 0.4) * 10.0 + 1.0 + ref = rolling_median_ref(xs, ys, 2.0) + peaks = find_top_peaks_over_ref(xs, ys, ref, top_n=3) + width = find_peak_width_markers(xs, ys) + self.assertEqual(ref.shape, ys.shape) + self.assertEqual(len(peaks), 1) + self.assertGreater(peaks[0]["x"], 4.0) + self.assertLess(peaks[0]["x"], 6.0) + self.assertIsNotNone(width) + self.assertGreater(width["width"], 0.0) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_ring_buffer.py b/tests/test_ring_buffer.py new file mode 100644 index 0000000..16c08fc --- /dev/null +++ b/tests/test_ring_buffer.py @@ -0,0 +1,44 @@ +from __future__ import annotations + +import numpy as np +import unittest + +from rfg_adc_plotter.state.ring_buffer import RingBuffer + + +class RingBufferTests(unittest.TestCase): + def test_ring_buffer_initializes_on_first_push(self): + ring = RingBuffer(max_sweeps=4) + sweep = np.linspace(-1.0, 1.0, 64, dtype=np.float32) + ring.push(sweep, np.linspace(3.3, 14.3, 64)) + self.assertIsNotNone(ring.ring) + self.assertIsNotNone(ring.ring_fft) + self.assertIsNotNone(ring.ring_time) + self.assertIsNotNone(ring.distance_axis) + self.assertIsNotNone(ring.last_fft_db) + self.assertEqual(ring.ring.shape[0], 4) + self.assertEqual(ring.ring_fft.shape, (4, ring.fft_bins)) + + def test_ring_buffer_reallocates_when_sweep_width_grows(self): + ring = RingBuffer(max_sweeps=3) + ring.push(np.ones((32,), dtype=np.float32), np.linspace(3.3, 14.3, 32)) + first_width = ring.width + ring.push(np.ones((2048,), dtype=np.float32), np.linspace(3.3, 14.3, 2048)) + self.assertGreater(ring.width, first_width) + self.assertIsNotNone(ring.ring) + self.assertEqual(ring.ring.shape, (3, ring.width)) + + def test_ring_buffer_tracks_latest_fft_and_display_arrays(self): + ring = RingBuffer(max_sweeps=2) + ring.push(np.linspace(0.0, 1.0, 64, dtype=np.float32), np.linspace(3.3, 14.3, 64)) + ring.push(np.linspace(1.0, 0.0, 64, dtype=np.float32), np.linspace(3.3, 14.3, 64)) + raw = ring.get_display_raw() + fft = ring.get_display_fft_linear() + self.assertEqual(raw.shape[1], 2) + self.assertEqual(fft.shape[1], 2) + self.assertIsNotNone(ring.last_fft_db) + self.assertEqual(ring.last_fft_db.shape, (ring.fft_bins,)) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_sweep_parser_core.py b/tests/test_sweep_parser_core.py new file mode 100644 index 0000000..73a81a5 --- /dev/null +++ b/tests/test_sweep_parser_core.py @@ -0,0 +1,156 @@ +from __future__ import annotations + +import math +import unittest + +from rfg_adc_plotter.io.sweep_parser_core import ( + AsciiSweepParser, + LegacyBinaryParser, + LogScale16BitX2BinaryParser, + LogScaleBinaryParser32, + ParserTestStreamParser, + PointEvent, + StartEvent, + SweepAssembler, + log_pair_to_sweep, +) + + +def _u16le(word: int) -> bytes: + w = int(word) & 0xFFFF + return bytes((w & 0xFF, (w >> 8) & 0xFF)) + + +def _pack_legacy_start(ch: int) -> bytes: + return b"\xff\xff" * 3 + bytes((0x0A, int(ch) & 0xFF)) + + +def _pack_legacy_point(ch: int, step: int, value_i32: int) -> bytes: + value = int(value_i32) & 0xFFFF_FFFF + return b"".join( + [ + _u16le(step), + _u16le((value >> 16) & 0xFFFF), + _u16le(value & 0xFFFF), + bytes((0x0A, int(ch) & 0xFF)), + ] + ) + + +def _pack_log_start(ch: int) -> bytes: + return b"\xff\xff" * 5 + bytes((0x0A, int(ch) & 0xFF)) + + +def _pack_log_point(step: int, avg1: int, avg2: int, ch: int = 0) -> bytes: + a1 = int(avg1) & 0xFFFF_FFFF + a2 = int(avg2) & 0xFFFF_FFFF + return b"".join( + [ + _u16le(step), + _u16le((a1 >> 16) & 0xFFFF), + _u16le(a1 & 0xFFFF), + _u16le((a2 >> 16) & 0xFFFF), + _u16le(a2 & 0xFFFF), + bytes((0x0A, int(ch) & 0xFF)), + ] + ) + + +def _pack_log16_start(ch: int) -> bytes: + return b"\xff\xff" * 3 + bytes((0x0A, int(ch) & 0xFF)) + + +def _pack_log16_point(step: int, avg1: int, avg2: int) -> bytes: + return b"".join( + [ + _u16le(step), + _u16le(avg1), + _u16le(avg2), + _u16le(0xFFFF), + ] + ) + + +class SweepParserCoreTests(unittest.TestCase): + def test_ascii_parser_emits_start_and_points(self): + parser = AsciiSweepParser() + events = parser.feed(b"Sweep_start\ns 1 2 -3\ns2 4 5\n") + self.assertIsInstance(events[0], StartEvent) + self.assertIsInstance(events[1], PointEvent) + self.assertIsInstance(events[2], PointEvent) + self.assertEqual(events[1].ch, 1) + self.assertEqual(events[1].x, 2) + self.assertEqual(events[1].y, -3.0) + self.assertEqual(events[2].ch, 2) + self.assertEqual(events[2].x, 4) + self.assertEqual(events[2].y, 5.0) + + def test_legacy_binary_parser_resynchronizes_after_garbage(self): + parser = LegacyBinaryParser() + stream = b"\x00junk" + _pack_legacy_start(3) + _pack_legacy_point(3, 1, -2) + events = parser.feed(stream) + self.assertIsInstance(events[0], StartEvent) + self.assertEqual(events[0].ch, 3) + self.assertIsInstance(events[1], PointEvent) + self.assertEqual(events[1].ch, 3) + self.assertEqual(events[1].x, 1) + self.assertEqual(events[1].y, -2.0) + + def test_logscale_32_parser_keeps_channel_and_aux_values(self): + parser = LogScaleBinaryParser32() + stream = _pack_log_start(5) + _pack_log_point(7, 1500, 700, ch=5) + events = parser.feed(stream) + self.assertIsInstance(events[0], StartEvent) + self.assertEqual(events[0].ch, 5) + self.assertIsInstance(events[1], PointEvent) + self.assertEqual(events[1].ch, 5) + self.assertEqual(events[1].x, 7) + self.assertAlmostEqual(events[1].y, log_pair_to_sweep(1500, 700), places=6) + self.assertEqual(events[1].aux, (1500.0, 700.0)) + + def test_logscale_16bit_parser_uses_last_start_channel(self): + parser = LogScale16BitX2BinaryParser() + stream = _pack_log16_start(2) + _pack_log16_point(1, 100, 90) + events = parser.feed(stream) + self.assertIsInstance(events[0], StartEvent) + self.assertEqual(events[0].ch, 2) + self.assertIsInstance(events[1], PointEvent) + self.assertEqual(events[1].ch, 2) + self.assertEqual(events[1].aux, (100.0, 90.0)) + + def test_parser_test_stream_parser_recovers_point_after_single_separator(self): + parser = ParserTestStreamParser() + stream = b"".join( + [ + b"\xff\xff\xff\xff", + bytes((0x0A, 4)), + _u16le(1), + _u16le(100), + _u16le(90), + _u16le(0xFFFF), + ] + ) + events = parser.feed(stream) + events.extend(parser.feed(_u16le(2))) + self.assertIsInstance(events[0], StartEvent) + self.assertEqual(events[0].ch, 4) + self.assertIsInstance(events[1], PointEvent) + self.assertEqual(events[1].ch, 4) + self.assertEqual(events[1].x, 1) + self.assertTrue(math.isfinite(events[1].y)) + + def test_sweep_assembler_builds_aux_curves_without_inversion(self): + assembler = SweepAssembler(fancy=False, apply_inversion=False) + self.assertIsNone(assembler.consume(StartEvent(ch=1))) + assembler.consume(PointEvent(ch=1, x=1, y=10.0, aux=(100.0, 90.0))) + assembler.consume(PointEvent(ch=1, x=2, y=20.0, aux=(110.0, 95.0))) + sweep, info, aux = assembler.finalize_current() + self.assertEqual(sweep.shape[0], 3) + self.assertEqual(info["ch"], 1) + self.assertIsNotNone(aux) + self.assertEqual(aux[0][1], 100.0) + self.assertEqual(aux[1][2], 95.0) + + +if __name__ == "__main__": + unittest.main()