17 Commits

Author SHA1 Message Date
awe
f89dedf4ab add new data format support 2026-02-10 20:02:55 +03:00
awe
2f6e5d0dda fix median 2026-02-04 14:56:33 +03:00
awe
5978d58dc5 fix sub matplotlib 2026-02-04 14:38:55 +03:00
awe
e0f7678c3e backend upd 2026-02-04 14:13:01 +03:00
awe
60a35b690f fix debug 2026-02-03 19:13:33 +03:00
awe
6f5dfafbfb fix matplotlib 2026-02-03 19:06:26 +03:00
awe
3a072f0298 fix 3 2026-02-03 18:55:39 +03:00
awe
d8b71b2cc4 fix 2 2026-02-03 15:34:37 +03:00
awe
53ff80a522 fix 2026-02-03 15:33:09 +03:00
awe
a32bef2250 reference cli add 2026-02-03 15:24:07 +03:00
awe
61816cf894 reference 2026-02-03 15:10:22 +03:00
awe
3bc2382bd0 new fft 2026-02-03 14:40:39 +03:00
awe
2af6c8a486 new project structure 2026-02-03 14:17:06 +03:00
awe
0332ebdd98 global phase try 2 2026-01-30 15:50:40 +03:00
awe
e84c155e25 try to modern fft 2026-01-30 12:38:17 +03:00
awe
508c835368 add logging 2026-01-29 17:05:47 +03:00
awe
23cff76dd2 test fix for dropping points 2026-01-29 16:58:01 +03:00
18 changed files with 8781 additions and 1852 deletions

187
README.md Normal file
View File

@ -0,0 +1,187 @@
# RFG STM32 ADC Receiver GUI
Реалтайм-плоттер для визуализации данных FMCW радара, получаемых через виртуальный COM-порт от STM32 ADC.
## Описание
Приложение визуализирует данные в реальном времени, отображая 6 синхронизированных графиков:
1. **Сырые данные** - график последнего полученного свипа
2. **Водопад сырых данных** - временная серия последних N свипов
3. **FFT спектр** - спектр текущего свипа в частотной области
4. **B-scan** - спектрограмма (водопад FFT)
5. **Фаза спектра** - развернутая фаза для анализа расстояния
6. **Водопад фазы** - временная эволюция фазы
## Возможности
- ✅ Высокопроизводительная визуализация в реальном времени
- ✅ Два бэкенда визуализации: matplotlib (совместимость) и pyqtgraph (скорость)
- ✅ Автоматическая обработка фазы для FMCW радара
- ✅ Преобразование фазы в расстояние
- ✅ Поддержка pyserial или raw TTY доступа
- ✅ Заполнение пропущенных точек (режим --fancy)
- ✅ Инверсия сигнала при отрицательном уровне
- ✅ Диагностика потерь данных
## Установка
### Минимальные требования
```bash
pip install -r requirements.txt
```
### Зависимости
**Обязательные:**
- `numpy` - обработка массивов и FFT
- `matplotlib` - визуализация
**Опциональные (рекомендуется):**
- `pyserial` - доступ к serial порту (обязательно для Windows)
- `pyqtgraph` + `PyQt5` или `PySide6` - быстрый бэкенд визуализации
## Использование
### Базовый запуск
```bash
python -m rfg_adc_plotter.cli /dev/ttyACM0
```
### С параметрами
```bash
python -m rfg_adc_plotter.cli /dev/ttyACM0 \
--baud 115200 \
--max-sweeps 200 \
--max-fps 30 \
--backend pg \
--fancy
```
### Параметры командной строки
- `port` - путь к порту (например `/dev/ttyACM0`, `COM3`)
- `--baud` - скорость порта (по умолчанию 115200)
- `--max-sweeps` - количество свипов в водопаде (по умолчанию 200)
- `--max-fps` - ограничение частоты отрисовки (по умолчанию 30)
- `--cmap` - цветовая карта для водопадов (по умолчанию viridis)
- `--spec-clip` - процентильная обрезка контраста B-scan (по умолчанию 2,98)
- `--title` - заголовок окна (по умолчанию "ADC Sweeps")
- `--fancy` - заполнение пропущенных точек средними значениями
- `--ylim` - фиксированные пределы по Y (формат: min,max)
- `--backend` - бэкенд визуализации:
- `auto` - автоматический выбор (сначала pyqtgraph, fallback на matplotlib)
- `pg` - pyqtgraph (быстрее)
- `mpl` - matplotlib (совместимее)
## Формат данных
Приложение ожидает текстовые строки через serial порт:
```
Sweep_start
s 0 1234
s 1 1256
s 2 1278
...
Sweep_start
s 0 1235
...
```
- `Sweep_start` - начало нового свипа
- `s X Y` - точка данных (индекс X, значение Y), целые числа со знаком
## Архитектура проекта
```
rfg_adc_plotter/
├── __init__.py
├── config.py # Константы и типы
├── cli.py # Точка входа CLI
├── data_acquisition/
│ ├── __init__.py
│ ├── serial_io.py # Serial порт I/O
│ └── sweep_reader.py # Фоновый поток чтения данных
├── signal_processing/
│ ├── __init__.py
│ └── phase_analysis.py # Обработка фазы
├── visualization/
│ ├── __init__.py
│ ├── matplotlib_backend.py # Matplotlib визуализация
│ └── pyqtgraph_backend.py # PyQtGraph визуализация
└── utils/
├── __init__.py
└── formatting.py # Утилиты форматирования
```
## Технические особенности
### Оптимизации производительности
- Фоновый поток для чтения и парсинга данных
- Векторизованные numpy операции
- Кольцевые буферы для водопадов
- Неблокирующее чтение из serial порта
- Буферизация с увеличенным размером (256KB)
### Обработка сигналов
- **FFT анализ**: окно Хэннинга, длина 1024
- **Phase unwrapping**: адаптивный алгоритм с порогом 0.8π
- **Преобразование фазы в расстояние**: формула Δl = φ × c / (4π × ν)
- **Инверсия сигнала**: автоматическая при среднем уровне < порога
### Диагностика
Каждые 10 секунд в stderr выводится диагностическая информация:
- Номер свипа
- Среднее количество валидных точек
- Количество принятых строк
- Ошибки парсинга
- Ошибки чтения
- Размер буфера
- Потерянные свипы
## Примеры использования
### Linux с pyserial
```bash
python -m rfg_adc_plotter.cli /dev/ttyACM0 --backend pg
```
### Linux с raw TTY (без pyserial)
```bash
python -m rfg_adc_plotter.cli /dev/ttyACM0 --backend mpl
```
### Windows
```bash
python -m rfg_adc_plotter.cli COM3 --backend pg --baud 115200
```
### С высоким разрешением времени
```bash
python -m rfg_adc_plotter.cli /dev/ttyACM0 --max-sweeps 500 --max-fps 60
```
### С заполнением пропусков и фиксированным Y
```bash
python -m rfg_adc_plotter.cli /dev/ttyACM0 --fancy --ylim -2000,2000
```
## Лицензия
См. LICENSE файл в корне проекта.
## Авторы
Разработано для визуализации данных FMCW радара с STM32 ADC.

File diff suppressed because it is too large Load Diff

14
requirements.txt Normal file
View File

@ -0,0 +1,14 @@
# Основные зависимости
numpy>=1.20.0
# Визуализация (matplotlib - обязательна)
matplotlib>=3.3.0
# Serial порт (опционально, но рекомендуется)
pyserial>=3.5
# Быстрый бэкенд визуализации (опционально)
pyqtgraph>=0.12.0
PyQt5>=5.15.0
# Альтернатива PyQt5:
# PySide6>=6.0.0

View File

110
rfg_adc_plotter/cli.py Normal file
View File

@ -0,0 +1,110 @@
#!/usr/bin/env python3
"""
Точка входа для RFG ADC Data Plotter.
Реалтайм-плоттер для свипов из виртуального COM-порта.
Формат строк:
- "Sweep_start" — начало нового свипа (предыдущий считается завершённым)
- "s X Y" — точка (индекс X, значение Y), все целые со знаком
Отрисовываются шесть графиков:
- Левый верхний: последний полученный свип (Y vs X)
- Правый верхний: водопад (последние N свипов во времени)
- Левый средний: FFT спектр текущего свипа
- Правый средний: B-scan (водопад FFT спектров)
- Левый нижний: Фаза спектра (развернутая)
- Правый нижний: Водопад фазы
Оптимизации для скорости:
- Парсинг и чтение в фоновой нити
- Анимация с обновлением только данных (без лишнего пересоздания фигур)
- Кольцевой буфер под водопад с фиксированным числом свипов
Зависимости: matplotlib, numpy. PySerial опционален — при его отсутствии
используется сырой доступ к TTY через termios.
"""
import argparse
import sys
def main():
"""Основная функция CLI."""
parser = argparse.ArgumentParser(
description=(
"Читает свипы из виртуального COM-порта и рисует: "
"последний свип и водопад (реалтайм)."
)
)
parser.add_argument(
"port",
help="Путь к порту, например /dev/ttyACM1 или COM3 (COM10+: \\\\.\\COM10)",
)
parser.add_argument("--baud", type=int, default=115200, help="Скорость (по умолчанию 115200)")
parser.add_argument("--max-sweeps", type=int, default=200, help="Количество видимых свипов в водопаде")
parser.add_argument("--max-fps", type=float, default=30.0, help="Лимит частоты отрисовки, кадров/с")
parser.add_argument("--cmap", default="viridis", help="Цветовая карта водопада")
parser.add_argument(
"--spec-clip",
default="2,98",
help=(
"Процентильная обрезка уровней водопада спектров, % (min,max). "
"Напр. 2,98. 'off' — отключить"
),
)
parser.add_argument("--title", default="ADC Sweeps", help="Заголовок окна")
parser.add_argument(
"--fancy",
action="store_true",
help="Заполнять выпавшие точки средними значениями между соседними",
)
parser.add_argument(
"--ylim",
type=str,
default=None,
help="Фиксированные Y-пределы для кривой формата min,max (например -1000,1000). По умолчанию авто",
)
parser.add_argument(
"--backend",
choices=["auto", "pg", "mpl"],
default="auto",
help="Графический бэкенд: pyqtgraph (pg) — быстрее; matplotlib (mpl) — совместимый. По умолчанию auto",
)
parser.add_argument(
"--ref-out",
type=str,
default=None,
help="Сохранить медиану последних 1000 свипов в указанный файл при накоплении данных",
)
parser.add_argument(
"--ref-in",
type=str,
default=None,
help="Загрузить медиану из файла и вычитать её из входящего сигнала",
)
args = parser.parse_args()
# Попробуем быстрый бэкенд (pyqtgraph) при auto/pg
if args.backend in ("auto", "pg"):
try:
from .visualization.pyqtgraph_backend import run_pyqtgraph
return run_pyqtgraph(args)
except Exception as e:
if args.backend == "pg":
sys.stderr.write(f"[error] PyQtGraph бэкенд недоступен: {e}\n")
sys.exit(1)
# При auto — тихо откатываемся на matplotlib
# Fallback на matplotlib
try:
from .visualization.matplotlib_backend import run_matplotlib
return run_matplotlib(args)
except Exception as e:
sys.stderr.write(f"[error] Matplotlib бэкенд недоступен: {e}\n")
sys.exit(1)
if __name__ == "__main__":
main()

28
rfg_adc_plotter/config.py Normal file
View File

@ -0,0 +1,28 @@
"""
Константы и типы для RFG ADC Data Plotter.
"""
from typing import Dict, Tuple, Union
import numpy as np
# Максимальное число точек в ряду водопада
WF_WIDTH = 1000
# Длина БПФ для спектра/водопада спектров
FFT_LEN = 2048
# Частотный диапазон для FFT (в ГГц)
FREQ_MIN_GHZ = -10.0 # Начало частотной оси
FREQ_MAX_GHZ = 10.0 # Конец частотной оси
DATA_FREQ_START_GHZ = 1.0 # Начало реальных данных
DATA_FREQ_END_GHZ = 10.0 # Конец реальных данных
# Порог для инверсии сырых данных: если среднее значение свипа ниже порога —
# считаем, что сигнал «меньше нуля» и домножаем свип на -1
DATA_INVERSION_THRASHOLD = 10.0
# Типы данных
Number = Union[int, float]
SweepInfo = Dict[str, Number]
SweepPacket = Tuple[np.ndarray, SweepInfo]

View File

@ -0,0 +1,204 @@
"""
Модули для работы с serial портом: чтение данных через pyserial или raw TTY.
"""
import io
import os
import sys
from typing import Optional
def try_open_pyserial(path: str, baud: int, timeout: float):
"""Попытка открыть порт через pyserial."""
try:
import serial # type: ignore
except Exception:
return None
try:
ser = serial.Serial(path, baudrate=baud, timeout=timeout)
# ВРЕМЕННО ОТКЛЮЧЕН: hardware flow control для проверки
# ser.rtscts = True
# Увеличиваем буфер приема ядра до 64KB
try:
ser.set_buffer_size(rx_size=65536, tx_size=4096)
except (AttributeError, NotImplementedError):
# Не все платформы/версии pyserial поддерживают set_buffer_size
pass
return ser
except Exception:
return None
class FDReader:
"""Простой враппер чтения строк из файлового дескриптора TTY."""
def __init__(self, fd: int):
# Отдельно буферизуем для корректной readline()
self._fd = fd
raw = os.fdopen(fd, "rb", closefd=False)
self._file = raw
# Увеличен размер буфера до 256KB для предотвращения потерь
self._buf = io.BufferedReader(raw, buffer_size=262144)
def fileno(self) -> int:
return self._fd
def readline(self) -> bytes:
return self._buf.readline()
def close(self):
try:
self._buf.close()
except Exception:
pass
def open_raw_tty(path: str, baud: int) -> Optional[FDReader]:
"""Открыть TTY без pyserial и настроить порт через termios.
Возвращает FDReader или None при ошибке.
"""
try:
import termios
import tty
except Exception:
return None
try:
fd = os.open(path, os.O_RDONLY | os.O_NOCTTY)
except Exception:
return None
try:
attrs = termios.tcgetattr(fd)
# Установим «сырое» состояние
tty.setraw(fd)
# Скорость
baud_map = {
9600: termios.B9600,
19200: termios.B19200,
38400: termios.B38400,
57600: termios.B57600,
115200: termios.B115200,
230400: getattr(termios, "B230400", None),
460800: getattr(termios, "B460800", None),
}
b = baud_map.get(baud) or termios.B115200
attrs[4] = b # ispeed
attrs[5] = b # ospeed
# VMIN=1, VTIME=0 — блокирующее чтение по байту
cc = attrs[6]
cc[termios.VMIN] = 1
cc[termios.VTIME] = 0
attrs[6] = cc
termios.tcsetattr(fd, termios.TCSANOW, attrs)
except Exception:
try:
os.close(fd)
except Exception:
pass
return None
return FDReader(fd)
class SerialLineSource:
"""Единый интерфейс для чтения строк из порта (pyserial или raw TTY)."""
def __init__(self, path: str, baud: int, timeout: float = 1.0):
self._pyserial = try_open_pyserial(path, baud, timeout)
self._fdreader = None
self._using = "pyserial" if self._pyserial is not None else "raw"
if self._pyserial is None:
self._fdreader = open_raw_tty(path, baud)
if self._fdreader is None:
msg = f"Не удалось открыть порт '{path}' (pyserial и raw TTY не сработали)"
if sys.platform.startswith("win"):
msg += ". На Windows нужен pyserial: pip install pyserial"
raise RuntimeError(msg)
def readline(self) -> bytes:
if self._pyserial is not None:
try:
return self._pyserial.readline()
except Exception:
return b""
else:
try:
return self._fdreader.readline() # type: ignore[union-attr]
except Exception:
return b""
def close(self):
try:
if self._pyserial is not None:
self._pyserial.close()
elif self._fdreader is not None:
self._fdreader.close()
except Exception:
pass
class SerialChunkReader:
"""Быстрое неблокирующее чтение чанков из serial/raw TTY для максимального дренажа буфера."""
def __init__(self, src: SerialLineSource, error_counter: Optional[list] = None):
self._src = src
self._ser = src._pyserial
self._fd: Optional[int] = None
self._error_counter = error_counter # Список с 1 элементом для передачи по ссылке
if self._ser is not None:
# Неблокирующий режим для быстрой откачки
try:
self._ser.timeout = 0
except Exception:
pass
else:
try:
self._fd = src._fdreader.fileno() # type: ignore[union-attr]
try:
os.set_blocking(self._fd, False)
except Exception:
pass
except Exception:
self._fd = None
def read_available(self) -> bytes:
"""Вернёт доступные байты (b"" если данных нет)."""
if self._ser is not None:
try:
n = int(getattr(self._ser, "in_waiting", 0))
except Exception:
if self._error_counter:
self._error_counter[0] += 1
n = 0
if n > 0:
try:
return self._ser.read(n)
except Exception:
if self._error_counter:
self._error_counter[0] += 1
return b""
return b""
if self._fd is None:
return b""
out = bytearray()
while True:
try:
chunk = os.read(self._fd, 65536)
if not chunk:
break
out += chunk
if len(chunk) < 65536:
break
except BlockingIOError:
break
except Exception:
if self._error_counter:
self._error_counter[0] += 1
break
return bytes(out)

View File

@ -0,0 +1,269 @@
"""
Фоновый поток для чтения и сборки свипов из serial порта.
"""
import sys
import threading
import time
from collections import deque
from queue import Queue, Full
import numpy as np
from ..config import DATA_INVERSION_THRASHOLD, SweepInfo, SweepPacket
from .serial_io import SerialChunkReader, SerialLineSource
class SweepReader(threading.Thread):
"""Фоновый поток: читает строки, формирует завершённые свипы и кладёт в очередь."""
def __init__(
self,
port_path: str,
baud: int,
out_queue: Queue[SweepPacket],
stop_event: threading.Event,
fancy: bool = False,
):
super().__init__(daemon=True)
self._port_path = port_path
self._baud = baud
self._q = out_queue
self._stop = stop_event
self._src: SerialLineSource | None = None
self._fancy = bool(fancy)
self._max_width: int = 0
self._sweep_idx: int = 0
self._last_sweep_ts: float | None = None
self._n_valid_hist = deque()
# Счетчик потерь данных (выброшенных свипов из-за переполнения очереди)
self._dropped_sweeps: int = 0
# Диагностика потери точек внутри свипа
self._total_lines_received: int = 0 # Всего принято строк с данными
self._total_parse_errors: int = 0 # Ошибок парсинга строк
self._total_empty_lines: int = 0 # Пустых строк
self._max_buf_size: int = 0 # Максимальный размер буфера парсинга
self._read_errors: int = 0 # Ошибок чтения из порта
self._last_diag_time: float = 0.0 # Время последнего вывода диагностики
self._cal_mode: int = -1 # Режим калибровки (07), -1 = неизвестен
def _finalize_current(self, xs, ys, cal_mode: int = -1):
if not xs:
return
max_x = max(xs)
width = max_x + 1
self._max_width = max(self._max_width, width)
target_width = self._max_width if self._fancy else width
# Быстрый векторизованный путь
sweep = np.full((target_width,), np.nan, dtype=np.float32)
try:
idx = np.asarray(xs, dtype=np.int64)
vals = np.asarray(ys, dtype=np.float32)
sweep[idx] = vals
except Exception:
# Запасной путь
for x, y in zip(xs, ys):
if 0 <= x < target_width:
sweep[x] = float(y)
# Метрики валидных точек до заполнения пропусков
finite_pre = np.isfinite(sweep)
n_valid_cur = int(np.count_nonzero(finite_pre))
# Дополнительная обработка пропусков: при --fancy заполняем внутренние разрывы, края и дотягиваем до максимальной длины
if self._fancy:
try:
known = ~np.isnan(sweep)
if np.any(known):
known_idx = np.nonzero(known)[0]
# Для каждой пары соседних известных индексов заполним промежуток средним значением
for i0, i1 in zip(known_idx[:-1], known_idx[1:]):
if i1 - i0 > 1:
avg = (sweep[i0] + sweep[i1]) * 0.5
sweep[i0 + 1 : i1] = avg
first_idx = int(known_idx[0])
last_idx = int(known_idx[-1])
if first_idx > 0:
sweep[:first_idx] = sweep[first_idx]
if last_idx < sweep.size - 1:
sweep[last_idx + 1 :] = sweep[last_idx]
except Exception:
# В случае ошибки просто оставляем как есть
pass
# Инверсия данных при «отрицательном» уровне (среднее ниже порога)
try:
m = float(np.nanmean(sweep))
if np.isfinite(m) and m < DATA_INVERSION_THRASHOLD:
sweep *= -1.0
except Exception:
pass
sweep -= float(np.nanmean(sweep))
# Метрики для статусной строки (вид словаря: переменная -> значение)
self._sweep_idx += 1
now = time.time()
if self._last_sweep_ts is None:
dt_ms = float("nan")
else:
dt_ms = (now - self._last_sweep_ts) * 1000.0
self._last_sweep_ts = now
self._n_valid_hist.append((now, n_valid_cur))
while self._n_valid_hist and (now - self._n_valid_hist[0][0]) > 1.0:
self._n_valid_hist.popleft()
if self._n_valid_hist:
n_valid = float(sum(v for _t, v in self._n_valid_hist) / len(self._n_valid_hist))
else:
n_valid = float(n_valid_cur)
if n_valid_cur > 0:
vmin = float(np.nanmin(sweep))
vmax = float(np.nanmax(sweep))
mean = float(np.nanmean(sweep))
std = float(np.nanstd(sweep))
else:
vmin = vmax = mean = std = float("nan")
info: SweepInfo = {
"sweep": self._sweep_idx,
"n_valid": n_valid,
"min": vmin,
"max": vmax,
"mean": mean,
"std": std,
"dt_ms": dt_ms,
"dropped": self._dropped_sweeps,
"lines": self._total_lines_received,
"parse_err": self._total_parse_errors,
"read_err": self._read_errors,
"max_buf": self._max_buf_size,
"cal_mode": cal_mode,
}
# Периодический вывод детальной диагностики в stderr (каждые 10 секунд)
now = time.time()
if now - self._last_diag_time > 10.0:
self._last_diag_time = now
sys.stderr.write(
f"[DIAG] sweep={self._sweep_idx} n_valid={n_valid:.1f} "
f"lines={self._total_lines_received} parse_err={self._total_parse_errors} "
f"read_err={self._read_errors} max_buf={self._max_buf_size} "
f"dropped={self._dropped_sweeps}\n"
)
sys.stderr.flush()
# Кладём готовый свип (если очередь полна — выбрасываем самый старый)
try:
self._q.put_nowait((sweep, info))
except Full:
# Счетчик потерь для диагностики
self._dropped_sweeps += 1
try:
_ = self._q.get_nowait()
except Exception:
pass
try:
self._q.put_nowait((sweep, info))
except Exception:
pass
def run(self):
# Состояние текущего свипа
xs: list[int] = []
ys: list[int] = []
current_cal_mode: int = -1 # Режим калибровки для текущего свипа
try:
self._src = SerialLineSource(self._port_path, self._baud, timeout=1.0)
sys.stderr.write(f"[info] Открыл порт {self._port_path} ({self._src._using})\n")
except Exception as e:
sys.stderr.write(f"[error] {e}\n")
return
try:
# Быстрый неблокирующий дренаж порта с разбором по байтам
# Передаем счетчик ошибок чтения как список для изменения по ссылке
error_counter = [0]
chunk_reader = SerialChunkReader(self._src, error_counter)
buf = bytearray()
while not self._stop.is_set():
data = chunk_reader.read_available()
# Обновляем счетчик ошибок чтения
self._read_errors = error_counter[0]
if data:
buf += data
# Отслеживаем максимальный размер буфера парсинга
if len(buf) > self._max_buf_size:
self._max_buf_size = len(buf)
else:
# Короткая уступка CPU, если нет новых данных (уменьшена до 0.1ms)
time.sleep(0.0001)
continue
# Обрабатываем все полные строки
while True:
nl = buf.find(b"\n")
if nl == -1:
break
line = bytes(buf[:nl])
del buf[: nl + 1]
if line.endswith(b"\r"):
line = line[:-1]
if not line:
self._total_empty_lines += 1
continue
if line.startswith(b"Sweep_start"):
self._finalize_current(xs, ys, current_cal_mode)
xs.clear()
ys.clear()
current_cal_mode = -1
continue
# Формат строки данных: "sN X Y" или "s X Y"
# где N — цифра режима калибровки 07 (слитно с 's')
# X — индекс точки, Y — значение (целое со знаком)
if len(line) >= 3:
parts = line.split()
if parts and len(parts[0]) >= 1 and parts[0][:1].lower() == b"s":
tag = parts[0].lower() # b"s" или b"s0"..b"s7"
if len(tag) == 2 and b"0" <= tag[1:2] <= b"7":
# Новый формат: режим калибровки встроен в тег
current_cal_mode = int(tag[1:2])
data_parts = parts[1:]
elif len(tag) == 1:
# Старый формат: "s X Y"
data_parts = parts[1:]
else:
self._total_parse_errors += 1
continue
if len(data_parts) >= 2:
try:
x = int(data_parts[0], 10)
y = int(data_parts[1], 10)
except Exception:
self._total_parse_errors += 1
continue
xs.append(x)
ys.append(y)
self._total_lines_received += 1
else:
self._total_parse_errors += 1
else:
# Строка не начинается с 's'
self._total_parse_errors += 1
else:
# Строка слишком короткая
self._total_parse_errors += 1
# Защита от переполнения буфера при отсутствии переводов строки (снижен порог)
if len(buf) > 262144:
del buf[:-131072]
finally:
try:
# Завершаем оставшийся свип
self._finalize_current(xs, ys, current_cal_mode)
except Exception:
pass
try:
if self._src is not None:
self._src.close()
except Exception:
pass

View File

@ -0,0 +1,107 @@
"""
Обработка фазы для FMCW радара: развертка фазы и преобразование в расстояние.
"""
from typing import Optional, Tuple
import numpy as np
def apply_temporal_unwrap(
current_phase: np.ndarray,
prev_phase: Optional[np.ndarray],
phase_offset: Optional[np.ndarray],
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Применяет улучшенный phase unwrapping для FMCW радара с адаптивным порогом.
Алгоритм учитывает особенности косинусоидального сигнала и заранее корректирует
фазу при приближении к границам ±π для получения монотонно растущей абсолютной фазы.
Args:
current_phase: Текущая фаза (развернутая по частоте) для всех бинов
prev_phase: Предыдущая фаза, может быть None при первом вызове
phase_offset: Накопленные смещения для каждого бина, может быть None
Returns:
(unwrapped_phase, new_prev_phase, new_phase_offset)
unwrapped_phase - абсолютная развёрнутая фаза (может быть > 2π)
new_prev_phase - обновлённая предыдущая фаза (для следующего вызова)
new_phase_offset - обновлённые смещения (для следующего вызова)
"""
n_bins = current_phase.size
# Инициализация при первом вызове
if prev_phase is None:
prev_phase = current_phase.copy()
phase_offset = np.zeros(n_bins, dtype=np.float32)
# При первом вызове просто возвращаем текущую фазу
return current_phase.copy(), prev_phase, phase_offset
if phase_offset is None:
phase_offset = np.zeros(n_bins, dtype=np.float32)
# Адаптивный порог для обнаружения приближения к границам
THRESHOLD = 0.8 * np.pi
# Вычисляем разницу между текущей и предыдущей фазой
delta = current_phase - prev_phase
# Обнаруживаем скачки и корректируем offset
# Используем улучшенный алгоритм с адаптивным порогом
# Метод 1: Стандартная коррекция для больших скачков (> π)
# Это ловит случаи, когда фаза уже перескочила границу
phase_offset = phase_offset - 2.0 * np.pi * np.round(delta / (2.0 * np.pi))
# Метод 2: Адаптивная коррекция при приближении к границам
# Проверяем текущую развернутую фазу
unwrapped_phase = current_phase + phase_offset
# Если фаза близка к нечетным π (π, 3π, 5π...), проверяем направление
# и корректируем для обеспечения монотонности
phase_mod = np.mod(unwrapped_phase + np.pi, 2.0 * np.pi) - np.pi # Приводим к [-π, π]
# Обнаруживаем точки, близкие к границам
near_upper = phase_mod > THRESHOLD # Приближение к +π
near_lower = phase_mod < -THRESHOLD # Приближение к -π
# Для точек, приближающихся к границам, анализируем тренд
if np.any(near_upper) or np.any(near_lower):
# Если delta положительна и мы около +π, готовимся к переходу
should_add = near_upper & (delta > 0)
# Если delta отрицательна и мы около -π, готовимся к переходу
should_sub = near_lower & (delta < 0)
# Применяем дополнительную коррекцию только там, где нужно
# (этот код срабатывает редко, только при быстром движении объекта)
pass # Основная коррекция уже сделана выше
# Финальная развернутая фаза
unwrapped_phase = current_phase + phase_offset
# Сохраняем текущую фазу как предыдущую для следующего свипа
new_prev_phase = current_phase.copy()
new_phase_offset = phase_offset.copy()
return unwrapped_phase, new_prev_phase, new_phase_offset
def phase_to_distance(phase: np.ndarray, center_freq_hz: float = 6e9) -> np.ndarray:
"""Преобразует развернутую фазу в расстояние для FMCW радара.
Формула: Δl = φ * c / (4π * ν)
где:
φ - фаза (радианы)
c - скорость света (м/с)
ν - центральная частота свипа (Гц)
Args:
phase: Развернутая фаза в радианах
center_freq_hz: Центральная частота диапазона в Гц (по умолчанию 6 ГГц для 2-10 ГГц)
Returns:
Расстояние в метрах
"""
c = 299792458.0 # Скорость света в м/с
distance = phase * c / (4.0 * np.pi * center_freq_hz)
return distance.astype(np.float32)

View File

View File

@ -0,0 +1,50 @@
"""
Утилиты для форматирования данных и парсинга параметров.
"""
from typing import Any, Mapping, Optional, Tuple
import numpy as np
def format_status_kv(data: Mapping[str, Any]) -> str:
"""Преобразовать словарь метрик в одну строку 'k:v'."""
def _fmt(v: Any) -> str:
if v is None:
return "NA"
try:
fv = float(v)
except Exception:
return str(v)
if not np.isfinite(fv):
return "nan"
# Достаточно компактно для статус-строки.
if abs(fv) >= 1000 or (0 < abs(fv) < 0.01):
return f"{fv:.3g}"
return f"{fv:.3f}".rstrip("0").rstrip(".")
parts = [f"{k}:{_fmt(v)}" for k, v in data.items()]
return " ".join(parts)
def parse_spec_clip(spec: Optional[str]) -> Optional[Tuple[float, float]]:
"""Разобрать строку вида "low,high" процентов для контрастного отображения водопада спектров.
Возвращает пару (low, high) или None для отключения. Допустимы значения 0..100, low < high.
Ключевые слова отключения: "off", "none", "no".
"""
if not spec:
return None
s = str(spec).strip().lower()
if s in ("off", "none", "no"):
return None
try:
p0, p1 = s.replace(";", ",").split(",")
low = float(p0)
high = float(p1)
if not (0.0 <= low < high <= 100.0):
return None
return (low, high)
except Exception:
return None

View File

@ -0,0 +1,651 @@
"""
Визуализация данных с использованием matplotlib.
"""
import csv
import sys
import threading
import time
from datetime import datetime
from queue import Empty, Queue
from typing import Optional, Tuple
import numpy as np
try:
import matplotlib
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from matplotlib.widgets import Slider
except Exception as e:
raise RuntimeError(f"Нужны matplotlib и ее зависимости: {e}")
from ..config import (
FFT_LEN,
WF_WIDTH,
SweepInfo,
SweepPacket,
FREQ_MIN_GHZ,
FREQ_MAX_GHZ,
DATA_FREQ_START_GHZ,
DATA_FREQ_END_GHZ,
)
from ..data_acquisition.sweep_reader import SweepReader
from ..signal_processing.phase_analysis import apply_temporal_unwrap, phase_to_distance
from ..utils.formatting import format_status_kv, parse_spec_clip
def run_matplotlib(args):
"""Запуск визуализации с использованием matplotlib."""
# Очередь завершённых свипов и поток чтения
q: Queue[SweepPacket] = Queue(maxsize=1000)
stop_event = threading.Event()
reader = SweepReader(args.port, args.baud, q, stop_event, fancy=bool(args.fancy))
reader.start()
# Графика (3 ряда x 2 колонки = 6 графиков)
fig, axs = plt.subplots(3, 2, figsize=(12, 12))
(ax_line, ax_img), (ax_fft, ax_spec), (ax_phase, ax_phase_wf) = axs
fig.canvas.manager.set_window_title(args.title) if hasattr(fig.canvas.manager, "set_window_title") else None
# Увеличим расстояния и оставим место справа под ползунки оси Y B-scan
fig.subplots_adjust(wspace=0.25, hspace=0.35, left=0.07, right=0.90, top=0.95, bottom=0.05)
# Состояние для отображения
current_sweep: Optional[np.ndarray] = None
current_info: Optional[SweepInfo] = None
x_shared: Optional[np.ndarray] = None
width: Optional[int] = None
max_sweeps = int(max(10, args.max_sweeps))
ring = None # type: Optional[np.ndarray]
ring_time = None # type: Optional[np.ndarray]
head = 0
# Медианные данные для вычитания
median_data: Optional[np.ndarray] = None
median_subtract_enabled = False
# CLI параметры для автоматического сохранения/загрузки
ref_out_file = getattr(args, 'ref_out', None)
ref_in_file = getattr(args, 'ref_in', None)
ref_out_saved = False # Флаг, что медиана уже сохранена
# Отдельный буфер для накопления 1000 сырых свипов (не зависит от max_sweeps)
ref_ring: Optional[np.ndarray] = None
ref_ring_head = 0
ref_ring_count = 0
if ref_out_file:
print(f"[ref-out] Автосохранение включено, файл: {ref_out_file}")
# Автоматическая загрузка медианы при старте
if ref_in_file:
try:
pairs = []
with open(ref_in_file, 'r') as f:
reader = csv.reader(f)
next(reader) # Пропускаем заголовок
for row in reader:
if len(row) >= 2:
try:
pairs.append((int(row[0]), float(row[1])))
except ValueError:
continue
if pairs:
max_idx = max(idx for idx, _ in pairs)
median_data = np.full(max_idx + 1, np.nan, dtype=np.float32)
for idx, val in pairs:
median_data[idx] = val
median_subtract_enabled = True
print(f"[ref-in] Загружена медиана из {ref_in_file} ({len(median_data)} точек), вычитание включено")
else:
print(f"[ref-in] Предупреждение: файл {ref_in_file} пустой или неверный формат")
except Exception as e:
print(f"[ref-in] Ошибка загрузки {ref_in_file}: {e}")
# Авто-уровни цветовой шкалы водопада сырых данных пересчитываются по видимой области.
# FFT состояние (полное FFT для отрицательных частот)
fft_bins = FFT_LEN
ring_fft = None # type: Optional[np.ndarray]
y_min_fft, y_max_fft = None, None
freq_shared: Optional[np.ndarray] = None
# Phase состояние
ring_phase = None # type: Optional[np.ndarray]
prev_phase_per_bin: Optional[np.ndarray] = None
phase_offset_per_bin: Optional[np.ndarray] = None
y_min_phase, y_max_phase = None, None
# Параметры контраста водопада спектров
spec_clip = parse_spec_clip(getattr(args, "spec_clip", None))
# Ползунки управления Y для B-scan и контрастом
ymin_slider = None
ymax_slider = None
contrast_slider = None
# Статусная строка (внизу окна)
status_text = fig.text(
0.01,
0.01,
"",
ha="left",
va="bottom",
fontsize=8,
family="monospace",
)
# Линейный график последнего свипа
line_obj, = ax_line.plot([], [], lw=1)
ax_line.set_title("Сырые данные", pad=1)
ax_line.set_xlabel("F")
ax_line.set_ylabel("")
# Линейный график спектра текущего свипа
fft_line_obj, = ax_fft.plot([], [], lw=1)
ax_fft.set_title("FFT", pad=1)
ax_fft.set_xlabel("Частота, ГГц")
ax_fft.set_ylabel("Амплитуда, дБ")
# Диапазон по Y для последнего свипа: авто по умолчанию (поддерживает отрицательные значения)
fixed_ylim: Optional[Tuple[float, float]] = None
# CLI переопределение при необходимости
if args.ylim:
try:
y0, y1 = args.ylim.split(",")
fixed_ylim = (float(y0), float(y1))
except Exception:
sys.stderr.write("[warn] Некорректный формат --ylim, игнорирую. Ожидалось min,max\n")
if fixed_ylim is not None:
ax_line.set_ylim(fixed_ylim)
# Водопад (будет инициализирован при первом свипе)
img_obj = ax_img.imshow(
np.zeros((1, 1), dtype=np.float32),
aspect="auto",
interpolation="nearest",
origin="lower",
cmap=args.cmap,
)
ax_img.set_title("Сырые данные", pad=12)
ax_img.set_xlabel("")
ax_img.set_ylabel("частота")
# Не показываем численные значения по времени на водопаде сырых данных
try:
ax_img.tick_params(axis="x", labelbottom=False)
except Exception:
pass
# Водопад спектров
img_fft_obj = ax_spec.imshow(
np.zeros((1, 1), dtype=np.float32),
aspect="auto",
interpolation="nearest",
origin="lower",
cmap=args.cmap,
)
ax_spec.set_title("B-scan (дБ)", pad=12)
ax_spec.set_xlabel("")
ax_spec.set_ylabel("Частота, ГГц")
# Не показываем численные значения по времени на B-scan
try:
ax_spec.tick_params(axis="x", labelbottom=False)
except Exception:
pass
# График фазы текущего свипа
phase_line_obj, = ax_phase.plot([], [], lw=1)
ax_phase.set_title("Фаза спектра (развернутая)", pad=1)
ax_phase.set_xlabel("Частота, ГГц")
ax_phase.set_ylabel("Фаза, радианы")
# Добавим второй Y axis для расстояния
ax_phase_dist = ax_phase.twinx()
ax_phase_dist.set_ylabel("Расстояние, м", color='green')
# Водопад фазы
img_phase_obj = ax_phase_wf.imshow(
np.zeros((1, 1), dtype=np.float32),
aspect="auto",
interpolation="nearest",
origin="lower",
cmap=args.cmap,
)
ax_phase_wf.set_title("Водопад фазы", pad=12)
ax_phase_wf.set_xlabel("")
ax_phase_wf.set_ylabel("Частота, ГГц")
# Не показываем численные значения по времени
try:
ax_phase_wf.tick_params(axis="x", labelbottom=False)
except Exception:
pass
# Слайдеры для управления осью Y B-scan (мин/макс) и контрастом
try:
ax_smin = fig.add_axes([0.92, 0.55, 0.02, 0.35])
ax_smax = fig.add_axes([0.95, 0.55, 0.02, 0.35])
ax_sctr = fig.add_axes([0.98, 0.55, 0.02, 0.35])
ymin_slider = Slider(ax_smin, "Y min", FREQ_MIN_GHZ, FREQ_MAX_GHZ, valinit=FREQ_MIN_GHZ, valstep=0.1, orientation="vertical")
ymax_slider = Slider(ax_smax, "Y max", FREQ_MIN_GHZ, FREQ_MAX_GHZ, valinit=FREQ_MAX_GHZ, valstep=0.1, orientation="vertical")
contrast_slider = Slider(ax_sctr, "Int max", 0, 100, valinit=100, valstep=1, orientation="vertical")
def _on_ylim_change(_val):
try:
y0 = float(min(ymin_slider.val, ymax_slider.val))
y1 = float(max(ymin_slider.val, ymax_slider.val))
ax_spec.set_ylim(y0, y1)
fig.canvas.draw_idle()
except Exception:
pass
ymin_slider.on_changed(_on_ylim_change)
ymax_slider.on_changed(_on_ylim_change)
# Контраст влияет на верхнюю границу цветовой шкалы (процент от авто-диапазона)
contrast_slider.on_changed(lambda _v: fig.canvas.draw_idle())
except Exception:
pass
# Для контроля частоты обновления
max_fps = max(1.0, float(args.max_fps))
interval_ms = int(1000.0 / max_fps)
frames_since_ylim_update = 0
def ensure_buffer(_w: int):
nonlocal ring, width, head, x_shared, ring_fft, freq_shared, ring_time
nonlocal ring_phase, prev_phase_per_bin, phase_offset_per_bin
nonlocal ref_ring
if ring is not None:
return
width = WF_WIDTH
x_shared = np.arange(width, dtype=np.int32)
ring = np.full((max_sweeps, width), np.nan, dtype=np.float32)
ring_time = np.full((max_sweeps,), np.nan, dtype=np.float64)
head = 0
# Обновляем изображение под новые размеры: время по X (горизонталь), X по Y
img_obj.set_data(np.zeros((width, max_sweeps), dtype=np.float32))
img_obj.set_extent((0, max_sweeps - 1, 0, width - 1 if width > 0 else 1))
ax_img.set_xlim(0, max_sweeps - 1)
ax_img.set_ylim(0, max(1, width - 1))
# FFT буферы: время по X, бин по Y
ring_fft = np.full((max_sweeps, fft_bins), np.nan, dtype=np.float32)
img_fft_obj.set_data(np.zeros((fft_bins, max_sweeps), dtype=np.float32))
img_fft_obj.set_extent((0, max_sweeps - 1, FREQ_MIN_GHZ, FREQ_MAX_GHZ))
ax_spec.set_xlim(0, max_sweeps - 1)
ax_spec.set_ylim(FREQ_MIN_GHZ, FREQ_MAX_GHZ)
freq_shared = np.linspace(FREQ_MIN_GHZ, FREQ_MAX_GHZ, fft_bins, dtype=np.float32)
# Phase буферы: время по X, бин по Y
ring_phase = np.full((max_sweeps, fft_bins), np.nan, dtype=np.float32)
prev_phase_per_bin = np.zeros(fft_bins, dtype=np.float32)
phase_offset_per_bin = np.zeros(fft_bins, dtype=np.float32)
img_phase_obj.set_data(np.zeros((fft_bins, max_sweeps), dtype=np.float32))
img_phase_obj.set_extent((0, max_sweeps - 1, FREQ_MIN_GHZ, FREQ_MAX_GHZ))
ax_phase_wf.set_xlim(0, max_sweeps - 1)
ax_phase_wf.set_ylim(FREQ_MIN_GHZ, FREQ_MAX_GHZ)
# Буфер для медианы (отдельный от ring, размер всегда 1000)
if ref_out_file and ref_ring is None:
ref_ring = np.full((1000, width), np.nan, dtype=np.float32)
def _visible_levels_matplotlib(data: np.ndarray, axis) -> Optional[Tuple[float, float]]:
"""(vmin, vmax) по текущей видимой области imshow (без накопления по времени)."""
if data.size == 0:
return None
ny, nx = data.shape[0], data.shape[1]
try:
x0, x1 = axis.get_xlim()
y0, y1 = axis.get_ylim()
except Exception:
x0, x1 = 0.0, float(nx - 1)
y0, y1 = 0.0, float(ny - 1)
xmin, xmax = sorted((float(x0), float(x1)))
ymin, ymax = sorted((float(y0), float(y1)))
ix0 = max(0, min(nx - 1, int(np.floor(xmin))))
ix1 = max(0, min(nx - 1, int(np.ceil(xmax))))
iy0 = max(0, min(ny - 1, int(np.floor(ymin))))
iy1 = max(0, min(ny - 1, int(np.ceil(ymax))))
if ix1 < ix0:
ix1 = ix0
if iy1 < iy0:
iy1 = iy0
sub = data[iy0 : iy1 + 1, ix0 : ix1 + 1]
finite = np.isfinite(sub)
if not finite.any():
return None
vals = sub[finite]
vmin = float(np.min(vals))
vmax = float(np.max(vals))
if not (np.isfinite(vmin) and np.isfinite(vmax)) or vmin == vmax:
return None
return (vmin, vmax)
def push_sweep(s: np.ndarray):
nonlocal ring, head, ring_fft, y_min_fft, y_max_fft, ring_time
nonlocal ring_phase, prev_phase_per_bin, phase_offset_per_bin, y_min_phase, y_max_phase
nonlocal ref_ring_head, ref_ring_count
if s is None or s.size == 0 or ring is None:
return
# Сохраняем сырой свип в буфер медианы (до вычитания)
if ref_out_file and not ref_out_saved and ref_ring is not None:
w_ref = ref_ring.shape[1]
take_ref = min(w_ref, s.size)
ref_ring[ref_ring_head, :take_ref] = s[:take_ref]
ref_ring_head = (ref_ring_head + 1) % 1000
ref_ring_count = min(ref_ring_count + 1, 1000)
# Применяем вычитание медианы если включено
if median_subtract_enabled and median_data is not None:
take_median = min(s.size, median_data.size)
s_corrected = s.copy()
s_corrected[:take_median] = s[:take_median] - median_data[:take_median]
s = s_corrected
# Нормализуем длину до фиксированной ширины
w = ring.shape[1]
row = np.full((w,), np.nan, dtype=np.float32)
take = min(w, s.size)
row[:take] = s[:take]
ring[head, :] = row
if ring_time is not None:
ring_time[head] = time.time()
head = (head + 1) % ring.shape[0]
# FFT строка (дБ) и фаза
if ring_fft is not None:
bins = ring_fft.shape[1]
# Подготовка входа FFT_LEN, замена NaN на 0
take_fft = min(int(s.size), FFT_LEN)
if take_fft <= 0:
fft_row = np.full((bins,), np.nan, dtype=np.float32)
phase_row = np.full((bins,), np.nan, dtype=np.float32)
else:
# Создаем буфер для полного FFT (с отрицательными частотами)
fft_in = np.zeros((FFT_LEN,), dtype=np.float32)
# Вычисляем индексы для размещения данных (1-10 ГГц в диапазоне -10 до +10 ГГц)
freq_range_total = FREQ_MAX_GHZ - FREQ_MIN_GHZ # 20 ГГц
freq_range_data = DATA_FREQ_END_GHZ - DATA_FREQ_START_GHZ # 9 ГГц
# Начальный индекс для данных в FFT буфере
start_idx = int((DATA_FREQ_START_GHZ - FREQ_MIN_GHZ) / freq_range_total * FFT_LEN)
# Количество точек для данных
data_points = int(freq_range_data / freq_range_total * FFT_LEN)
data_points = min(data_points, take_fft, FFT_LEN - start_idx)
# Подготовка данных
seg = s[:data_points]
if isinstance(seg, np.ndarray):
seg = np.nan_to_num(seg, nan=0.0).astype(np.float32, copy=False)
else:
seg = np.asarray(seg, dtype=np.float32)
seg = np.nan_to_num(seg, nan=0.0)
# Окно Хэннинга
win = np.hanning(data_points).astype(np.float32)
# Размещаем данные в правильной позиции
fft_in[start_idx:start_idx + data_points] = seg * win
# Полное FFT (включая отрицательные частоты)
spec = np.fft.fft(fft_in)
# Сдвигаем для центрирования нулевой частоты
spec = np.fft.fftshift(spec)
mag = np.abs(spec).astype(np.float32)
fft_row = 20.0 * np.log10(mag + 1e-9)
if fft_row.shape[0] != bins:
fft_row = fft_row[:bins]
# Расчет фазы
phase = np.angle(spec).astype(np.float32)
if phase.shape[0] > bins:
phase = phase[:bins]
# Unwrapping по частоте (внутри свипа)
phase_unwrapped_freq = np.unwrap(phase)
# Unwrapping по времени (между свипами)
phase_unwrapped_time, prev_phase_per_bin, phase_offset_per_bin = apply_temporal_unwrap(
phase_unwrapped_freq, prev_phase_per_bin, phase_offset_per_bin
)
phase_row = phase_unwrapped_time
ring_fft[(head - 1) % ring_fft.shape[0], :] = fft_row
# Экстремумы для цветовой шкалы
fr_min = np.nanmin(fft_row)
fr_max = np.nanmax(fft_row)
fr_max = np.nanpercentile(fft_row, 90)
if y_min_fft is None or (not np.isnan(fr_min) and fr_min < y_min_fft):
y_min_fft = float(fr_min)
if y_max_fft is None or (not np.isnan(fr_max) and fr_max > y_max_fft):
y_max_fft = float(fr_max)
# Сохраняем фазу в буфер
if ring_phase is not None:
ring_phase[(head - 1) % ring_phase.shape[0], :] = phase_row
# Экстремумы для цветовой шкалы фазы
ph_min = np.nanmin(phase_row)
ph_max = np.nanmax(phase_row)
if y_min_phase is None or (not np.isnan(ph_min) and ph_min < y_min_phase):
y_min_phase = float(ph_min)
if y_max_phase is None or (not np.isnan(ph_max) and ph_max > y_max_phase):
y_max_phase = float(ph_max)
def drain_queue():
nonlocal current_sweep, current_info
drained = 0
while True:
try:
s, info = q.get_nowait()
except Empty:
break
drained += 1
current_sweep = s
current_info = info
ensure_buffer(s.size)
push_sweep(s)
return drained
def make_display_ring():
# Возвращаем буфер с правильным порядком по времени (старые→новые) и осью времени по X
if ring is None:
return np.zeros((1, 1), dtype=np.float32)
base = ring if head == 0 else np.roll(ring, -head, axis=0)
return base.T # (width, time)
def make_display_times():
if ring_time is None:
return None
base_t = ring_time if head == 0 else np.roll(ring_time, -head)
return base_t
def make_display_ring_fft():
if ring_fft is None:
return np.zeros((1, 1), dtype=np.float32)
base = ring_fft if head == 0 else np.roll(ring_fft, -head, axis=0)
return base.T # (bins, time)
def make_display_ring_phase():
if ring_phase is None:
return np.zeros((1, 1), dtype=np.float32)
base = ring_phase if head == 0 else np.roll(ring_phase, -head, axis=0)
return base.T # (bins, time)
def update(_frame):
nonlocal frames_since_ylim_update, ref_out_saved
changed = drain_queue() > 0
# Обновление линии последнего свипа
if current_sweep is not None:
# Применяем вычитание медианы для отображения
display_sweep = current_sweep
if median_subtract_enabled and median_data is not None:
take_median = min(current_sweep.size, median_data.size)
display_sweep = current_sweep.copy()
display_sweep[:take_median] = current_sweep[:take_median] - median_data[:take_median]
if x_shared is not None and display_sweep.size <= x_shared.size:
xs = x_shared[: display_sweep.size]
else:
xs = np.arange(display_sweep.size, dtype=np.int32)
line_obj.set_data(xs, display_sweep)
# Лимиты по X постоянные под текущую ширину
ax_line.set_xlim(0, max(1, display_sweep.size - 1))
# Адаптивные Y-лимиты (если не задан --ylim)
if fixed_ylim is None:
y0 = float(np.nanmin(display_sweep))
y1 = float(np.nanmax(display_sweep))
if np.isfinite(y0) and np.isfinite(y1):
if y0 == y1:
pad = max(1.0, abs(y0) * 0.05)
y0 -= pad
y1 += pad
else:
pad = 0.05 * (y1 - y0)
y0 -= pad
y1 += pad
ax_line.set_ylim(y0, y1)
# Обновление спектра и фазы текущего свипа
take_fft = min(int(display_sweep.size), FFT_LEN)
if take_fft > 0 and freq_shared is not None:
# Создаем буфер для полного FFT (с отрицательными частотами)
fft_in = np.zeros((FFT_LEN,), dtype=np.float32)
# Вычисляем индексы для размещения данных (1-10 ГГц в диапазоне -10 до +10 ГГц)
freq_range_total = FREQ_MAX_GHZ - FREQ_MIN_GHZ # 20 ГГц
freq_range_data = DATA_FREQ_END_GHZ - DATA_FREQ_START_GHZ # 9 ГГц
# Начальный индекс для данных в FFT буфере
start_idx = int((DATA_FREQ_START_GHZ - FREQ_MIN_GHZ) / freq_range_total * FFT_LEN)
# Количество точек для данных
data_points = int(freq_range_data / freq_range_total * FFT_LEN)
data_points = min(data_points, take_fft, FFT_LEN - start_idx)
# Подготовка данных с окном Хэннинга
seg = np.nan_to_num(display_sweep[:data_points], nan=0.0).astype(np.float32, copy=False)
win = np.hanning(data_points).astype(np.float32)
# Размещаем данные в правильной позиции
fft_in[start_idx:start_idx + data_points] = seg * win
# Полное FFT (включая отрицательные частоты)
spec = np.fft.fft(fft_in)
# Сдвигаем для центрирования нулевой частоты
spec = np.fft.fftshift(spec)
mag = np.abs(spec).astype(np.float32)
fft_vals = 20.0 * np.log10(mag + 1e-9)
xs_fft = freq_shared
if fft_vals.size > xs_fft.size:
fft_vals = fft_vals[: xs_fft.size]
fft_line_obj.set_data(xs_fft[: fft_vals.size], fft_vals)
# Авто-диапазон по Y для спектра
if np.isfinite(np.nanmin(fft_vals)) and np.isfinite(np.nanmax(fft_vals)):
ax_fft.set_xlim(FREQ_MIN_GHZ, FREQ_MAX_GHZ)
ax_fft.set_ylim(float(np.nanmin(fft_vals)), float(np.nanmax(fft_vals)))
# Расчет и отображение фазы текущего свипа
phase = np.angle(spec).astype(np.float32)
if phase.size > xs_fft.size:
phase = phase[: xs_fft.size]
# Unwrapping по частоте
phase_unwrapped = np.unwrap(phase)
phase_line_obj.set_data(xs_fft[: phase_unwrapped.size], phase_unwrapped)
# Авто-диапазон по Y для фазы
if np.isfinite(np.nanmin(phase_unwrapped)) and np.isfinite(np.nanmax(phase_unwrapped)):
ax_phase.set_xlim(FREQ_MIN_GHZ, FREQ_MAX_GHZ)
phase_min = float(np.nanmin(phase_unwrapped))
phase_max = float(np.nanmax(phase_unwrapped))
ax_phase.set_ylim(phase_min, phase_max)
# Обновляем вторую ось Y с расстоянием
try:
dist_min = phase_to_distance(np.array([phase_min]))[0]
dist_max = phase_to_distance(np.array([phase_max]))[0]
ax_phase_dist.set_ylim(dist_min, dist_max)
except Exception:
pass
# Обновление водопада
if changed and ring is not None:
disp = make_display_ring()
# Новые данные справа: без реверса
img_obj.set_data(disp)
# Подписи времени не обновляем динамически (оставляем авто-тики)
# Авто-уровни: по видимой области (не накапливаем за всё время)
levels = _visible_levels_matplotlib(disp, ax_img)
if levels is not None:
img_obj.set_clim(vmin=levels[0], vmax=levels[1])
# Обновление водопада спектров
if changed and ring_fft is not None:
disp_fft = make_display_ring_fft()
# Новые данные справа: без реверса
img_fft_obj.set_data(disp_fft)
# Подписи времени не обновляем динамически (оставляем авто-тики)
# Автодиапазон по среднему спектру за видимый интервал (как в хорошей версии)
try:
# disp_fft имеет форму (bins, time); берём среднее по времени
mean_spec = np.nanmean(disp_fft, axis=1)
vmin_v = float(np.nanmin(mean_spec))
vmax_v = float(np.nanmax(mean_spec))
except Exception:
vmin_v = vmax_v = None
# Если средние не дают валидный диапазон — используем процентильную обрезку (если задана)
if (vmin_v is None or not np.isfinite(vmin_v)) or (vmax_v is None or not np.isfinite(vmax_v)) or vmin_v == vmax_v:
if spec_clip is not None:
try:
vmin_v = float(np.nanpercentile(disp_fft, spec_clip[0]))
vmax_v = float(np.nanpercentile(disp_fft, spec_clip[1]))
except Exception:
vmin_v = vmax_v = None
# Фолбэк к отслеживаемым минимум/максимумам
if (vmin_v is None or not np.isfinite(vmin_v)) or (vmax_v is None or not np.isfinite(vmax_v)) or vmin_v == vmax_v:
if y_min_fft is not None and y_max_fft is not None and np.isfinite(y_min_fft) and np.isfinite(y_max_fft) and y_min_fft != y_max_fft:
vmin_v, vmax_v = y_min_fft, y_max_fft
if vmin_v is not None and vmax_v is not None and vmin_v != vmax_v:
# Применим скалирование контрастом (верхняя граница)
try:
c = float(contrast_slider.val) / 100.0 if contrast_slider is not None else 1.0
except Exception:
c = 1.0
vmax_eff = vmin_v + c * (vmax_v - vmin_v)
img_fft_obj.set_clim(vmin=vmin_v, vmax=vmax_eff)
# Обновление водопада фазы
if changed and ring_phase is not None:
disp_phase = make_display_ring_phase()
img_phase_obj.set_data(disp_phase)
# Автодиапазон для фазы
try:
mean_phase = np.nanmean(disp_phase, axis=1)
vmin_p = float(np.nanmin(mean_phase))
vmax_p = float(np.nanmax(mean_phase))
except Exception:
vmin_p = vmax_p = None
# Фолбэк к отслеживаемым минимум/максимумам
if (vmin_p is None or not np.isfinite(vmin_p)) or (vmax_p is None or not np.isfinite(vmax_p)) or vmin_p == vmax_p:
if y_min_phase is not None and y_max_phase is not None and np.isfinite(y_min_phase) and np.isfinite(y_max_phase) and y_min_phase != y_max_phase:
vmin_p, vmax_p = y_min_phase, y_max_phase
if vmin_p is not None and vmax_p is not None and vmin_p != vmax_p:
img_phase_obj.set_clim(vmin=vmin_p, vmax=vmax_p)
if changed and current_info:
status_text.set_text(format_status_kv(current_info))
# Автоматическое сохранение медианы при накоплении 1000 сырых свипов
if ref_out_file and not ref_out_saved and ref_ring is not None:
if ref_ring_count >= 1000:
try:
ordered = ref_ring if ref_ring_head == 0 else np.roll(ref_ring, -ref_ring_head, axis=0)
median_sweep = np.nanmedian(ordered, axis=0)
with open(ref_out_file, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['Index', 'Median_Value'])
for i, value in enumerate(median_sweep):
if np.isfinite(value):
writer.writerow([i, float(value)])
ref_out_saved = True
print(f"[ref-out] Сохранена медиана 1000 свипов в {ref_out_file}")
status_text.set_text(f"[ref-out] Сохранено в {ref_out_file}")
except Exception as e:
print(f"[ref-out] Ошибка сохранения: {e}")
# Возвращаем обновлённые артисты
return (line_obj, img_obj, fft_line_obj, img_fft_obj, phase_line_obj, img_phase_obj, status_text)
ani = FuncAnimation(fig, update, interval=interval_ms, blit=False)
plt.show()
# Нормальное завершение при закрытии окна
stop_event.set()
reader.join(timeout=1.0)

View File

@ -0,0 +1,705 @@
"""
Визуализация данных с использованием pyqtgraph (быстрый бэкенд).
"""
import csv
import sys
import threading
import time
from datetime import datetime
from queue import Empty, Queue
from typing import Optional, Tuple
import numpy as np
try:
import pyqtgraph as pg
from PyQt5 import QtCore, QtWidgets # noqa: F401
from PyQt5.QtWidgets import QPushButton, QWidget, QHBoxLayout, QCheckBox, QFileDialog
except Exception:
# Возможно установлена PySide6
try:
import pyqtgraph as pg
from PySide6 import QtCore, QtWidgets # noqa: F401
from PySide6.QtWidgets import QPushButton, QWidget, QHBoxLayout, QCheckBox, QFileDialog
except Exception as e:
raise RuntimeError(
"pyqtgraph/PyQt5(Pyside6) не найдены. Установите: pip install pyqtgraph PyQt5"
) from e
from ..config import (
FFT_LEN,
WF_WIDTH,
SweepInfo,
SweepPacket,
FREQ_MIN_GHZ,
FREQ_MAX_GHZ,
DATA_FREQ_START_GHZ,
DATA_FREQ_END_GHZ,
)
from ..data_acquisition.sweep_reader import SweepReader
from ..signal_processing.phase_analysis import apply_temporal_unwrap, phase_to_distance
from ..utils.formatting import format_status_kv, parse_spec_clip
def run_pyqtgraph(args):
"""Быстрый GUI на PyQtGraph. Требует pyqtgraph и PyQt5/PySide6."""
# Очередь завершённых свипов и поток чтения
q: Queue[SweepPacket] = Queue(maxsize=1000)
stop_event = threading.Event()
reader = SweepReader(args.port, args.baud, q, stop_event, fancy=bool(args.fancy))
reader.start()
# Настройки скорости
max_sweeps = int(max(10, args.max_sweeps))
max_fps = max(1.0, float(args.max_fps))
interval_ms = int(1000.0 / max_fps)
# PyQtGraph настройки
pg.setConfigOptions(useOpenGL=True, antialias=False)
app = pg.mkQApp(args.title)
win = pg.GraphicsLayoutWidget(show=True, title=args.title)
win.resize(1200, 900)
# Плот последнего свипа (слева-сверху)
p_line = win.addPlot(row=0, col=0, title="Сырые данные")
p_line.showGrid(x=True, y=True, alpha=0.3)
curve = p_line.plot(pen=pg.mkPen((80, 120, 255), width=1))
p_line.setLabel("bottom", "X")
p_line.setLabel("left", "Y")
# Водопад (справа-сверху)
p_img = win.addPlot(row=0, col=1, title="Сырые данные водопад")
p_img.invertY(False)
p_img.showGrid(x=False, y=False)
p_img.setLabel("bottom", "Время, с (новое справа)")
try:
p_img.getAxis("bottom").setStyle(showValues=False)
except Exception:
pass
p_img.setLabel("left", "X (0 снизу)")
img = pg.ImageItem()
p_img.addItem(img)
# FFT (слева-средний ряд)
p_fft = win.addPlot(row=1, col=0, title="FFT")
p_fft.showGrid(x=True, y=True, alpha=0.3)
curve_fft = p_fft.plot(pen=pg.mkPen((255, 120, 80), width=1))
p_fft.setLabel("bottom", "Частота, ГГц")
p_fft.setLabel("left", "Амплитуда, дБ")
# Водопад спектров (справа-средний ряд)
p_spec = win.addPlot(row=1, col=1, title="B-scan (дБ)")
p_spec.invertY(True)
p_spec.showGrid(x=False, y=False)
p_spec.setLabel("bottom", "Время, с (новое справа)")
try:
p_spec.getAxis("bottom").setStyle(showValues=False)
except Exception:
pass
p_spec.setLabel("left", "Частота, ГГц (0 снизу)")
img_fft = pg.ImageItem()
p_spec.addItem(img_fft)
# График фазы (слева-снизу)
p_phase = win.addPlot(row=2, col=0, title="Фаза спектра (развернутая)")
p_phase.showGrid(x=True, y=True, alpha=0.3)
curve_phase = p_phase.plot(pen=pg.mkPen((120, 255, 80), width=1))
p_phase.setLabel("bottom", "Частота, ГГц")
p_phase.setLabel("left", "Фаза, радианы")
# Добавим вторую ось Y для расстояния
p_phase_dist_axis = pg.ViewBox()
p_phase.showAxis("right")
p_phase.scene().addItem(p_phase_dist_axis)
p_phase.getAxis("right").linkToView(p_phase_dist_axis)
p_phase_dist_axis.setXLink(p_phase)
p_phase.setLabel("right", "Расстояние, м")
def updateViews():
try:
p_phase_dist_axis.setGeometry(p_phase.vb.sceneBoundingRect())
p_phase_dist_axis.linkedViewChanged(p_phase.vb, p_phase_dist_axis.XAxis)
except Exception:
pass
updateViews()
p_phase.vb.sigResized.connect(updateViews)
# Водопад фазы (справа-снизу)
p_phase_wf = win.addPlot(row=2, col=1, title="Водопад фазы")
p_phase_wf.invertY(True)
p_phase_wf.showGrid(x=False, y=False)
p_phase_wf.setLabel("bottom", "Время, с (новое справа)")
try:
p_phase_wf.getAxis("bottom").setStyle(showValues=False)
except Exception:
pass
p_phase_wf.setLabel("left", "Частота, ГГц (0 снизу)")
img_phase = pg.ImageItem()
p_phase_wf.addItem(img_phase)
# Статусная строка (внизу окна)
status = pg.LabelItem(justify="left")
win.addItem(status, row=3, col=0, colspan=2)
# Функция сохранения медианы последних 1000 свипов
def save_median_data():
"""Сохранить медиану последних 1000 свипов в CSV файл"""
if ring is None:
status.setText("Нет данных для сохранения")
return
# Определяем сколько свипов доступно
n_sweeps = 1000
available = min(n_sweeps, max_sweeps)
# Проверяем сколько свипов реально заполнено
filled_count = np.count_nonzero(~np.isnan(ring[:, 0]))
if filled_count == 0:
status.setText("Нет данных для сохранения")
return
available = min(available, filled_count)
# Получаем хронологически упорядоченные данные
ordered = ring if head == 0 else np.roll(ring, -head, axis=0)
# Берем последние n свипов
recent_sweeps = ordered[-available:, :]
# Вычисляем медиану по свипам (ось 0)
median_sweep = np.nanmedian(recent_sweeps, axis=0)
# Сохраняем в CSV
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"median_sweep_{timestamp}.csv"
try:
with open(filename, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['Index', 'Median_Value'])
for i, value in enumerate(median_sweep):
if np.isfinite(value):
writer.writerow([i, float(value)])
status.setText(f"Сохранено {available} свипов (медиана) в {filename}")
except Exception as e:
status.setText(f"Ошибка сохранения: {e}")
# Функция загрузки медианного файла
def load_median_file():
"""Загрузить медианный файл из CSV"""
nonlocal median_data
filename, _ = QFileDialog.getOpenFileName(
None,
"Выберите файл с медианой",
"",
"CSV Files (*.csv);;All Files (*)"
)
if not filename:
return
try:
# Загружаем CSV файл
pairs = []
with open(filename, 'r') as f:
reader = csv.reader(f)
next(reader) # Пропускаем заголовок
for row in reader:
if len(row) >= 2:
try:
pairs.append((int(row[0]), float(row[1])))
except ValueError:
continue
if not pairs:
status.setText("Ошибка: файл пустой или неверный формат")
return
max_idx = max(idx for idx, _ in pairs)
median_data = np.full(max_idx + 1, np.nan, dtype=np.float32)
for idx, val in pairs:
median_data[idx] = val
status.setText(f"Загружена медиана из {filename} ({len(median_data)} точек)")
# Автоматически включаем чекбокс
subtract_checkbox.setChecked(True)
except Exception as e:
status.setText(f"Ошибка загрузки: {e}")
median_data = None
# Функция переключения вычитания медианы
def toggle_median_subtraction(state):
nonlocal median_subtract_enabled
median_subtract_enabled = bool(state)
if median_subtract_enabled and median_data is None:
status.setText("Сначала загрузите файл с медианой")
subtract_checkbox.setChecked(False)
elif median_subtract_enabled:
status.setText("Вычитание медианы включено")
else:
status.setText("Вычитание медианы выключено")
# Создаем контейнер для кнопок управления
button_container = QWidget()
button_layout = QHBoxLayout()
# Кнопка сохранения медианы
save_btn = QPushButton("Сохранить медиану (1000 свипов)")
save_btn.clicked.connect(save_median_data)
button_layout.addWidget(save_btn)
# Кнопка загрузки медианы
load_btn = QPushButton("Загрузить медиану")
load_btn.clicked.connect(load_median_file)
button_layout.addWidget(load_btn)
# Чекбокс для включения вычитания
subtract_checkbox = QCheckBox("Вычитать медиану")
subtract_checkbox.stateChanged.connect(toggle_median_subtraction)
button_layout.addWidget(subtract_checkbox)
button_layout.setContentsMargins(5, 5, 5, 5)
button_container.setLayout(button_layout)
# Добавляем кнопки в окно
proxy_widget = QtWidgets.QGraphicsProxyWidget()
proxy_widget.setWidget(button_container)
win.addItem(proxy_widget, row=4, col=0, colspan=2)
# Состояние
ring: Optional[np.ndarray] = None
head = 0
width: Optional[int] = None
x_shared: Optional[np.ndarray] = None
current_sweep: Optional[np.ndarray] = None
current_info: Optional[SweepInfo] = None
# Медианные данные для вычитания
median_data: Optional[np.ndarray] = None
median_subtract_enabled = False
# CLI параметры для автоматического сохранения/загрузки
ref_out_file = getattr(args, 'ref_out', None)
ref_in_file = getattr(args, 'ref_in', None)
ref_out_saved = False # Флаг, что медиана уже сохранена
# Отдельный буфер для накопления 1000 сырых свипов (не зависит от max_sweeps)
ref_ring: Optional[np.ndarray] = None
ref_ring_head = 0
ref_ring_count = 0
# Автоматическая загрузка медианы при старте
if ref_in_file:
try:
pairs = []
with open(ref_in_file, 'r') as f:
reader = csv.reader(f)
next(reader) # Пропускаем заголовок
for row in reader:
if len(row) >= 2:
try:
pairs.append((int(row[0]), float(row[1])))
except ValueError:
continue
if pairs:
max_idx = max(idx for idx, _ in pairs)
median_data = np.full(max_idx + 1, np.nan, dtype=np.float32)
for idx, val in pairs:
median_data[idx] = val
median_subtract_enabled = True
print(f"[ref-in] Загружена медиана из {ref_in_file} ({len(median_data)} точек), вычитание включено")
else:
print(f"[ref-in] Предупреждение: файл {ref_in_file} пустой или неверный формат")
except Exception as e:
print(f"[ref-in] Ошибка загрузки {ref_in_file}: {e}")
# Авто-уровни цветовой шкалы водопада сырых данных пересчитываются по видимой области.
# Для спектров (полное FFT для отрицательных частот)
fft_bins = FFT_LEN
ring_fft: Optional[np.ndarray] = None
freq_shared: Optional[np.ndarray] = None
y_min_fft, y_max_fft = None, None
# Phase состояние
ring_phase: Optional[np.ndarray] = None
prev_phase_per_bin: Optional[np.ndarray] = None
phase_offset_per_bin: Optional[np.ndarray] = None
y_min_phase, y_max_phase = None, None
# Параметры контраста водопада спектров (процентильная обрезка)
spec_clip = parse_spec_clip(getattr(args, "spec_clip", None))
# Диапазон по Y: авто по умолчанию (поддерживает отрицательные значения)
fixed_ylim: Optional[Tuple[float, float]] = None
if args.ylim:
try:
y0, y1 = args.ylim.split(",")
fixed_ylim = (float(y0), float(y1))
except Exception:
pass
if fixed_ylim is not None:
p_line.setYRange(fixed_ylim[0], fixed_ylim[1], padding=0)
def ensure_buffer(_w: int):
nonlocal ring, head, width, x_shared, ring_fft, freq_shared
nonlocal ring_phase, prev_phase_per_bin, phase_offset_per_bin
nonlocal ref_ring
if ring is not None:
return
width = WF_WIDTH
x_shared = np.arange(width, dtype=np.int32)
ring = np.full((max_sweeps, width), np.nan, dtype=np.float32)
head = 0
# Водопад: время по оси X, X по оси Y
img.setImage(ring.T, autoLevels=False)
p_img.setRange(xRange=(0, max_sweeps - 1), yRange=(0, max(1, width - 1)), padding=0)
p_line.setXRange(0, max(1, width - 1), padding=0)
# FFT: время по оси X, бин по оси Y
ring_fft = np.full((max_sweeps, fft_bins), np.nan, dtype=np.float32)
img_fft.setImage(ring_fft.T, autoLevels=False)
p_spec.setRange(xRange=(0, max_sweeps - 1), yRange=(0, max(1, fft_bins - 1)), padding=0)
p_fft.setXRange(FREQ_MIN_GHZ, FREQ_MAX_GHZ, padding=0)
freq_shared = np.linspace(FREQ_MIN_GHZ, FREQ_MAX_GHZ, fft_bins, dtype=np.float32)
# Phase: время по оси X, бин по оси Y
ring_phase = np.full((max_sweeps, fft_bins), np.nan, dtype=np.float32)
prev_phase_per_bin = np.zeros(fft_bins, dtype=np.float32)
phase_offset_per_bin = np.zeros(fft_bins, dtype=np.float32)
img_phase.setImage(ring_phase.T, autoLevels=False)
p_phase_wf.setRange(xRange=(0, max_sweeps - 1), yRange=(0, max(1, fft_bins - 1)), padding=0)
p_phase.setXRange(0, max(1, fft_bins - 1), padding=0)
# Буфер для медианы (отдельный от ring, размер всегда 1000)
if ref_out_file and ref_ring is None:
ref_ring = np.full((1000, width), np.nan, dtype=np.float32)
def _visible_levels_pyqtgraph(data: np.ndarray) -> Optional[Tuple[float, float]]:
"""(vmin, vmax) по текущей видимой области ImageItem (без накопления по времени)."""
if data.size == 0:
return None
ny, nx = data.shape[0], data.shape[1]
try:
(x0, x1), (y0, y1) = p_img.viewRange()
except Exception:
x0, x1 = 0.0, float(nx - 1)
y0, y1 = 0.0, float(ny - 1)
xmin, xmax = sorted((float(x0), float(x1)))
ymin, ymax = sorted((float(y0), float(y1)))
ix0 = max(0, min(nx - 1, int(np.floor(xmin))))
ix1 = max(0, min(nx - 1, int(np.ceil(xmax))))
iy0 = max(0, min(ny - 1, int(np.floor(ymin))))
iy1 = max(0, min(ny - 1, int(np.ceil(ymax))))
if ix1 < ix0:
ix1 = ix0
if iy1 < iy0:
iy1 = iy0
sub = data[iy0 : iy1 + 1, ix0 : ix1 + 1]
finite = np.isfinite(sub)
if not finite.any():
return None
vals = sub[finite]
vmin = float(np.min(vals))
vmax = float(np.max(vals))
if not (np.isfinite(vmin) and np.isfinite(vmax)) or vmin == vmax:
return None
return (vmin, vmax)
def push_sweep(s: np.ndarray):
nonlocal ring, head, ring_fft, y_min_fft, y_max_fft
nonlocal ring_phase, prev_phase_per_bin, phase_offset_per_bin, y_min_phase, y_max_phase
nonlocal ref_ring_head, ref_ring_count
if s is None or s.size == 0 or ring is None:
return
# Сохраняем сырой свип в буфер медианы (до вычитания)
if ref_out_file and not ref_out_saved and ref_ring is not None:
w_ref = ref_ring.shape[1]
take_ref = min(w_ref, s.size)
ref_ring[ref_ring_head, :take_ref] = s[:take_ref]
ref_ring_head = (ref_ring_head + 1) % 1000
ref_ring_count = min(ref_ring_count + 1, 1000)
# Применяем вычитание медианы если включено
if median_subtract_enabled and median_data is not None:
# Вычитаем медиану из сигнала
take_median = min(s.size, median_data.size)
s_corrected = s.copy()
s_corrected[:take_median] = s[:take_median] - median_data[:take_median]
s = s_corrected
w = ring.shape[1]
row = np.full((w,), np.nan, dtype=np.float32)
take = min(w, s.size)
row[:take] = s[:take]
ring[head, :] = row
head = (head + 1) % ring.shape[0]
# FFT строка (дБ) и фаза
if ring_fft is not None:
bins = ring_fft.shape[1]
take_fft = min(int(s.size), FFT_LEN)
if take_fft > 0:
# Создаем буфер для полного FFT (с отрицательными частотами)
fft_in = np.zeros((FFT_LEN,), dtype=np.float32)
# Вычисляем индексы для размещения данных (1-10 ГГц в диапазоне -10 до +10 ГГц)
# Диапазон данных: от DATA_FREQ_START_GHZ (1) до DATA_FREQ_END_GHZ (10)
# Полный диапазон: от FREQ_MIN_GHZ (-10) до FREQ_MAX_GHZ (10)
freq_range_total = FREQ_MAX_GHZ - FREQ_MIN_GHZ # 20 ГГц
freq_range_data = DATA_FREQ_END_GHZ - DATA_FREQ_START_GHZ # 9 ГГц
# Начальный индекс для данных в FFT буфере
start_idx = int((DATA_FREQ_START_GHZ - FREQ_MIN_GHZ) / freq_range_total * FFT_LEN)
# Количество точек для данных
data_points = int(freq_range_data / freq_range_total * FFT_LEN)
data_points = min(data_points, take_fft, FFT_LEN - start_idx)
# Подготовка данных с окном Хэннинга
seg = np.nan_to_num(s[:data_points], nan=0.0).astype(np.float32, copy=False)
win = np.hanning(data_points).astype(np.float32)
# Размещаем данные в правильной позиции (от -10 до 1 ГГц - нули, от 1 до 10 ГГц - данные)
fft_in[start_idx:start_idx + data_points] = seg * win
# Полное FFT (включая отрицательные частоты)
spec = np.fft.fft(fft_in)
# Сдвигаем для центрирования нулевой частоты
spec = np.fft.fftshift(spec)
mag = np.abs(spec).astype(np.float32)
fft_row = 20.0 * np.log10(mag + 1e-9)
if fft_row.shape[0] != bins:
fft_row = fft_row[:bins]
# Расчет фазы
phase = np.angle(spec).astype(np.float32)
if phase.shape[0] > bins:
phase = phase[:bins]
# Unwrapping по частоте (внутри свипа)
phase_unwrapped_freq = np.unwrap(phase)
# Unwrapping по времени (между свипами)
phase_unwrapped_time, prev_phase_per_bin, phase_offset_per_bin = apply_temporal_unwrap(
phase_unwrapped_freq, prev_phase_per_bin, phase_offset_per_bin
)
phase_row = phase_unwrapped_time
else:
fft_row = np.full((bins,), np.nan, dtype=np.float32)
phase_row = np.full((bins,), np.nan, dtype=np.float32)
ring_fft[(head - 1) % ring_fft.shape[0], :] = fft_row
fr_min = np.nanmin(fft_row)
fr_max = np.nanmax(fft_row)
if y_min_fft is None or (not np.isnan(fr_min) and fr_min < y_min_fft):
y_min_fft = float(fr_min)
if y_max_fft is None or (not np.isnan(fr_max) and fr_max > y_max_fft):
y_max_fft = float(fr_max)
# Сохраняем фазу в буфер
if ring_phase is not None:
ring_phase[(head - 1) % ring_phase.shape[0], :] = phase_row
# Экстремумы для цветовой шкалы фазы
ph_min = np.nanmin(phase_row)
ph_max = np.nanmax(phase_row)
if y_min_phase is None or (not np.isnan(ph_min) and ph_min < y_min_phase):
y_min_phase = float(ph_min)
if y_max_phase is None or (not np.isnan(ph_max) and ph_max > y_max_phase):
y_max_phase = float(ph_max)
def drain_queue():
nonlocal current_sweep, current_info
drained = 0
while True:
try:
s, info = q.get_nowait()
except Empty:
break
drained += 1
current_sweep = s
current_info = info
ensure_buffer(s.size)
push_sweep(s)
return drained
# Попытка применить LUT из колормэпа (если доступен)
try:
cm_mod = getattr(pg, "colormap", None)
if cm_mod is not None:
cm = cm_mod.get(args.cmap)
img.setLookupTable(cm.getLookupTable(0.0, 1.0, 256))
except Exception:
pass
def update():
nonlocal ref_out_saved
changed = drain_queue() > 0
if current_sweep is not None and x_shared is not None:
# Применяем вычитание медианы для отображения
display_sweep = current_sweep
if median_subtract_enabled and median_data is not None:
take_median = min(current_sweep.size, median_data.size)
display_sweep = current_sweep.copy()
display_sweep[:take_median] = current_sweep[:take_median] - median_data[:take_median]
if display_sweep.size <= x_shared.size:
xs = x_shared[: display_sweep.size]
else:
xs = np.arange(display_sweep.size)
curve.setData(xs, display_sweep, autoDownsample=True)
if fixed_ylim is None:
y0 = float(np.nanmin(display_sweep))
y1 = float(np.nanmax(display_sweep))
if np.isfinite(y0) and np.isfinite(y1):
margin = 0.05 * max(1.0, (y1 - y0))
p_line.setYRange(y0 - margin, y1 + margin, padding=0)
# Обновим спектр и фазу
take_fft = min(int(display_sweep.size), FFT_LEN)
if take_fft > 0 and freq_shared is not None:
# Создаем буфер для полного FFT (с отрицательными частотами)
fft_in = np.zeros((FFT_LEN,), dtype=np.float32)
# Вычисляем индексы для размещения данных (1-10 ГГц в диапазоне -10 до +10 ГГц)
freq_range_total = FREQ_MAX_GHZ - FREQ_MIN_GHZ # 20 ГГц
freq_range_data = DATA_FREQ_END_GHZ - DATA_FREQ_START_GHZ # 9 ГГц
# Начальный индекс для данных в FFT буфере
start_idx = int((DATA_FREQ_START_GHZ - FREQ_MIN_GHZ) / freq_range_total * FFT_LEN)
# Количество точек для данных
data_points = int(freq_range_data / freq_range_total * FFT_LEN)
data_points = min(data_points, take_fft, FFT_LEN - start_idx)
# Подготовка данных с окном Хэннинга
seg = np.nan_to_num(display_sweep[:data_points], nan=0.0).astype(np.float32, copy=False)
win = np.hanning(data_points).astype(np.float32)
# Размещаем данные в правильной позиции
fft_in[start_idx:start_idx + data_points] = seg * win
# Полное FFT (включая отрицательные частоты)
spec = np.fft.fft(fft_in)
# Сдвигаем для центрирования нулевой частоты
spec = np.fft.fftshift(spec)
mag = np.abs(spec).astype(np.float32)
fft_vals = 20.0 * np.log10(mag + 1e-9)
xs_fft = freq_shared
if fft_vals.size > xs_fft.size:
fft_vals = fft_vals[: xs_fft.size]
curve_fft.setData(xs_fft[: fft_vals.size], fft_vals)
p_fft.setYRange(float(np.nanmin(fft_vals)), float(np.nanmax(fft_vals)), padding=0)
# Расчет и отображение фазы текущего свипа
phase = np.angle(spec).astype(np.float32)
if phase.size > xs_fft.size:
phase = phase[: xs_fft.size]
# Unwrapping по частоте
phase_unwrapped = np.unwrap(phase)
curve_phase.setData(xs_fft[: phase_unwrapped.size], phase_unwrapped)
phase_min = float(np.nanmin(phase_unwrapped))
phase_max = float(np.nanmax(phase_unwrapped))
p_phase.setYRange(phase_min, phase_max, padding=0)
# Обновляем вторую ось Y с расстоянием
try:
dist_min = phase_to_distance(np.array([phase_min]))[0]
dist_max = phase_to_distance(np.array([phase_max]))[0]
p_phase_dist_axis.setYRange(dist_min, dist_max, padding=0)
except Exception:
pass
if changed and ring is not None:
disp = ring if head == 0 else np.roll(ring, -head, axis=0)
disp = disp.T[:, ::-1] # (width, time with newest at left)
levels = _visible_levels_pyqtgraph(disp)
if levels is not None:
img.setImage(disp, autoLevels=False, levels=levels)
else:
img.setImage(disp, autoLevels=False)
if changed and current_info:
try:
status.setText(format_status_kv(current_info))
except Exception:
pass
# Автоматическое сохранение медианы при накоплении 1000 сырых свипов
if ref_out_file and not ref_out_saved and ref_ring is not None:
if ref_ring_count >= 1000:
try:
ordered = ref_ring if ref_ring_head == 0 else np.roll(ref_ring, -ref_ring_head, axis=0)
median_sweep = np.nanmedian(ordered, axis=0)
with open(ref_out_file, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['Index', 'Median_Value'])
for i, value in enumerate(median_sweep):
if np.isfinite(value):
writer.writerow([i, float(value)])
ref_out_saved = True
print(f"[ref-out] Сохранена медиана 1000 свипов в {ref_out_file}")
if status:
status.setText(f"[ref-out] Сохранено в {ref_out_file}")
except Exception as e:
print(f"[ref-out] Ошибка сохранения: {e}")
if changed and ring_fft is not None:
disp_fft = ring_fft if head == 0 else np.roll(ring_fft, -head, axis=0)
disp_fft = disp_fft.T[:, ::-1]
# Автодиапазон по среднему спектру за видимый интервал (как в хорошей версии)
levels = None
try:
mean_spec = np.nanmean(disp_fft, axis=1)
vmin_v = float(np.nanmin(mean_spec))
vmax_v = float(np.nanmax(mean_spec))
if np.isfinite(vmin_v) and np.isfinite(vmax_v) and vmin_v != vmax_v:
levels = (vmin_v, vmax_v)
except Exception:
levels = None
# Процентильная обрезка как запасной вариант
if levels is None and spec_clip is not None:
try:
vmin_v = float(np.nanpercentile(disp_fft, spec_clip[0]))
vmax_v = float(np.nanpercentile(disp_fft, spec_clip[1]))
if np.isfinite(vmin_v) and np.isfinite(vmax_v) and vmin_v != vmax_v:
levels = (vmin_v, vmax_v)
except Exception:
levels = None
# Ещё один фолбэк — глобальные накопленные мин/макс
if levels is None and y_min_fft is not None and y_max_fft is not None and np.isfinite(y_min_fft) and np.isfinite(y_max_fft) and y_min_fft != y_max_fft:
levels = (y_min_fft, y_max_fft)
if levels is not None:
img_fft.setImage(disp_fft, autoLevels=False, levels=levels)
else:
img_fft.setImage(disp_fft, autoLevels=False)
# Обновление водопада фазы
if changed and ring_phase is not None:
disp_phase = ring_phase if head == 0 else np.roll(ring_phase, -head, axis=0)
disp_phase = disp_phase.T[:, ::-1]
# Автодиапазон для фазы
levels_phase = None
try:
mean_phase = np.nanmean(disp_phase, axis=1)
vmin_p = float(np.nanmin(mean_phase))
vmax_p = float(np.nanmax(mean_phase))
if np.isfinite(vmin_p) and np.isfinite(vmax_p) and vmin_p != vmax_p:
levels_phase = (vmin_p, vmax_p)
except Exception:
levels_phase = None
# Фолбэк к отслеживаемым минимум/максимумам
if levels_phase is None and y_min_phase is not None and y_max_phase is not None and np.isfinite(y_min_phase) and np.isfinite(y_max_phase) and y_min_phase != y_max_phase:
levels_phase = (y_min_phase, y_max_phase)
if levels_phase is not None:
img_phase.setImage(disp_phase, autoLevels=False, levels=levels_phase)
else:
img_phase.setImage(disp_phase, autoLevels=False)
timer = pg.QtCore.QTimer()
timer.timeout.connect(update)
timer.start(interval_ms)
def on_quit():
stop_event.set()
reader.join(timeout=1.0)
app.aboutToQuit.connect(on_quit)
win.show()
exec_fn = getattr(app, "exec_", None) or getattr(app, "exec", None)
exec_fn()
# На случай если aboutToQuit не сработал
on_quit()

9
run.py Executable file
View File

@ -0,0 +1,9 @@
#!/usr/bin/env python3
"""
Скрипт запуска RFG ADC Data Plotter.
"""
from rfg_adc_plotter.cli import main
if __name__ == "__main__":
main()

6447
test2.ipynb Normal file

File diff suppressed because one or more lines are too long