Now tty parser is faster a lot. Data is processed online

This commit is contained in:
2025-12-19 21:49:02 +03:00
parent 718bff0c4b
commit 356ca99d12

View File

@ -50,9 +50,14 @@ class FDReader:
def __init__(self, fd: int):
# Отдельно буферизуем для корректной readline()
self._fd = fd
raw = os.fdopen(fd, "rb", closefd=False)
self._file = raw
self._buf = io.BufferedReader(raw, buffer_size=65536)
def fileno(self) -> int:
return self._fd
def readline(self) -> bytes:
return self._buf.readline()
@ -152,6 +157,60 @@ class SerialLineSource:
pass
class SerialChunkReader:
"""Быстрое неблокирующее чтение чанков из serial/raw TTY для максимального дренажа буфера."""
def __init__(self, src: SerialLineSource):
self._src = src
self._ser = src._pyserial
self._fd: Optional[int] = None
if self._ser is not None:
# Неблокирующий режим для быстрой откачки
try:
self._ser.timeout = 0
except Exception:
pass
else:
try:
self._fd = src._fdreader.fileno() # type: ignore[union-attr]
try:
os.set_blocking(self._fd, False)
except Exception:
pass
except Exception:
self._fd = None
def read_available(self) -> bytes:
"""Вернёт доступные байты (b"" если данных нет)."""
if self._ser is not None:
try:
n = int(getattr(self._ser, "in_waiting", 0))
except Exception:
n = 0
if n > 0:
try:
return self._ser.read(n)
except Exception:
return b""
return b""
if self._fd is None:
return b""
out = bytearray()
while True:
try:
chunk = os.read(self._fd, 65536)
if not chunk:
break
out += chunk
if len(chunk) < 65536:
break
except BlockingIOError:
break
except Exception:
break
return bytes(out)
class SweepReader(threading.Thread):
"""Фоновый поток: читает строки, формирует завершённые свипы и кладёт в очередь."""
@ -166,13 +225,16 @@ class SweepReader(threading.Thread):
def _finalize_current(self, xs, ys):
if not xs:
return
try:
max_x = max(xs)
except ValueError:
return
width = max_x + 1
# Быстрый векторизованный путь
sweep = np.full((width,), np.nan, dtype=np.float32)
# Заполнение известными точками
try:
idx = np.asarray(xs, dtype=np.int64)
vals = np.asarray(ys, dtype=np.float32)
sweep[idx] = vals
except Exception:
# Запасной путь
for x, y in zip(xs, ys):
if 0 <= x < width:
sweep[x] = float(y)
@ -202,38 +264,59 @@ class SweepReader(threading.Thread):
return
try:
# Быстрый неблокирующий дренаж порта с разбором по байтам
chunk_reader = SerialChunkReader(self._src)
buf = bytearray()
while not self._stop.is_set():
raw = self._src.readline()
if not raw:
# timeout/ошибка/EOF — небольшой сон, чтобы не крутить CPU
time.sleep(0.001)
continue
try:
line = raw.decode("ascii", errors="ignore").strip()
except Exception:
data = chunk_reader.read_available()
if data:
buf += data
else:
# Короткая уступка CPU, если нет новых данных
time.sleep(0.0005)
continue
# Обрабатываем все полные строки
while True:
nl = buf.find(b"\n")
if nl == -1:
break
line = bytes(buf[:nl])
del buf[: nl + 1]
if line.endswith(b"\r"):
line = line[:-1]
if not line:
continue
if line.startswith("Sweep_start"):
# Завершаем предыдущий, начинаем новый
if line.startswith(b"Sweep_start"):
self._finalize_current(xs, ys)
xs.clear()
ys.clear()
continue
# stp X Y
# Разрешим как с пробелами, так и табами
parts = line.split()
if len(parts) >= 3 and parts[0].lower() == "stp":
if len(line) >= 5 and (line[:3] == b"stp" or line[:3] == b"STP"):
sp1 = line.find(b" ", 3)
if sp1 == -1:
sp1 = line.find(b"\t", 3)
if sp1 == -1:
continue
sp2 = line.find(b" ", sp1 + 1)
if sp2 == -1:
sp2 = line.find(b"\t", sp1 + 1)
if sp2 == -1:
continue
try:
x = int(parts[1], 10)
y = int(parts[2], 10)
x = int(line[sp1 + 1 : sp2])
y = int(line[sp2 + 1 :])
except Exception:
continue
xs.append(x)
ys.append(y)
# Защита от переполнения буфера при отсутствии переводов строки
if len(buf) > 1_000_000:
del buf[:-262144]
finally:
try:
# Завершаем оставшийся свип