diff --git a/RFG_ADC_dataplotter.py b/RFG_ADC_dataplotter.py index 1df1484..7c8b63a 100755 --- a/RFG_ADC_dataplotter.py +++ b/RFG_ADC_dataplotter.py @@ -36,6 +36,9 @@ FFT_LEN = 1024 # длина БПФ для спектра/водопада сп # Порог для инверсии сырых данных: если среднее значение свипа ниже порога — # считаем, что сигнал «меньше нуля» и домножаем свип на -1 DATA_INVERSION_THRASHOLD = 10.0 +LOG_DETECTOR_OFFSET = 0.0 +LOG_DETECTOR_SCALER = 1.0 +LOG_DETECTOR_BASE = 2.0 Number = Union[int, float] SweepInfo = Dict[str, Any] @@ -59,7 +62,8 @@ def _format_status_kv(data: Mapping[str, Any]) -> str: return f"{fv:.3g}" return f"{fv:.3f}".rstrip("0").rstrip(".") - parts = [f"{k}:{_fmt(v)}" for k, v in data.items()] + hidden_keys = {"pre_exp_sweep", "sweep_1", "sweep_2"} + parts = [f"{k}:{_fmt(v)}" for k, v in data.items() if k not in hidden_keys] return " ".join(parts) @@ -384,6 +388,8 @@ class SweepReader(threading.Thread): out_queue: Queue[SweepPacket], stop_event: threading.Event, fancy: bool = False, + bin_mode: bool = False, + logdetector: bool = False, ): super().__init__(daemon=True) self._port_path = port_path @@ -392,12 +398,26 @@ class SweepReader(threading.Thread): self._stop = stop_event self._src: Optional[SerialLineSource] = None self._fancy = bool(fancy) + self._bin_mode = bool(bin_mode) + self._logdetector = bool(logdetector) self._max_width: int = 0 self._sweep_idx: int = 0 self._last_sweep_ts: Optional[float] = None self._n_valid_hist = deque() - def _finalize_current(self, xs, ys, channels: Optional[set[int]]): + @staticmethod + def _u32_to_i32(v: int) -> int: + """Преобразование 32-bit слова в знаковое значение.""" + return v - 0x1_0000_0000 if (v & 0x8000_0000) else v + + def _finalize_current( + self, + xs, + ys, + channels: Optional[set[int]], + ys1: Optional[list[int]] = None, + ys2: Optional[list[int]] = None, + ): if not xs: return ch_list = sorted(channels) if channels else [0] @@ -406,17 +426,26 @@ class SweepReader(threading.Thread): width = max_x + 1 self._max_width = max(self._max_width, width) target_width = self._max_width if self._fancy else width - # Быстрый векторизованный путь - sweep = np.full((target_width,), np.nan, dtype=np.float32) - try: - idx = np.asarray(xs, dtype=np.int64) - vals = np.asarray(ys, dtype=np.float32) - sweep[idx] = vals - except Exception: - # Запасной путь - for x, y in zip(xs, ys): - if 0 <= x < target_width: - sweep[x] = float(y) + def _build_sweep(values) -> np.ndarray: + arr = np.full((target_width,), np.nan, dtype=np.float32) + try: + idx = np.asarray(xs, dtype=np.int64) + vals = np.asarray(values, dtype=np.float32) + arr[idx] = vals + except Exception: + for x, y in zip(xs, values): + if 0 <= x < target_width: + arr[x] = float(y) + return arr + + sweep_1: Optional[np.ndarray] = None + sweep_2: Optional[np.ndarray] = None + if ys1 is not None and ys2 is not None and len(ys1) == len(xs) and len(ys2) == len(xs): + sweep_1 = _build_sweep(ys1) + sweep_2 = _build_sweep(ys2) + sweep = sweep_1 - sweep_2 + else: + sweep = _build_sweep(ys) # Метрики валидных точек до заполнения пропусков finite_pre = np.isfinite(sweep) n_valid_cur = int(np.count_nonzero(finite_pre)) @@ -448,6 +477,19 @@ class SweepReader(threading.Thread): sweep *= -1.0 except Exception: pass + + pre_exp_sweep: Optional[np.ndarray] = None + if self._logdetector: + try: + sweep = (sweep - LOG_DETECTOR_OFFSET) * LOG_DETECTOR_SCALER + pre_exp_sweep = sweep.copy() + with np.errstate(over="ignore", invalid="ignore"): + sweep = np.power(LOG_DETECTOR_BASE, np.asarray(sweep, dtype=np.float64)).astype(np.float32) + sweep[~np.isfinite(sweep)] = np.nan + except Exception: + pass + + #print(sweep) #sweep -= float(np.nanmean(sweep)) # Метрики для статусной строки (вид словаря: переменная -> значение) @@ -488,6 +530,11 @@ class SweepReader(threading.Thread): "std": std, "dt_ms": dt_ms, } + if pre_exp_sweep is not None: + info["pre_exp_sweep"] = pre_exp_sweep + if sweep_1 is not None and sweep_2 is not None: + info["sweep_1"] = sweep_1 + info["sweep_2"] = sweep_2 # Кладём готовый свип (если очередь полна — выбрасываем самый старый) try: @@ -502,13 +549,149 @@ class SweepReader(threading.Thread): except Exception: pass - def run(self): - # Состояние текущего свипа + def _run_ascii_stream(self, chunk_reader: SerialChunkReader): xs: list[int] = [] ys: list[int] = [] cur_channel: Optional[int] = None cur_channels: set[int] = set() + buf = bytearray() + while not self._stop.is_set(): + data = chunk_reader.read_available() + if data: + buf += data + else: + time.sleep(0.0005) + continue + + while True: + nl = buf.find(b"\n") + if nl == -1: + break + line = bytes(buf[:nl]) + del buf[: nl + 1] + if line.endswith(b"\r"): + line = line[:-1] + if not line: + continue + + if line.startswith(b"Sweep_start"): + self._finalize_current(xs, ys, cur_channels) + xs.clear() + ys.clear() + cur_channel = None + cur_channels.clear() + continue + + if len(line) >= 3: + parts = line.split() + if len(parts) >= 3 and (parts[0].lower() == b"s" or parts[0].lower().startswith(b"s")): + try: + if parts[0].lower() == b"s": + if len(parts) >= 4: + ch = int(parts[1], 10) + x = int(parts[2], 10) + y = int(parts[3], 10) + else: + ch = 0 + x = int(parts[1], 10) + y = int(parts[2], 10) + else: + ch = int(parts[0][1:], 10) + x = int(parts[1], 10) + y = int(parts[2], 10) + except Exception: + continue + if cur_channel is None: + cur_channel = ch + cur_channels.add(ch) + xs.append(x) + ys.append(y) + + if len(buf) > 1_000_000: + del buf[:-262144] + + self._finalize_current(xs, ys, cur_channels) + + def _run_binary_stream(self, chunk_reader: SerialChunkReader): + xs: list[int] = [] + ys: list[int] = [] + ys1: list[int] = [] + ys2: list[int] = [] + cur_channel: Optional[int] = None + cur_channels: set[int] = set() + words = deque() + + buf = bytearray() + while not self._stop.is_set(): + data = chunk_reader.read_available() + if data: + buf += data + else: + time.sleep(0.0005) + continue + + usable = len(buf) & ~1 + if usable == 0: + continue + + i = 0 + while i < usable: + w = int(buf[i]) | (int(buf[i + 1]) << 8) + words.append(w) + i += 2 + + # Новый бинарный формат: + # - старт: FFFF,FFFF,FFFF,FFFF,FFFF,(CH<<8)|0x0A + # - точка: X,avg1_hi,avg1_lo,avg2_hi,avg2_lo,0x000A + while len(words) >= 6: + w0 = int(words[0]) + w1 = int(words[1]) + w2 = int(words[2]) + w3 = int(words[3]) + w4 = int(words[4]) + w5 = int(words[5]) + + if ( + w0 == 0xFFFF and w1 == 0xFFFF and w2 == 0xFFFF + and w3 == 0xFFFF and w4 == 0xFFFF and (w5 & 0x00FF) == 0x000A + ): + self._finalize_current(xs, ys, cur_channels, ys1=ys1, ys2=ys2) + xs.clear() + ys.clear() + ys1.clear() + ys2.clear() + cur_channels.clear() + cur_channel = (w5 >> 8) & 0x00FF + cur_channels.add(cur_channel) + for _ in range(6): + words.popleft() + continue + + if w5 == 0x000A: + if cur_channel is not None: + cur_channels.add(cur_channel) + xs.append(w0) + avg1_u32 = (w1 << 16) | w2 + avg2_u32 = (w3 << 16) | w4 + avg1 = self._u32_to_i32(avg1_u32) + avg2 = self._u32_to_i32(avg2_u32) + ys1.append(avg1) + ys2.append(avg2) + ys.append(avg1 - avg2) + for _ in range(6): + words.popleft() + continue + + words.popleft() + + del buf[:usable] + if len(buf) > 1_000_000: + del buf[:-262144] + + self._finalize_current(xs, ys, cur_channels, ys1=ys1, ys2=ys2) + + def run(self): try: self._src = SerialLineSource(self._port_path, self._baud, timeout=1.0) sys.stderr.write(f"[info] Открыл порт {self._port_path} ({self._src._using})\n") @@ -517,74 +700,12 @@ class SweepReader(threading.Thread): return try: - # Быстрый неблокирующий дренаж порта с разбором по байтам chunk_reader = SerialChunkReader(self._src) - buf = bytearray() - while not self._stop.is_set(): - data = chunk_reader.read_available() - if data: - buf += data - else: - # Короткая уступка CPU, если нет новых данных - time.sleep(0.0005) - continue - - # Обрабатываем все полные строки - while True: - nl = buf.find(b"\n") - if nl == -1: - break - line = bytes(buf[:nl]) - del buf[: nl + 1] - if line.endswith(b"\r"): - line = line[:-1] - if not line: - continue - - if line.startswith(b"Sweep_start"): - self._finalize_current(xs, ys, cur_channels) - xs.clear() - ys.clear() - cur_channel = None - cur_channels.clear() - continue - - # sCH X Y или s CH X Y (все целые со знаком). Разделяем по любым пробелам/табам. - if len(line) >= 3: - parts = line.split() - if len(parts) >= 3 and (parts[0].lower() == b"s" or parts[0].lower().startswith(b"s")): - try: - if parts[0].lower() == b"s": - if len(parts) >= 4: - ch = int(parts[1], 10) - x = int(parts[2], 10) - y = int(parts[3], 10) # поддержка знака: "+…" и "-…" - else: - ch = 0 - x = int(parts[1], 10) - y = int(parts[2], 10) # поддержка знака: "+…" и "-…" - else: - # формат вида "s0" - ch = int(parts[0][1:], 10) - x = int(parts[1], 10) - y = int(parts[2], 10) # поддержка знака: "+…" и "-…" - except Exception: - continue - if cur_channel is None: - cur_channel = ch - cur_channels.add(ch) - xs.append(x) - ys.append(y) - - # Защита от переполнения буфера при отсутствии переводов строки - if len(buf) > 1_000_000: - del buf[:-262144] + if self._bin_mode: + self._run_binary_stream(chunk_reader) + else: + self._run_ascii_stream(chunk_reader) finally: - try: - # Завершаем оставшийся свип - self._finalize_current(xs, ys, cur_channels) - except Exception: - pass try: if self._src is not None: self._src.close() @@ -611,7 +732,7 @@ def main(): "--spec-clip", default="2,98", help=( - "Процентильная обрезка уровней водопада спектров, % (min,max). " + "Процентильная обрезка уровней водопада спектров, %% (min,max). " "Напр. 2,98. 'off' — отключить" ), ) @@ -648,6 +769,21 @@ def main(): default="projector", help="Тип нормировки: projector (по огибающим в [-1,+1]) или simple (raw/calib)", ) + parser.add_argument( + "--bin", + dest="bin_mode", + action="store_true", + default=True, + help=( + "Бинарный протокол: старт FFFFx5,(CH<<8)|0x0A; " + "точки X,avg1_hi,avg1_lo,avg2_hi,avg2_lo,0x000A (sweep=avg1-avg2)" + ), + ) + parser.add_argument( + "--logdetector", + action="store_true", + help="Лог-детектор: после инверсии ((sweep-OFFSET)*SCALER) и затем BASE**sweep", + ) args = parser.parse_args() @@ -673,7 +809,15 @@ def main(): # Очередь завершённых свипов и поток чтения q: Queue[SweepPacket] = Queue(maxsize=1000) stop_event = threading.Event() - reader = SweepReader(args.port, args.baud, q, stop_event, fancy=bool(args.fancy)) + reader = SweepReader( + args.port, + args.baud, + q, + stop_event, + fancy=bool(args.fancy), + bin_mode=bool(getattr(args, "bin_mode", False)), + logdetector=bool(getattr(args, "logdetector", False)), + ) reader.start() # Графика @@ -685,6 +829,9 @@ def main(): # Состояние для отображения current_sweep_raw: Optional[np.ndarray] = None + current_sweep_1: Optional[np.ndarray] = None + current_sweep_2: Optional[np.ndarray] = None + current_sweep_pre_exp: Optional[np.ndarray] = None current_sweep_norm: Optional[np.ndarray] = None last_calib_sweep: Optional[np.ndarray] = None current_info: Optional[SweepInfo] = None @@ -709,6 +856,7 @@ def main(): contrast_slider = None calib_enabled = False norm_type = str(getattr(args, "norm_type", "projector")).strip().lower() + logdetector_enabled = bool(getattr(args, "logdetector", False)) cb = None # Статусная строка (внизу окна) @@ -724,7 +872,7 @@ def main(): # Линейный график последнего свипа line_obj, = ax_line.plot([], [], lw=1, color="tab:blue") - line_calib_obj, = ax_line.plot([], [], lw=1, color="tab:red") + line_calib_obj, = ax_line.plot([], [], lw=1, color="gold") line_norm_obj, = ax_line.plot([], [], lw=1, color="tab:green") ax_line.set_title("Сырые данные", pad=1) ax_line.set_xlabel("ГГц") @@ -942,7 +1090,7 @@ def main(): y_max_fft = float(fr_max) def drain_queue(): - nonlocal current_sweep_raw, current_sweep_norm, current_info, last_calib_sweep + nonlocal current_sweep_raw, current_sweep_1, current_sweep_2, current_sweep_pre_exp, current_sweep_norm, current_info, last_calib_sweep drained = 0 while True: try: @@ -952,6 +1100,12 @@ def main(): drained += 1 current_sweep_raw = s current_info = info + s1 = info.get("sweep_1") if isinstance(info, dict) else None + s2 = info.get("sweep_2") if isinstance(info, dict) else None + current_sweep_1 = s1 if isinstance(s1, np.ndarray) else None + current_sweep_2 = s2 if isinstance(s2, np.ndarray) else None + pre = info.get("pre_exp_sweep") if isinstance(info, dict) else None + current_sweep_pre_exp = pre if isinstance(pre, np.ndarray) else None ch = 0 try: ch = int(info.get("ch", 0)) if isinstance(info, dict) else 0 @@ -1020,20 +1174,36 @@ def main(): else: xs = np.arange(current_sweep_raw.size, dtype=np.int32) line_obj.set_data(xs, current_sweep_raw) - if last_calib_sweep is not None: - line_calib_obj.set_data(xs[: last_calib_sweep.size], last_calib_sweep) - else: + if current_sweep_1 is not None and current_sweep_2 is not None: + line_calib_obj.set_data(xs[: current_sweep_1.size], current_sweep_1) + line_norm_obj.set_data(xs[: current_sweep_2.size], current_sweep_2) + elif logdetector_enabled: line_calib_obj.set_data([], []) - if current_sweep_norm is not None: - line_norm_obj.set_data(xs[: current_sweep_norm.size], current_sweep_norm) + if current_sweep_pre_exp is not None: + line_norm_obj.set_data(xs[: current_sweep_pre_exp.size], current_sweep_pre_exp) + else: + line_norm_obj.set_data([], []) else: - line_norm_obj.set_data([], []) + if last_calib_sweep is not None: + line_calib_obj.set_data(xs[: last_calib_sweep.size], last_calib_sweep) + else: + line_calib_obj.set_data([], []) + if current_sweep_norm is not None: + line_norm_obj.set_data(xs[: current_sweep_norm.size], current_sweep_norm) + else: + line_norm_obj.set_data([], []) # Лимиты по X: 3.3 ГГц .. 14.3 ГГц ax_line.set_xlim(3.3, 14.3) # Адаптивные Y-лимиты (если не задан --ylim) if fixed_ylim is None: - y0 = float(np.nanmin(current_sweep_raw)) - y1 = float(np.nanmax(current_sweep_raw)) + y_candidates = [current_sweep_raw] + if current_sweep_1 is not None and current_sweep_2 is not None: + y_candidates.extend([current_sweep_1, current_sweep_2]) + elif logdetector_enabled and current_sweep_pre_exp is not None: + y_candidates.append(current_sweep_pre_exp) + y_concat = np.concatenate([np.asarray(v, dtype=np.float32) for v in y_candidates]) + y0 = float(np.nanmin(y_concat)) + y1 = float(np.nanmax(y_concat)) if np.isfinite(y0) and np.isfinite(y1): if y0 == y1: pad = max(1.0, abs(y0) * 0.05) @@ -1168,7 +1338,15 @@ def run_pyqtgraph(args): # Очередь завершённых свипов и поток чтения q: Queue[SweepPacket] = Queue(maxsize=1000) stop_event = threading.Event() - reader = SweepReader(args.port, args.baud, q, stop_event, fancy=bool(args.fancy)) + reader = SweepReader( + args.port, + args.baud, + q, + stop_event, + fancy=bool(args.fancy), + bin_mode=bool(getattr(args, "bin_mode", False)), + logdetector=bool(getattr(args, "logdetector", False)), + ) reader.start() # Настройки скорости @@ -1186,7 +1364,7 @@ def run_pyqtgraph(args): p_line = win.addPlot(row=0, col=0, title="Сырые данные") p_line.showGrid(x=True, y=True, alpha=0.3) curve = p_line.plot(pen=pg.mkPen((80, 120, 255), width=1)) - curve_calib = p_line.plot(pen=pg.mkPen((220, 60, 60), width=1)) + curve_calib = p_line.plot(pen=pg.mkPen((220, 200, 60), width=1)) curve_norm = p_line.plot(pen=pg.mkPen((60, 180, 90), width=1)) p_line.setLabel("bottom", "ГГц") p_line.setLabel("left", "Y") @@ -1244,6 +1422,9 @@ def run_pyqtgraph(args): width: Optional[int] = None x_shared: Optional[np.ndarray] = None current_sweep_raw: Optional[np.ndarray] = None + current_sweep_1: Optional[np.ndarray] = None + current_sweep_2: Optional[np.ndarray] = None + current_sweep_pre_exp: Optional[np.ndarray] = None current_sweep_norm: Optional[np.ndarray] = None last_calib_sweep: Optional[np.ndarray] = None current_info: Optional[SweepInfo] = None @@ -1258,6 +1439,7 @@ def run_pyqtgraph(args): spec_mean_sec = float(getattr(args, "spec_mean_sec", 0.0)) calib_enabled = False norm_type = str(getattr(args, "norm_type", "projector")).strip().lower() + logdetector_enabled = bool(getattr(args, "logdetector", False)) # Диапазон по Y: авто по умолчанию (поддерживает отрицательные значения) fixed_ylim: Optional[Tuple[float, float]] = None if args.ylim: @@ -1377,7 +1559,7 @@ def run_pyqtgraph(args): y_max_fft = float(fr_max) def drain_queue(): - nonlocal current_sweep_raw, current_sweep_norm, current_info, last_calib_sweep + nonlocal current_sweep_raw, current_sweep_1, current_sweep_2, current_sweep_pre_exp, current_sweep_norm, current_info, last_calib_sweep drained = 0 while True: try: @@ -1387,6 +1569,12 @@ def run_pyqtgraph(args): drained += 1 current_sweep_raw = s current_info = info + s1 = info.get("sweep_1") if isinstance(info, dict) else None + s2 = info.get("sweep_2") if isinstance(info, dict) else None + current_sweep_1 = s1 if isinstance(s1, np.ndarray) else None + current_sweep_2 = s2 if isinstance(s2, np.ndarray) else None + pre = info.get("pre_exp_sweep") if isinstance(info, dict) else None + current_sweep_pre_exp = pre if isinstance(pre, np.ndarray) else None ch = 0 try: ch = int(info.get("ch", 0)) if isinstance(info, dict) else 0 @@ -1424,17 +1612,33 @@ def run_pyqtgraph(args): else: xs = np.arange(current_sweep_raw.size) curve.setData(xs, current_sweep_raw, autoDownsample=True) - if last_calib_sweep is not None: - curve_calib.setData(xs[: last_calib_sweep.size], last_calib_sweep, autoDownsample=True) - else: + if current_sweep_1 is not None and current_sweep_2 is not None: + curve_calib.setData(xs[: current_sweep_1.size], current_sweep_1, autoDownsample=True) + curve_norm.setData(xs[: current_sweep_2.size], current_sweep_2, autoDownsample=True) + elif logdetector_enabled: curve_calib.setData([], []) - if current_sweep_norm is not None: - curve_norm.setData(xs[: current_sweep_norm.size], current_sweep_norm, autoDownsample=True) + if current_sweep_pre_exp is not None: + curve_norm.setData(xs[: current_sweep_pre_exp.size], current_sweep_pre_exp, autoDownsample=True) + else: + curve_norm.setData([], []) else: - curve_norm.setData([], []) + if last_calib_sweep is not None: + curve_calib.setData(xs[: last_calib_sweep.size], last_calib_sweep, autoDownsample=True) + else: + curve_calib.setData([], []) + if current_sweep_norm is not None: + curve_norm.setData(xs[: current_sweep_norm.size], current_sweep_norm, autoDownsample=True) + else: + curve_norm.setData([], []) if fixed_ylim is None: - y0 = float(np.nanmin(current_sweep_raw)) - y1 = float(np.nanmax(current_sweep_raw)) + y_candidates = [current_sweep_raw] + if current_sweep_1 is not None and current_sweep_2 is not None: + y_candidates.extend([current_sweep_1, current_sweep_2]) + elif logdetector_enabled and current_sweep_pre_exp is not None: + y_candidates.append(current_sweep_pre_exp) + y_concat = np.concatenate([np.asarray(v, dtype=np.float32) for v in y_candidates]) + y0 = float(np.nanmin(y_concat)) + y1 = float(np.nanmax(y_concat)) if np.isfinite(y0) and np.isfinite(y1): margin = 0.05 * max(1.0, (y1 - y0)) p_line.setYRange(y0 - margin, y1 + margin, padding=0)