From ea57f879200faf3de5e8aad74f5aa26f6a54e2c1 Mon Sep 17 00:00:00 2001 From: awe Date: Wed, 11 Feb 2026 18:27:12 +0300 Subject: [PATCH] new graph style --- RFG_ADC_dataplotter.py | 41 ++++++------- rfg_adc_plotter/gui/matplotlib_backend.py | 54 ++++++++++------- rfg_adc_plotter/gui/pyqtgraph_backend.py | 72 +++++++++++++++++------ rfg_adc_plotter/processing/normalizer.py | 40 ++++++------- rfg_adc_plotter/state/ring_buffer.py | 21 +++---- 5 files changed, 131 insertions(+), 97 deletions(-) diff --git a/RFG_ADC_dataplotter.py b/RFG_ADC_dataplotter.py index 80fbf32..d4cade9 100755 --- a/RFG_ADC_dataplotter.py +++ b/RFG_ADC_dataplotter.py @@ -1019,31 +1019,24 @@ def main(): xs = x_shared[: current_sweep_raw.size] else: xs = np.arange(current_sweep_raw.size, dtype=np.int32) - line_obj.set_data(xs, current_sweep_raw) + def _norm_to_max(data): + m = float(np.nanmax(np.abs(data))) + return data / m if m > 0.0 else data + line_obj.set_data(xs, _norm_to_max(current_sweep_raw)) if last_calib_sweep is not None: - line_calib_obj.set_data(xs[: last_calib_sweep.size], last_calib_sweep) + line_calib_obj.set_data(xs[: last_calib_sweep.size], _norm_to_max(last_calib_sweep)) else: line_calib_obj.set_data([], []) if current_sweep_norm is not None: - line_norm_obj.set_data(xs[: current_sweep_norm.size], current_sweep_norm) + line_norm_obj.set_data(xs[: current_sweep_norm.size], _norm_to_max(current_sweep_norm)) else: line_norm_obj.set_data([], []) # Лимиты по X постоянные под текущую ширину ax_line.set_xlim(0, max(1, current_sweep_raw.size - 1)) - # Адаптивные Y-лимиты (если не задан --ylim) + # Фиксированные Y-лимиты после нормировки на максимум if fixed_ylim is None: - y0 = float(np.nanmin(current_sweep_raw)) - y1 = float(np.nanmax(current_sweep_raw)) - if np.isfinite(y0) and np.isfinite(y1): - if y0 == y1: - pad = max(1.0, abs(y0) * 0.05) - y0 -= pad - y1 += pad - else: - pad = 0.05 * (y1 - y0) - y0 -= pad - y1 += pad - ax_line.set_ylim(y0, y1) + ax_line.set_ylim(-1.05, 1.05) + ax_line.set_ylabel("/ max") # Обновление спектра текущего свипа sweep_for_fft = current_sweep_norm if current_sweep_norm is not None else current_sweep_raw @@ -1422,21 +1415,21 @@ def run_pyqtgraph(args): xs = x_shared[: current_sweep_raw.size] else: xs = np.arange(current_sweep_raw.size) - curve.setData(xs, current_sweep_raw, autoDownsample=True) + def _norm_to_max(data): + m = float(np.nanmax(np.abs(data))) + return data / m if m > 0.0 else data + curve.setData(xs, _norm_to_max(current_sweep_raw), autoDownsample=True) if last_calib_sweep is not None: - curve_calib.setData(xs[: last_calib_sweep.size], last_calib_sweep, autoDownsample=True) + curve_calib.setData(xs[: last_calib_sweep.size], _norm_to_max(last_calib_sweep), autoDownsample=True) else: curve_calib.setData([], []) if current_sweep_norm is not None: - curve_norm.setData(xs[: current_sweep_norm.size], current_sweep_norm, autoDownsample=True) + curve_norm.setData(xs[: current_sweep_norm.size], _norm_to_max(current_sweep_norm), autoDownsample=True) else: curve_norm.setData([], []) if fixed_ylim is None: - y0 = float(np.nanmin(current_sweep_raw)) - y1 = float(np.nanmax(current_sweep_raw)) - if np.isfinite(y0) and np.isfinite(y1): - margin = 0.05 * max(1.0, (y1 - y0)) - p_line.setYRange(y0 - margin, y1 + margin, padding=0) + p_line.setYRange(-1.05, 1.05, padding=0) + p_line.setLabel("left", "/ max") # Обновим спектр sweep_for_fft = current_sweep_norm if current_sweep_norm is not None else current_sweep_raw diff --git a/rfg_adc_plotter/gui/matplotlib_backend.py b/rfg_adc_plotter/gui/matplotlib_backend.py index c6664b0..1f80a8a 100644 --- a/rfg_adc_plotter/gui/matplotlib_backend.py +++ b/rfg_adc_plotter/gui/matplotlib_backend.py @@ -9,6 +9,7 @@ import numpy as np from rfg_adc_plotter.constants import FFT_LEN from rfg_adc_plotter.io.sweep_reader import SweepReader +from rfg_adc_plotter.processing.normalizer import build_calib_envelopes from rfg_adc_plotter.state.app_state import AppState, format_status from rfg_adc_plotter.state.ring_buffer import RingBuffer from rfg_adc_plotter.types import SweepPacket @@ -112,10 +113,11 @@ def run_matplotlib(args): # График последнего свипа line_obj, = ax_line.plot([], [], lw=1, color="tab:blue") - line_calib_obj, = ax_line.plot([], [], lw=1, color="tab:red") line_norm_obj, = ax_line.plot([], [], lw=1, color="tab:green") + line_env_lo, = ax_line.plot([], [], lw=1, color="tab:orange", linestyle="--", alpha=0.7) + line_env_hi, = ax_line.plot([], [], lw=1, color="tab:orange", linestyle="--", alpha=0.7) ax_line.set_title("Сырые данные", pad=1) - ax_line.set_xlabel("F") + ax_line.set_xlabel("Частота, ГГц") channel_text = ax_line.text( 0.98, 0.98, "", transform=ax_line.transAxes, ha="right", va="top", fontsize=9, family="monospace", @@ -184,15 +186,18 @@ def run_matplotlib(args): except Exception: calib_cb = None + FREQ_MIN = 3.323 + FREQ_MAX = 14.323 + # --- Инициализация imshow при первом свипе --- def _init_imshow_extents(): w = ring.width ms = ring.max_sweeps fb = ring.fft_bins img_obj.set_data(np.zeros((w, ms), dtype=np.float32)) - img_obj.set_extent((0, ms - 1, 0, w - 1 if w > 0 else 1)) + img_obj.set_extent((0, ms - 1, FREQ_MIN, FREQ_MAX)) ax_img.set_xlim(0, ms - 1) - ax_img.set_ylim(0, max(1, w - 1)) + ax_img.set_ylim(FREQ_MIN, FREQ_MAX) img_fft_obj.set_data(np.zeros((fb, ms), dtype=np.float32)) img_fft_obj.set_extent((0, ms - 1, 0, fb - 1)) ax_spec.set_xlim(0, ms - 1) @@ -214,29 +219,29 @@ def run_matplotlib(args): xs = ring.x_shared[: raw.size] else: xs = np.arange(raw.size, dtype=np.int32) - line_obj.set_data(xs, raw) + def _norm_to_max(data): + m = float(np.nanmax(np.abs(data))) + return data / m if m > 0.0 else data + line_obj.set_data(xs, _norm_to_max(raw)) if state.last_calib_sweep is not None: - line_calib_obj.set_data(xs[: state.last_calib_sweep.size], state.last_calib_sweep) + calib = state.last_calib_sweep + m_calib = float(np.nanmax(np.abs(calib))) + if m_calib <= 0.0: + m_calib = 1.0 + lower, upper = build_calib_envelopes(calib) + line_env_lo.set_data(xs[: calib.size], lower / m_calib) + line_env_hi.set_data(xs[: calib.size], upper / m_calib) else: - line_calib_obj.set_data([], []) + line_env_lo.set_data([], []) + line_env_hi.set_data([], []) if state.current_sweep_norm is not None: - line_norm_obj.set_data(xs[: state.current_sweep_norm.size], state.current_sweep_norm) + line_norm_obj.set_data(xs[: state.current_sweep_norm.size], _norm_to_max(state.current_sweep_norm)) else: line_norm_obj.set_data([], []) - ax_line.set_xlim(0, max(1, raw.size - 1)) + ax_line.set_xlim(FREQ_MIN, FREQ_MAX) if fixed_ylim is None: - y0 = float(np.nanmin(raw)) - y1 = float(np.nanmax(raw)) - if np.isfinite(y0) and np.isfinite(y1): - if y0 == y1: - pad = max(1.0, abs(y0) * 0.05) - y0 -= pad - y1 += pad - else: - pad = 0.05 * (y1 - y0) - y0 -= pad - y1 += pad - ax_line.set_ylim(y0, y1) + ax_line.set_ylim(-1.05, 1.05) + ax_line.set_ylabel("/ max") # Спектр — используем уже вычисленный в ring FFT if ring.last_fft_vals is not None and ring.freq_shared is not None: @@ -252,7 +257,12 @@ def run_matplotlib(args): # Водопад сырых данных if changed and ring.is_ready: disp = ring.get_display_ring() + if ring.x_shared is not None: + n = ring.x_shared.size + disp = disp[:n, :] img_obj.set_data(disp) + img_obj.set_extent((0, ring.max_sweeps - 1, FREQ_MIN, FREQ_MAX)) + ax_img.set_ylim(FREQ_MIN, FREQ_MAX) levels = _visible_levels(disp, ax_img) if levels is not None: img_obj.set_clim(vmin=levels[0], vmax=levels[1]) @@ -276,7 +286,7 @@ def run_matplotlib(args): status_text.set_text(format_status(state.current_info)) channel_text.set_text(state.format_channel_label()) - return (line_obj, line_calib_obj, line_norm_obj, img_obj, fft_line_obj, img_fft_obj, status_text, channel_text) + return (line_obj, line_norm_obj, line_env_lo, line_env_hi, img_obj, fft_line_obj, img_fft_obj, status_text, channel_text) ani = FuncAnimation(fig, update, interval=interval_ms, blit=False) plt.show() diff --git a/rfg_adc_plotter/gui/pyqtgraph_backend.py b/rfg_adc_plotter/gui/pyqtgraph_backend.py index afe01eb..54136be 100644 --- a/rfg_adc_plotter/gui/pyqtgraph_backend.py +++ b/rfg_adc_plotter/gui/pyqtgraph_backend.py @@ -8,6 +8,7 @@ from typing import Optional, Tuple import numpy as np from rfg_adc_plotter.io.sweep_reader import SweepReader +from rfg_adc_plotter.processing.normalizer import build_calib_envelopes from rfg_adc_plotter.state.app_state import AppState, format_status from rfg_adc_plotter.state.ring_buffer import RingBuffer from rfg_adc_plotter.types import SweepPacket @@ -39,8 +40,17 @@ def _parse_spec_clip(spec: Optional[str]) -> Optional[Tuple[float, float]]: return None -def _visible_levels(data: np.ndarray, plot_item) -> Optional[Tuple[float, float]]: - """(vmin, vmax) по текущей видимой области ImageItem.""" +def _visible_levels( + data: np.ndarray, + plot_item, + freq_min: Optional[float] = None, + freq_max: Optional[float] = None, +) -> Optional[Tuple[float, float]]: + """(vmin, vmax) по текущей видимой области ImageItem. + + Если freq_min/freq_max заданы, ось Y трактуется как частота [freq_min..freq_max] + и пересчитывается в индексы строк данных. + """ if data.size == 0: return None ny, nx = data.shape[0], data.shape[1] @@ -53,8 +63,13 @@ def _visible_levels(data: np.ndarray, plot_item) -> Optional[Tuple[float, float] ymin, ymax = sorted((float(y0), float(y1))) ix0 = max(0, min(nx - 1, int(np.floor(xmin)))) ix1 = max(0, min(nx - 1, int(np.ceil(xmax)))) - iy0 = max(0, min(ny - 1, int(np.floor(ymin)))) - iy1 = max(0, min(ny - 1, int(np.ceil(ymax)))) + if freq_min is not None and freq_max is not None and freq_max > freq_min: + span = freq_max - freq_min + iy0 = max(0, min(ny - 1, int(np.floor((ymin - freq_min) / span * ny)))) + iy1 = max(0, min(ny - 1, int(np.ceil((ymax - freq_min) / span * ny)))) + else: + iy0 = max(0, min(ny - 1, int(np.floor(ymin)))) + iy1 = max(0, min(ny - 1, int(np.ceil(ymax)))) if ix1 < ix0: ix1 = ix0 if iy1 < iy0: @@ -111,10 +126,13 @@ def run_pyqtgraph(args): p_line = win.addPlot(row=0, col=0, title="Сырые данные") p_line.showGrid(x=True, y=True, alpha=0.3) curve = p_line.plot(pen=pg.mkPen((80, 120, 255), width=1)) - curve_calib = p_line.plot(pen=pg.mkPen((220, 60, 60), width=1)) curve_norm = p_line.plot(pen=pg.mkPen((60, 180, 90), width=1)) - p_line.setLabel("bottom", "X") + curve_env_lo = p_line.plot(pen=pg.mkPen((255, 165, 0), width=1, style=QtCore.Qt.DashLine)) + curve_env_hi = p_line.plot(pen=pg.mkPen((255, 165, 0), width=1, style=QtCore.Qt.DashLine)) + p_line.setLabel("bottom", "Частота, ГГц") p_line.setLabel("left", "Y") + p_line.setXRange(3.323, 14.323, padding=0) + p_line.enableAutoRange(axis="x", enable=False) ch_text = pg.TextItem("", anchor=(1, 1)) ch_text.setZValue(10) p_line.addItem(ch_text) @@ -130,7 +148,8 @@ def run_pyqtgraph(args): p_img.getAxis("bottom").setStyle(showValues=False) except Exception: pass - p_img.setLabel("left", "X (0 снизу)") + p_img.setLabel("left", "Частота, ГГц") + p_img.enableAutoRange(enable=False) img = pg.ImageItem() p_img.addItem(img) @@ -174,17 +193,24 @@ def run_pyqtgraph(args): _imshow_initialized = [False] + FREQ_MIN = 3.323 + FREQ_MAX = 14.323 + def _init_imshow_extents(): w = ring.width ms = ring.max_sweeps fb = ring.fft_bins img.setImage(ring.ring.T, autoLevels=False) - p_img.setRange(xRange=(0, ms - 1), yRange=(0, max(1, w - 1)), padding=0) - p_line.setXRange(0, max(1, w - 1), padding=0) + img.setRect(pg.QtCore.QRectF(0.0, FREQ_MIN, float(ms), FREQ_MAX - FREQ_MIN)) + p_img.setRange(xRange=(0, ms - 1), yRange=(FREQ_MIN, FREQ_MAX), padding=0) + p_line.setXRange(FREQ_MIN, FREQ_MAX, padding=0) img_fft.setImage(ring.ring_fft.T, autoLevels=False) p_spec.setRange(xRange=(0, ms - 1), yRange=(0, max(1, fb - 1)), padding=0) p_fft.setXRange(0, max(1, fb - 1), padding=0) + def _img_rect(ms: int) -> "pg.QtCore.QRectF": + return pg.QtCore.QRectF(0.0, FREQ_MIN, float(ms), FREQ_MAX - FREQ_MIN) + def update(): changed = state.drain_queue(q, ring) > 0 @@ -196,21 +222,28 @@ def run_pyqtgraph(args): if state.current_sweep_raw is not None and ring.x_shared is not None: raw = state.current_sweep_raw xs = ring.x_shared[: raw.size] if raw.size <= ring.x_shared.size else np.arange(raw.size) - curve.setData(xs, raw, autoDownsample=True) + def _norm_to_max(data): + m = float(np.nanmax(np.abs(data))) + return data / m if m > 0.0 else data + curve.setData(xs, _norm_to_max(raw), autoDownsample=True) if state.last_calib_sweep is not None: - curve_calib.setData(xs[: state.last_calib_sweep.size], state.last_calib_sweep, autoDownsample=True) + calib = state.last_calib_sweep + m_calib = float(np.nanmax(np.abs(calib))) + if m_calib <= 0.0: + m_calib = 1.0 + lower, upper = build_calib_envelopes(calib) + curve_env_lo.setData(xs[: calib.size], lower / m_calib, autoDownsample=True) + curve_env_hi.setData(xs[: calib.size], upper / m_calib, autoDownsample=True) else: - curve_calib.setData([], []) + curve_env_lo.setData([], []) + curve_env_hi.setData([], []) if state.current_sweep_norm is not None: - curve_norm.setData(xs[: state.current_sweep_norm.size], state.current_sweep_norm, autoDownsample=True) + curve_norm.setData(xs[: state.current_sweep_norm.size], _norm_to_max(state.current_sweep_norm), autoDownsample=True) else: curve_norm.setData([], []) if fixed_ylim is None: - y0 = float(np.nanmin(raw)) - y1 = float(np.nanmax(raw)) - if np.isfinite(y0) and np.isfinite(y1): - margin = 0.05 * max(1.0, (y1 - y0)) - p_line.setYRange(y0 - margin, y1 + margin, padding=0) + p_line.setYRange(-1.05, 1.05, padding=0) + p_line.setLabel("left", "/ max") # Спектр — используем уже вычисленный в ring FFT if ring.last_fft_vals is not None and ring.freq_shared is not None: @@ -233,11 +266,12 @@ def run_pyqtgraph(args): # Водопад сырых данных — новые данные справа (без реверса) if changed and ring.is_ready: disp = ring.get_display_ring() # (width, time), новые справа - levels = _visible_levels(disp, p_img) + levels = _visible_levels(disp, p_img, FREQ_MIN, FREQ_MAX) if levels is not None: img.setImage(disp, autoLevels=False, levels=levels) else: img.setImage(disp, autoLevels=False) + img.setRect(_img_rect(ring.max_sweeps)) # Статус и подпись канала if changed and state.current_info: diff --git a/rfg_adc_plotter/processing/normalizer.py b/rfg_adc_plotter/processing/normalizer.py index 10780d5..5d9c675 100644 --- a/rfg_adc_plotter/processing/normalizer.py +++ b/rfg_adc_plotter/processing/normalizer.py @@ -18,7 +18,11 @@ def normalize_simple(raw: np.ndarray, calib: np.ndarray) -> np.ndarray: def build_calib_envelopes(calib: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: - """Оценить нижнюю/верхнюю огибающие калибровочной кривой.""" + """Оценить огибающую по модулю сигнала. + + Возвращает (lower, upper) = (-envelope, +envelope), где envelope — + интерполяция через локальные максимумы |calib|. + """ n = int(calib.size) if n <= 0: empty = np.zeros((0,), dtype=np.float32) @@ -35,11 +39,14 @@ def build_calib_envelopes(calib: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: y = y.copy() y[~finite] = np.interp(x[~finite], x[finite], y[finite]).astype(np.float32) - if n < 3: - return y.copy(), y.copy() + a = np.abs(y) - dy = np.diff(y) - s = np.sign(dy).astype(np.int8, copy=False) + if n < 3: + env = a.copy() + return -env, env + + da = np.diff(a) + s = np.sign(da).astype(np.int8, copy=False) if np.any(s == 0): for i in range(1, s.size): @@ -51,27 +58,16 @@ def build_calib_envelopes(calib: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: s[s == 0] = 1 max_idx = np.where((s[:-1] > 0) & (s[1:] < 0))[0] + 1 - min_idx = np.where((s[:-1] < 0) & (s[1:] > 0))[0] + 1 x = np.arange(n, dtype=np.float32) - def _interp_nodes(nodes: np.ndarray) -> np.ndarray: - if nodes.size == 0: - idx = np.array([0, n - 1], dtype=np.int64) - else: - idx = np.unique(np.concatenate(([0], nodes, [n - 1]))).astype(np.int64) - return np.interp(x, idx.astype(np.float32), y[idx]).astype(np.float32) + if max_idx.size == 0: + idx = np.array([0, n - 1], dtype=np.int64) + else: + idx = np.unique(np.concatenate(([0], max_idx, [n - 1]))).astype(np.int64) + env = np.interp(x, idx.astype(np.float32), a[idx]).astype(np.float32) - upper = _interp_nodes(max_idx) - lower = _interp_nodes(min_idx) - - swap = lower > upper - if np.any(swap): - tmp = upper[swap].copy() - upper[swap] = lower[swap] - lower[swap] = tmp - - return lower, upper + return -env, env def normalize_projector(raw: np.ndarray, calib: np.ndarray) -> np.ndarray: diff --git a/rfg_adc_plotter/state/ring_buffer.py b/rfg_adc_plotter/state/ring_buffer.py index 56d3334..1b9ecd5 100644 --- a/rfg_adc_plotter/state/ring_buffer.py +++ b/rfg_adc_plotter/state/ring_buffer.py @@ -37,16 +37,17 @@ class RingBuffer: return self.ring is not None def ensure_init(self, sweep_width: int): - """Инициализировать буферы при первом свипе. Повторные вызовы — no-op.""" - if self.ring is not None: - return - self.width = WF_WIDTH - self.x_shared = np.arange(self.width, dtype=np.int32) - self.ring = np.full((self.max_sweeps, self.width), np.nan, dtype=np.float32) - self.ring_time = np.full((self.max_sweeps,), np.nan, dtype=np.float64) - self.ring_fft = np.full((self.max_sweeps, self.fft_bins), np.nan, dtype=np.float32) - self.freq_shared = np.arange(self.fft_bins, dtype=np.int32) - self.head = 0 + """Инициализировать буферы при первом свипе. Повторные вызовы — no-op (кроме x_shared).""" + if self.ring is None: + self.width = WF_WIDTH + self.ring = np.full((self.max_sweeps, self.width), np.nan, dtype=np.float32) + self.ring_time = np.full((self.max_sweeps,), np.nan, dtype=np.float64) + self.ring_fft = np.full((self.max_sweeps, self.fft_bins), np.nan, dtype=np.float32) + self.freq_shared = np.arange(self.fft_bins, dtype=np.int32) + self.head = 0 + # Обновляем x_shared если пришёл свип большего размера + if self.x_shared is None or sweep_width > self.x_shared.size: + self.x_shared = np.linspace(3.323, 14.323, sweep_width, dtype=np.float32) def push(self, s: np.ndarray): """Добавить строку свипа в кольцевой буфер, вычислить FFT-строку."""