new graph style
This commit is contained in:
@ -9,6 +9,7 @@ import numpy as np
|
||||
|
||||
from rfg_adc_plotter.constants import FFT_LEN
|
||||
from rfg_adc_plotter.io.sweep_reader import SweepReader
|
||||
from rfg_adc_plotter.processing.normalizer import build_calib_envelopes
|
||||
from rfg_adc_plotter.state.app_state import AppState, format_status
|
||||
from rfg_adc_plotter.state.ring_buffer import RingBuffer
|
||||
from rfg_adc_plotter.types import SweepPacket
|
||||
@ -112,10 +113,11 @@ def run_matplotlib(args):
|
||||
|
||||
# График последнего свипа
|
||||
line_obj, = ax_line.plot([], [], lw=1, color="tab:blue")
|
||||
line_calib_obj, = ax_line.plot([], [], lw=1, color="tab:red")
|
||||
line_norm_obj, = ax_line.plot([], [], lw=1, color="tab:green")
|
||||
line_env_lo, = ax_line.plot([], [], lw=1, color="tab:orange", linestyle="--", alpha=0.7)
|
||||
line_env_hi, = ax_line.plot([], [], lw=1, color="tab:orange", linestyle="--", alpha=0.7)
|
||||
ax_line.set_title("Сырые данные", pad=1)
|
||||
ax_line.set_xlabel("F")
|
||||
ax_line.set_xlabel("Частота, ГГц")
|
||||
channel_text = ax_line.text(
|
||||
0.98, 0.98, "", transform=ax_line.transAxes,
|
||||
ha="right", va="top", fontsize=9, family="monospace",
|
||||
@ -184,15 +186,18 @@ def run_matplotlib(args):
|
||||
except Exception:
|
||||
calib_cb = None
|
||||
|
||||
FREQ_MIN = 3.323
|
||||
FREQ_MAX = 14.323
|
||||
|
||||
# --- Инициализация imshow при первом свипе ---
|
||||
def _init_imshow_extents():
|
||||
w = ring.width
|
||||
ms = ring.max_sweeps
|
||||
fb = ring.fft_bins
|
||||
img_obj.set_data(np.zeros((w, ms), dtype=np.float32))
|
||||
img_obj.set_extent((0, ms - 1, 0, w - 1 if w > 0 else 1))
|
||||
img_obj.set_extent((0, ms - 1, FREQ_MIN, FREQ_MAX))
|
||||
ax_img.set_xlim(0, ms - 1)
|
||||
ax_img.set_ylim(0, max(1, w - 1))
|
||||
ax_img.set_ylim(FREQ_MIN, FREQ_MAX)
|
||||
img_fft_obj.set_data(np.zeros((fb, ms), dtype=np.float32))
|
||||
img_fft_obj.set_extent((0, ms - 1, 0, fb - 1))
|
||||
ax_spec.set_xlim(0, ms - 1)
|
||||
@ -214,29 +219,29 @@ def run_matplotlib(args):
|
||||
xs = ring.x_shared[: raw.size]
|
||||
else:
|
||||
xs = np.arange(raw.size, dtype=np.int32)
|
||||
line_obj.set_data(xs, raw)
|
||||
def _norm_to_max(data):
|
||||
m = float(np.nanmax(np.abs(data)))
|
||||
return data / m if m > 0.0 else data
|
||||
line_obj.set_data(xs, _norm_to_max(raw))
|
||||
if state.last_calib_sweep is not None:
|
||||
line_calib_obj.set_data(xs[: state.last_calib_sweep.size], state.last_calib_sweep)
|
||||
calib = state.last_calib_sweep
|
||||
m_calib = float(np.nanmax(np.abs(calib)))
|
||||
if m_calib <= 0.0:
|
||||
m_calib = 1.0
|
||||
lower, upper = build_calib_envelopes(calib)
|
||||
line_env_lo.set_data(xs[: calib.size], lower / m_calib)
|
||||
line_env_hi.set_data(xs[: calib.size], upper / m_calib)
|
||||
else:
|
||||
line_calib_obj.set_data([], [])
|
||||
line_env_lo.set_data([], [])
|
||||
line_env_hi.set_data([], [])
|
||||
if state.current_sweep_norm is not None:
|
||||
line_norm_obj.set_data(xs[: state.current_sweep_norm.size], state.current_sweep_norm)
|
||||
line_norm_obj.set_data(xs[: state.current_sweep_norm.size], _norm_to_max(state.current_sweep_norm))
|
||||
else:
|
||||
line_norm_obj.set_data([], [])
|
||||
ax_line.set_xlim(0, max(1, raw.size - 1))
|
||||
ax_line.set_xlim(FREQ_MIN, FREQ_MAX)
|
||||
if fixed_ylim is None:
|
||||
y0 = float(np.nanmin(raw))
|
||||
y1 = float(np.nanmax(raw))
|
||||
if np.isfinite(y0) and np.isfinite(y1):
|
||||
if y0 == y1:
|
||||
pad = max(1.0, abs(y0) * 0.05)
|
||||
y0 -= pad
|
||||
y1 += pad
|
||||
else:
|
||||
pad = 0.05 * (y1 - y0)
|
||||
y0 -= pad
|
||||
y1 += pad
|
||||
ax_line.set_ylim(y0, y1)
|
||||
ax_line.set_ylim(-1.05, 1.05)
|
||||
ax_line.set_ylabel("/ max")
|
||||
|
||||
# Спектр — используем уже вычисленный в ring FFT
|
||||
if ring.last_fft_vals is not None and ring.freq_shared is not None:
|
||||
@ -252,7 +257,12 @@ def run_matplotlib(args):
|
||||
# Водопад сырых данных
|
||||
if changed and ring.is_ready:
|
||||
disp = ring.get_display_ring()
|
||||
if ring.x_shared is not None:
|
||||
n = ring.x_shared.size
|
||||
disp = disp[:n, :]
|
||||
img_obj.set_data(disp)
|
||||
img_obj.set_extent((0, ring.max_sweeps - 1, FREQ_MIN, FREQ_MAX))
|
||||
ax_img.set_ylim(FREQ_MIN, FREQ_MAX)
|
||||
levels = _visible_levels(disp, ax_img)
|
||||
if levels is not None:
|
||||
img_obj.set_clim(vmin=levels[0], vmax=levels[1])
|
||||
@ -276,7 +286,7 @@ def run_matplotlib(args):
|
||||
status_text.set_text(format_status(state.current_info))
|
||||
channel_text.set_text(state.format_channel_label())
|
||||
|
||||
return (line_obj, line_calib_obj, line_norm_obj, img_obj, fft_line_obj, img_fft_obj, status_text, channel_text)
|
||||
return (line_obj, line_norm_obj, line_env_lo, line_env_hi, img_obj, fft_line_obj, img_fft_obj, status_text, channel_text)
|
||||
|
||||
ani = FuncAnimation(fig, update, interval=interval_ms, blit=False)
|
||||
plt.show()
|
||||
|
||||
@ -8,6 +8,7 @@ from typing import Optional, Tuple
|
||||
import numpy as np
|
||||
|
||||
from rfg_adc_plotter.io.sweep_reader import SweepReader
|
||||
from rfg_adc_plotter.processing.normalizer import build_calib_envelopes
|
||||
from rfg_adc_plotter.state.app_state import AppState, format_status
|
||||
from rfg_adc_plotter.state.ring_buffer import RingBuffer
|
||||
from rfg_adc_plotter.types import SweepPacket
|
||||
@ -39,8 +40,17 @@ def _parse_spec_clip(spec: Optional[str]) -> Optional[Tuple[float, float]]:
|
||||
return None
|
||||
|
||||
|
||||
def _visible_levels(data: np.ndarray, plot_item) -> Optional[Tuple[float, float]]:
|
||||
"""(vmin, vmax) по текущей видимой области ImageItem."""
|
||||
def _visible_levels(
|
||||
data: np.ndarray,
|
||||
plot_item,
|
||||
freq_min: Optional[float] = None,
|
||||
freq_max: Optional[float] = None,
|
||||
) -> Optional[Tuple[float, float]]:
|
||||
"""(vmin, vmax) по текущей видимой области ImageItem.
|
||||
|
||||
Если freq_min/freq_max заданы, ось Y трактуется как частота [freq_min..freq_max]
|
||||
и пересчитывается в индексы строк данных.
|
||||
"""
|
||||
if data.size == 0:
|
||||
return None
|
||||
ny, nx = data.shape[0], data.shape[1]
|
||||
@ -53,8 +63,13 @@ def _visible_levels(data: np.ndarray, plot_item) -> Optional[Tuple[float, float]
|
||||
ymin, ymax = sorted((float(y0), float(y1)))
|
||||
ix0 = max(0, min(nx - 1, int(np.floor(xmin))))
|
||||
ix1 = max(0, min(nx - 1, int(np.ceil(xmax))))
|
||||
iy0 = max(0, min(ny - 1, int(np.floor(ymin))))
|
||||
iy1 = max(0, min(ny - 1, int(np.ceil(ymax))))
|
||||
if freq_min is not None and freq_max is not None and freq_max > freq_min:
|
||||
span = freq_max - freq_min
|
||||
iy0 = max(0, min(ny - 1, int(np.floor((ymin - freq_min) / span * ny))))
|
||||
iy1 = max(0, min(ny - 1, int(np.ceil((ymax - freq_min) / span * ny))))
|
||||
else:
|
||||
iy0 = max(0, min(ny - 1, int(np.floor(ymin))))
|
||||
iy1 = max(0, min(ny - 1, int(np.ceil(ymax))))
|
||||
if ix1 < ix0:
|
||||
ix1 = ix0
|
||||
if iy1 < iy0:
|
||||
@ -111,10 +126,13 @@ def run_pyqtgraph(args):
|
||||
p_line = win.addPlot(row=0, col=0, title="Сырые данные")
|
||||
p_line.showGrid(x=True, y=True, alpha=0.3)
|
||||
curve = p_line.plot(pen=pg.mkPen((80, 120, 255), width=1))
|
||||
curve_calib = p_line.plot(pen=pg.mkPen((220, 60, 60), width=1))
|
||||
curve_norm = p_line.plot(pen=pg.mkPen((60, 180, 90), width=1))
|
||||
p_line.setLabel("bottom", "X")
|
||||
curve_env_lo = p_line.plot(pen=pg.mkPen((255, 165, 0), width=1, style=QtCore.Qt.DashLine))
|
||||
curve_env_hi = p_line.plot(pen=pg.mkPen((255, 165, 0), width=1, style=QtCore.Qt.DashLine))
|
||||
p_line.setLabel("bottom", "Частота, ГГц")
|
||||
p_line.setLabel("left", "Y")
|
||||
p_line.setXRange(3.323, 14.323, padding=0)
|
||||
p_line.enableAutoRange(axis="x", enable=False)
|
||||
ch_text = pg.TextItem("", anchor=(1, 1))
|
||||
ch_text.setZValue(10)
|
||||
p_line.addItem(ch_text)
|
||||
@ -130,7 +148,8 @@ def run_pyqtgraph(args):
|
||||
p_img.getAxis("bottom").setStyle(showValues=False)
|
||||
except Exception:
|
||||
pass
|
||||
p_img.setLabel("left", "X (0 снизу)")
|
||||
p_img.setLabel("left", "Частота, ГГц")
|
||||
p_img.enableAutoRange(enable=False)
|
||||
img = pg.ImageItem()
|
||||
p_img.addItem(img)
|
||||
|
||||
@ -174,17 +193,24 @@ def run_pyqtgraph(args):
|
||||
|
||||
_imshow_initialized = [False]
|
||||
|
||||
FREQ_MIN = 3.323
|
||||
FREQ_MAX = 14.323
|
||||
|
||||
def _init_imshow_extents():
|
||||
w = ring.width
|
||||
ms = ring.max_sweeps
|
||||
fb = ring.fft_bins
|
||||
img.setImage(ring.ring.T, autoLevels=False)
|
||||
p_img.setRange(xRange=(0, ms - 1), yRange=(0, max(1, w - 1)), padding=0)
|
||||
p_line.setXRange(0, max(1, w - 1), padding=0)
|
||||
img.setRect(pg.QtCore.QRectF(0.0, FREQ_MIN, float(ms), FREQ_MAX - FREQ_MIN))
|
||||
p_img.setRange(xRange=(0, ms - 1), yRange=(FREQ_MIN, FREQ_MAX), padding=0)
|
||||
p_line.setXRange(FREQ_MIN, FREQ_MAX, padding=0)
|
||||
img_fft.setImage(ring.ring_fft.T, autoLevels=False)
|
||||
p_spec.setRange(xRange=(0, ms - 1), yRange=(0, max(1, fb - 1)), padding=0)
|
||||
p_fft.setXRange(0, max(1, fb - 1), padding=0)
|
||||
|
||||
def _img_rect(ms: int) -> "pg.QtCore.QRectF":
|
||||
return pg.QtCore.QRectF(0.0, FREQ_MIN, float(ms), FREQ_MAX - FREQ_MIN)
|
||||
|
||||
def update():
|
||||
changed = state.drain_queue(q, ring) > 0
|
||||
|
||||
@ -196,21 +222,28 @@ def run_pyqtgraph(args):
|
||||
if state.current_sweep_raw is not None and ring.x_shared is not None:
|
||||
raw = state.current_sweep_raw
|
||||
xs = ring.x_shared[: raw.size] if raw.size <= ring.x_shared.size else np.arange(raw.size)
|
||||
curve.setData(xs, raw, autoDownsample=True)
|
||||
def _norm_to_max(data):
|
||||
m = float(np.nanmax(np.abs(data)))
|
||||
return data / m if m > 0.0 else data
|
||||
curve.setData(xs, _norm_to_max(raw), autoDownsample=True)
|
||||
if state.last_calib_sweep is not None:
|
||||
curve_calib.setData(xs[: state.last_calib_sweep.size], state.last_calib_sweep, autoDownsample=True)
|
||||
calib = state.last_calib_sweep
|
||||
m_calib = float(np.nanmax(np.abs(calib)))
|
||||
if m_calib <= 0.0:
|
||||
m_calib = 1.0
|
||||
lower, upper = build_calib_envelopes(calib)
|
||||
curve_env_lo.setData(xs[: calib.size], lower / m_calib, autoDownsample=True)
|
||||
curve_env_hi.setData(xs[: calib.size], upper / m_calib, autoDownsample=True)
|
||||
else:
|
||||
curve_calib.setData([], [])
|
||||
curve_env_lo.setData([], [])
|
||||
curve_env_hi.setData([], [])
|
||||
if state.current_sweep_norm is not None:
|
||||
curve_norm.setData(xs[: state.current_sweep_norm.size], state.current_sweep_norm, autoDownsample=True)
|
||||
curve_norm.setData(xs[: state.current_sweep_norm.size], _norm_to_max(state.current_sweep_norm), autoDownsample=True)
|
||||
else:
|
||||
curve_norm.setData([], [])
|
||||
if fixed_ylim is None:
|
||||
y0 = float(np.nanmin(raw))
|
||||
y1 = float(np.nanmax(raw))
|
||||
if np.isfinite(y0) and np.isfinite(y1):
|
||||
margin = 0.05 * max(1.0, (y1 - y0))
|
||||
p_line.setYRange(y0 - margin, y1 + margin, padding=0)
|
||||
p_line.setYRange(-1.05, 1.05, padding=0)
|
||||
p_line.setLabel("left", "/ max")
|
||||
|
||||
# Спектр — используем уже вычисленный в ring FFT
|
||||
if ring.last_fft_vals is not None and ring.freq_shared is not None:
|
||||
@ -233,11 +266,12 @@ def run_pyqtgraph(args):
|
||||
# Водопад сырых данных — новые данные справа (без реверса)
|
||||
if changed and ring.is_ready:
|
||||
disp = ring.get_display_ring() # (width, time), новые справа
|
||||
levels = _visible_levels(disp, p_img)
|
||||
levels = _visible_levels(disp, p_img, FREQ_MIN, FREQ_MAX)
|
||||
if levels is not None:
|
||||
img.setImage(disp, autoLevels=False, levels=levels)
|
||||
else:
|
||||
img.setImage(disp, autoLevels=False)
|
||||
img.setRect(_img_rect(ring.max_sweeps))
|
||||
|
||||
# Статус и подпись канала
|
||||
if changed and state.current_info:
|
||||
|
||||
@ -18,7 +18,11 @@ def normalize_simple(raw: np.ndarray, calib: np.ndarray) -> np.ndarray:
|
||||
|
||||
|
||||
def build_calib_envelopes(calib: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
|
||||
"""Оценить нижнюю/верхнюю огибающие калибровочной кривой."""
|
||||
"""Оценить огибающую по модулю сигнала.
|
||||
|
||||
Возвращает (lower, upper) = (-envelope, +envelope), где envelope —
|
||||
интерполяция через локальные максимумы |calib|.
|
||||
"""
|
||||
n = int(calib.size)
|
||||
if n <= 0:
|
||||
empty = np.zeros((0,), dtype=np.float32)
|
||||
@ -35,11 +39,14 @@ def build_calib_envelopes(calib: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
|
||||
y = y.copy()
|
||||
y[~finite] = np.interp(x[~finite], x[finite], y[finite]).astype(np.float32)
|
||||
|
||||
if n < 3:
|
||||
return y.copy(), y.copy()
|
||||
a = np.abs(y)
|
||||
|
||||
dy = np.diff(y)
|
||||
s = np.sign(dy).astype(np.int8, copy=False)
|
||||
if n < 3:
|
||||
env = a.copy()
|
||||
return -env, env
|
||||
|
||||
da = np.diff(a)
|
||||
s = np.sign(da).astype(np.int8, copy=False)
|
||||
|
||||
if np.any(s == 0):
|
||||
for i in range(1, s.size):
|
||||
@ -51,27 +58,16 @@ def build_calib_envelopes(calib: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
|
||||
s[s == 0] = 1
|
||||
|
||||
max_idx = np.where((s[:-1] > 0) & (s[1:] < 0))[0] + 1
|
||||
min_idx = np.where((s[:-1] < 0) & (s[1:] > 0))[0] + 1
|
||||
|
||||
x = np.arange(n, dtype=np.float32)
|
||||
|
||||
def _interp_nodes(nodes: np.ndarray) -> np.ndarray:
|
||||
if nodes.size == 0:
|
||||
idx = np.array([0, n - 1], dtype=np.int64)
|
||||
else:
|
||||
idx = np.unique(np.concatenate(([0], nodes, [n - 1]))).astype(np.int64)
|
||||
return np.interp(x, idx.astype(np.float32), y[idx]).astype(np.float32)
|
||||
if max_idx.size == 0:
|
||||
idx = np.array([0, n - 1], dtype=np.int64)
|
||||
else:
|
||||
idx = np.unique(np.concatenate(([0], max_idx, [n - 1]))).astype(np.int64)
|
||||
env = np.interp(x, idx.astype(np.float32), a[idx]).astype(np.float32)
|
||||
|
||||
upper = _interp_nodes(max_idx)
|
||||
lower = _interp_nodes(min_idx)
|
||||
|
||||
swap = lower > upper
|
||||
if np.any(swap):
|
||||
tmp = upper[swap].copy()
|
||||
upper[swap] = lower[swap]
|
||||
lower[swap] = tmp
|
||||
|
||||
return lower, upper
|
||||
return -env, env
|
||||
|
||||
|
||||
def normalize_projector(raw: np.ndarray, calib: np.ndarray) -> np.ndarray:
|
||||
|
||||
@ -37,16 +37,17 @@ class RingBuffer:
|
||||
return self.ring is not None
|
||||
|
||||
def ensure_init(self, sweep_width: int):
|
||||
"""Инициализировать буферы при первом свипе. Повторные вызовы — no-op."""
|
||||
if self.ring is not None:
|
||||
return
|
||||
self.width = WF_WIDTH
|
||||
self.x_shared = np.arange(self.width, dtype=np.int32)
|
||||
self.ring = np.full((self.max_sweeps, self.width), np.nan, dtype=np.float32)
|
||||
self.ring_time = np.full((self.max_sweeps,), np.nan, dtype=np.float64)
|
||||
self.ring_fft = np.full((self.max_sweeps, self.fft_bins), np.nan, dtype=np.float32)
|
||||
self.freq_shared = np.arange(self.fft_bins, dtype=np.int32)
|
||||
self.head = 0
|
||||
"""Инициализировать буферы при первом свипе. Повторные вызовы — no-op (кроме x_shared)."""
|
||||
if self.ring is None:
|
||||
self.width = WF_WIDTH
|
||||
self.ring = np.full((self.max_sweeps, self.width), np.nan, dtype=np.float32)
|
||||
self.ring_time = np.full((self.max_sweeps,), np.nan, dtype=np.float64)
|
||||
self.ring_fft = np.full((self.max_sweeps, self.fft_bins), np.nan, dtype=np.float32)
|
||||
self.freq_shared = np.arange(self.fft_bins, dtype=np.int32)
|
||||
self.head = 0
|
||||
# Обновляем x_shared если пришёл свип большего размера
|
||||
if self.x_shared is None or sweep_width > self.x_shared.size:
|
||||
self.x_shared = np.linspace(3.323, 14.323, sweep_width, dtype=np.float32)
|
||||
|
||||
def push(self, s: np.ndarray):
|
||||
"""Добавить строку свипа в кольцевой буфер, вычислить FFT-строку."""
|
||||
|
||||
Reference in New Issue
Block a user