diff --git a/RFG_ADC_dataplotter.py b/RFG_ADC_dataplotter.py index 1df1484..15c5bec 100755 --- a/RFG_ADC_dataplotter.py +++ b/RFG_ADC_dataplotter.py @@ -384,6 +384,7 @@ class SweepReader(threading.Thread): out_queue: Queue[SweepPacket], stop_event: threading.Event, fancy: bool = False, + bin_mode: bool = False, ): super().__init__(daemon=True) self._port_path = port_path @@ -392,11 +393,17 @@ class SweepReader(threading.Thread): self._stop = stop_event self._src: Optional[SerialLineSource] = None self._fancy = bool(fancy) + self._bin_mode = bool(bin_mode) self._max_width: int = 0 self._sweep_idx: int = 0 self._last_sweep_ts: Optional[float] = None self._n_valid_hist = deque() + @staticmethod + def _u32_to_i32(v: int) -> int: + """Преобразование 32-bit слова в знаковое значение.""" + return v - 0x1_0000_0000 if (v & 0x8000_0000) else v + def _finalize_current(self, xs, ys, channels: Optional[set[int]]): if not xs: return @@ -446,8 +453,10 @@ class SweepReader(threading.Thread): m = float(np.nanmean(sweep)) if np.isfinite(m) and m < DATA_INVERSION_THRASHOLD: sweep *= -1.0 + except Exception: pass + #sweep = np.abs(sweep) #sweep -= float(np.nanmean(sweep)) # Метрики для статусной строки (вид словаря: переменная -> значение) @@ -502,13 +511,149 @@ class SweepReader(threading.Thread): except Exception: pass - def run(self): - # Состояние текущего свипа + def _run_ascii_stream(self, chunk_reader: SerialChunkReader): xs: list[int] = [] ys: list[int] = [] cur_channel: Optional[int] = None cur_channels: set[int] = set() + buf = bytearray() + while not self._stop.is_set(): + data = chunk_reader.read_available() + if data: + buf += data + else: + time.sleep(0.0005) + continue + while True: + nl = buf.find(b"\n") + if nl == -1: + break + line = bytes(buf[:nl]) + del buf[: nl + 1] + if line.endswith(b"\r"): + line = line[:-1] + if not line: + continue + + if line.startswith(b"Sweep_start"): + self._finalize_current(xs, ys, cur_channels) + xs.clear() + ys.clear() + cur_channel = None + cur_channels.clear() + continue + + # sCH X Y или s CH X Y (все целые со знаком). Разделяем по любым пробелам/табам. + if len(line) >= 3: + parts = line.split() + if len(parts) >= 3 and (parts[0].lower() == b"s" or parts[0].lower().startswith(b"s")): + try: + if parts[0].lower() == b"s": + if len(parts) >= 4: + ch = int(parts[1], 10) + x = int(parts[2], 10) + y = int(parts[3], 10) # поддержка знака: "+…" и "-…" + else: + ch = 0 + x = int(parts[1], 10) + y = int(parts[2], 10) # поддержка знака: "+…" и "-…" + else: + # формат вида "s0" + ch = int(parts[0][1:], 10) + x = int(parts[1], 10) + y = int(parts[2], 10) # поддержка знака: "+…" и "-…" + except Exception: + continue + if cur_channel is None: + cur_channel = ch + cur_channels.add(ch) + xs.append(x) + ys.append(y) + + if len(buf) > 1_000_000: + del buf[:-262144] + + self._finalize_current(xs, ys, cur_channels) + + def _run_binary_stream(self, chunk_reader: SerialChunkReader): + xs: list[int] = [] + ys: list[int] = [] + cur_channel: Optional[int] = None + cur_channels: set[int] = set() + words = deque() + + buf = bytearray() + while not self._stop.is_set(): + data = chunk_reader.read_available() + if data: + buf += data + else: + time.sleep(0.0005) + continue + + usable = len(buf) & ~1 + if usable == 0: + continue + + i = 0 + while i < usable: + w = int(buf[i]) | (int(buf[i + 1]) << 8) + words.append(w) + i += 2 + + # Бинарный протокол: + # старт свипа (актуальный): 0xFFFF, 0xFFFF, 0xFFFF, (ch<<8)|0x0A + # старт свипа (legacy): 0xFFFF, 0xFFFF, channel, 0x0A0A + # точка: step, value_hi, value_lo, 0x000A + while len(words) >= 4: + w0 = int(words[0]) + w1 = int(words[1]) + w2 = int(words[2]) + w3 = int(words[3]) + + if w0 == 0xFFFF and w1 == 0xFFFF and w2 == 0xFFFF and (w3 & 0x00FF) == 0x000A: + self._finalize_current(xs, ys, cur_channels) + xs.clear() + ys.clear() + cur_channels.clear() + cur_channel = (w3 >> 8) & 0x00FF + cur_channels.add(cur_channel) + for _ in range(4): + words.popleft() + continue + + if w0 == 0xFFFF and w1 == 0xFFFF and w3 == 0x0A0A: + self._finalize_current(xs, ys, cur_channels) + xs.clear() + ys.clear() + cur_channels.clear() + cur_channel = w2 + cur_channels.add(cur_channel) + for _ in range(4): + words.popleft() + continue + + if w3 == 0x000A: + if cur_channel is not None: + cur_channels.add(cur_channel) + xs.append(w0) + value_u32 = (w1 << 16) | w2 + ys.append(self._u32_to_i32(value_u32)) + for _ in range(4): + words.popleft() + continue + + # Поток может начаться с середины пакета; сдвигаемся по слову до ресинхронизации. + words.popleft() + + del buf[:usable] + if len(buf) > 1_000_000: + del buf[:-262144] + + self._finalize_current(xs, ys, cur_channels) + + def run(self): try: self._src = SerialLineSource(self._port_path, self._baud, timeout=1.0) sys.stderr.write(f"[info] Открыл порт {self._port_path} ({self._src._using})\n") @@ -517,74 +662,12 @@ class SweepReader(threading.Thread): return try: - # Быстрый неблокирующий дренаж порта с разбором по байтам chunk_reader = SerialChunkReader(self._src) - buf = bytearray() - while not self._stop.is_set(): - data = chunk_reader.read_available() - if data: - buf += data - else: - # Короткая уступка CPU, если нет новых данных - time.sleep(0.0005) - continue - - # Обрабатываем все полные строки - while True: - nl = buf.find(b"\n") - if nl == -1: - break - line = bytes(buf[:nl]) - del buf[: nl + 1] - if line.endswith(b"\r"): - line = line[:-1] - if not line: - continue - - if line.startswith(b"Sweep_start"): - self._finalize_current(xs, ys, cur_channels) - xs.clear() - ys.clear() - cur_channel = None - cur_channels.clear() - continue - - # sCH X Y или s CH X Y (все целые со знаком). Разделяем по любым пробелам/табам. - if len(line) >= 3: - parts = line.split() - if len(parts) >= 3 and (parts[0].lower() == b"s" or parts[0].lower().startswith(b"s")): - try: - if parts[0].lower() == b"s": - if len(parts) >= 4: - ch = int(parts[1], 10) - x = int(parts[2], 10) - y = int(parts[3], 10) # поддержка знака: "+…" и "-…" - else: - ch = 0 - x = int(parts[1], 10) - y = int(parts[2], 10) # поддержка знака: "+…" и "-…" - else: - # формат вида "s0" - ch = int(parts[0][1:], 10) - x = int(parts[1], 10) - y = int(parts[2], 10) # поддержка знака: "+…" и "-…" - except Exception: - continue - if cur_channel is None: - cur_channel = ch - cur_channels.add(ch) - xs.append(x) - ys.append(y) - - # Защита от переполнения буфера при отсутствии переводов строки - if len(buf) > 1_000_000: - del buf[:-262144] + if self._bin_mode: + self._run_binary_stream(chunk_reader) + else: + self._run_ascii_stream(chunk_reader) finally: - try: - # Завершаем оставшийся свип - self._finalize_current(xs, ys, cur_channels) - except Exception: - pass try: if self._src is not None: self._src.close() @@ -648,6 +731,15 @@ def main(): default="projector", help="Тип нормировки: projector (по огибающим в [-1,+1]) или simple (raw/calib)", ) + parser.add_argument( + "--bin", + dest="bin_mode", + action="store_true", + help=( + "Бинарный протокол: старт свипа 0xFFFF,0xFFFF,0xFFFF,(CH<<8)|0x0A; " + "точки step,uint32(hi16,lo16),0x000A" + ), + ) args = parser.parse_args() @@ -673,7 +765,14 @@ def main(): # Очередь завершённых свипов и поток чтения q: Queue[SweepPacket] = Queue(maxsize=1000) stop_event = threading.Event() - reader = SweepReader(args.port, args.baud, q, stop_event, fancy=bool(args.fancy)) + reader = SweepReader( + args.port, + args.baud, + q, + stop_event, + fancy=bool(args.fancy), + bin_mode=bool(args.bin_mode), + ) reader.start() # Графика @@ -863,7 +962,7 @@ def main(): freq_shared = np.arange(fft_bins, dtype=np.int32) def _visible_levels_matplotlib(data: np.ndarray, axis) -> Optional[Tuple[float, float]]: - """(vmin, vmax) по текущей видимой области imshow (без накопления по времени).""" + """(vmin, vmax) по центральным 90% значений в видимой области imshow.""" if data.size == 0: return None ny, nx = data.shape[0], data.shape[1] @@ -888,8 +987,8 @@ def main(): if not finite.any(): return None vals = sub[finite] - vmin = float(np.min(vals)) - vmax = float(np.max(vals)) + vmin = float(np.nanpercentile(vals, 5)) + vmax = float(np.nanpercentile(vals, 95)) if not (np.isfinite(vmin) and np.isfinite(vmax)) or vmin == vmax: return None return (vmin, vmax) @@ -1168,7 +1267,14 @@ def run_pyqtgraph(args): # Очередь завершённых свипов и поток чтения q: Queue[SweepPacket] = Queue(maxsize=1000) stop_event = threading.Event() - reader = SweepReader(args.port, args.baud, q, stop_event, fancy=bool(args.fancy)) + reader = SweepReader( + args.port, + args.baud, + q, + stop_event, + fancy=bool(args.fancy), + bin_mode=bool(args.bin_mode), + ) reader.start() # Настройки скорости @@ -1310,7 +1416,7 @@ def run_pyqtgraph(args): freq_shared = np.arange(fft_bins, dtype=np.int32) def _visible_levels_pyqtgraph(data: np.ndarray) -> Optional[Tuple[float, float]]: - """(vmin, vmax) по текущей видимой области ImageItem (без накопления по времени).""" + """(vmin, vmax) по центральным 90% значений в видимой области ImageItem.""" if data.size == 0: return None ny, nx = data.shape[0], data.shape[1] @@ -1334,8 +1440,8 @@ def run_pyqtgraph(args): if not finite.any(): return None vals = sub[finite] - vmin = float(np.min(vals)) - vmax = float(np.max(vals)) + vmin = float(np.nanpercentile(vals, 5)) + vmax = float(np.nanpercentile(vals, 95)) if not (np.isfinite(vmin) and np.isfinite(vmax)) or vmin == vmax: return None return (vmin, vmax)