new project structure
This commit is contained in:
0
rfg_adc_plotter/data_acquisition/__init__.py
Normal file
0
rfg_adc_plotter/data_acquisition/__init__.py
Normal file
204
rfg_adc_plotter/data_acquisition/serial_io.py
Normal file
204
rfg_adc_plotter/data_acquisition/serial_io.py
Normal file
@ -0,0 +1,204 @@
|
||||
"""
|
||||
Модули для работы с serial портом: чтение данных через pyserial или raw TTY.
|
||||
"""
|
||||
|
||||
import io
|
||||
import os
|
||||
import sys
|
||||
from typing import Optional
|
||||
|
||||
|
||||
def try_open_pyserial(path: str, baud: int, timeout: float):
|
||||
"""Попытка открыть порт через pyserial."""
|
||||
try:
|
||||
import serial # type: ignore
|
||||
except Exception:
|
||||
return None
|
||||
try:
|
||||
ser = serial.Serial(path, baudrate=baud, timeout=timeout)
|
||||
# ВРЕМЕННО ОТКЛЮЧЕН: hardware flow control для проверки
|
||||
# ser.rtscts = True
|
||||
# Увеличиваем буфер приема ядра до 64KB
|
||||
try:
|
||||
ser.set_buffer_size(rx_size=65536, tx_size=4096)
|
||||
except (AttributeError, NotImplementedError):
|
||||
# Не все платформы/версии pyserial поддерживают set_buffer_size
|
||||
pass
|
||||
return ser
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
class FDReader:
|
||||
"""Простой враппер чтения строк из файлового дескриптора TTY."""
|
||||
|
||||
def __init__(self, fd: int):
|
||||
# Отдельно буферизуем для корректной readline()
|
||||
self._fd = fd
|
||||
raw = os.fdopen(fd, "rb", closefd=False)
|
||||
self._file = raw
|
||||
# Увеличен размер буфера до 256KB для предотвращения потерь
|
||||
self._buf = io.BufferedReader(raw, buffer_size=262144)
|
||||
|
||||
def fileno(self) -> int:
|
||||
return self._fd
|
||||
|
||||
def readline(self) -> bytes:
|
||||
return self._buf.readline()
|
||||
|
||||
def close(self):
|
||||
try:
|
||||
self._buf.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def open_raw_tty(path: str, baud: int) -> Optional[FDReader]:
|
||||
"""Открыть TTY без pyserial и настроить порт через termios.
|
||||
|
||||
Возвращает FDReader или None при ошибке.
|
||||
"""
|
||||
try:
|
||||
import termios
|
||||
import tty
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
try:
|
||||
fd = os.open(path, os.O_RDONLY | os.O_NOCTTY)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
try:
|
||||
attrs = termios.tcgetattr(fd)
|
||||
# Установим «сырое» состояние
|
||||
tty.setraw(fd)
|
||||
|
||||
# Скорость
|
||||
baud_map = {
|
||||
9600: termios.B9600,
|
||||
19200: termios.B19200,
|
||||
38400: termios.B38400,
|
||||
57600: termios.B57600,
|
||||
115200: termios.B115200,
|
||||
230400: getattr(termios, "B230400", None),
|
||||
460800: getattr(termios, "B460800", None),
|
||||
}
|
||||
b = baud_map.get(baud) or termios.B115200
|
||||
|
||||
attrs[4] = b # ispeed
|
||||
attrs[5] = b # ospeed
|
||||
|
||||
# VMIN=1, VTIME=0 — блокирующее чтение по байту
|
||||
cc = attrs[6]
|
||||
cc[termios.VMIN] = 1
|
||||
cc[termios.VTIME] = 0
|
||||
attrs[6] = cc
|
||||
|
||||
termios.tcsetattr(fd, termios.TCSANOW, attrs)
|
||||
except Exception:
|
||||
try:
|
||||
os.close(fd)
|
||||
except Exception:
|
||||
pass
|
||||
return None
|
||||
|
||||
return FDReader(fd)
|
||||
|
||||
|
||||
class SerialLineSource:
|
||||
"""Единый интерфейс для чтения строк из порта (pyserial или raw TTY)."""
|
||||
|
||||
def __init__(self, path: str, baud: int, timeout: float = 1.0):
|
||||
self._pyserial = try_open_pyserial(path, baud, timeout)
|
||||
self._fdreader = None
|
||||
self._using = "pyserial" if self._pyserial is not None else "raw"
|
||||
if self._pyserial is None:
|
||||
self._fdreader = open_raw_tty(path, baud)
|
||||
if self._fdreader is None:
|
||||
msg = f"Не удалось открыть порт '{path}' (pyserial и raw TTY не сработали)"
|
||||
if sys.platform.startswith("win"):
|
||||
msg += ". На Windows нужен pyserial: pip install pyserial"
|
||||
raise RuntimeError(msg)
|
||||
|
||||
def readline(self) -> bytes:
|
||||
if self._pyserial is not None:
|
||||
try:
|
||||
return self._pyserial.readline()
|
||||
except Exception:
|
||||
return b""
|
||||
else:
|
||||
try:
|
||||
return self._fdreader.readline() # type: ignore[union-attr]
|
||||
except Exception:
|
||||
return b""
|
||||
|
||||
def close(self):
|
||||
try:
|
||||
if self._pyserial is not None:
|
||||
self._pyserial.close()
|
||||
elif self._fdreader is not None:
|
||||
self._fdreader.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
class SerialChunkReader:
|
||||
"""Быстрое неблокирующее чтение чанков из serial/raw TTY для максимального дренажа буфера."""
|
||||
|
||||
def __init__(self, src: SerialLineSource, error_counter: Optional[list] = None):
|
||||
self._src = src
|
||||
self._ser = src._pyserial
|
||||
self._fd: Optional[int] = None
|
||||
self._error_counter = error_counter # Список с 1 элементом для передачи по ссылке
|
||||
if self._ser is not None:
|
||||
# Неблокирующий режим для быстрой откачки
|
||||
try:
|
||||
self._ser.timeout = 0
|
||||
except Exception:
|
||||
pass
|
||||
else:
|
||||
try:
|
||||
self._fd = src._fdreader.fileno() # type: ignore[union-attr]
|
||||
try:
|
||||
os.set_blocking(self._fd, False)
|
||||
except Exception:
|
||||
pass
|
||||
except Exception:
|
||||
self._fd = None
|
||||
|
||||
def read_available(self) -> bytes:
|
||||
"""Вернёт доступные байты (b"" если данных нет)."""
|
||||
if self._ser is not None:
|
||||
try:
|
||||
n = int(getattr(self._ser, "in_waiting", 0))
|
||||
except Exception:
|
||||
if self._error_counter:
|
||||
self._error_counter[0] += 1
|
||||
n = 0
|
||||
if n > 0:
|
||||
try:
|
||||
return self._ser.read(n)
|
||||
except Exception:
|
||||
if self._error_counter:
|
||||
self._error_counter[0] += 1
|
||||
return b""
|
||||
return b""
|
||||
if self._fd is None:
|
||||
return b""
|
||||
out = bytearray()
|
||||
while True:
|
||||
try:
|
||||
chunk = os.read(self._fd, 65536)
|
||||
if not chunk:
|
||||
break
|
||||
out += chunk
|
||||
if len(chunk) < 65536:
|
||||
break
|
||||
except BlockingIOError:
|
||||
break
|
||||
except Exception:
|
||||
if self._error_counter:
|
||||
self._error_counter[0] += 1
|
||||
break
|
||||
return bytes(out)
|
||||
249
rfg_adc_plotter/data_acquisition/sweep_reader.py
Normal file
249
rfg_adc_plotter/data_acquisition/sweep_reader.py
Normal file
@ -0,0 +1,249 @@
|
||||
"""
|
||||
Фоновый поток для чтения и сборки свипов из serial порта.
|
||||
"""
|
||||
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
from collections import deque
|
||||
from queue import Queue, Full
|
||||
|
||||
import numpy as np
|
||||
|
||||
from ..config import DATA_INVERSION_THRASHOLD, SweepInfo, SweepPacket
|
||||
from .serial_io import SerialChunkReader, SerialLineSource
|
||||
|
||||
|
||||
class SweepReader(threading.Thread):
|
||||
"""Фоновый поток: читает строки, формирует завершённые свипы и кладёт в очередь."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
port_path: str,
|
||||
baud: int,
|
||||
out_queue: Queue[SweepPacket],
|
||||
stop_event: threading.Event,
|
||||
fancy: bool = False,
|
||||
):
|
||||
super().__init__(daemon=True)
|
||||
self._port_path = port_path
|
||||
self._baud = baud
|
||||
self._q = out_queue
|
||||
self._stop = stop_event
|
||||
self._src: SerialLineSource | None = None
|
||||
self._fancy = bool(fancy)
|
||||
self._max_width: int = 0
|
||||
self._sweep_idx: int = 0
|
||||
self._last_sweep_ts: float | None = None
|
||||
self._n_valid_hist = deque()
|
||||
# Счетчик потерь данных (выброшенных свипов из-за переполнения очереди)
|
||||
self._dropped_sweeps: int = 0
|
||||
# Диагностика потери точек внутри свипа
|
||||
self._total_lines_received: int = 0 # Всего принято строк с данными
|
||||
self._total_parse_errors: int = 0 # Ошибок парсинга строк
|
||||
self._total_empty_lines: int = 0 # Пустых строк
|
||||
self._max_buf_size: int = 0 # Максимальный размер буфера парсинга
|
||||
self._read_errors: int = 0 # Ошибок чтения из порта
|
||||
self._last_diag_time: float = 0.0 # Время последнего вывода диагностики
|
||||
|
||||
def _finalize_current(self, xs, ys):
|
||||
if not xs:
|
||||
return
|
||||
max_x = max(xs)
|
||||
width = max_x + 1
|
||||
self._max_width = max(self._max_width, width)
|
||||
target_width = self._max_width if self._fancy else width
|
||||
# Быстрый векторизованный путь
|
||||
sweep = np.full((target_width,), np.nan, dtype=np.float32)
|
||||
try:
|
||||
idx = np.asarray(xs, dtype=np.int64)
|
||||
vals = np.asarray(ys, dtype=np.float32)
|
||||
sweep[idx] = vals
|
||||
except Exception:
|
||||
# Запасной путь
|
||||
for x, y in zip(xs, ys):
|
||||
if 0 <= x < target_width:
|
||||
sweep[x] = float(y)
|
||||
# Метрики валидных точек до заполнения пропусков
|
||||
finite_pre = np.isfinite(sweep)
|
||||
n_valid_cur = int(np.count_nonzero(finite_pre))
|
||||
|
||||
# Дополнительная обработка пропусков: при --fancy заполняем внутренние разрывы, края и дотягиваем до максимальной длины
|
||||
if self._fancy:
|
||||
try:
|
||||
known = ~np.isnan(sweep)
|
||||
if np.any(known):
|
||||
known_idx = np.nonzero(known)[0]
|
||||
# Для каждой пары соседних известных индексов заполним промежуток средним значением
|
||||
for i0, i1 in zip(known_idx[:-1], known_idx[1:]):
|
||||
if i1 - i0 > 1:
|
||||
avg = (sweep[i0] + sweep[i1]) * 0.5
|
||||
sweep[i0 + 1 : i1] = avg
|
||||
first_idx = int(known_idx[0])
|
||||
last_idx = int(known_idx[-1])
|
||||
if first_idx > 0:
|
||||
sweep[:first_idx] = sweep[first_idx]
|
||||
if last_idx < sweep.size - 1:
|
||||
sweep[last_idx + 1 :] = sweep[last_idx]
|
||||
except Exception:
|
||||
# В случае ошибки просто оставляем как есть
|
||||
pass
|
||||
# Инверсия данных при «отрицательном» уровне (среднее ниже порога)
|
||||
try:
|
||||
m = float(np.nanmean(sweep))
|
||||
if np.isfinite(m) and m < DATA_INVERSION_THRASHOLD:
|
||||
sweep *= -1.0
|
||||
except Exception:
|
||||
pass
|
||||
sweep -= float(np.nanmean(sweep))
|
||||
|
||||
# Метрики для статусной строки (вид словаря: переменная -> значение)
|
||||
self._sweep_idx += 1
|
||||
now = time.time()
|
||||
if self._last_sweep_ts is None:
|
||||
dt_ms = float("nan")
|
||||
else:
|
||||
dt_ms = (now - self._last_sweep_ts) * 1000.0
|
||||
self._last_sweep_ts = now
|
||||
self._n_valid_hist.append((now, n_valid_cur))
|
||||
while self._n_valid_hist and (now - self._n_valid_hist[0][0]) > 1.0:
|
||||
self._n_valid_hist.popleft()
|
||||
if self._n_valid_hist:
|
||||
n_valid = float(sum(v for _t, v in self._n_valid_hist) / len(self._n_valid_hist))
|
||||
else:
|
||||
n_valid = float(n_valid_cur)
|
||||
|
||||
if n_valid_cur > 0:
|
||||
vmin = float(np.nanmin(sweep))
|
||||
vmax = float(np.nanmax(sweep))
|
||||
mean = float(np.nanmean(sweep))
|
||||
std = float(np.nanstd(sweep))
|
||||
else:
|
||||
vmin = vmax = mean = std = float("nan")
|
||||
info: SweepInfo = {
|
||||
"sweep": self._sweep_idx,
|
||||
"n_valid": n_valid,
|
||||
"min": vmin,
|
||||
"max": vmax,
|
||||
"mean": mean,
|
||||
"std": std,
|
||||
"dt_ms": dt_ms,
|
||||
"dropped": self._dropped_sweeps,
|
||||
"lines": self._total_lines_received,
|
||||
"parse_err": self._total_parse_errors,
|
||||
"read_err": self._read_errors,
|
||||
"max_buf": self._max_buf_size,
|
||||
}
|
||||
|
||||
# Периодический вывод детальной диагностики в stderr (каждые 10 секунд)
|
||||
now = time.time()
|
||||
if now - self._last_diag_time > 10.0:
|
||||
self._last_diag_time = now
|
||||
sys.stderr.write(
|
||||
f"[DIAG] sweep={self._sweep_idx} n_valid={n_valid:.1f} "
|
||||
f"lines={self._total_lines_received} parse_err={self._total_parse_errors} "
|
||||
f"read_err={self._read_errors} max_buf={self._max_buf_size} "
|
||||
f"dropped={self._dropped_sweeps}\n"
|
||||
)
|
||||
sys.stderr.flush()
|
||||
|
||||
# Кладём готовый свип (если очередь полна — выбрасываем самый старый)
|
||||
try:
|
||||
self._q.put_nowait((sweep, info))
|
||||
except Full:
|
||||
# Счетчик потерь для диагностики
|
||||
self._dropped_sweeps += 1
|
||||
try:
|
||||
_ = self._q.get_nowait()
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
self._q.put_nowait((sweep, info))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def run(self):
|
||||
# Состояние текущего свипа
|
||||
xs: list[int] = []
|
||||
ys: list[int] = []
|
||||
|
||||
try:
|
||||
self._src = SerialLineSource(self._port_path, self._baud, timeout=1.0)
|
||||
sys.stderr.write(f"[info] Открыл порт {self._port_path} ({self._src._using})\n")
|
||||
except Exception as e:
|
||||
sys.stderr.write(f"[error] {e}\n")
|
||||
return
|
||||
|
||||
try:
|
||||
# Быстрый неблокирующий дренаж порта с разбором по байтам
|
||||
# Передаем счетчик ошибок чтения как список для изменения по ссылке
|
||||
error_counter = [0]
|
||||
chunk_reader = SerialChunkReader(self._src, error_counter)
|
||||
buf = bytearray()
|
||||
while not self._stop.is_set():
|
||||
data = chunk_reader.read_available()
|
||||
# Обновляем счетчик ошибок чтения
|
||||
self._read_errors = error_counter[0]
|
||||
if data:
|
||||
buf += data
|
||||
# Отслеживаем максимальный размер буфера парсинга
|
||||
if len(buf) > self._max_buf_size:
|
||||
self._max_buf_size = len(buf)
|
||||
else:
|
||||
# Короткая уступка CPU, если нет новых данных (уменьшена до 0.1ms)
|
||||
time.sleep(0.0001)
|
||||
continue
|
||||
|
||||
# Обрабатываем все полные строки
|
||||
while True:
|
||||
nl = buf.find(b"\n")
|
||||
if nl == -1:
|
||||
break
|
||||
line = bytes(buf[:nl])
|
||||
del buf[: nl + 1]
|
||||
if line.endswith(b"\r"):
|
||||
line = line[:-1]
|
||||
if not line:
|
||||
self._total_empty_lines += 1
|
||||
continue
|
||||
|
||||
if line.startswith(b"Sweep_start"):
|
||||
self._finalize_current(xs, ys)
|
||||
xs.clear()
|
||||
ys.clear()
|
||||
continue
|
||||
|
||||
# s X Y (оба целые со знаком). Разделяем по любым пробелам/табам.
|
||||
if len(line) >= 3:
|
||||
parts = line.split()
|
||||
if len(parts) >= 3 and parts[0].lower() == b"s":
|
||||
try:
|
||||
x = int(parts[1], 10)
|
||||
y = int(parts[2], 10) # поддержка знака: "+…" и "-…"
|
||||
except Exception:
|
||||
self._total_parse_errors += 1
|
||||
continue
|
||||
xs.append(x)
|
||||
ys.append(y)
|
||||
self._total_lines_received += 1
|
||||
else:
|
||||
# Строка не в формате "s X Y"
|
||||
self._total_parse_errors += 1
|
||||
else:
|
||||
# Строка слишком короткая
|
||||
self._total_parse_errors += 1
|
||||
|
||||
# Защита от переполнения буфера при отсутствии переводов строки (снижен порог)
|
||||
if len(buf) > 262144:
|
||||
del buf[:-131072]
|
||||
finally:
|
||||
try:
|
||||
# Завершаем оставшийся свип
|
||||
self._finalize_current(xs, ys)
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
if self._src is not None:
|
||||
self._src.close()
|
||||
except Exception:
|
||||
pass
|
||||
Reference in New Issue
Block a user