From 1e098ffa89683da97b71fb0c13527b4c5dc6f0c7 Mon Sep 17 00:00:00 2001 From: Theodor Chikin Date: Tue, 3 Mar 2026 19:50:44 +0300 Subject: [PATCH] implemented new binary mode (--logscale): 2 32-bit values: avg_1, avg_2. Also implemented log-detector mode: avg_1,2 are processed as lg(signal_power) in def _log_pair_to_sweep. Tuning variables: LOG_BASE, LOG_SCALER, LOG_POSTSCALER. --- RFG_ADC_dataplotter.py | 306 +++++++++++++++++++++++++++++++++-------- 1 file changed, 249 insertions(+), 57 deletions(-) diff --git a/RFG_ADC_dataplotter.py b/RFG_ADC_dataplotter.py index 15c5bec..bedc901 100755 --- a/RFG_ADC_dataplotter.py +++ b/RFG_ADC_dataplotter.py @@ -33,13 +33,18 @@ import numpy as np WF_WIDTH = 1000 # максимальное число точек в ряду водопада FFT_LEN = 1024 # длина БПФ для спектра/водопада спектров +LOG_BASE = 10.0 +LOG_SCALER = 0.001 # int32 значения приходят в fixed-point лог-шкале с шагом 1e-3 +LOG_POSTSCALER = 1000 +LOG_EXP_LIMIT = 300.0 # запас до переполнения float64 при возведении LOG_BASE в степень # Порог для инверсии сырых данных: если среднее значение свипа ниже порога — # считаем, что сигнал «меньше нуля» и домножаем свип на -1 DATA_INVERSION_THRASHOLD = 10.0 Number = Union[int, float] SweepInfo = Dict[str, Any] -SweepPacket = Tuple[np.ndarray, SweepInfo] +SweepAuxCurves = Optional[Tuple[np.ndarray, np.ndarray]] +SweepPacket = Tuple[np.ndarray, SweepInfo, SweepAuxCurves] def _format_status_kv(data: Mapping[str, Any]) -> str: @@ -85,6 +90,44 @@ def _parse_spec_clip(spec: Optional[str]) -> Optional[Tuple[float, float]]: return None +def _log_value_to_linear(value: int) -> float: + """Преобразовать fixed-point логарифмическое значение в линейную шкалу.""" + exponent = max(-LOG_EXP_LIMIT, min(LOG_EXP_LIMIT, float(value) * LOG_SCALER)) + return float(LOG_BASE ** exponent) + + +def _log_pair_to_sweep(avg_1: int, avg_2: int) -> float: + """Разность двух логарифмических усреднений в линейной шкале.""" + return (_log_value_to_linear(avg_1) - _log_value_to_linear(avg_2))*LOG_POSTSCALER + + +def _compute_auto_ylim(*series_list: Optional[np.ndarray]) -> Optional[Tuple[float, float]]: + """Общий Y-диапазон по всем переданным кривым с небольшим запасом.""" + y_min: Optional[float] = None + y_max: Optional[float] = None + for series in series_list: + if series is None: + continue + arr = np.asarray(series) + if arr.size == 0: + continue + finite = arr[np.isfinite(arr)] + if finite.size == 0: + continue + cur_min = float(np.min(finite)) + cur_max = float(np.max(finite)) + y_min = cur_min if y_min is None else min(y_min, cur_min) + y_max = cur_max if y_max is None else max(y_max, cur_max) + + if y_min is None or y_max is None: + return None + if y_min == y_max: + pad = max(1.0, abs(y_min) * 0.05) + else: + pad = 0.05 * (y_max - y_min) + return (y_min - pad, y_max + pad) + + def _normalize_sweep_simple(raw: np.ndarray, calib: np.ndarray) -> np.ndarray: """Простая нормировка: поэлементное деление raw/calib.""" w = min(raw.size, calib.size) @@ -385,6 +428,7 @@ class SweepReader(threading.Thread): stop_event: threading.Event, fancy: bool = False, bin_mode: bool = False, + logscale: bool = False, ): super().__init__(daemon=True) self._port_path = port_path @@ -394,6 +438,7 @@ class SweepReader(threading.Thread): self._src: Optional[SerialLineSource] = None self._fancy = bool(fancy) self._bin_mode = bool(bin_mode) + self._logscale = bool(logscale) self._max_width: int = 0 self._sweep_idx: int = 0 self._last_sweep_ts: Optional[float] = None @@ -404,7 +449,14 @@ class SweepReader(threading.Thread): """Преобразование 32-bit слова в знаковое значение.""" return v - 0x1_0000_0000 if (v & 0x8000_0000) else v - def _finalize_current(self, xs, ys, channels: Optional[set[int]]): + def _finalize_current( + self, + xs, + ys, + channels: Optional[set[int]], + raw_curves: Optional[Tuple[list[int], list[int]]] = None, + apply_inversion: bool = True, + ): if not xs: return ch_list = sorted(channels) if channels else [0] @@ -413,17 +465,43 @@ class SweepReader(threading.Thread): width = max_x + 1 self._max_width = max(self._max_width, width) target_width = self._max_width if self._fancy else width + + def _scatter(values, dtype) -> np.ndarray: + series = np.full((target_width,), np.nan, dtype=dtype) + try: + idx = np.asarray(xs, dtype=np.int64) + vals = np.asarray(values, dtype=dtype) + series[idx] = vals + except Exception: + for x, y in zip(xs, values): + if 0 <= x < target_width: + series[x] = y + return series + + def _fill_missing(series: np.ndarray): + known = ~np.isnan(series) + if not np.any(known): + return + known_idx = np.nonzero(known)[0] + for i0, i1 in zip(known_idx[:-1], known_idx[1:]): + if i1 - i0 > 1: + avg = (series[i0] + series[i1]) * 0.5 + series[i0 + 1 : i1] = avg + first_idx = int(known_idx[0]) + last_idx = int(known_idx[-1]) + if first_idx > 0: + series[:first_idx] = series[first_idx] + if last_idx < series.size - 1: + series[last_idx + 1 :] = series[last_idx] + # Быстрый векторизованный путь - sweep = np.full((target_width,), np.nan, dtype=np.float32) - try: - idx = np.asarray(xs, dtype=np.int64) - vals = np.asarray(ys, dtype=np.float32) - sweep[idx] = vals - except Exception: - # Запасной путь - for x, y in zip(xs, ys): - if 0 <= x < target_width: - sweep[x] = float(y) + sweep = _scatter(ys, np.float32) + aux_curves: SweepAuxCurves = None + if raw_curves is not None: + aux_curves = ( + _scatter(raw_curves[0], np.float32), + _scatter(raw_curves[1], np.float32), + ) # Метрики валидных точек до заполнения пропусков finite_pre = np.isfinite(sweep) n_valid_cur = int(np.count_nonzero(finite_pre)) @@ -431,31 +509,21 @@ class SweepReader(threading.Thread): # Дополнительная обработка пропусков: при --fancy заполняем внутренние разрывы, края и дотягиваем до максимальной длины if self._fancy: try: - known = ~np.isnan(sweep) - if np.any(known): - known_idx = np.nonzero(known)[0] - # Для каждой пары соседних известных индексов заполним промежуток средним значением - for i0, i1 in zip(known_idx[:-1], known_idx[1:]): - if i1 - i0 > 1: - avg = (sweep[i0] + sweep[i1]) * 0.5 - sweep[i0 + 1 : i1] = avg - first_idx = int(known_idx[0]) - last_idx = int(known_idx[-1]) - if first_idx > 0: - sweep[:first_idx] = sweep[first_idx] - if last_idx < sweep.size - 1: - sweep[last_idx + 1 :] = sweep[last_idx] + _fill_missing(sweep) + if aux_curves is not None: + _fill_missing(aux_curves[0]) + _fill_missing(aux_curves[1]) except Exception: # В случае ошибки просто оставляем как есть pass # Инверсия данных при «отрицательном» уровне (среднее ниже порога) - try: - m = float(np.nanmean(sweep)) - if np.isfinite(m) and m < DATA_INVERSION_THRASHOLD: - sweep *= -1.0 - - except Exception: - pass + if apply_inversion: + try: + m = float(np.nanmean(sweep)) + if np.isfinite(m) and m < DATA_INVERSION_THRASHOLD: + sweep *= -1.0 + except Exception: + pass #sweep = np.abs(sweep) #sweep -= float(np.nanmean(sweep)) @@ -500,14 +568,14 @@ class SweepReader(threading.Thread): # Кладём готовый свип (если очередь полна — выбрасываем самый старый) try: - self._q.put_nowait((sweep, info)) + self._q.put_nowait((sweep, info, aux_curves)) except Full: try: _ = self._q.get_nowait() except Exception: pass try: - self._q.put_nowait((sweep, info)) + self._q.put_nowait((sweep, info, aux_curves)) except Exception: pass @@ -653,6 +721,99 @@ class SweepReader(threading.Thread): self._finalize_current(xs, ys, cur_channels) + def _run_logscale_binary_stream(self, chunk_reader: SerialChunkReader): + xs: list[int] = [] + ys: list[float] = [] + avg_1_vals: list[int] = [] + avg_2_vals: list[int] = [] + cur_channel: Optional[int] = None + cur_channels: set[int] = set() + words = deque() + + buf = bytearray() + while not self._stop.is_set(): + data = chunk_reader.read_available() + if data: + buf += data + else: + time.sleep(0.0005) + continue + + usable = len(buf) & ~1 + if usable == 0: + continue + + i = 0 + while i < usable: + w = int(buf[i]) | (int(buf[i + 1]) << 8) + words.append(w) + i += 2 + + # Бинарный logscale-протокол: + # старт свипа: 0xFFFF x5, затем (ch<<8)|0x0A + # точка: step, avg1_hi, avg1_lo, avg2_hi, avg2_lo, 0x000A + while len(words) >= 6: + w0 = int(words[0]) + w1 = int(words[1]) + w2 = int(words[2]) + w3 = int(words[3]) + w4 = int(words[4]) + w5 = int(words[5]) + + if ( + w0 == 0xFFFF + and w1 == 0xFFFF + and w2 == 0xFFFF + and w3 == 0xFFFF + and w4 == 0xFFFF + and (w5 & 0x00FF) == 0x000A + ): + self._finalize_current( + xs, + ys, + cur_channels, + raw_curves=(avg_1_vals, avg_2_vals), + apply_inversion=False, + ) + xs.clear() + ys.clear() + avg_1_vals.clear() + avg_2_vals.clear() + cur_channels.clear() + cur_channel = (w5 >> 8) & 0x00FF + cur_channels.add(cur_channel) + for _ in range(6): + words.popleft() + continue + + if w5 == 0x000A: + if cur_channel is not None: + cur_channels.add(cur_channel) + avg_1 = self._u32_to_i32((w1 << 16) | w2) + avg_2 = self._u32_to_i32((w3 << 16) | w4) + xs.append(w0) + avg_1_vals.append(avg_1) + avg_2_vals.append(avg_2) + ys.append(_log_pair_to_sweep(avg_1, avg_2)) + #ys.append(LOG_BASE**(avg_1/LOG_SCALER) - LOG_BASE**(avg_2/LOG_SCALER)) + for _ in range(6): + words.popleft() + continue + + words.popleft() + + del buf[:usable] + if len(buf) > 1_000_000: + del buf[:-262144] + + self._finalize_current( + xs, + ys, + cur_channels, + raw_curves=(avg_1_vals, avg_2_vals), + apply_inversion=False, + ) + def run(self): try: self._src = SerialLineSource(self._port_path, self._baud, timeout=1.0) @@ -663,7 +824,9 @@ class SweepReader(threading.Thread): try: chunk_reader = SerialChunkReader(self._src) - if self._bin_mode: + if self._logscale: + self._run_logscale_binary_stream(chunk_reader) + elif self._bin_mode: self._run_binary_stream(chunk_reader) else: self._run_ascii_stream(chunk_reader) @@ -740,6 +903,14 @@ def main(): "точки step,uint32(hi16,lo16),0x000A" ), ) + parser.add_argument( + "--logscale", + action="store_true", + help=( + "Новый бинарный протокол: точка несёт пару int32 (avg_1, avg_2), " + "а свип считается как 10**(avg_1*0.001) - 10**(avg_2*0.001)" + ), + ) args = parser.parse_args() @@ -772,6 +943,7 @@ def main(): stop_event, fancy=bool(args.fancy), bin_mode=bool(args.bin_mode), + logscale=bool(args.logscale), ) reader.start() @@ -784,6 +956,7 @@ def main(): # Состояние для отображения current_sweep_raw: Optional[np.ndarray] = None + current_aux_curves: SweepAuxCurves = None current_sweep_norm: Optional[np.ndarray] = None last_calib_sweep: Optional[np.ndarray] = None current_info: Optional[SweepInfo] = None @@ -822,6 +995,8 @@ def main(): ) # Линейный график последнего свипа + line_avg1_obj, = ax_line.plot([], [], lw=1, color="0.65") + line_avg2_obj, = ax_line.plot([], [], lw=1, color="0.45") line_obj, = ax_line.plot([], [], lw=1, color="tab:blue") line_calib_obj, = ax_line.plot([], [], lw=1, color="tab:red") line_norm_obj, = ax_line.plot([], [], lw=1, color="tab:green") @@ -1041,15 +1216,16 @@ def main(): y_max_fft = float(fr_max) def drain_queue(): - nonlocal current_sweep_raw, current_sweep_norm, current_info, last_calib_sweep + nonlocal current_sweep_raw, current_aux_curves, current_sweep_norm, current_info, last_calib_sweep drained = 0 while True: try: - s, info = q.get_nowait() + s, info, aux_curves = q.get_nowait() except Empty: break drained += 1 current_sweep_raw = s + current_aux_curves = aux_curves current_info = info ch = 0 try: @@ -1119,6 +1295,13 @@ def main(): else: xs = np.arange(current_sweep_raw.size, dtype=np.int32) line_obj.set_data(xs, current_sweep_raw) + if current_aux_curves is not None: + avg_1_curve, avg_2_curve = current_aux_curves + line_avg1_obj.set_data(xs[: avg_1_curve.size], avg_1_curve) + line_avg2_obj.set_data(xs[: avg_2_curve.size], avg_2_curve) + else: + line_avg1_obj.set_data([], []) + line_avg2_obj.set_data([], []) if last_calib_sweep is not None: line_calib_obj.set_data(xs[: last_calib_sweep.size], last_calib_sweep) else: @@ -1131,18 +1314,12 @@ def main(): ax_line.set_xlim(3.3, 14.3) # Адаптивные Y-лимиты (если не задан --ylim) if fixed_ylim is None: - y0 = float(np.nanmin(current_sweep_raw)) - y1 = float(np.nanmax(current_sweep_raw)) - if np.isfinite(y0) and np.isfinite(y1): - if y0 == y1: - pad = max(1.0, abs(y0) * 0.05) - y0 -= pad - y1 += pad - else: - pad = 0.05 * (y1 - y0) - y0 -= pad - y1 += pad - ax_line.set_ylim(y0, y1) + y_series = [current_sweep_raw, last_calib_sweep, current_sweep_norm] + if current_aux_curves is not None: + y_series.extend(current_aux_curves) + y_limits = _compute_auto_ylim(*y_series) + if y_limits is not None: + ax_line.set_ylim(y_limits[0], y_limits[1]) # Обновление спектра текущего свипа sweep_for_fft = current_sweep_norm if current_sweep_norm is not None else current_sweep_raw @@ -1232,6 +1409,8 @@ def main(): # Возвращаем обновлённые артисты return ( line_obj, + line_avg1_obj, + line_avg2_obj, line_calib_obj, line_norm_obj, img_obj, @@ -1274,6 +1453,7 @@ def run_pyqtgraph(args): stop_event, fancy=bool(args.fancy), bin_mode=bool(args.bin_mode), + logscale=bool(args.logscale), ) reader.start() @@ -1291,6 +1471,8 @@ def run_pyqtgraph(args): # Плот последнего свипа (слева-сверху) p_line = win.addPlot(row=0, col=0, title="Сырые данные") p_line.showGrid(x=True, y=True, alpha=0.3) + curve_avg1 = p_line.plot(pen=pg.mkPen((170, 170, 170), width=1)) + curve_avg2 = p_line.plot(pen=pg.mkPen((110, 110, 110), width=1)) curve = p_line.plot(pen=pg.mkPen((80, 120, 255), width=1)) curve_calib = p_line.plot(pen=pg.mkPen((220, 60, 60), width=1)) curve_norm = p_line.plot(pen=pg.mkPen((60, 180, 90), width=1)) @@ -1350,6 +1532,7 @@ def run_pyqtgraph(args): width: Optional[int] = None x_shared: Optional[np.ndarray] = None current_sweep_raw: Optional[np.ndarray] = None + current_aux_curves: SweepAuxCurves = None current_sweep_norm: Optional[np.ndarray] = None last_calib_sweep: Optional[np.ndarray] = None current_info: Optional[SweepInfo] = None @@ -1483,15 +1666,16 @@ def run_pyqtgraph(args): y_max_fft = float(fr_max) def drain_queue(): - nonlocal current_sweep_raw, current_sweep_norm, current_info, last_calib_sweep + nonlocal current_sweep_raw, current_aux_curves, current_sweep_norm, current_info, last_calib_sweep drained = 0 while True: try: - s, info = q.get_nowait() + s, info, aux_curves = q.get_nowait() except Empty: break drained += 1 current_sweep_raw = s + current_aux_curves = aux_curves current_info = info ch = 0 try: @@ -1530,6 +1714,13 @@ def run_pyqtgraph(args): else: xs = np.arange(current_sweep_raw.size) curve.setData(xs, current_sweep_raw, autoDownsample=True) + if current_aux_curves is not None: + avg_1_curve, avg_2_curve = current_aux_curves + curve_avg1.setData(xs[: avg_1_curve.size], avg_1_curve, autoDownsample=True) + curve_avg2.setData(xs[: avg_2_curve.size], avg_2_curve, autoDownsample=True) + else: + curve_avg1.setData([], []) + curve_avg2.setData([], []) if last_calib_sweep is not None: curve_calib.setData(xs[: last_calib_sweep.size], last_calib_sweep, autoDownsample=True) else: @@ -1539,11 +1730,12 @@ def run_pyqtgraph(args): else: curve_norm.setData([], []) if fixed_ylim is None: - y0 = float(np.nanmin(current_sweep_raw)) - y1 = float(np.nanmax(current_sweep_raw)) - if np.isfinite(y0) and np.isfinite(y1): - margin = 0.05 * max(1.0, (y1 - y0)) - p_line.setYRange(y0 - margin, y1 + margin, padding=0) + y_series = [current_sweep_raw, last_calib_sweep, current_sweep_norm] + if current_aux_curves is not None: + y_series.extend(current_aux_curves) + y_limits = _compute_auto_ylim(*y_series) + if y_limits is not None: + p_line.setYRange(y_limits[0], y_limits[1], padding=0) # Обновим спектр sweep_for_fft = current_sweep_norm if current_sweep_norm is not None else current_sweep_raw