From 356ca99d1202e21075f43f163c89f18fb31c175b Mon Sep 17 00:00:00 2001 From: Theodor Chikin Date: Fri, 19 Dec 2025 21:49:02 +0300 Subject: [PATCH] Now tty parser is faster a lot. Data is processed online --- RFG_ADC_dataplotter.py | 155 +++++++++++++++++++++++++++++++---------- 1 file changed, 119 insertions(+), 36 deletions(-) diff --git a/RFG_ADC_dataplotter.py b/RFG_ADC_dataplotter.py index 633379e..9b9321e 100755 --- a/RFG_ADC_dataplotter.py +++ b/RFG_ADC_dataplotter.py @@ -50,9 +50,14 @@ class FDReader: def __init__(self, fd: int): # Отдельно буферизуем для корректной readline() + self._fd = fd raw = os.fdopen(fd, "rb", closefd=False) + self._file = raw self._buf = io.BufferedReader(raw, buffer_size=65536) + def fileno(self) -> int: + return self._fd + def readline(self) -> bytes: return self._buf.readline() @@ -152,6 +157,60 @@ class SerialLineSource: pass +class SerialChunkReader: + """Быстрое неблокирующее чтение чанков из serial/raw TTY для максимального дренажа буфера.""" + + def __init__(self, src: SerialLineSource): + self._src = src + self._ser = src._pyserial + self._fd: Optional[int] = None + if self._ser is not None: + # Неблокирующий режим для быстрой откачки + try: + self._ser.timeout = 0 + except Exception: + pass + else: + try: + self._fd = src._fdreader.fileno() # type: ignore[union-attr] + try: + os.set_blocking(self._fd, False) + except Exception: + pass + except Exception: + self._fd = None + + def read_available(self) -> bytes: + """Вернёт доступные байты (b"" если данных нет).""" + if self._ser is not None: + try: + n = int(getattr(self._ser, "in_waiting", 0)) + except Exception: + n = 0 + if n > 0: + try: + return self._ser.read(n) + except Exception: + return b"" + return b"" + if self._fd is None: + return b"" + out = bytearray() + while True: + try: + chunk = os.read(self._fd, 65536) + if not chunk: + break + out += chunk + if len(chunk) < 65536: + break + except BlockingIOError: + break + except Exception: + break + return bytes(out) + + class SweepReader(threading.Thread): """Фоновый поток: читает строки, формирует завершённые свипы и кладёт в очередь.""" @@ -166,16 +225,19 @@ class SweepReader(threading.Thread): def _finalize_current(self, xs, ys): if not xs: return - try: - max_x = max(xs) - except ValueError: - return + max_x = max(xs) width = max_x + 1 + # Быстрый векторизованный путь sweep = np.full((width,), np.nan, dtype=np.float32) - # Заполнение известными точками - for x, y in zip(xs, ys): - if 0 <= x < width: - sweep[x] = float(y) + try: + idx = np.asarray(xs, dtype=np.int64) + vals = np.asarray(ys, dtype=np.float32) + sweep[idx] = vals + except Exception: + # Запасной путь + for x, y in zip(xs, ys): + if 0 <= x < width: + sweep[x] = float(y) # Кладём готовый свип (если очередь полна — выбрасываем самый старый) try: self._q.put_nowait(sweep) @@ -202,38 +264,59 @@ class SweepReader(threading.Thread): return try: + # Быстрый неблокирующий дренаж порта с разбором по байтам + chunk_reader = SerialChunkReader(self._src) + buf = bytearray() while not self._stop.is_set(): - raw = self._src.readline() - if not raw: - # timeout/ошибка/EOF — небольшой сон, чтобы не крутить CPU - time.sleep(0.001) - continue - try: - line = raw.decode("ascii", errors="ignore").strip() - except Exception: + data = chunk_reader.read_available() + if data: + buf += data + else: + # Короткая уступка CPU, если нет новых данных + time.sleep(0.0005) continue - if not line: - continue - - if line.startswith("Sweep_start"): - # Завершаем предыдущий, начинаем новый - self._finalize_current(xs, ys) - xs.clear() - ys.clear() - continue - - # stp X Y - # Разрешим как с пробелами, так и табами - parts = line.split() - if len(parts) >= 3 and parts[0].lower() == "stp": - try: - x = int(parts[1], 10) - y = int(parts[2], 10) - except Exception: + # Обрабатываем все полные строки + while True: + nl = buf.find(b"\n") + if nl == -1: + break + line = bytes(buf[:nl]) + del buf[: nl + 1] + if line.endswith(b"\r"): + line = line[:-1] + if not line: continue - xs.append(x) - ys.append(y) + + if line.startswith(b"Sweep_start"): + self._finalize_current(xs, ys) + xs.clear() + ys.clear() + continue + + # stp X Y + if len(line) >= 5 and (line[:3] == b"stp" or line[:3] == b"STP"): + sp1 = line.find(b" ", 3) + if sp1 == -1: + sp1 = line.find(b"\t", 3) + if sp1 == -1: + continue + sp2 = line.find(b" ", sp1 + 1) + if sp2 == -1: + sp2 = line.find(b"\t", sp1 + 1) + if sp2 == -1: + continue + try: + x = int(line[sp1 + 1 : sp2]) + y = int(line[sp2 + 1 :]) + except Exception: + continue + xs.append(x) + ys.append(y) + + # Защита от переполнения буфера при отсутствии переводов строки + if len(buf) > 1_000_000: + del buf[:-262144] finally: try: # Завершаем оставшийся свип