implemented calibration: last s0 sweep stored and used as calibration val. If checkbox []calibrate is active -- normalised val used for feature processing

This commit is contained in:
2026-02-09 20:55:09 +03:00
parent 869d5baebc
commit 3074859793

View File

@ -38,7 +38,7 @@ FFT_LEN = 1024 # длина БПФ для спектра/водопада сп
DATA_INVERSION_THRASHOLD = 10.0 DATA_INVERSION_THRASHOLD = 10.0
Number = Union[int, float] Number = Union[int, float]
SweepInfo = Dict[str, Number] SweepInfo = Dict[str, Any]
SweepPacket = Tuple[np.ndarray, SweepInfo] SweepPacket = Tuple[np.ndarray, SweepInfo]
@ -338,7 +338,7 @@ class SweepReader(threading.Thread):
sweep *= -1.0 sweep *= -1.0
except Exception: except Exception:
pass pass
sweep -= float(np.nanmean(sweep)) #sweep -= float(np.nanmean(sweep))
# Метрики для статусной строки (вид словаря: переменная -> значение) # Метрики для статусной строки (вид словаря: переменная -> значение)
self._sweep_idx += 1 self._sweep_idx += 1
@ -549,7 +549,7 @@ def main():
import matplotlib import matplotlib
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation from matplotlib.animation import FuncAnimation
from matplotlib.widgets import Slider from matplotlib.widgets import Slider, CheckButtons
except Exception as e: except Exception as e:
sys.stderr.write(f"[error] Нужны matplotlib и ее зависимости: {e}\n") sys.stderr.write(f"[error] Нужны matplotlib и ее зависимости: {e}\n")
sys.exit(1) sys.exit(1)
@ -568,7 +568,9 @@ def main():
fig.subplots_adjust(wspace=0.25, hspace=0.35, left=0.07, right=0.90, top=0.92, bottom=0.08) fig.subplots_adjust(wspace=0.25, hspace=0.35, left=0.07, right=0.90, top=0.92, bottom=0.08)
# Состояние для отображения # Состояние для отображения
current_sweep: Optional[np.ndarray] = None current_sweep_raw: Optional[np.ndarray] = None
current_sweep_norm: Optional[np.ndarray] = None
last_calib_sweep: Optional[np.ndarray] = None
current_info: Optional[SweepInfo] = None current_info: Optional[SweepInfo] = None
x_shared: Optional[np.ndarray] = None x_shared: Optional[np.ndarray] = None
width: Optional[int] = None width: Optional[int] = None
@ -589,6 +591,8 @@ def main():
ymin_slider = None ymin_slider = None
ymax_slider = None ymax_slider = None
contrast_slider = None contrast_slider = None
calib_enabled = False
cb = None
# Статусная строка (внизу окна) # Статусная строка (внизу окна)
status_text = fig.text( status_text = fig.text(
@ -602,7 +606,9 @@ def main():
) )
# Линейный график последнего свипа # Линейный график последнего свипа
line_obj, = ax_line.plot([], [], lw=1) line_obj, = ax_line.plot([], [], lw=1, color="tab:blue")
line_calib_obj, = ax_line.plot([], [], lw=1, color="tab:red")
line_norm_obj, = ax_line.plot([], [], lw=1, color="tab:green")
ax_line.set_title("Сырые данные", pad=1) ax_line.set_title("Сырые данные", pad=1)
ax_line.set_xlabel("F") ax_line.set_xlabel("F")
ax_line.set_ylabel("") ax_line.set_ylabel("")
@ -668,14 +674,37 @@ def main():
ax_spec.tick_params(axis="x", labelbottom=False) ax_spec.tick_params(axis="x", labelbottom=False)
except Exception: except Exception:
pass pass
def _normalize_sweep(raw: np.ndarray, calib: np.ndarray) -> np.ndarray:
w = min(raw.size, calib.size)
if w <= 0:
return raw
out = np.full_like(raw, np.nan, dtype=np.float32)
with np.errstate(divide="ignore", invalid="ignore"):
out[:w] = raw[:w] / calib[:w]
out = np.nan_to_num(out, nan=np.nan, posinf=np.nan, neginf=np.nan)
return out
def _set_calib_enabled():
nonlocal calib_enabled, current_sweep_norm
try:
calib_enabled = bool(cb.get_status()[0]) if cb is not None else False
except Exception:
calib_enabled = False
if calib_enabled and current_sweep_raw is not None and last_calib_sweep is not None:
current_sweep_norm = _normalize_sweep(current_sweep_raw, last_calib_sweep)
else:
current_sweep_norm = None
# Слайдеры для управления осью Y B-scan (мин/макс) и контрастом # Слайдеры для управления осью Y B-scan (мин/макс) и контрастом
try: try:
ax_smin = fig.add_axes([0.92, 0.55, 0.02, 0.35]) ax_smin = fig.add_axes([0.92, 0.55, 0.02, 0.35])
ax_smax = fig.add_axes([0.95, 0.55, 0.02, 0.35]) ax_smax = fig.add_axes([0.95, 0.55, 0.02, 0.35])
ax_sctr = fig.add_axes([0.98, 0.55, 0.02, 0.35]) ax_sctr = fig.add_axes([0.98, 0.55, 0.02, 0.35])
ax_cb = fig.add_axes([0.92, 0.45, 0.08, 0.08])
ymin_slider = Slider(ax_smin, "Y min", 0, max(1, fft_bins - 1), valinit=0, valstep=1, orientation="vertical") ymin_slider = Slider(ax_smin, "Y min", 0, max(1, fft_bins - 1), valinit=0, valstep=1, orientation="vertical")
ymax_slider = Slider(ax_smax, "Y max", 0, max(1, fft_bins - 1), valinit=max(1, fft_bins - 1), valstep=1, orientation="vertical") ymax_slider = Slider(ax_smax, "Y max", 0, max(1, fft_bins - 1), valinit=max(1, fft_bins - 1), valstep=1, orientation="vertical")
contrast_slider = Slider(ax_sctr, "Int max", 0, 100, valinit=100, valstep=1, orientation="vertical") contrast_slider = Slider(ax_sctr, "Int max", 0, 100, valinit=100, valstep=1, orientation="vertical")
cb = CheckButtons(ax_cb, ["калибровка"], [False])
def _on_ylim_change(_val): def _on_ylim_change(_val):
try: try:
@ -690,6 +719,7 @@ def main():
ymax_slider.on_changed(_on_ylim_change) ymax_slider.on_changed(_on_ylim_change)
# Контраст влияет на верхнюю границу цветовой шкалы (процент от авто-диапазона) # Контраст влияет на верхнюю границу цветовой шкалы (процент от авто-диапазона)
contrast_slider.on_changed(lambda _v: fig.canvas.draw_idle()) contrast_slider.on_changed(lambda _v: fig.canvas.draw_idle())
cb.on_clicked(lambda _v: _set_calib_enabled())
except Exception: except Exception:
pass pass
@ -698,6 +728,7 @@ def main():
interval_ms = int(1000.0 / max_fps) interval_ms = int(1000.0 / max_fps)
frames_since_ylim_update = 0 frames_since_ylim_update = 0
def ensure_buffer(_w: int): def ensure_buffer(_w: int):
nonlocal ring, width, head, x_shared, ring_fft, freq_shared, ring_time nonlocal ring, width, head, x_shared, ring_fft, freq_shared, ring_time
if ring is not None: if ring is not None:
@ -800,7 +831,7 @@ def main():
y_max_fft = float(fr_max) y_max_fft = float(fr_max)
def drain_queue(): def drain_queue():
nonlocal current_sweep, current_info nonlocal current_sweep_raw, current_sweep_norm, current_info, last_calib_sweep
drained = 0 drained = 0
while True: while True:
try: try:
@ -808,10 +839,26 @@ def main():
except Empty: except Empty:
break break
drained += 1 drained += 1
current_sweep = s current_sweep_raw = s
current_info = info current_info = info
ch = 0
try:
ch = int(info.get("ch", 0)) if isinstance(info, dict) else 0
except Exception:
ch = 0
if ch == 0:
last_calib_sweep = s
current_sweep_norm = None
sweep_for_proc = s
else:
if calib_enabled and last_calib_sweep is not None:
current_sweep_norm = _normalize_sweep(s, last_calib_sweep)
sweep_for_proc = current_sweep_norm
else:
current_sweep_norm = None
sweep_for_proc = s
ensure_buffer(s.size) ensure_buffer(s.size)
push_sweep(s) push_sweep(sweep_for_proc)
return drained return drained
def make_display_ring(): def make_display_ring():
@ -856,18 +903,26 @@ def main():
changed = drain_queue() > 0 changed = drain_queue() > 0
# Обновление линии последнего свипа # Обновление линии последнего свипа
if current_sweep is not None: if current_sweep_raw is not None:
if x_shared is not None and current_sweep.size <= x_shared.size: if x_shared is not None and current_sweep_raw.size <= x_shared.size:
xs = x_shared[: current_sweep.size] xs = x_shared[: current_sweep_raw.size]
else: else:
xs = np.arange(current_sweep.size, dtype=np.int32) xs = np.arange(current_sweep_raw.size, dtype=np.int32)
line_obj.set_data(xs, current_sweep) line_obj.set_data(xs, current_sweep_raw)
if last_calib_sweep is not None:
line_calib_obj.set_data(xs[: last_calib_sweep.size], last_calib_sweep)
else:
line_calib_obj.set_data([], [])
if current_sweep_norm is not None:
line_norm_obj.set_data(xs[: current_sweep_norm.size], current_sweep_norm)
else:
line_norm_obj.set_data([], [])
# Лимиты по X постоянные под текущую ширину # Лимиты по X постоянные под текущую ширину
ax_line.set_xlim(0, max(1, current_sweep.size - 1)) ax_line.set_xlim(0, max(1, current_sweep_raw.size - 1))
# Адаптивные Y-лимиты (если не задан --ylim) # Адаптивные Y-лимиты (если не задан --ylim)
if fixed_ylim is None: if fixed_ylim is None:
y0 = float(np.nanmin(current_sweep)) y0 = float(np.nanmin(current_sweep_raw))
y1 = float(np.nanmax(current_sweep)) y1 = float(np.nanmax(current_sweep_raw))
if np.isfinite(y0) and np.isfinite(y1): if np.isfinite(y0) and np.isfinite(y1):
if y0 == y1: if y0 == y1:
pad = max(1.0, abs(y0) * 0.05) pad = max(1.0, abs(y0) * 0.05)
@ -880,10 +935,11 @@ def main():
ax_line.set_ylim(y0, y1) ax_line.set_ylim(y0, y1)
# Обновление спектра текущего свипа # Обновление спектра текущего свипа
take_fft = min(int(current_sweep.size), FFT_LEN) sweep_for_fft = current_sweep_norm if current_sweep_norm is not None else current_sweep_raw
take_fft = min(int(sweep_for_fft.size), FFT_LEN)
if take_fft > 0 and freq_shared is not None: if take_fft > 0 and freq_shared is not None:
fft_in = np.zeros((FFT_LEN,), dtype=np.float32) fft_in = np.zeros((FFT_LEN,), dtype=np.float32)
seg = np.nan_to_num(current_sweep[:take_fft], nan=0.0).astype(np.float32, copy=False) seg = np.nan_to_num(sweep_for_fft[:take_fft], nan=0.0).astype(np.float32, copy=False)
win = np.hanning(take_fft).astype(np.float32) win = np.hanning(take_fft).astype(np.float32)
fft_in[:take_fft] = seg * win fft_in[:take_fft] = seg * win
spec = np.fft.rfft(fft_in) spec = np.fft.rfft(fft_in)
@ -964,7 +1020,16 @@ def main():
channel_text.set_text(f"chs {chs}") channel_text.set_text(f"chs {chs}")
# Возвращаем обновлённые артисты # Возвращаем обновлённые артисты
return (line_obj, img_obj, fft_line_obj, img_fft_obj, status_text, channel_text) return (
line_obj,
line_calib_obj,
line_norm_obj,
img_obj,
fft_line_obj,
img_fft_obj,
status_text,
channel_text,
)
ani = FuncAnimation(fig, update, interval=interval_ms, blit=False) ani = FuncAnimation(fig, update, interval=interval_ms, blit=False)
@ -1010,6 +1075,8 @@ def run_pyqtgraph(args):
p_line = win.addPlot(row=0, col=0, title="Сырые данные") p_line = win.addPlot(row=0, col=0, title="Сырые данные")
p_line.showGrid(x=True, y=True, alpha=0.3) p_line.showGrid(x=True, y=True, alpha=0.3)
curve = p_line.plot(pen=pg.mkPen((80, 120, 255), width=1)) curve = p_line.plot(pen=pg.mkPen((80, 120, 255), width=1))
curve_calib = p_line.plot(pen=pg.mkPen((220, 60, 60), width=1))
curve_norm = p_line.plot(pen=pg.mkPen((60, 180, 90), width=1))
p_line.setLabel("bottom", "X") p_line.setLabel("bottom", "X")
p_line.setLabel("left", "Y") p_line.setLabel("left", "Y")
ch_text = pg.TextItem("", anchor=(1, 1)) ch_text = pg.TextItem("", anchor=(1, 1))
@ -1049,9 +1116,15 @@ def run_pyqtgraph(args):
img_fft = pg.ImageItem() img_fft = pg.ImageItem()
p_spec.addItem(img_fft) p_spec.addItem(img_fft)
# Чекбокс калибровки
calib_cb = QtWidgets.QCheckBox("калибровка")
cb_proxy = QtWidgets.QGraphicsProxyWidget()
cb_proxy.setWidget(calib_cb)
win.addItem(cb_proxy, row=2, col=1)
# Статусная строка (внизу окна) # Статусная строка (внизу окна)
status = pg.LabelItem(justify="left") status = pg.LabelItem(justify="left")
win.addItem(status, row=2, col=0, colspan=2) win.addItem(status, row=3, col=0, colspan=2)
# Состояние # Состояние
ring: Optional[np.ndarray] = None ring: Optional[np.ndarray] = None
@ -1059,7 +1132,9 @@ def run_pyqtgraph(args):
head = 0 head = 0
width: Optional[int] = None width: Optional[int] = None
x_shared: Optional[np.ndarray] = None x_shared: Optional[np.ndarray] = None
current_sweep: Optional[np.ndarray] = None current_sweep_raw: Optional[np.ndarray] = None
current_sweep_norm: Optional[np.ndarray] = None
last_calib_sweep: Optional[np.ndarray] = None
current_info: Optional[SweepInfo] = None current_info: Optional[SweepInfo] = None
# Авто-уровни цветовой шкалы водопада сырых данных пересчитываются по видимой области. # Авто-уровни цветовой шкалы водопада сырых данных пересчитываются по видимой области.
# Для спектров # Для спектров
@ -1070,6 +1145,7 @@ def run_pyqtgraph(args):
# Параметры контраста водопада спектров (процентильная обрезка) # Параметры контраста водопада спектров (процентильная обрезка)
spec_clip = _parse_spec_clip(getattr(args, "spec_clip", None)) spec_clip = _parse_spec_clip(getattr(args, "spec_clip", None))
spec_mean_sec = float(getattr(args, "spec_mean_sec", 0.0)) spec_mean_sec = float(getattr(args, "spec_mean_sec", 0.0))
calib_enabled = False
# Диапазон по Y: авто по умолчанию (поддерживает отрицательные значения) # Диапазон по Y: авто по умолчанию (поддерживает отрицательные значения)
fixed_ylim: Optional[Tuple[float, float]] = None fixed_ylim: Optional[Tuple[float, float]] = None
if args.ylim: if args.ylim:
@ -1081,6 +1157,32 @@ def run_pyqtgraph(args):
if fixed_ylim is not None: if fixed_ylim is not None:
p_line.setYRange(fixed_ylim[0], fixed_ylim[1], padding=0) p_line.setYRange(fixed_ylim[0], fixed_ylim[1], padding=0)
def _normalize_sweep(raw: np.ndarray, calib: np.ndarray) -> np.ndarray:
w = min(raw.size, calib.size)
if w <= 0:
return raw
out = np.full_like(raw, np.nan, dtype=np.float32)
with np.errstate(divide="ignore", invalid="ignore"):
out[:w] = raw[:w] / calib[:w]
out = np.nan_to_num(out, nan=np.nan, posinf=np.nan, neginf=np.nan)
return out
def _set_calib_enabled():
nonlocal calib_enabled, current_sweep_norm
try:
calib_enabled = bool(calib_cb.isChecked())
except Exception:
calib_enabled = False
if calib_enabled and current_sweep_raw is not None and last_calib_sweep is not None:
current_sweep_norm = _normalize_sweep(current_sweep_raw, last_calib_sweep)
else:
current_sweep_norm = None
try:
calib_cb.stateChanged.connect(lambda _v: _set_calib_enabled())
except Exception:
pass
def ensure_buffer(_w: int): def ensure_buffer(_w: int):
nonlocal ring, ring_time, head, width, x_shared, ring_fft, freq_shared nonlocal ring, ring_time, head, width, x_shared, ring_fft, freq_shared
if ring is not None: if ring is not None:
@ -1169,7 +1271,7 @@ def run_pyqtgraph(args):
y_max_fft = float(fr_max) y_max_fft = float(fr_max)
def drain_queue(): def drain_queue():
nonlocal current_sweep, current_info nonlocal current_sweep_raw, current_sweep_norm, current_info, last_calib_sweep
drained = 0 drained = 0
while True: while True:
try: try:
@ -1177,10 +1279,26 @@ def run_pyqtgraph(args):
except Empty: except Empty:
break break
drained += 1 drained += 1
current_sweep = s current_sweep_raw = s
current_info = info current_info = info
ch = 0
try:
ch = int(info.get("ch", 0)) if isinstance(info, dict) else 0
except Exception:
ch = 0
if ch == 0:
last_calib_sweep = s
current_sweep_norm = None
sweep_for_proc = s
else:
if calib_enabled and last_calib_sweep is not None:
current_sweep_norm = _normalize_sweep(s, last_calib_sweep)
sweep_for_proc = current_sweep_norm
else:
current_sweep_norm = None
sweep_for_proc = s
ensure_buffer(s.size) ensure_buffer(s.size)
push_sweep(s) push_sweep(sweep_for_proc)
return drained return drained
# Попытка применить LUT из колормэпа (если доступен) # Попытка применить LUT из колормэпа (если доступен)
@ -1194,24 +1312,33 @@ def run_pyqtgraph(args):
def update(): def update():
changed = drain_queue() > 0 changed = drain_queue() > 0
if current_sweep is not None and x_shared is not None: if current_sweep_raw is not None and x_shared is not None:
if current_sweep.size <= x_shared.size: if current_sweep_raw.size <= x_shared.size:
xs = x_shared[: current_sweep.size] xs = x_shared[: current_sweep_raw.size]
else: else:
xs = np.arange(current_sweep.size) xs = np.arange(current_sweep_raw.size)
curve.setData(xs, current_sweep, autoDownsample=True) curve.setData(xs, current_sweep_raw, autoDownsample=True)
if last_calib_sweep is not None:
curve_calib.setData(xs[: last_calib_sweep.size], last_calib_sweep, autoDownsample=True)
else:
curve_calib.setData([], [])
if current_sweep_norm is not None:
curve_norm.setData(xs[: current_sweep_norm.size], current_sweep_norm, autoDownsample=True)
else:
curve_norm.setData([], [])
if fixed_ylim is None: if fixed_ylim is None:
y0 = float(np.nanmin(current_sweep)) y0 = float(np.nanmin(current_sweep_raw))
y1 = float(np.nanmax(current_sweep)) y1 = float(np.nanmax(current_sweep_raw))
if np.isfinite(y0) and np.isfinite(y1): if np.isfinite(y0) and np.isfinite(y1):
margin = 0.05 * max(1.0, (y1 - y0)) margin = 0.05 * max(1.0, (y1 - y0))
p_line.setYRange(y0 - margin, y1 + margin, padding=0) p_line.setYRange(y0 - margin, y1 + margin, padding=0)
# Обновим спектр # Обновим спектр
take_fft = min(int(current_sweep.size), FFT_LEN) sweep_for_fft = current_sweep_norm if current_sweep_norm is not None else current_sweep_raw
take_fft = min(int(sweep_for_fft.size), FFT_LEN)
if take_fft > 0 and freq_shared is not None: if take_fft > 0 and freq_shared is not None:
fft_in = np.zeros((FFT_LEN,), dtype=np.float32) fft_in = np.zeros((FFT_LEN,), dtype=np.float32)
seg = np.nan_to_num(current_sweep[:take_fft], nan=0.0).astype(np.float32, copy=False) seg = np.nan_to_num(sweep_for_fft[:take_fft], nan=0.0).astype(np.float32, copy=False)
win = np.hanning(take_fft).astype(np.float32) win = np.hanning(take_fft).astype(np.float32)
fft_in[:take_fft] = seg * win fft_in[:take_fft] = seg * win
spec = np.fft.rfft(fft_in) spec = np.fft.rfft(fft_in)