10 Commits

Author SHA1 Message Date
awe
2e6ad24aaa ad to gitignore 2026-02-19 18:34:59 +03:00
02fa3645d7 Now software can be run by: run_dataplotter /dev/ttyACM0 2026-02-18 23:07:17 +03:00
ece30f1cd5 impoved tty parser binary mode: now it supports 32-bit values of intensity 2026-02-18 23:01:34 +03:00
8b1d424cbe New tty parser: accepts binary format. Enable arg: --bin 2026-02-17 18:51:12 +03:00
awe
34d151aef1 fix bug 2026-02-13 17:49:43 +03:00
awe
0ecb83751f add background remove 2026-02-13 17:45:14 +03:00
awe
66a318fff8 add calibration file 2026-02-13 17:32:04 +03:00
awe
d2d504f5b8 fix axis 2026-02-11 19:26:00 +03:00
awe
66b9eee230 right ifft implementation 2026-02-11 18:43:43 +03:00
awe
ea57f87920 new graph style 2026-02-11 18:27:12 +03:00
13 changed files with 656 additions and 1753 deletions

3
.gitignore vendored
View File

@ -5,4 +5,5 @@ __pycache__/
*.tmp *.tmp
*.bak *.bak
*.swp *.swp
*.swo *.swo
acm_9

File diff suppressed because it is too large Load Diff

BIN
background.npy Normal file

Binary file not shown.

BIN
calib_envelope.npy Normal file

Binary file not shown.

View File

@ -1,5 +1,13 @@
WF_WIDTH = 1000 # максимальное число точек в ряду водопада WF_WIDTH = 1000 # максимальное число точек в ряду водопада
FFT_LEN = 1024 # длина БПФ для спектра/водопада спектров FFT_LEN = 2048 # длина БПФ для спектра/водопада спектров
# Порог для инверсии сырых данных: если среднее значение свипа ниже порога — # Порог для инверсии сырых данных: если среднее значение свипа ниже порога —
# считаем, что сигнал «меньше нуля» и домножаем свип на -1 # считаем, что сигнал «меньше нуля» и домножаем свип на -1
DATA_INVERSION_THRESHOLD = 10.0 DATA_INVERSION_THRESHOLD = 10.0
# Параметры IFFT-спектра (временной профиль из спектра 3.2..14.3 ГГц)
# Двусторонний спектр формируется как: [нули -14.3..-3.2 | нули -3.2..+3.2 | данные +3.2..+14.3]
ZEROS_LOW = 758 # нули от -14.3 до -3.2 ГГц
ZEROS_MID = 437 # нули от -3.2 до +3.2 ГГц
SWEEP_LEN = 758 # ожидаемая длина свипа (3.2 → 14.3 ГГц)
FREQ_SPAN_GHZ = 28.6 # полная двусторонняя полоса (-14.3 .. +14.3 ГГц)
IFFT_LEN = ZEROS_LOW + ZEROS_MID + SWEEP_LEN # = 1953

View File

@ -7,9 +7,12 @@ from typing import Optional, Tuple
import numpy as np import numpy as np
from rfg_adc_plotter.constants import FFT_LEN from rfg_adc_plotter.constants import FFT_LEN, FREQ_SPAN_GHZ, IFFT_LEN
_IFFT_T_MAX_NS = float((IFFT_LEN - 1) / (FREQ_SPAN_GHZ * 1e9) * 1e9)
from rfg_adc_plotter.io.sweep_reader import SweepReader from rfg_adc_plotter.io.sweep_reader import SweepReader
from rfg_adc_plotter.state.app_state import AppState, format_status from rfg_adc_plotter.processing.normalizer import build_calib_envelopes
from rfg_adc_plotter.state.app_state import BACKGROUND_PATH, CALIB_ENVELOPE_PATH, AppState, format_status
from rfg_adc_plotter.state.ring_buffer import RingBuffer from rfg_adc_plotter.state.ring_buffer import RingBuffer
from rfg_adc_plotter.types import SweepPacket from rfg_adc_plotter.types import SweepPacket
@ -86,7 +89,14 @@ def run_matplotlib(args):
q: Queue[SweepPacket] = Queue(maxsize=1000) q: Queue[SweepPacket] = Queue(maxsize=1000)
stop_event = threading.Event() stop_event = threading.Event()
reader = SweepReader(args.port, args.baud, q, stop_event, fancy=bool(args.fancy)) reader = SweepReader(
args.port,
args.baud,
q,
stop_event,
fancy=bool(args.fancy),
bin_mode=bool(getattr(args, "bin_mode", False)),
)
reader.start() reader.start()
max_sweeps = int(max(10, args.max_sweeps)) max_sweeps = int(max(10, args.max_sweeps))
@ -112,10 +122,11 @@ def run_matplotlib(args):
# График последнего свипа # График последнего свипа
line_obj, = ax_line.plot([], [], lw=1, color="tab:blue") line_obj, = ax_line.plot([], [], lw=1, color="tab:blue")
line_calib_obj, = ax_line.plot([], [], lw=1, color="tab:red")
line_norm_obj, = ax_line.plot([], [], lw=1, color="tab:green") line_norm_obj, = ax_line.plot([], [], lw=1, color="tab:green")
line_env_lo, = ax_line.plot([], [], lw=1, color="tab:orange", linestyle="--", alpha=0.7)
line_env_hi, = ax_line.plot([], [], lw=1, color="tab:orange", linestyle="--", alpha=0.7)
ax_line.set_title("Сырые данные", pad=1) ax_line.set_title("Сырые данные", pad=1)
ax_line.set_xlabel("F") ax_line.set_xlabel("Частота, ГГц")
channel_text = ax_line.text( channel_text = ax_line.text(
0.98, 0.98, "", transform=ax_line.transAxes, 0.98, 0.98, "", transform=ax_line.transAxes,
ha="right", va="top", fontsize=9, family="monospace", ha="right", va="top", fontsize=9, family="monospace",
@ -126,8 +137,8 @@ def run_matplotlib(args):
# График спектра # График спектра
fft_line_obj, = ax_fft.plot([], [], lw=1) fft_line_obj, = ax_fft.plot([], [], lw=1)
ax_fft.set_title("FFT", pad=1) ax_fft.set_title("FFT", pad=1)
ax_fft.set_xlabel("X") ax_fft.set_xlabel("Время, нс")
ax_fft.set_ylabel("Амплитуда, дБ") ax_fft.set_ylabel("Мощность, дБ")
# Водопад сырых данных # Водопад сырых данных
img_obj = ax_img.imshow( img_obj = ax_img.imshow(
@ -147,7 +158,7 @@ def run_matplotlib(args):
aspect="auto", interpolation="nearest", origin="lower", cmap=args.cmap, aspect="auto", interpolation="nearest", origin="lower", cmap=args.cmap,
) )
ax_spec.set_title("B-scan (дБ)", pad=12) ax_spec.set_title("B-scan (дБ)", pad=12)
ax_spec.set_ylabel("расстояние") ax_spec.set_ylabel("Время, нс")
try: try:
ax_spec.tick_params(axis="x", labelbottom=False) ax_spec.tick_params(axis="x", labelbottom=False)
except Exception: except Exception:
@ -161,10 +172,16 @@ def run_matplotlib(args):
ax_smax = fig.add_axes([0.95, 0.55, 0.02, 0.35]) ax_smax = fig.add_axes([0.95, 0.55, 0.02, 0.35])
ax_sctr = fig.add_axes([0.98, 0.55, 0.02, 0.35]) ax_sctr = fig.add_axes([0.98, 0.55, 0.02, 0.35])
ax_cb = fig.add_axes([0.92, 0.45, 0.08, 0.08]) ax_cb = fig.add_axes([0.92, 0.45, 0.08, 0.08])
ax_cb_file = fig.add_axes([0.92, 0.36, 0.08, 0.08])
ymin_slider = Slider(ax_smin, "Y min", 0, max(1, fft_bins - 1), valinit=0, valstep=1, orientation="vertical") ymin_slider = Slider(ax_smin, "Y min", 0, max(1, fft_bins - 1), valinit=0, valstep=1, orientation="vertical")
ymax_slider = Slider(ax_smax, "Y max", 0, max(1, fft_bins - 1), valinit=max(1, fft_bins - 1), valstep=1, orientation="vertical") ymax_slider = Slider(ax_smax, "Y max", 0, max(1, fft_bins - 1), valinit=max(1, fft_bins - 1), valstep=1, orientation="vertical")
contrast_slider = Slider(ax_sctr, "Int max", 0, 100, valinit=100, valstep=1, orientation="vertical") contrast_slider = Slider(ax_sctr, "Int max", 0, 100, valinit=100, valstep=1, orientation="vertical")
calib_cb = CheckButtons(ax_cb, ["калибровка"], [False]) calib_cb = CheckButtons(ax_cb, ["калибровка"], [False])
calib_file_cb = CheckButtons(ax_cb_file, ["из файла"], [False])
import os as _os
if not _os.path.isfile(CALIB_ENVELOPE_PATH):
ax_cb_file.set_visible(False)
def _on_ylim_change(_val): def _on_ylim_change(_val):
try: try:
@ -175,28 +192,68 @@ def run_matplotlib(args):
except Exception: except Exception:
pass pass
def _on_calib_file_clicked(_v):
use_file = bool(calib_file_cb.get_status()[0])
if use_file:
ok = state.load_calib_envelope(CALIB_ENVELOPE_PATH)
if ok:
state.set_calib_mode("file")
else:
calib_file_cb.set_active(0) # снять галочку
else:
state.set_calib_mode("live")
state.set_calib_enabled(bool(calib_cb.get_status()[0]))
def _on_calib_clicked(_v):
import os as _os2
if _os2.path.isfile(CALIB_ENVELOPE_PATH):
ax_cb_file.set_visible(True)
state.set_calib_enabled(bool(calib_cb.get_status()[0]))
fig.canvas.draw_idle()
ax_btn_bg = fig.add_axes([0.92, 0.27, 0.08, 0.05])
ax_cb_bg = fig.add_axes([0.92, 0.20, 0.08, 0.06])
from matplotlib.widgets import Button as MplButton
save_bg_btn = MplButton(ax_btn_bg, "Сохр. фон")
bg_cb = CheckButtons(ax_cb_bg, ["вычет фона"], [False])
def _on_save_bg(_event):
ok = state.save_background()
if ok:
state.load_background()
fig.canvas.draw_idle()
def _on_bg_clicked(_v):
state.set_background_enabled(bool(bg_cb.get_status()[0]))
save_bg_btn.on_clicked(_on_save_bg)
bg_cb.on_clicked(_on_bg_clicked)
ymin_slider.on_changed(_on_ylim_change) ymin_slider.on_changed(_on_ylim_change)
ymax_slider.on_changed(_on_ylim_change) ymax_slider.on_changed(_on_ylim_change)
contrast_slider.on_changed(lambda _v: fig.canvas.draw_idle()) contrast_slider.on_changed(lambda _v: fig.canvas.draw_idle())
calib_cb.on_clicked(lambda _v: state.set_calib_enabled( calib_cb.on_clicked(_on_calib_clicked)
bool(calib_cb.get_status()[0]) calib_file_cb.on_clicked(_on_calib_file_clicked)
))
except Exception: except Exception:
calib_cb = None calib_cb = None
FREQ_MIN = 3.323
FREQ_MAX = 14.323
# --- Инициализация imshow при первом свипе --- # --- Инициализация imshow при первом свипе ---
def _init_imshow_extents(): def _init_imshow_extents():
w = ring.width w = ring.width
ms = ring.max_sweeps ms = ring.max_sweeps
fb = ring.fft_bins fb = ring.fft_bins
img_obj.set_data(np.zeros((w, ms), dtype=np.float32)) img_obj.set_data(np.zeros((w, ms), dtype=np.float32))
img_obj.set_extent((0, ms - 1, 0, w - 1 if w > 0 else 1)) img_obj.set_extent((0, ms - 1, FREQ_MIN, FREQ_MAX))
ax_img.set_xlim(0, ms - 1) ax_img.set_xlim(0, ms - 1)
ax_img.set_ylim(0, max(1, w - 1)) ax_img.set_ylim(FREQ_MIN, FREQ_MAX)
img_fft_obj.set_data(np.zeros((fb, ms), dtype=np.float32)) img_fft_obj.set_data(np.zeros((fb, ms), dtype=np.float32))
img_fft_obj.set_extent((0, ms - 1, 0, fb - 1)) img_fft_obj.set_extent((0, ms - 1, 0.0, _IFFT_T_MAX_NS))
ax_spec.set_xlim(0, ms - 1) ax_spec.set_xlim(0, ms - 1)
ax_spec.set_ylim(0, max(1, fb - 1)) ax_spec.set_ylim(0.0, _IFFT_T_MAX_NS)
ax_fft.set_xlim(0.0, _IFFT_T_MAX_NS)
_imshow_initialized = [False] _imshow_initialized = [False]
@ -214,45 +271,57 @@ def run_matplotlib(args):
xs = ring.x_shared[: raw.size] xs = ring.x_shared[: raw.size]
else: else:
xs = np.arange(raw.size, dtype=np.int32) xs = np.arange(raw.size, dtype=np.int32)
line_obj.set_data(xs, raw) def _norm_to_max(data):
if state.last_calib_sweep is not None: m = float(np.nanmax(np.abs(data)))
line_calib_obj.set_data(xs[: state.last_calib_sweep.size], state.last_calib_sweep) return data / m if m > 0.0 else data
line_obj.set_data(xs, _norm_to_max(raw))
if state.calib_mode == "file" and state.calib_file_envelope is not None:
upper = state.calib_file_envelope
lower = -upper
m_env = float(np.nanmax(np.abs(upper)))
if m_env <= 0.0:
m_env = 1.0
line_env_lo.set_data(xs[: upper.size], lower / m_env)
line_env_hi.set_data(xs[: upper.size], upper / m_env)
elif state.last_calib_sweep is not None:
calib = state.last_calib_sweep
m_calib = float(np.nanmax(np.abs(calib)))
if m_calib <= 0.0:
m_calib = 1.0
lower, upper = build_calib_envelopes(calib)
line_env_lo.set_data(xs[: calib.size], lower / m_calib)
line_env_hi.set_data(xs[: calib.size], upper / m_calib)
else: else:
line_calib_obj.set_data([], []) line_env_lo.set_data([], [])
line_env_hi.set_data([], [])
if state.current_sweep_norm is not None: if state.current_sweep_norm is not None:
line_norm_obj.set_data(xs[: state.current_sweep_norm.size], state.current_sweep_norm) line_norm_obj.set_data(xs[: state.current_sweep_norm.size], _norm_to_max(state.current_sweep_norm))
else: else:
line_norm_obj.set_data([], []) line_norm_obj.set_data([], [])
ax_line.set_xlim(0, max(1, raw.size - 1)) ax_line.set_xlim(FREQ_MIN, FREQ_MAX)
if fixed_ylim is None: if fixed_ylim is None:
y0 = float(np.nanmin(raw)) ax_line.set_ylim(-1.05, 1.05)
y1 = float(np.nanmax(raw)) ax_line.set_ylabel("/ max")
if np.isfinite(y0) and np.isfinite(y1):
if y0 == y1:
pad = max(1.0, abs(y0) * 0.05)
y0 -= pad
y1 += pad
else:
pad = 0.05 * (y1 - y0)
y0 -= pad
y1 += pad
ax_line.set_ylim(y0, y1)
# Спектр — используем уже вычисленный в ring FFT # Спектр — используем уже вычисленный в ring IFFT (временной профиль)
if ring.last_fft_vals is not None and ring.freq_shared is not None: if ring.last_fft_vals is not None and ring.fft_time_axis is not None:
fft_vals = ring.last_fft_vals fft_vals = ring.last_fft_vals
xs_fft = ring.freq_shared xs_fft = ring.fft_time_axis
if fft_vals.size > xs_fft.size: n = min(fft_vals.size, xs_fft.size)
fft_vals = fft_vals[: xs_fft.size] fft_line_obj.set_data(xs_fft[:n], fft_vals[:n])
fft_line_obj.set_data(xs_fft[: fft_vals.size], fft_vals)
if np.isfinite(np.nanmin(fft_vals)) and np.isfinite(np.nanmax(fft_vals)): if np.isfinite(np.nanmin(fft_vals)) and np.isfinite(np.nanmax(fft_vals)):
ax_fft.set_xlim(0, max(1, xs_fft.size - 1)) ax_fft.set_xlim(0, float(xs_fft[n - 1]))
ax_fft.set_ylim(float(np.nanmin(fft_vals)), float(np.nanmax(fft_vals))) ax_fft.set_ylim(float(np.nanmin(fft_vals)), float(np.nanmax(fft_vals)))
# Водопад сырых данных # Водопад сырых данных
if changed and ring.is_ready: if changed and ring.is_ready:
disp = ring.get_display_ring() disp = ring.get_display_ring()
if ring.x_shared is not None:
n = ring.x_shared.size
disp = disp[:n, :]
img_obj.set_data(disp) img_obj.set_data(disp)
img_obj.set_extent((0, ring.max_sweeps - 1, FREQ_MIN, FREQ_MAX))
ax_img.set_ylim(FREQ_MIN, FREQ_MAX)
levels = _visible_levels(disp, ax_img) levels = _visible_levels(disp, ax_img)
if levels is not None: if levels is not None:
img_obj.set_clim(vmin=levels[0], vmax=levels[1]) img_obj.set_clim(vmin=levels[0], vmax=levels[1])
@ -276,7 +345,7 @@ def run_matplotlib(args):
status_text.set_text(format_status(state.current_info)) status_text.set_text(format_status(state.current_info))
channel_text.set_text(state.format_channel_label()) channel_text.set_text(state.format_channel_label())
return (line_obj, line_calib_obj, line_norm_obj, img_obj, fft_line_obj, img_fft_obj, status_text, channel_text) return (line_obj, line_norm_obj, line_env_lo, line_env_hi, img_obj, fft_line_obj, img_fft_obj, status_text, channel_text)
ani = FuncAnimation(fig, update, interval=interval_ms, blit=False) ani = FuncAnimation(fig, update, interval=interval_ms, blit=False)
plt.show() plt.show()

View File

@ -7,11 +7,16 @@ from typing import Optional, Tuple
import numpy as np import numpy as np
from rfg_adc_plotter.constants import FREQ_SPAN_GHZ, IFFT_LEN
from rfg_adc_plotter.io.sweep_reader import SweepReader from rfg_adc_plotter.io.sweep_reader import SweepReader
from rfg_adc_plotter.state.app_state import AppState, format_status from rfg_adc_plotter.processing.normalizer import build_calib_envelopes
from rfg_adc_plotter.state.app_state import BACKGROUND_PATH, CALIB_ENVELOPE_PATH, AppState, format_status
from rfg_adc_plotter.state.ring_buffer import RingBuffer from rfg_adc_plotter.state.ring_buffer import RingBuffer
from rfg_adc_plotter.types import SweepPacket from rfg_adc_plotter.types import SweepPacket
# Максимальное значение временной оси IFFT в нс
_IFFT_T_MAX_NS = float((IFFT_LEN - 1) / (FREQ_SPAN_GHZ * 1e9) * 1e9)
def _parse_ylim(ylim_str: Optional[str]) -> Optional[Tuple[float, float]]: def _parse_ylim(ylim_str: Optional[str]) -> Optional[Tuple[float, float]]:
if not ylim_str: if not ylim_str:
@ -39,8 +44,17 @@ def _parse_spec_clip(spec: Optional[str]) -> Optional[Tuple[float, float]]:
return None return None
def _visible_levels(data: np.ndarray, plot_item) -> Optional[Tuple[float, float]]: def _visible_levels(
"""(vmin, vmax) по текущей видимой области ImageItem.""" data: np.ndarray,
plot_item,
freq_min: Optional[float] = None,
freq_max: Optional[float] = None,
) -> Optional[Tuple[float, float]]:
"""(vmin, vmax) по текущей видимой области ImageItem.
Если freq_min/freq_max заданы, ось Y трактуется как частота [freq_min..freq_max]
и пересчитывается в индексы строк данных.
"""
if data.size == 0: if data.size == 0:
return None return None
ny, nx = data.shape[0], data.shape[1] ny, nx = data.shape[0], data.shape[1]
@ -53,8 +67,13 @@ def _visible_levels(data: np.ndarray, plot_item) -> Optional[Tuple[float, float]
ymin, ymax = sorted((float(y0), float(y1))) ymin, ymax = sorted((float(y0), float(y1)))
ix0 = max(0, min(nx - 1, int(np.floor(xmin)))) ix0 = max(0, min(nx - 1, int(np.floor(xmin))))
ix1 = max(0, min(nx - 1, int(np.ceil(xmax)))) ix1 = max(0, min(nx - 1, int(np.ceil(xmax))))
iy0 = max(0, min(ny - 1, int(np.floor(ymin)))) if freq_min is not None and freq_max is not None and freq_max > freq_min:
iy1 = max(0, min(ny - 1, int(np.ceil(ymax)))) span = freq_max - freq_min
iy0 = max(0, min(ny - 1, int(np.floor((ymin - freq_min) / span * ny))))
iy1 = max(0, min(ny - 1, int(np.ceil((ymax - freq_min) / span * ny))))
else:
iy0 = max(0, min(ny - 1, int(np.floor(ymin))))
iy1 = max(0, min(ny - 1, int(np.ceil(ymax))))
if ix1 < ix0: if ix1 < ix0:
ix1 = ix0 ix1 = ix0
if iy1 < iy0: if iy1 < iy0:
@ -87,7 +106,14 @@ def run_pyqtgraph(args):
q: Queue[SweepPacket] = Queue(maxsize=1000) q: Queue[SweepPacket] = Queue(maxsize=1000)
stop_event = threading.Event() stop_event = threading.Event()
reader = SweepReader(args.port, args.baud, q, stop_event, fancy=bool(args.fancy)) reader = SweepReader(
args.port,
args.baud,
q,
stop_event,
fancy=bool(args.fancy),
bin_mode=bool(getattr(args, "bin_mode", False)),
)
reader.start() reader.start()
max_sweeps = int(max(10, args.max_sweeps)) max_sweeps = int(max(10, args.max_sweeps))
@ -111,10 +137,13 @@ def run_pyqtgraph(args):
p_line = win.addPlot(row=0, col=0, title="Сырые данные") p_line = win.addPlot(row=0, col=0, title="Сырые данные")
p_line.showGrid(x=True, y=True, alpha=0.3) p_line.showGrid(x=True, y=True, alpha=0.3)
curve = p_line.plot(pen=pg.mkPen((80, 120, 255), width=1)) curve = p_line.plot(pen=pg.mkPen((80, 120, 255), width=1))
curve_calib = p_line.plot(pen=pg.mkPen((220, 60, 60), width=1))
curve_norm = p_line.plot(pen=pg.mkPen((60, 180, 90), width=1)) curve_norm = p_line.plot(pen=pg.mkPen((60, 180, 90), width=1))
p_line.setLabel("bottom", "X") curve_env_lo = p_line.plot(pen=pg.mkPen((255, 165, 0), width=1, style=QtCore.Qt.DashLine))
curve_env_hi = p_line.plot(pen=pg.mkPen((255, 165, 0), width=1, style=QtCore.Qt.DashLine))
p_line.setLabel("bottom", "Частота, ГГц")
p_line.setLabel("left", "Y") p_line.setLabel("left", "Y")
p_line.setXRange(3.323, 14.323, padding=0)
p_line.enableAutoRange(axis="x", enable=False)
ch_text = pg.TextItem("", anchor=(1, 1)) ch_text = pg.TextItem("", anchor=(1, 1))
ch_text.setZValue(10) ch_text.setZValue(10)
p_line.addItem(ch_text) p_line.addItem(ch_text)
@ -130,7 +159,8 @@ def run_pyqtgraph(args):
p_img.getAxis("bottom").setStyle(showValues=False) p_img.getAxis("bottom").setStyle(showValues=False)
except Exception: except Exception:
pass pass
p_img.setLabel("left", "X (0 снизу)") p_img.setLabel("left", "Частота, ГГц")
p_img.enableAutoRange(enable=False)
img = pg.ImageItem() img = pg.ImageItem()
p_img.addItem(img) p_img.addItem(img)
@ -145,8 +175,8 @@ def run_pyqtgraph(args):
p_fft = win.addPlot(row=1, col=0, title="FFT") p_fft = win.addPlot(row=1, col=0, title="FFT")
p_fft.showGrid(x=True, y=True, alpha=0.3) p_fft.showGrid(x=True, y=True, alpha=0.3)
curve_fft = p_fft.plot(pen=pg.mkPen((255, 120, 80), width=1)) curve_fft = p_fft.plot(pen=pg.mkPen((255, 120, 80), width=1))
p_fft.setLabel("bottom", "Бин") p_fft.setLabel("bottom", "Время, нс")
p_fft.setLabel("left", "Амплитуда, дБ") p_fft.setLabel("left", "Мощность, дБ")
# Водопад спектров (справа-снизу) # Водопад спектров (справа-снизу)
p_spec = win.addPlot(row=1, col=1, title="B-scan (дБ)") p_spec = win.addPlot(row=1, col=1, title="B-scan (дБ)")
@ -157,16 +187,76 @@ def run_pyqtgraph(args):
p_spec.getAxis("bottom").setStyle(showValues=False) p_spec.getAxis("bottom").setStyle(showValues=False)
except Exception: except Exception:
pass pass
p_spec.setLabel("left", "Бин (0 снизу)") p_spec.setLabel("left", "Время, нс")
img_fft = pg.ImageItem() img_fft = pg.ImageItem()
p_spec.addItem(img_fft) p_spec.addItem(img_fft)
# Чекбокс калибровки # Чекбоксы калибровки — в одном контейнере
calib_widget = QtWidgets.QWidget()
calib_layout = QtWidgets.QHBoxLayout(calib_widget)
calib_layout.setContentsMargins(2, 2, 2, 2)
calib_layout.setSpacing(8)
calib_cb = QtWidgets.QCheckBox("калибровка") calib_cb = QtWidgets.QCheckBox("калибровка")
cb_proxy = QtWidgets.QGraphicsProxyWidget() calib_file_cb = QtWidgets.QCheckBox("из файла")
cb_proxy.setWidget(calib_cb) calib_file_cb.setEnabled(False) # активируется только если файл существует
win.addItem(cb_proxy, row=2, col=1)
calib_cb.stateChanged.connect(lambda _v: state.set_calib_enabled(calib_cb.isChecked())) calib_layout.addWidget(calib_cb)
calib_layout.addWidget(calib_file_cb)
cb_container_proxy = QtWidgets.QGraphicsProxyWidget()
cb_container_proxy.setWidget(calib_widget)
win.addItem(cb_container_proxy, row=2, col=1)
def _check_file_cb_available():
import os
calib_file_cb.setEnabled(os.path.isfile(CALIB_ENVELOPE_PATH))
_check_file_cb_available()
def _on_calib_file_toggled(checked):
if checked:
ok = state.load_calib_envelope(CALIB_ENVELOPE_PATH)
if ok:
state.set_calib_mode("file")
else:
calib_file_cb.setChecked(False)
else:
state.set_calib_mode("live")
state.set_calib_enabled(calib_cb.isChecked())
def _on_calib_toggled(_v):
_check_file_cb_available()
state.set_calib_enabled(calib_cb.isChecked())
calib_cb.stateChanged.connect(_on_calib_toggled)
calib_file_cb.stateChanged.connect(lambda _v: _on_calib_file_toggled(calib_file_cb.isChecked()))
# Кнопка сохранения фона + чекбокс вычета фона
bg_widget = QtWidgets.QWidget()
bg_layout = QtWidgets.QHBoxLayout(bg_widget)
bg_layout.setContentsMargins(2, 2, 2, 2)
bg_layout.setSpacing(8)
save_bg_btn = QtWidgets.QPushButton("Сохр. фон")
bg_cb = QtWidgets.QCheckBox("вычет фона")
bg_cb.setEnabled(False)
bg_layout.addWidget(save_bg_btn)
bg_layout.addWidget(bg_cb)
bg_container_proxy = QtWidgets.QGraphicsProxyWidget()
bg_container_proxy.setWidget(bg_widget)
win.addItem(bg_container_proxy, row=2, col=0)
def _on_save_bg():
ok = state.save_background()
if ok:
state.load_background()
bg_cb.setEnabled(True)
save_bg_btn.clicked.connect(_on_save_bg)
bg_cb.stateChanged.connect(lambda _v: state.set_background_enabled(bg_cb.isChecked()))
# Статусная строка # Статусная строка
status = pg.LabelItem(justify="left") status = pg.LabelItem(justify="left")
@ -174,16 +264,23 @@ def run_pyqtgraph(args):
_imshow_initialized = [False] _imshow_initialized = [False]
FREQ_MIN = 3.323
FREQ_MAX = 14.323
def _init_imshow_extents(): def _init_imshow_extents():
w = ring.width
ms = ring.max_sweeps ms = ring.max_sweeps
fb = ring.fft_bins fb = ring.fft_bins
img.setImage(ring.ring.T, autoLevels=False) img.setImage(ring.ring.T, autoLevels=False)
p_img.setRange(xRange=(0, ms - 1), yRange=(0, max(1, w - 1)), padding=0) img.setRect(pg.QtCore.QRectF(0.0, FREQ_MIN, float(ms), FREQ_MAX - FREQ_MIN))
p_line.setXRange(0, max(1, w - 1), padding=0) p_img.setRange(xRange=(0, ms - 1), yRange=(FREQ_MIN, FREQ_MAX), padding=0)
p_line.setXRange(FREQ_MIN, FREQ_MAX, padding=0)
img_fft.setImage(ring.ring_fft.T, autoLevels=False) img_fft.setImage(ring.ring_fft.T, autoLevels=False)
p_spec.setRange(xRange=(0, ms - 1), yRange=(0, max(1, fb - 1)), padding=0) img_fft.setRect(pg.QtCore.QRectF(0.0, 0.0, float(ms), _IFFT_T_MAX_NS))
p_fft.setXRange(0, max(1, fb - 1), padding=0) p_spec.setRange(xRange=(0, ms - 1), yRange=(0.0, _IFFT_T_MAX_NS), padding=0)
p_fft.setXRange(0.0, _IFFT_T_MAX_NS, padding=0)
def _img_rect(ms: int) -> "pg.QtCore.QRectF":
return pg.QtCore.QRectF(0.0, FREQ_MIN, float(ms), FREQ_MAX - FREQ_MIN)
def update(): def update():
changed = state.drain_queue(q, ring) > 0 changed = state.drain_queue(q, ring) > 0
@ -196,29 +293,43 @@ def run_pyqtgraph(args):
if state.current_sweep_raw is not None and ring.x_shared is not None: if state.current_sweep_raw is not None and ring.x_shared is not None:
raw = state.current_sweep_raw raw = state.current_sweep_raw
xs = ring.x_shared[: raw.size] if raw.size <= ring.x_shared.size else np.arange(raw.size) xs = ring.x_shared[: raw.size] if raw.size <= ring.x_shared.size else np.arange(raw.size)
curve.setData(xs, raw, autoDownsample=True) def _norm_to_max(data):
if state.last_calib_sweep is not None: m = float(np.nanmax(np.abs(data)))
curve_calib.setData(xs[: state.last_calib_sweep.size], state.last_calib_sweep, autoDownsample=True) return data / m if m > 0.0 else data
curve.setData(xs, _norm_to_max(raw), autoDownsample=True)
if state.calib_mode == "file" and state.calib_file_envelope is not None:
upper = state.calib_file_envelope
lower = -upper
m_env = float(np.nanmax(np.abs(upper)))
if m_env <= 0.0:
m_env = 1.0
curve_env_lo.setData(xs[: upper.size], lower / m_env, autoDownsample=True)
curve_env_hi.setData(xs[: upper.size], upper / m_env, autoDownsample=True)
elif state.last_calib_sweep is not None:
calib = state.last_calib_sweep
m_calib = float(np.nanmax(np.abs(calib)))
if m_calib <= 0.0:
m_calib = 1.0
lower, upper = build_calib_envelopes(calib)
curve_env_lo.setData(xs[: calib.size], lower / m_calib, autoDownsample=True)
curve_env_hi.setData(xs[: calib.size], upper / m_calib, autoDownsample=True)
else: else:
curve_calib.setData([], []) curve_env_lo.setData([], [])
curve_env_hi.setData([], [])
if state.current_sweep_norm is not None: if state.current_sweep_norm is not None:
curve_norm.setData(xs[: state.current_sweep_norm.size], state.current_sweep_norm, autoDownsample=True) curve_norm.setData(xs[: state.current_sweep_norm.size], _norm_to_max(state.current_sweep_norm), autoDownsample=True)
else: else:
curve_norm.setData([], []) curve_norm.setData([], [])
if fixed_ylim is None: if fixed_ylim is None:
y0 = float(np.nanmin(raw)) p_line.setYRange(-1.05, 1.05, padding=0)
y1 = float(np.nanmax(raw)) p_line.setLabel("left", "/ max")
if np.isfinite(y0) and np.isfinite(y1):
margin = 0.05 * max(1.0, (y1 - y0))
p_line.setYRange(y0 - margin, y1 + margin, padding=0)
# Спектр — используем уже вычисленный в ring FFT # Спектр — используем уже вычисленный в ring IFFT (временной профиль)
if ring.last_fft_vals is not None and ring.freq_shared is not None: if ring.last_fft_vals is not None and ring.fft_time_axis is not None:
fft_vals = ring.last_fft_vals fft_vals = ring.last_fft_vals
xs_fft = ring.freq_shared xs_fft = ring.fft_time_axis
if fft_vals.size > xs_fft.size: n = min(fft_vals.size, xs_fft.size)
fft_vals = fft_vals[: xs_fft.size] curve_fft.setData(xs_fft[:n], fft_vals[:n])
curve_fft.setData(xs_fft[: fft_vals.size], fft_vals)
p_fft.setYRange(float(np.nanmin(fft_vals)), float(np.nanmax(fft_vals)), padding=0) p_fft.setYRange(float(np.nanmin(fft_vals)), float(np.nanmax(fft_vals)), padding=0)
# Позиция подписи канала # Позиция подписи канала
@ -233,11 +344,12 @@ def run_pyqtgraph(args):
# Водопад сырых данных — новые данные справа (без реверса) # Водопад сырых данных — новые данные справа (без реверса)
if changed and ring.is_ready: if changed and ring.is_ready:
disp = ring.get_display_ring() # (width, time), новые справа disp = ring.get_display_ring() # (width, time), новые справа
levels = _visible_levels(disp, p_img) levels = _visible_levels(disp, p_img, FREQ_MIN, FREQ_MAX)
if levels is not None: if levels is not None:
img.setImage(disp, autoLevels=False, levels=levels) img.setImage(disp, autoLevels=False, levels=levels)
else: else:
img.setImage(disp, autoLevels=False) img.setImage(disp, autoLevels=False)
img.setRect(_img_rect(ring.max_sweeps))
# Статус и подпись канала # Статус и подпись канала
if changed and state.current_info: if changed and state.current_info:
@ -256,6 +368,7 @@ def run_pyqtgraph(args):
img_fft.setImage(disp_fft, autoLevels=False, levels=levels) img_fft.setImage(disp_fft, autoLevels=False, levels=levels)
else: else:
img_fft.setImage(disp_fft, autoLevels=False) img_fft.setImage(disp_fft, autoLevels=False)
img_fft.setRect(pg.QtCore.QRectF(0.0, 0.0, float(ring.max_sweeps), _IFFT_T_MAX_NS))
timer = pg.QtCore.QTimer() timer = pg.QtCore.QTimer()
timer.timeout.connect(update) timer.timeout.connect(update)

View File

@ -24,6 +24,7 @@ class SweepReader(threading.Thread):
out_queue: "Queue[SweepPacket]", out_queue: "Queue[SweepPacket]",
stop_event: threading.Event, stop_event: threading.Event,
fancy: bool = False, fancy: bool = False,
bin_mode: bool = False,
): ):
super().__init__(daemon=True) super().__init__(daemon=True)
self._port_path = port_path self._port_path = port_path
@ -32,11 +33,17 @@ class SweepReader(threading.Thread):
self._stop = stop_event self._stop = stop_event
self._src: Optional[SerialLineSource] = None self._src: Optional[SerialLineSource] = None
self._fancy = bool(fancy) self._fancy = bool(fancy)
self._bin_mode = bool(bin_mode)
self._max_width: int = 0 self._max_width: int = 0
self._sweep_idx: int = 0 self._sweep_idx: int = 0
self._last_sweep_ts: Optional[float] = None self._last_sweep_ts: Optional[float] = None
self._n_valid_hist = deque() self._n_valid_hist = deque()
@staticmethod
def _u32_to_i32(v: int) -> int:
"""Преобразование 32-bit слова в знаковое значение."""
return v - 0x1_0000_0000 if (v & 0x8000_0000) else v
def _finalize_current(self, xs, ys, channels: Optional[set]): def _finalize_current(self, xs, ys, channels: Optional[set]):
if not xs: if not xs:
return return
@ -135,11 +142,148 @@ class SweepReader(threading.Thread):
except Exception: except Exception:
pass pass
def run(self): def _run_ascii_stream(self, chunk_reader: SerialChunkReader):
xs: list = [] xs: list[int] = []
ys: list = [] ys: list[int] = []
cur_channel: Optional[int] = None cur_channel: Optional[int] = None
cur_channels: set = set() cur_channels: set[int] = set()
buf = bytearray()
while not self._stop.is_set():
data = chunk_reader.read_available()
if data:
buf += data
else:
time.sleep(0.0005)
continue
while True:
nl = buf.find(b"\n")
if nl == -1:
break
line = bytes(buf[:nl])
del buf[: nl + 1]
if line.endswith(b"\r"):
line = line[:-1]
if not line:
continue
if line.startswith(b"Sweep_start"):
self._finalize_current(xs, ys, cur_channels)
xs.clear()
ys.clear()
cur_channel = None
cur_channels.clear()
continue
if len(line) >= 3:
parts = line.split()
if len(parts) >= 3 and (parts[0].lower() == b"s" or parts[0].lower().startswith(b"s")):
try:
if parts[0].lower() == b"s":
if len(parts) >= 4:
ch = int(parts[1], 10)
x = int(parts[2], 10)
y = int(parts[3], 10)
else:
ch = 0
x = int(parts[1], 10)
y = int(parts[2], 10)
else:
ch = int(parts[0][1:], 10)
x = int(parts[1], 10)
y = int(parts[2], 10)
except Exception:
continue
if cur_channel is None:
cur_channel = ch
cur_channels.add(ch)
xs.append(x)
ys.append(y)
if len(buf) > 1_000_000:
del buf[:-262144]
self._finalize_current(xs, ys, cur_channels)
def _run_binary_stream(self, chunk_reader: SerialChunkReader):
xs: list[int] = []
ys: list[int] = []
cur_channel: Optional[int] = None
cur_channels: set[int] = set()
words = deque()
buf = bytearray()
while not self._stop.is_set():
data = chunk_reader.read_available()
if data:
buf += data
else:
time.sleep(0.0005)
continue
usable = len(buf) & ~1
if usable == 0:
continue
i = 0
while i < usable:
w = int(buf[i]) | (int(buf[i + 1]) << 8)
words.append(w)
i += 2
# Бинарный протокол:
# старт свипа (актуальный): 0xFFFF, 0xFFFF, 0xFFFF, (ch<<8)|0x0A
# старт свипа (legacy): 0xFFFF, 0xFFFF, channel, 0x0A0A
# точка: step, value_hi, value_lo, 0x000A
while len(words) >= 4:
w0 = int(words[0])
w1 = int(words[1])
w2 = int(words[2])
w3 = int(words[3])
if w0 == 0xFFFF and w1 == 0xFFFF and w2 == 0xFFFF and (w3 & 0x00FF) == 0x000A:
self._finalize_current(xs, ys, cur_channels)
xs.clear()
ys.clear()
cur_channels.clear()
cur_channel = (w3 >> 8) & 0x00FF
cur_channels.add(cur_channel)
for _ in range(4):
words.popleft()
continue
if w0 == 0xFFFF and w1 == 0xFFFF and w3 == 0x0A0A:
self._finalize_current(xs, ys, cur_channels)
xs.clear()
ys.clear()
cur_channels.clear()
cur_channel = w2
cur_channels.add(cur_channel)
for _ in range(4):
words.popleft()
continue
if w3 == 0x000A:
if cur_channel is not None:
cur_channels.add(cur_channel)
xs.append(w0)
value_u32 = (w1 << 16) | w2
ys.append(self._u32_to_i32(value_u32))
for _ in range(4):
words.popleft()
continue
# Поток может начаться с середины пакета; сдвигаемся по слову до ресинхронизации.
words.popleft()
del buf[:usable]
if len(buf) > 1_000_000:
del buf[:-262144]
self._finalize_current(xs, ys, cur_channels)
def run(self):
try: try:
self._src = SerialLineSource(self._port_path, self._baud, timeout=1.0) self._src = SerialLineSource(self._port_path, self._baud, timeout=1.0)
@ -150,66 +294,11 @@ class SweepReader(threading.Thread):
try: try:
chunk_reader = SerialChunkReader(self._src) chunk_reader = SerialChunkReader(self._src)
buf = bytearray() if self._bin_mode:
while not self._stop.is_set(): self._run_binary_stream(chunk_reader)
data = chunk_reader.read_available() else:
if data: self._run_ascii_stream(chunk_reader)
buf += data
else:
time.sleep(0.0005)
continue
while True:
nl = buf.find(b"\n")
if nl == -1:
break
line = bytes(buf[:nl])
del buf[: nl + 1]
if line.endswith(b"\r"):
line = line[:-1]
if not line:
continue
if line.startswith(b"Sweep_start"):
self._finalize_current(xs, ys, cur_channels)
xs.clear()
ys.clear()
cur_channel = None
cur_channels.clear()
continue
if len(line) >= 3:
parts = line.split()
if len(parts) >= 3 and (parts[0].lower() == b"s" or parts[0].lower().startswith(b"s")):
try:
if parts[0].lower() == b"s":
if len(parts) >= 4:
ch = int(parts[1], 10)
x = int(parts[2], 10)
y = int(parts[3], 10)
else:
ch = 0
x = int(parts[1], 10)
y = int(parts[2], 10)
else:
ch = int(parts[0][1:], 10)
x = int(parts[1], 10)
y = int(parts[2], 10)
except Exception:
continue
if cur_channel is None:
cur_channel = ch
cur_channels.add(ch)
xs.append(x)
ys.append(y)
if len(buf) > 1_000_000:
del buf[:-262144]
finally: finally:
try:
self._finalize_current(xs, ys, cur_channels)
except Exception:
pass
try: try:
if self._src is not None: if self._src is not None:
self._src.close() self._src.close()

9
rfg_adc_plotter/main.py Normal file → Executable file
View File

@ -77,6 +77,15 @@ def build_parser() -> argparse.ArgumentParser:
default="projector", default="projector",
help="Тип нормировки: projector (по огибающим в [-1000,+1000]) или simple (raw/calib)", help="Тип нормировки: projector (по огибающим в [-1000,+1000]) или simple (raw/calib)",
) )
parser.add_argument(
"--bin",
dest="bin_mode",
action="store_true",
help=(
"Бинарный протокол: старт свипа 0xFFFF,0xFFFF,0xFFFF,(CH<<8)|0x0A; "
"точки step,uint32(hi16,lo16),0x000A"
),
)
return parser return parser

View File

@ -18,7 +18,11 @@ def normalize_simple(raw: np.ndarray, calib: np.ndarray) -> np.ndarray:
def build_calib_envelopes(calib: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: def build_calib_envelopes(calib: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""Оценить нижнюю/верхнюю огибающие калибровочной кривой.""" """Оценить огибающую по модулю сигнала.
Возвращает (lower, upper) = (-envelope, +envelope), где envelope —
интерполяция через локальные максимумы |calib|.
"""
n = int(calib.size) n = int(calib.size)
if n <= 0: if n <= 0:
empty = np.zeros((0,), dtype=np.float32) empty = np.zeros((0,), dtype=np.float32)
@ -35,11 +39,14 @@ def build_calib_envelopes(calib: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
y = y.copy() y = y.copy()
y[~finite] = np.interp(x[~finite], x[finite], y[finite]).astype(np.float32) y[~finite] = np.interp(x[~finite], x[finite], y[finite]).astype(np.float32)
if n < 3: a = np.abs(y)
return y.copy(), y.copy()
dy = np.diff(y) if n < 3:
s = np.sign(dy).astype(np.int8, copy=False) env = a.copy()
return -env, env
da = np.diff(a)
s = np.sign(da).astype(np.int8, copy=False)
if np.any(s == 0): if np.any(s == 0):
for i in range(1, s.size): for i in range(1, s.size):
@ -51,27 +58,16 @@ def build_calib_envelopes(calib: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
s[s == 0] = 1 s[s == 0] = 1
max_idx = np.where((s[:-1] > 0) & (s[1:] < 0))[0] + 1 max_idx = np.where((s[:-1] > 0) & (s[1:] < 0))[0] + 1
min_idx = np.where((s[:-1] < 0) & (s[1:] > 0))[0] + 1
x = np.arange(n, dtype=np.float32) x = np.arange(n, dtype=np.float32)
def _interp_nodes(nodes: np.ndarray) -> np.ndarray: if max_idx.size == 0:
if nodes.size == 0: idx = np.array([0, n - 1], dtype=np.int64)
idx = np.array([0, n - 1], dtype=np.int64) else:
else: idx = np.unique(np.concatenate(([0], max_idx, [n - 1]))).astype(np.int64)
idx = np.unique(np.concatenate(([0], nodes, [n - 1]))).astype(np.int64) env = np.interp(x, idx.astype(np.float32), a[idx]).astype(np.float32)
return np.interp(x, idx.astype(np.float32), y[idx]).astype(np.float32)
upper = _interp_nodes(max_idx) return -env, env
lower = _interp_nodes(min_idx)
swap = lower > upper
if np.any(swap):
tmp = upper[swap].copy()
upper[swap] = lower[swap]
lower[swap] = tmp
return lower, upper
def normalize_projector(raw: np.ndarray, calib: np.ndarray) -> np.ndarray: def normalize_projector(raw: np.ndarray, calib: np.ndarray) -> np.ndarray:
@ -113,3 +109,41 @@ def normalize_by_calib(raw: np.ndarray, calib: np.ndarray, norm_type: str) -> np
if nt == "simple": if nt == "simple":
return normalize_simple(raw, calib) return normalize_simple(raw, calib)
return normalize_projector(raw, calib) return normalize_projector(raw, calib)
def normalize_by_envelope(raw: np.ndarray, envelope: np.ndarray) -> np.ndarray:
"""Нормировка свипа через проекцию на огибающую из файла.
Воспроизводит логику normalize_projector: проецирует raw в [-1000, +1000]
используя готовую верхнюю огибающую (upper = envelope, lower = -envelope).
"""
w = min(raw.size, envelope.size)
if w <= 0:
return raw
out = np.full_like(raw, np.nan, dtype=np.float32)
raw_seg = np.asarray(raw[:w], dtype=np.float32)
upper = np.asarray(envelope[:w], dtype=np.float32)
lower = -upper
span = upper - lower # = 2 * upper
finite_span = span[np.isfinite(span) & (span > 0)]
if finite_span.size > 0:
eps = max(float(np.median(finite_span)) * 1e-6, 1e-9)
else:
eps = 1e-9
valid = (
np.isfinite(raw_seg)
& np.isfinite(lower)
& np.isfinite(upper)
& (span > eps)
)
if np.any(valid):
proj = np.empty_like(raw_seg, dtype=np.float32)
proj[valid] = ((2.0 * (raw_seg[valid] - lower[valid]) / span[valid]) - 1.0) * 1000.0
proj[valid] = np.clip(proj[valid], -1000.0, 1000.0)
proj[~valid] = np.nan
out[:w] = proj
return out

View File

@ -1,14 +1,22 @@
"""Состояние приложения: текущие свипы и настройки калибровки/нормировки.""" """Состояние приложения: текущие свипы и настройки калибровки/нормировки."""
import os
from queue import Empty, Queue from queue import Empty, Queue
from typing import Any, Dict, Mapping, Optional from typing import Any, Dict, Mapping, Optional
import numpy as np import numpy as np
from rfg_adc_plotter.processing.normalizer import normalize_by_calib from rfg_adc_plotter.processing.normalizer import (
build_calib_envelopes,
normalize_by_calib,
normalize_by_envelope,
)
from rfg_adc_plotter.state.ring_buffer import RingBuffer from rfg_adc_plotter.state.ring_buffer import RingBuffer
from rfg_adc_plotter.types import SweepInfo, SweepPacket from rfg_adc_plotter.types import SweepInfo, SweepPacket
CALIB_ENVELOPE_PATH = "calib_envelope.npy"
BACKGROUND_PATH = "background.npy"
def format_status(data: Mapping[str, Any]) -> str: def format_status(data: Mapping[str, Any]) -> str:
"""Преобразовать словарь метрик в одну строку 'k:v'.""" """Преобразовать словарь метрик в одну строку 'k:v'."""
@ -44,21 +52,106 @@ class AppState:
self.current_info: Optional[SweepInfo] = None self.current_info: Optional[SweepInfo] = None
self.calib_enabled: bool = False self.calib_enabled: bool = False
self.norm_type: str = norm_type self.norm_type: str = norm_type
# "live" — нормировка по текущему ch0-свипу; "file" — по огибающей из файла
self.calib_mode: str = "live"
self.calib_file_envelope: Optional[np.ndarray] = None
# Вычет фона
self.background: Optional[np.ndarray] = None
self.background_enabled: bool = False
self._last_sweep_for_ring: Optional[np.ndarray] = None
def _normalize(self, raw: np.ndarray, calib: np.ndarray) -> np.ndarray: def _normalize(self, raw: np.ndarray, calib: np.ndarray) -> np.ndarray:
if self.calib_mode == "file" and self.calib_file_envelope is not None:
return normalize_by_envelope(raw, self.calib_file_envelope)
return normalize_by_calib(raw, calib, self.norm_type) return normalize_by_calib(raw, calib, self.norm_type)
def save_calib_envelope(self, path: str = CALIB_ENVELOPE_PATH) -> bool:
"""Вычислить огибающую из last_calib_sweep и сохранить в файл.
Возвращает True при успехе.
"""
if self.last_calib_sweep is None:
return False
try:
_lower, upper = build_calib_envelopes(self.last_calib_sweep)
np.save(path, upper)
return True
except Exception as exc:
import sys
sys.stderr.write(f"[warn] Не удалось сохранить огибающую: {exc}\n")
return False
def load_calib_envelope(self, path: str = CALIB_ENVELOPE_PATH) -> bool:
"""Загрузить огибающую из файла.
Возвращает True при успехе.
"""
if not os.path.isfile(path):
return False
try:
env = np.load(path)
self.calib_file_envelope = np.asarray(env, dtype=np.float32)
return True
except Exception as exc:
import sys
sys.stderr.write(f"[warn] Не удалось загрузить огибающую: {exc}\n")
return False
def set_calib_mode(self, mode: str):
"""Переключить режим калибровки: 'live' или 'file'."""
self.calib_mode = mode
def save_background(self, path: str = BACKGROUND_PATH) -> bool:
"""Сохранить текущий sweep_for_ring как фоновый спектр.
Сохраняет последний свип, который был записан в ринг-буфер
(нормированный, если калибровка включена, иначе сырой).
Возвращает True при успехе.
"""
if self._last_sweep_for_ring is None:
return False
try:
np.save(path, self._last_sweep_for_ring)
return True
except Exception as exc:
import sys
sys.stderr.write(f"[warn] Не удалось сохранить фон: {exc}\n")
return False
def load_background(self, path: str = BACKGROUND_PATH) -> bool:
"""Загрузить фоновый спектр из файла.
Возвращает True при успехе.
"""
if not os.path.isfile(path):
return False
try:
bg = np.load(path)
self.background = np.asarray(bg, dtype=np.float32)
return True
except Exception as exc:
import sys
sys.stderr.write(f"[warn] Не удалось загрузить фон: {exc}\n")
return False
def set_background_enabled(self, enabled: bool):
"""Включить/выключить вычет фона."""
self.background_enabled = enabled
def set_calib_enabled(self, enabled: bool): def set_calib_enabled(self, enabled: bool):
"""Включить/выключить режим калибровки, пересчитать norm-свип.""" """Включить/выключить режим калибровки, пересчитать norm-свип."""
self.calib_enabled = enabled self.calib_enabled = enabled
if ( if self.calib_enabled and self.current_sweep_raw is not None:
self.calib_enabled if self.calib_mode == "file" and self.calib_file_envelope is not None:
and self.current_sweep_raw is not None self.current_sweep_norm = normalize_by_envelope(
and self.last_calib_sweep is not None self.current_sweep_raw, self.calib_file_envelope
): )
self.current_sweep_norm = self._normalize( elif self.calib_mode == "live" and self.last_calib_sweep is not None:
self.current_sweep_raw, self.last_calib_sweep self.current_sweep_norm = self._normalize(
) self.current_sweep_raw, self.last_calib_sweep
)
else:
self.current_sweep_norm = None
else: else:
self.current_sweep_norm = None self.current_sweep_norm = None
@ -86,16 +179,31 @@ class AppState:
# Канал 0 — опорный (калибровочный) свип # Канал 0 — опорный (калибровочный) свип
if ch == 0: if ch == 0:
self.last_calib_sweep = s self.last_calib_sweep = s
self.save_calib_envelope()
self.current_sweep_norm = None self.current_sweep_norm = None
sweep_for_ring = s sweep_for_ring = s
self._last_sweep_for_ring = sweep_for_ring
else: else:
if self.calib_enabled and self.last_calib_sweep is not None: can_normalize = self.calib_enabled and (
self.current_sweep_norm = self._normalize(s, self.last_calib_sweep) (self.calib_mode == "file" and self.calib_file_envelope is not None)
or (self.calib_mode == "live" and self.last_calib_sweep is not None)
)
if can_normalize:
calib_ref = self.last_calib_sweep if self.last_calib_sweep is not None else s
self.current_sweep_norm = self._normalize(s, calib_ref)
sweep_for_ring = self.current_sweep_norm sweep_for_ring = self.current_sweep_norm
else: else:
self.current_sweep_norm = None self.current_sweep_norm = None
sweep_for_ring = s sweep_for_ring = s
# Вычет фона (в том же домене что и sweep_for_ring)
if self.background_enabled and self.background is not None and ch != 0:
w = min(sweep_for_ring.size, self.background.size)
sweep_for_ring = sweep_for_ring.copy()
sweep_for_ring[:w] -= self.background[:w]
self.current_sweep_norm = sweep_for_ring
self._last_sweep_for_ring = sweep_for_ring
ring.ensure_init(s.size) ring.ensure_init(s.size)
ring.push(sweep_for_ring) ring.push(sweep_for_ring)
return drained return drained

View File

@ -5,7 +5,15 @@ from typing import Optional, Tuple
import numpy as np import numpy as np
from rfg_adc_plotter.constants import FFT_LEN, WF_WIDTH from rfg_adc_plotter.constants import (
FFT_LEN,
FREQ_SPAN_GHZ,
IFFT_LEN,
SWEEP_LEN,
WF_WIDTH,
ZEROS_LOW,
ZEROS_MID,
)
class RingBuffer: class RingBuffer:
@ -17,7 +25,7 @@ class RingBuffer:
def __init__(self, max_sweeps: int): def __init__(self, max_sweeps: int):
self.max_sweeps = max_sweeps self.max_sweeps = max_sweeps
self.fft_bins = FFT_LEN // 2 + 1 self.fft_bins = IFFT_LEN # = 1953 (полная длина IFFT-результата)
# Инициализируются при первом свипе (ensure_init) # Инициализируются при первом свипе (ensure_init)
self.ring: Optional[np.ndarray] = None # (max_sweeps, WF_WIDTH) self.ring: Optional[np.ndarray] = None # (max_sweeps, WF_WIDTH)
@ -26,7 +34,7 @@ class RingBuffer:
self.head: int = 0 self.head: int = 0
self.width: Optional[int] = None self.width: Optional[int] = None
self.x_shared: Optional[np.ndarray] = None self.x_shared: Optional[np.ndarray] = None
self.freq_shared: Optional[np.ndarray] = None self.fft_time_axis: Optional[np.ndarray] = None # временная ось IFFT в нс
self.y_min_fft: Optional[float] = None self.y_min_fft: Optional[float] = None
self.y_max_fft: Optional[float] = None self.y_max_fft: Optional[float] = None
# FFT последнего свипа (для отображения без повторного вычисления) # FFT последнего свипа (для отображения без повторного вычисления)
@ -37,16 +45,20 @@ class RingBuffer:
return self.ring is not None return self.ring is not None
def ensure_init(self, sweep_width: int): def ensure_init(self, sweep_width: int):
"""Инициализировать буферы при первом свипе. Повторные вызовы — no-op.""" """Инициализировать буферы при первом свипе. Повторные вызовы — no-op (кроме x_shared)."""
if self.ring is not None: if self.ring is None:
return self.width = WF_WIDTH
self.width = WF_WIDTH self.ring = np.full((self.max_sweeps, self.width), np.nan, dtype=np.float32)
self.x_shared = np.arange(self.width, dtype=np.int32) self.ring_time = np.full((self.max_sweeps,), np.nan, dtype=np.float64)
self.ring = np.full((self.max_sweeps, self.width), np.nan, dtype=np.float32) self.ring_fft = np.full((self.max_sweeps, self.fft_bins), np.nan, dtype=np.float32)
self.ring_time = np.full((self.max_sweeps,), np.nan, dtype=np.float64) # Временная ось IFFT: шаг dt = 1/(FREQ_SPAN_GHZ*1e9), переведём в нс
self.ring_fft = np.full((self.max_sweeps, self.fft_bins), np.nan, dtype=np.float32) self.fft_time_axis = (
self.freq_shared = np.arange(self.fft_bins, dtype=np.int32) np.arange(IFFT_LEN, dtype=np.float64) / (FREQ_SPAN_GHZ * 1e9) * 1e9
self.head = 0 ).astype(np.float32)
self.head = 0
# Обновляем x_shared если пришёл свип большего размера
if self.x_shared is None or sweep_width > self.x_shared.size:
self.x_shared = np.linspace(3.323, 14.323, sweep_width, dtype=np.float32)
def push(self, s: np.ndarray): def push(self, s: np.ndarray):
"""Добавить строку свипа в кольцевой буфер, вычислить FFT-строку.""" """Добавить строку свипа в кольцевой буфер, вычислить FFT-строку."""
@ -63,20 +75,29 @@ class RingBuffer:
self._push_fft(s) self._push_fft(s)
def _push_fft(self, s: np.ndarray): def _push_fft(self, s: np.ndarray):
bins = self.ring_fft.shape[1] bins = self.ring_fft.shape[1] # = IFFT_LEN = 1953
take_fft = min(int(s.size), FFT_LEN) if s is None or s.size == 0:
if take_fft <= 0:
fft_row = np.full((bins,), np.nan, dtype=np.float32) fft_row = np.full((bins,), np.nan, dtype=np.float32)
else: else:
fft_in = np.zeros((FFT_LEN,), dtype=np.float32) # 1. Взять первые SWEEP_LEN отсчётов (остаток — нули если свип короче)
seg = np.nan_to_num(s[:take_fft], nan=0.0).astype(np.float32, copy=False) sig = np.zeros(SWEEP_LEN, dtype=np.float32)
win = np.hanning(take_fft).astype(np.float32) take = min(int(s.size), SWEEP_LEN)
fft_in[:take_fft] = seg * win seg = np.nan_to_num(s[:take], nan=0.0).astype(np.float32, copy=False)
spec = np.fft.rfft(fft_in) sig[:take] = seg
mag = np.abs(spec).astype(np.float32)
# 2. Собрать двусторонний спектр:
# [ZEROS_LOW нулей | ZEROS_MID нулей | SWEEP_LEN данных]
# = [-14.3..-3.2 ГГц | -3.2..+3.2 ГГц | +3.2..+14.3 ГГц]
data = np.zeros(IFFT_LEN, dtype=np.complex64)
data[ZEROS_LOW + ZEROS_MID:] = sig
# 3. ifftshift + ifft → временной профиль
spec = np.fft.ifftshift(data)
result = np.fft.ifft(spec)
# 4. Амплитуда в дБ
mag = np.abs(result).astype(np.float32)
fft_row = (20.0 * np.log10(mag + 1e-9)).astype(np.float32) fft_row = (20.0 * np.log10(mag + 1e-9)).astype(np.float32)
if fft_row.shape[0] != bins:
fft_row = fft_row[:bins]
prev_head = (self.head - 1) % self.ring_fft.shape[0] prev_head = (self.head - 1) % self.ring_fft.shape[0]
self.ring_fft[prev_head, :] = fft_row self.ring_fft[prev_head, :] = fft_row

2
run_dataplotter Executable file
View File

@ -0,0 +1,2 @@
#!/usr/bin/bash
python3 -m rfg_adc_plotter.main --bin --backend mpl $@