implemented func calibrate_freqs --it can warp frequency axis. Also movide from abstract bins and counts to freqs and distances

This commit is contained in:
2026-03-04 13:35:05 +03:00
parent ce11c38b44
commit 283631c52e

View File

@ -37,12 +37,14 @@ LOG_BASE = 10.0
LOG_SCALER = 0.001 # int32 значения приходят в fixed-point лог-шкале с шагом 1e-3 LOG_SCALER = 0.001 # int32 значения приходят в fixed-point лог-шкале с шагом 1e-3
LOG_POSTSCALER = 1000 LOG_POSTSCALER = 1000
LOG_EXP_LIMIT = 300.0 # запас до переполнения float64 при возведении LOG_BASE в степень LOG_EXP_LIMIT = 300.0 # запас до переполнения float64 при возведении LOG_BASE в степень
C_M_S = 299_792_458.0
# Порог для инверсии сырых данных: если среднее значение свипа ниже порога — # Порог для инверсии сырых данных: если среднее значение свипа ниже порога —
# считаем, что сигнал «меньше нуля» и домножаем свип на -1 # считаем, что сигнал «меньше нуля» и домножаем свип на -1
DATA_INVERSION_THRASHOLD = 10.0 DATA_INVERSION_THRASHOLD = 10.0
Number = Union[int, float] Number = Union[int, float]
SweepInfo = Dict[str, Any] SweepInfo = Dict[str, Any]
SweepData = Dict[str, np.ndarray]
SweepAuxCurves = Optional[Tuple[np.ndarray, np.ndarray]] SweepAuxCurves = Optional[Tuple[np.ndarray, np.ndarray]]
SweepPacket = Tuple[np.ndarray, SweepInfo, SweepAuxCurves] SweepPacket = Tuple[np.ndarray, SweepInfo, SweepAuxCurves]
@ -128,6 +130,120 @@ def _compute_auto_ylim(*series_list: Optional[np.ndarray]) -> Optional[Tuple[flo
return (y_min - pad, y_max + pad) return (y_min - pad, y_max + pad)
def calibrate_freqs(sweep: Mapping[str, Any]) -> SweepData:
"""Вернуть копию свипа для будущей калибровки частотной оси."""
F = np.asarray(sweep["F"], dtype=np.float64).copy()
I = np.asarray(sweep["I"], dtype=np.float64).copy()
tmp = []
C = [0,1,0]
for f in F:
val = C[0] + (f**1) * C[1] + (f**2) * C[2]
tmp.append(val)
F = np.asanyarray(tmp, dtype=np.float64)
if F.size >= 2:
F_cal = np.linspace(float(F[0]), float(F[-1]), F.size, dtype=np.float64)
I_cal = np.interp(F_cal, F, I).astype(np.float64)
else:
F_cal = F.copy()
I_cal = I.copy()
return {
"F": F_cal,
"I": I_cal,
}
def _prepare_fft_segment(
sweep: np.ndarray,
freqs: Optional[np.ndarray],
fft_len: int = FFT_LEN,
) -> Optional[Tuple[np.ndarray, int]]:
"""Подготовить свип к FFT, пересэмплируя его на равномерную сетку по частоте."""
take_fft = min(int(sweep.size), int(fft_len))
if take_fft <= 0:
return None
sweep_seg = np.asarray(sweep[:take_fft], dtype=np.float32)
fallback = np.nan_to_num(sweep_seg, nan=0.0).astype(np.float32, copy=False)
if freqs is None:
return fallback, take_fft
freq_arr = np.asarray(freqs)
if freq_arr.size < take_fft:
return fallback, take_fft
freq_seg = np.asarray(freq_arr[:take_fft], dtype=np.float64)
valid = np.isfinite(sweep_seg) & np.isfinite(freq_seg)
if int(np.count_nonzero(valid)) < 2:
return fallback, take_fft
x_valid = freq_seg[valid]
y_valid = sweep_seg[valid]
order = np.argsort(x_valid, kind="mergesort")
x_valid = x_valid[order]
y_valid = y_valid[order]
x_unique, unique_idx = np.unique(x_valid, return_index=True)
y_unique = y_valid[unique_idx]
if x_unique.size < 2 or x_unique[-1] <= x_unique[0]:
return fallback, take_fft
x_uniform = np.linspace(float(x_unique[0]), float(x_unique[-1]), take_fft, dtype=np.float64)
resampled = np.interp(x_uniform, x_unique, y_unique).astype(np.float32)
return resampled, take_fft
def _compute_fft_row(
sweep: np.ndarray,
freqs: Optional[np.ndarray],
bins: int,
) -> np.ndarray:
"""Посчитать FFT-строку, используя калиброванную частотную ось при подготовке входа."""
if bins <= 0:
return np.zeros((0,), dtype=np.float32)
prepared = _prepare_fft_segment(sweep, freqs, fft_len=FFT_LEN)
if prepared is None:
return np.full((bins,), np.nan, dtype=np.float32)
fft_seg, take_fft = prepared
fft_in = np.zeros((FFT_LEN,), dtype=np.float32)
win = np.hanning(take_fft).astype(np.float32)
fft_in[:take_fft] = fft_seg * win
spec = np.fft.ifft(fft_in)
mag = np.abs(spec).astype(np.float32)
fft_row = 20.0 * np.log10(mag + 1e-9)
if fft_row.shape[0] != bins:
fft_row = fft_row[:bins]
return fft_row
def _compute_distance_axis(freqs: Optional[np.ndarray], bins: int) -> np.ndarray:
"""Рассчитать ось расстояния для IFFT по равномерной частотной сетке."""
if bins <= 0:
return np.zeros((0,), dtype=np.float64)
if freqs is None:
return np.arange(bins, dtype=np.float64)
freq_arr = np.asarray(freqs, dtype=np.float64)
finite = freq_arr[np.isfinite(freq_arr)]
if finite.size < 2:
return np.arange(bins, dtype=np.float64)
df_ghz = float((finite[-1] - finite[0]) / max(1, finite.size - 1))
df_hz = abs(df_ghz) * 1e9
if not np.isfinite(df_hz) or df_hz <= 0.0:
return np.arange(bins, dtype=np.float64)
step_m = C_M_S / (2.0 * FFT_LEN * df_hz)
return np.arange(bins, dtype=np.float64) * step_m
def _normalize_sweep_simple(raw: np.ndarray, calib: np.ndarray) -> np.ndarray: def _normalize_sweep_simple(raw: np.ndarray, calib: np.ndarray) -> np.ndarray:
"""Простая нормировка: поэлементное деление raw/calib.""" """Простая нормировка: поэлементное деление raw/calib."""
w = min(raw.size, calib.size) w = min(raw.size, calib.size)
@ -956,6 +1072,8 @@ def main():
fig.subplots_adjust(wspace=0.25, hspace=0.35, left=0.07, right=0.90, top=0.92, bottom=0.08) fig.subplots_adjust(wspace=0.25, hspace=0.35, left=0.07, right=0.90, top=0.92, bottom=0.08)
# Состояние для отображения # Состояние для отображения
current_freqs: Optional[np.ndarray] = None
current_distances: Optional[np.ndarray] = None
current_sweep_raw: Optional[np.ndarray] = None current_sweep_raw: Optional[np.ndarray] = None
current_aux_curves: SweepAuxCurves = None current_aux_curves: SweepAuxCurves = None
current_sweep_norm: Optional[np.ndarray] = None current_sweep_norm: Optional[np.ndarray] = None
@ -972,7 +1090,7 @@ def main():
fft_bins = FFT_LEN // 2 + 1 fft_bins = FFT_LEN // 2 + 1
ring_fft = None # type: Optional[np.ndarray] ring_fft = None # type: Optional[np.ndarray]
y_min_fft, y_max_fft = None, None y_min_fft, y_max_fft = None, None
freq_shared: Optional[np.ndarray] = None distance_shared: Optional[np.ndarray] = None
# Параметры контраста водопада спектров # Параметры контраста водопада спектров
spec_clip = _parse_spec_clip(getattr(args, "spec_clip", None)) spec_clip = _parse_spec_clip(getattr(args, "spec_clip", None))
spec_mean_sec = float(getattr(args, "spec_mean_sec", 0.0)) spec_mean_sec = float(getattr(args, "spec_mean_sec", 0.0))
@ -1018,7 +1136,7 @@ def main():
# Линейный график спектра текущего свипа # Линейный график спектра текущего свипа
fft_line_obj, = ax_fft.plot([], [], lw=1) fft_line_obj, = ax_fft.plot([], [], lw=1)
ax_fft.set_title("FFT", pad=1) ax_fft.set_title("FFT", pad=1)
ax_fft.set_xlabel("Время") ax_fft.set_xlabel("Расстояние, м")
ax_fft.set_ylabel("дБ") ax_fft.set_ylabel("дБ")
# Диапазон по Y для последнего свипа: авто по умолчанию (поддерживает отрицательные значения) # Диапазон по Y для последнего свипа: авто по умолчанию (поддерживает отрицательные значения)
@ -1060,7 +1178,7 @@ def main():
) )
ax_spec.set_title("B-scan (дБ)", pad=12) ax_spec.set_title("B-scan (дБ)", pad=12)
ax_spec.set_xlabel("") ax_spec.set_xlabel("")
ax_spec.set_ylabel("расстояние") ax_spec.set_ylabel("Расстояние, м")
# Не показываем численные значения по времени на B-scan # Не показываем численные значения по времени на B-scan
try: try:
ax_spec.tick_params(axis="x", labelbottom=False) ax_spec.tick_params(axis="x", labelbottom=False)
@ -1087,15 +1205,15 @@ def main():
ax_smax = fig.add_axes([0.95, 0.55, 0.02, 0.35]) ax_smax = fig.add_axes([0.95, 0.55, 0.02, 0.35])
ax_sctr = fig.add_axes([0.98, 0.55, 0.02, 0.35]) ax_sctr = fig.add_axes([0.98, 0.55, 0.02, 0.35])
ax_cb = fig.add_axes([0.92, 0.45, 0.08, 0.08]) ax_cb = fig.add_axes([0.92, 0.45, 0.08, 0.08])
ymin_slider = Slider(ax_smin, "Y min", 0, max(1, fft_bins - 1), valinit=0, valstep=1, orientation="vertical") ymin_slider = Slider(ax_smin, "R min", 0.0, 1.0, valinit=0.0, orientation="vertical")
ymax_slider = Slider(ax_smax, "Y max", 0, max(1, fft_bins - 1), valinit=max(1, fft_bins - 1), valstep=1, orientation="vertical") ymax_slider = Slider(ax_smax, "R max", 0.0, 1.0, valinit=1.0, orientation="vertical")
contrast_slider = Slider(ax_sctr, "Int max", 0, 100, valinit=100, valstep=1, orientation="vertical") contrast_slider = Slider(ax_sctr, "Int max", 0, 100, valinit=100, valstep=1, orientation="vertical")
cb = CheckButtons(ax_cb, ["калибровка"], [False]) cb = CheckButtons(ax_cb, ["калибровка"], [False])
def _on_ylim_change(_val): def _on_ylim_change(_val):
try: try:
y0 = int(min(ymin_slider.val, ymax_slider.val)) y0 = float(min(ymin_slider.val, ymax_slider.val))
y1 = int(max(ymin_slider.val, ymax_slider.val)) y1 = float(max(ymin_slider.val, ymax_slider.val))
ax_spec.set_ylim(y0, y1) ax_spec.set_ylim(y0, y1)
fig.canvas.draw_idle() fig.canvas.draw_idle()
except Exception: except Exception:
@ -1113,10 +1231,11 @@ def main():
max_fps = max(1.0, float(args.max_fps)) max_fps = max(1.0, float(args.max_fps))
interval_ms = int(1000.0 / max_fps) interval_ms = int(1000.0 / max_fps)
frames_since_ylim_update = 0 frames_since_ylim_update = 0
spec_slider_initialized = False
def ensure_buffer(_w: int): def ensure_buffer(_w: int):
nonlocal ring, width, head, x_shared, ring_fft, freq_shared, ring_time nonlocal ring, width, head, x_shared, ring_fft, distance_shared, ring_time
if ring is not None: if ring is not None:
return return
width = WF_WIDTH width = WF_WIDTH
@ -1132,10 +1251,46 @@ def main():
# FFT буферы: время по X, бин по Y # FFT буферы: время по X, бин по Y
ring_fft = np.full((max_sweeps, fft_bins), np.nan, dtype=np.float32) ring_fft = np.full((max_sweeps, fft_bins), np.nan, dtype=np.float32)
img_fft_obj.set_data(np.zeros((fft_bins, max_sweeps), dtype=np.float32)) img_fft_obj.set_data(np.zeros((fft_bins, max_sweeps), dtype=np.float32))
img_fft_obj.set_extent((0, max_sweeps - 1, 0, fft_bins - 1)) img_fft_obj.set_extent((0, max_sweeps - 1, 0.0, 1.0))
ax_spec.set_xlim(0, max_sweeps - 1) ax_spec.set_xlim(0, max_sweeps - 1)
ax_spec.set_ylim(0, max(1, fft_bins - 1)) ax_spec.set_ylim(0.0, 1.0)
freq_shared = np.arange(fft_bins, dtype=np.int32) distance_shared = _compute_distance_axis(current_freqs, fft_bins)
def _update_physical_axes():
nonlocal distance_shared, spec_slider_initialized
if current_freqs is not None and current_freqs.size > 0:
finite_f = current_freqs[np.isfinite(current_freqs)]
if finite_f.size > 0:
f_min = float(np.min(finite_f))
f_max = float(np.max(finite_f))
if f_max <= f_min:
f_max = f_min + 1.0
img_obj.set_extent((0, max_sweeps - 1, f_min, f_max))
ax_img.set_ylim(f_min, f_max)
distance_shared = _compute_distance_axis(current_freqs, fft_bins)
if distance_shared.size > 0:
d_min = float(distance_shared[0])
d_max = float(distance_shared[-1]) if distance_shared.size > 1 else float(distance_shared[0] + 1.0)
if d_max <= d_min:
d_max = d_min + 1.0
img_fft_obj.set_extent((0, max_sweeps - 1, d_min, d_max))
ax_spec.set_ylim(d_min, d_max)
if ymin_slider is not None and ymax_slider is not None:
try:
ymin_slider.valmin = d_min
ymin_slider.valmax = d_max
ymax_slider.valmin = d_min
ymax_slider.valmax = d_max
ymin_slider.ax.set_ylim(d_min, d_max)
ymax_slider.ax.set_ylim(d_min, d_max)
if (not spec_slider_initialized) or (not (d_min <= ymin_slider.val <= d_max)):
ymin_slider.set_val(d_min)
if (not spec_slider_initialized) or (not (d_min <= ymax_slider.val <= d_max)):
ymax_slider.set_val(d_max)
spec_slider_initialized = True
except Exception:
pass
def _visible_levels_matplotlib(data: np.ndarray, axis) -> Optional[Tuple[float, float]]: def _visible_levels_matplotlib(data: np.ndarray, axis) -> Optional[Tuple[float, float]]:
"""(vmin, vmax) по центральным 90% значений в видимой области imshow.""" """(vmin, vmax) по центральным 90% значений в видимой области imshow."""
@ -1169,7 +1324,7 @@ def main():
return None return None
return (vmin, vmax) return (vmin, vmax)
def push_sweep(s: np.ndarray): def push_sweep(s: np.ndarray, freqs: Optional[np.ndarray] = None):
nonlocal ring, head, ring_fft, y_min_fft, y_max_fft, ring_time nonlocal ring, head, ring_fft, y_min_fft, y_max_fft, ring_time
if s is None or s.size == 0 or ring is None: if s is None or s.size == 0 or ring is None:
return return
@ -1185,27 +1340,7 @@ def main():
# FFT строка (дБ) # FFT строка (дБ)
if ring_fft is not None: if ring_fft is not None:
bins = ring_fft.shape[1] bins = ring_fft.shape[1]
# Подготовка входа FFT_LEN, замена NaN на 0 fft_row = _compute_fft_row(s, freqs, bins)
take_fft = min(int(s.size), FFT_LEN)
if take_fft <= 0:
fft_row = np.full((bins,), np.nan, dtype=np.float32)
else:
fft_in = np.zeros((FFT_LEN,), dtype=np.float32)
seg = s[:take_fft]
if isinstance(seg, np.ndarray):
seg = np.nan_to_num(seg, nan=0.0).astype(np.float32, copy=False)
else:
seg = np.asarray(seg, dtype=np.float32)
seg = np.nan_to_num(seg, nan=0.0)
# Окно Хэннинга
win = np.hanning(take_fft).astype(np.float32)
fft_in[:take_fft] = seg * win
spec = np.fft.ifft(fft_in)
mag = np.abs(spec).astype(np.float32)
fft_row = 20.0 * np.log10(mag + 1e-9)
if fft_row.shape[0] != bins:
# rfft длиной FFT_LEN даёт bins == FFT_LEN//2+1
fft_row = fft_row[:bins]
ring_fft[(head - 1) % ring_fft.shape[0], :] = fft_row ring_fft[(head - 1) % ring_fft.shape[0], :] = fft_row
# Экстремумы для цветовой шкалы # Экстремумы для цветовой шкалы
fr_min = np.nanmin(fft_row) fr_min = np.nanmin(fft_row)
@ -1217,7 +1352,7 @@ def main():
y_max_fft = float(fr_max) y_max_fft = float(fr_max)
def drain_queue(): def drain_queue():
nonlocal current_sweep_raw, current_aux_curves, current_sweep_norm, current_info, last_calib_sweep nonlocal current_freqs, current_distances, current_sweep_raw, current_aux_curves, current_sweep_norm, current_info, last_calib_sweep
drained = 0 drained = 0
while True: while True:
try: try:
@ -1225,6 +1360,15 @@ def main():
except Empty: except Empty:
break break
drained += 1 drained += 1
calibrated = calibrate_freqs(
{
"F": np.linspace(3.3, 14.3, s.size, dtype=np.float64),
"I": s,
}
)
current_freqs = calibrated["F"]
current_distances = _compute_distance_axis(current_freqs, fft_bins)
s = calibrated["I"]
current_sweep_raw = s current_sweep_raw = s
current_aux_curves = aux_curves current_aux_curves = aux_curves
current_info = info current_info = info
@ -1245,7 +1389,8 @@ def main():
current_sweep_norm = None current_sweep_norm = None
sweep_for_proc = s sweep_for_proc = s
ensure_buffer(s.size) ensure_buffer(s.size)
push_sweep(sweep_for_proc) _update_physical_axes()
push_sweep(sweep_for_proc, current_freqs)
return drained return drained
def make_display_ring(): def make_display_ring():
@ -1291,7 +1436,9 @@ def main():
# Обновление линии последнего свипа # Обновление линии последнего свипа
if current_sweep_raw is not None: if current_sweep_raw is not None:
if x_shared is not None and current_sweep_raw.size <= x_shared.size: if current_freqs is not None and current_freqs.size == current_sweep_raw.size:
xs = current_freqs
elif x_shared is not None and current_sweep_raw.size <= x_shared.size:
xs = x_shared[: current_sweep_raw.size] xs = x_shared[: current_sweep_raw.size]
else: else:
xs = np.arange(current_sweep_raw.size, dtype=np.int32) xs = np.arange(current_sweep_raw.size, dtype=np.int32)
@ -1312,7 +1459,10 @@ def main():
else: else:
line_norm_obj.set_data([], []) line_norm_obj.set_data([], [])
# Лимиты по X: 3.3 ГГц .. 14.3 ГГц # Лимиты по X: 3.3 ГГц .. 14.3 ГГц
ax_line.set_xlim(3.3, 14.3) if isinstance(xs, np.ndarray) and xs.size > 0 and np.isfinite(xs[0]) and np.isfinite(xs[-1]):
ax_line.set_xlim(float(np.nanmin(xs)), float(np.nanmax(xs)))
else:
ax_line.set_xlim(3.3, 14.3)
# Адаптивные Y-лимиты (если не задан --ylim) # Адаптивные Y-лимиты (если не задан --ylim)
if fixed_ylim is None: if fixed_ylim is None:
y_series = [current_sweep_raw, last_calib_sweep, current_sweep_norm] y_series = [current_sweep_raw, last_calib_sweep, current_sweep_norm]
@ -1324,22 +1474,17 @@ def main():
# Обновление спектра текущего свипа # Обновление спектра текущего свипа
sweep_for_fft = current_sweep_norm if current_sweep_norm is not None else current_sweep_raw sweep_for_fft = current_sweep_norm if current_sweep_norm is not None else current_sweep_raw
take_fft = min(int(sweep_for_fft.size), FFT_LEN) if sweep_for_fft.size > 0 and distance_shared is not None:
if take_fft > 0 and freq_shared is not None: fft_vals = _compute_fft_row(sweep_for_fft, current_freqs, distance_shared.size)
fft_in = np.zeros((FFT_LEN,), dtype=np.float32) xs_fft = current_distances if current_distances is not None else distance_shared
seg = np.nan_to_num(sweep_for_fft[:take_fft], nan=0.0).astype(np.float32, copy=False)
win = np.hanning(take_fft).astype(np.float32)
fft_in[:take_fft] = seg * win
spec = np.fft.ifft(fft_in)
mag = np.abs(spec).astype(np.float32)
fft_vals = 20.0 * np.log10(mag + 1e-9)
xs_fft = freq_shared
if fft_vals.size > xs_fft.size: if fft_vals.size > xs_fft.size:
fft_vals = fft_vals[: xs_fft.size] fft_vals = fft_vals[: xs_fft.size]
fft_line_obj.set_data(xs_fft[: fft_vals.size], fft_vals) fft_line_obj.set_data(xs_fft[: fft_vals.size], fft_vals)
# Авто-диапазон по Y для спектра # Авто-диапазон по Y для спектра
if np.isfinite(np.nanmin(fft_vals)) and np.isfinite(np.nanmax(fft_vals)): if np.isfinite(np.nanmin(fft_vals)) and np.isfinite(np.nanmax(fft_vals)):
ax_fft.set_xlim(0, max(1, xs_fft.size - 1) * 1.5) finite_x = xs_fft[: fft_vals.size][np.isfinite(xs_fft[: fft_vals.size])]
if finite_x.size > 0:
ax_fft.set_xlim(float(np.min(finite_x)), float(np.max(finite_x)))
ax_fft.set_ylim(float(np.nanmin(fft_vals)), float(np.nanmax(fft_vals))) ax_fft.set_ylim(float(np.nanmin(fft_vals)), float(np.nanmax(fft_vals)))
# Обновление водопада # Обновление водопада
@ -1500,7 +1645,7 @@ def run_pyqtgraph(args):
p_fft = win.addPlot(row=1, col=0, title="FFT") p_fft = win.addPlot(row=1, col=0, title="FFT")
p_fft.showGrid(x=True, y=True, alpha=0.3) p_fft.showGrid(x=True, y=True, alpha=0.3)
curve_fft = p_fft.plot(pen=pg.mkPen((255, 120, 80), width=1)) curve_fft = p_fft.plot(pen=pg.mkPen((255, 120, 80), width=1))
p_fft.setLabel("bottom", "Время") p_fft.setLabel("bottom", "Расстояние, м")
p_fft.setLabel("left", "дБ") p_fft.setLabel("left", "дБ")
# Водопад спектров (справа-снизу) # Водопад спектров (справа-снизу)
@ -1512,7 +1657,7 @@ def run_pyqtgraph(args):
p_spec.getAxis("bottom").setStyle(showValues=False) p_spec.getAxis("bottom").setStyle(showValues=False)
except Exception: except Exception:
pass pass
p_spec.setLabel("left", "Бин (0 снизу)") p_spec.setLabel("left", "Расстояние, м")
img_fft = pg.ImageItem() img_fft = pg.ImageItem()
p_spec.addItem(img_fft) p_spec.addItem(img_fft)
@ -1532,6 +1677,8 @@ def run_pyqtgraph(args):
head = 0 head = 0
width: Optional[int] = None width: Optional[int] = None
x_shared: Optional[np.ndarray] = None x_shared: Optional[np.ndarray] = None
current_freqs: Optional[np.ndarray] = None
current_distances: Optional[np.ndarray] = None
current_sweep_raw: Optional[np.ndarray] = None current_sweep_raw: Optional[np.ndarray] = None
current_aux_curves: SweepAuxCurves = None current_aux_curves: SweepAuxCurves = None
current_sweep_norm: Optional[np.ndarray] = None current_sweep_norm: Optional[np.ndarray] = None
@ -1541,7 +1688,7 @@ def run_pyqtgraph(args):
# Для спектров # Для спектров
fft_bins = FFT_LEN // 2 + 1 fft_bins = FFT_LEN // 2 + 1
ring_fft: Optional[np.ndarray] = None ring_fft: Optional[np.ndarray] = None
freq_shared: Optional[np.ndarray] = None distance_shared: Optional[np.ndarray] = None
y_min_fft, y_max_fft = None, None y_min_fft, y_max_fft = None, None
# Параметры контраста водопада спектров (процентильная обрезка) # Параметры контраста водопада спектров (процентильная обрезка)
spec_clip = _parse_spec_clip(getattr(args, "spec_clip", None)) spec_clip = _parse_spec_clip(getattr(args, "spec_clip", None))
@ -1579,7 +1726,7 @@ def run_pyqtgraph(args):
pass pass
def ensure_buffer(_w: int): def ensure_buffer(_w: int):
nonlocal ring, ring_time, head, width, x_shared, ring_fft, freq_shared nonlocal ring, ring_time, head, width, x_shared, ring_fft, distance_shared
if ring is not None: if ring is not None:
return return
width = WF_WIDTH width = WF_WIDTH
@ -1595,9 +1742,31 @@ def run_pyqtgraph(args):
# FFT: время по оси X, бин по оси Y # FFT: время по оси X, бин по оси Y
ring_fft = np.full((max_sweeps, fft_bins), np.nan, dtype=np.float32) ring_fft = np.full((max_sweeps, fft_bins), np.nan, dtype=np.float32)
img_fft.setImage(ring_fft.T, autoLevels=False) img_fft.setImage(ring_fft.T, autoLevels=False)
p_spec.setRange(xRange=(0, max_sweeps - 1), yRange=(0, max(1, fft_bins - 1)), padding=0) img_fft.setRect(0, 0.0, max_sweeps, 1.0)
p_fft.setXRange(0, max(1, fft_bins - 1), padding=0) p_spec.setRange(xRange=(0, max_sweeps - 1), yRange=(0.0, 1.0), padding=0)
freq_shared = np.arange(fft_bins, dtype=np.int32) p_fft.setXRange(0.0, 1.0, padding=0)
distance_shared = _compute_distance_axis(current_freqs, fft_bins)
def _update_physical_axes():
nonlocal distance_shared
if current_freqs is not None and current_freqs.size > 0:
finite_f = current_freqs[np.isfinite(current_freqs)]
if finite_f.size > 0:
f_min = float(np.min(finite_f))
f_max = float(np.max(finite_f))
if f_max <= f_min:
f_max = f_min + 1.0
img.setRect(0, f_min, max_sweeps, f_max - f_min)
p_img.setRange(xRange=(0, max_sweeps - 1), yRange=(f_min, f_max), padding=0)
distance_shared = _compute_distance_axis(current_freqs, fft_bins)
if distance_shared.size > 0:
d_min = float(distance_shared[0])
d_max = float(distance_shared[-1]) if distance_shared.size > 1 else float(distance_shared[0] + 1.0)
if d_max <= d_min:
d_max = d_min + 1.0
img_fft.setRect(0, d_min, max_sweeps, d_max - d_min)
p_spec.setRange(xRange=(0, max_sweeps - 1), yRange=(d_min, d_max), padding=0)
def _visible_levels_pyqtgraph(data: np.ndarray) -> Optional[Tuple[float, float]]: def _visible_levels_pyqtgraph(data: np.ndarray) -> Optional[Tuple[float, float]]:
"""(vmin, vmax) по центральным 90% значений в видимой области ImageItem.""" """(vmin, vmax) по центральным 90% значений в видимой области ImageItem."""
@ -1630,7 +1799,7 @@ def run_pyqtgraph(args):
return None return None
return (vmin, vmax) return (vmin, vmax)
def push_sweep(s: np.ndarray): def push_sweep(s: np.ndarray, freqs: Optional[np.ndarray] = None):
nonlocal ring, ring_time, head, ring_fft, y_min_fft, y_max_fft nonlocal ring, ring_time, head, ring_fft, y_min_fft, y_max_fft
if s is None or s.size == 0 or ring is None: if s is None or s.size == 0 or ring is None:
return return
@ -1645,19 +1814,7 @@ def run_pyqtgraph(args):
# FFT строка (дБ) # FFT строка (дБ)
if ring_fft is not None: if ring_fft is not None:
bins = ring_fft.shape[1] bins = ring_fft.shape[1]
take_fft = min(int(s.size), FFT_LEN) fft_row = _compute_fft_row(s, freqs, bins)
if take_fft > 0:
fft_in = np.zeros((FFT_LEN,), dtype=np.float32)
seg = np.nan_to_num(s[:take_fft], nan=0.0).astype(np.float32, copy=False)
win = np.hanning(take_fft).astype(np.float32)
fft_in[:take_fft] = seg * win
spec = np.fft.ifft(fft_in)
mag = np.abs(spec).astype(np.float32)
fft_row = 20.0 * np.log10(mag + 1e-9)
if fft_row.shape[0] != bins:
fft_row = fft_row[:bins]
else:
fft_row = np.full((bins,), np.nan, dtype=np.float32)
ring_fft[(head - 1) % ring_fft.shape[0], :] = fft_row ring_fft[(head - 1) % ring_fft.shape[0], :] = fft_row
fr_min = np.nanmin(fft_row) fr_min = np.nanmin(fft_row)
fr_max = np.nanmax(fft_row) fr_max = np.nanmax(fft_row)
@ -1667,7 +1824,7 @@ def run_pyqtgraph(args):
y_max_fft = float(fr_max) y_max_fft = float(fr_max)
def drain_queue(): def drain_queue():
nonlocal current_sweep_raw, current_aux_curves, current_sweep_norm, current_info, last_calib_sweep nonlocal current_freqs, current_distances, current_sweep_raw, current_aux_curves, current_sweep_norm, current_info, last_calib_sweep
drained = 0 drained = 0
while True: while True:
try: try:
@ -1675,6 +1832,15 @@ def run_pyqtgraph(args):
except Empty: except Empty:
break break
drained += 1 drained += 1
calibrated = calibrate_freqs(
{
"F": np.linspace(3.3, 14.3, s.size, dtype=np.float64),
"I": s,
}
)
current_freqs = calibrated["F"]
current_distances = _compute_distance_axis(current_freqs, fft_bins)
s = calibrated["I"]
current_sweep_raw = s current_sweep_raw = s
current_aux_curves = aux_curves current_aux_curves = aux_curves
current_info = info current_info = info
@ -1695,7 +1861,8 @@ def run_pyqtgraph(args):
current_sweep_norm = None current_sweep_norm = None
sweep_for_proc = s sweep_for_proc = s
ensure_buffer(s.size) ensure_buffer(s.size)
push_sweep(sweep_for_proc) _update_physical_axes()
push_sweep(sweep_for_proc, current_freqs)
return drained return drained
# Попытка применить LUT из колормэпа (если доступен) # Попытка применить LUT из колормэпа (если доступен)
@ -1710,7 +1877,9 @@ def run_pyqtgraph(args):
def update(): def update():
changed = drain_queue() > 0 changed = drain_queue() > 0
if current_sweep_raw is not None and x_shared is not None: if current_sweep_raw is not None and x_shared is not None:
if current_sweep_raw.size <= x_shared.size: if current_freqs is not None and current_freqs.size == current_sweep_raw.size:
xs = current_freqs
elif current_sweep_raw.size <= x_shared.size:
xs = x_shared[: current_sweep_raw.size] xs = x_shared[: current_sweep_raw.size]
else: else:
xs = np.arange(current_sweep_raw.size) xs = np.arange(current_sweep_raw.size)
@ -1737,23 +1906,22 @@ def run_pyqtgraph(args):
y_limits = _compute_auto_ylim(*y_series) y_limits = _compute_auto_ylim(*y_series)
if y_limits is not None: if y_limits is not None:
p_line.setYRange(y_limits[0], y_limits[1], padding=0) p_line.setYRange(y_limits[0], y_limits[1], padding=0)
if isinstance(xs, np.ndarray) and xs.size > 0:
finite_x = xs[np.isfinite(xs)]
if finite_x.size > 0:
p_line.setXRange(float(np.min(finite_x)), float(np.max(finite_x)), padding=0)
# Обновим спектр # Обновим спектр
sweep_for_fft = current_sweep_norm if current_sweep_norm is not None else current_sweep_raw sweep_for_fft = current_sweep_norm if current_sweep_norm is not None else current_sweep_raw
take_fft = min(int(sweep_for_fft.size), FFT_LEN) if sweep_for_fft.size > 0 and distance_shared is not None:
if take_fft > 0 and freq_shared is not None: fft_vals = _compute_fft_row(sweep_for_fft, current_freqs, distance_shared.size)
fft_in = np.zeros((FFT_LEN,), dtype=np.float32) xs_fft = current_distances if current_distances is not None else distance_shared
seg = np.nan_to_num(sweep_for_fft[:take_fft], nan=0.0).astype(np.float32, copy=False)
win = np.hanning(take_fft).astype(np.float32)
fft_in[:take_fft] = seg * win
spec = np.fft.ifft(fft_in)
mag = np.abs(spec).astype(np.float32)
fft_vals = 20.0 * np.log10(mag + 1e-9)
xs_fft = freq_shared
if fft_vals.size > xs_fft.size: if fft_vals.size > xs_fft.size:
fft_vals = fft_vals[: xs_fft.size] fft_vals = fft_vals[: xs_fft.size]
curve_fft.setData(xs_fft[: fft_vals.size], fft_vals) curve_fft.setData(xs_fft[: fft_vals.size], fft_vals)
p_fft.setXRange(0, max(1, xs_fft.size - 1) * 1.5, padding=0) finite_x = xs_fft[: fft_vals.size][np.isfinite(xs_fft[: fft_vals.size])]
if finite_x.size > 0:
p_fft.setXRange(float(np.min(finite_x)), float(np.max(finite_x)), padding=0)
p_fft.setYRange(float(np.nanmin(fft_vals)), float(np.nanmax(fft_vals)), padding=0) p_fft.setYRange(float(np.nanmin(fft_vals)), float(np.nanmax(fft_vals)), padding=0)
if changed and ring is not None: if changed and ring is not None: