5 Commits

View File

@ -4,7 +4,7 @@
Формат строк: Формат строк:
- "Sweep_start" — начало нового свипа (предыдущий считается завершённым) - "Sweep_start" — начало нового свипа (предыдущий считается завершённым)
- "s X Y" — точка (индекс X, значение Y), все целые со знаком - "s CH X Y" — точка (номер канала, индекс X, значение Y), все целые со знаком
Отрисовываются два графика: Отрисовываются два графика:
- Левый: последний полученный свип (Y vs X) - Левый: последний полученный свип (Y vs X)
@ -38,7 +38,7 @@ FFT_LEN = 1024 # длина БПФ для спектра/водопада сп
DATA_INVERSION_THRASHOLD = 10.0 DATA_INVERSION_THRASHOLD = 10.0
Number = Union[int, float] Number = Union[int, float]
SweepInfo = Dict[str, Number] SweepInfo = Dict[str, Any]
SweepPacket = Tuple[np.ndarray, SweepInfo] SweepPacket = Tuple[np.ndarray, SweepInfo]
@ -85,6 +85,116 @@ def _parse_spec_clip(spec: Optional[str]) -> Optional[Tuple[float, float]]:
return None return None
def _normalize_sweep_simple(raw: np.ndarray, calib: np.ndarray) -> np.ndarray:
"""Простая нормировка: поэлементное деление raw/calib."""
w = min(raw.size, calib.size)
if w <= 0:
return raw
out = np.full_like(raw, np.nan, dtype=np.float32)
with np.errstate(divide="ignore", invalid="ignore"):
out[:w] = raw[:w] / calib[:w]
out = np.nan_to_num(out, nan=np.nan, posinf=np.nan, neginf=np.nan)
return out
def _build_calib_envelopes(calib: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""Оценить нижнюю/верхнюю огибающие калибровочной кривой."""
n = int(calib.size)
if n <= 0:
empty = np.zeros((0,), dtype=np.float32)
return empty, empty
y = np.asarray(calib, dtype=np.float32)
finite = np.isfinite(y)
if not np.any(finite):
zeros = np.zeros_like(y, dtype=np.float32)
return zeros, zeros
if not np.all(finite):
x = np.arange(n, dtype=np.float32)
y = y.copy()
y[~finite] = np.interp(x[~finite], x[finite], y[finite]).astype(np.float32)
if n < 3:
return y.copy(), y.copy()
dy = np.diff(y)
s = np.sign(dy).astype(np.int8, copy=False)
if np.any(s == 0):
for i in range(1, s.size):
if s[i] == 0:
s[i] = s[i - 1]
for i in range(s.size - 2, -1, -1):
if s[i] == 0:
s[i] = s[i + 1]
s[s == 0] = 1
max_idx = np.where((s[:-1] > 0) & (s[1:] < 0))[0] + 1
min_idx = np.where((s[:-1] < 0) & (s[1:] > 0))[0] + 1
x = np.arange(n, dtype=np.float32)
def _interp_nodes(nodes: np.ndarray) -> np.ndarray:
if nodes.size == 0:
idx = np.array([0, n - 1], dtype=np.int64)
else:
idx = np.unique(np.concatenate(([0], nodes, [n - 1]))).astype(np.int64)
return np.interp(x, idx.astype(np.float32), y[idx]).astype(np.float32)
upper = _interp_nodes(max_idx)
lower = _interp_nodes(min_idx)
swap = lower > upper
if np.any(swap):
tmp = upper[swap].copy()
upper[swap] = lower[swap]
lower[swap] = tmp
return lower, upper
def _normalize_sweep_projector(raw: np.ndarray, calib: np.ndarray) -> np.ndarray:
"""Нормировка через проекцию между огибающими калибровки в диапазон [-1, +1]."""
w = min(raw.size, calib.size)
if w <= 0:
return raw
out = np.full_like(raw, np.nan, dtype=np.float32)
raw_seg = np.asarray(raw[:w], dtype=np.float32)
lower, upper = _build_calib_envelopes(np.asarray(calib[:w], dtype=np.float32))
span = upper - lower
finite_span = span[np.isfinite(span) & (span > 0)]
if finite_span.size > 0:
eps = max(float(np.median(finite_span)) * 1e-6, 1e-9)
else:
eps = 1e-9
valid = (
np.isfinite(raw_seg)
& np.isfinite(lower)
& np.isfinite(upper)
& (span > eps)
)
if np.any(valid):
proj = np.empty_like(raw_seg, dtype=np.float32)
proj[valid] = ((2.0 * (raw_seg[valid] - lower[valid]) / span[valid]) - 1.0) * 1000.0
proj[valid] = np.clip(proj[valid], -1000.0, 1000.0)
proj[~valid] = np.nan
out[:w] = proj
return out
def _normalize_by_calib(raw: np.ndarray, calib: np.ndarray, norm_type: str) -> np.ndarray:
"""Нормировка свипа по выбранному алгоритму."""
nt = str(norm_type).strip().lower()
if nt == "simple":
return _normalize_sweep_simple(raw, calib)
return _normalize_sweep_projector(raw, calib)
def try_open_pyserial(path: str, baud: int, timeout: float): def try_open_pyserial(path: str, baud: int, timeout: float):
try: try:
import serial # type: ignore import serial # type: ignore
@ -287,9 +397,11 @@ class SweepReader(threading.Thread):
self._last_sweep_ts: Optional[float] = None self._last_sweep_ts: Optional[float] = None
self._n_valid_hist = deque() self._n_valid_hist = deque()
def _finalize_current(self, xs, ys): def _finalize_current(self, xs, ys, channels: Optional[set[int]]):
if not xs: if not xs:
return return
ch_list = sorted(channels) if channels else [0]
ch_primary = ch_list[0] if ch_list else 0
max_x = max(xs) max_x = max(xs)
width = max_x + 1 width = max_x + 1
self._max_width = max(self._max_width, width) self._max_width = max(self._max_width, width)
@ -336,10 +448,14 @@ class SweepReader(threading.Thread):
sweep *= -1.0 sweep *= -1.0
except Exception: except Exception:
pass pass
sweep -= float(np.nanmean(sweep)) #sweep -= float(np.nanmean(sweep))
# Метрики для статусной строки (вид словаря: переменная -> значение) # Метрики для статусной строки (вид словаря: переменная -> значение)
self._sweep_idx += 1 self._sweep_idx += 1
if len(ch_list) > 1:
sys.stderr.write(
f"[warn] Sweep {self._sweep_idx}: изменялся номер канала: {ch_list}\n"
)
now = time.time() now = time.time()
if self._last_sweep_ts is None: if self._last_sweep_ts is None:
dt_ms = float("nan") dt_ms = float("nan")
@ -363,6 +479,8 @@ class SweepReader(threading.Thread):
vmin = vmax = mean = std = float("nan") vmin = vmax = mean = std = float("nan")
info: SweepInfo = { info: SweepInfo = {
"sweep": self._sweep_idx, "sweep": self._sweep_idx,
"ch": ch_primary,
"chs": ch_list,
"n_valid": n_valid, "n_valid": n_valid,
"min": vmin, "min": vmin,
"max": vmax, "max": vmax,
@ -388,6 +506,8 @@ class SweepReader(threading.Thread):
# Состояние текущего свипа # Состояние текущего свипа
xs: list[int] = [] xs: list[int] = []
ys: list[int] = [] ys: list[int] = []
cur_channel: Optional[int] = None
cur_channels: set[int] = set()
try: try:
self._src = SerialLineSource(self._port_path, self._baud, timeout=1.0) self._src = SerialLineSource(self._port_path, self._baud, timeout=1.0)
@ -422,20 +542,37 @@ class SweepReader(threading.Thread):
continue continue
if line.startswith(b"Sweep_start"): if line.startswith(b"Sweep_start"):
self._finalize_current(xs, ys) self._finalize_current(xs, ys, cur_channels)
xs.clear() xs.clear()
ys.clear() ys.clear()
cur_channel = None
cur_channels.clear()
continue continue
# s X Y (оба целые со знаком). Разделяем по любым пробелам/табам. # sCH X Y или s CH X Y (все целые со знаком). Разделяем по любым пробелам/табам.
if len(line) >= 3: if len(line) >= 3:
parts = line.split() parts = line.split()
if len(parts) >= 3 and parts[0].lower() == b"s": if len(parts) >= 3 and (parts[0].lower() == b"s" or parts[0].lower().startswith(b"s")):
try: try:
x = int(parts[1], 10) if parts[0].lower() == b"s":
y = int(parts[2], 10) # поддержка знака: "+…" и "-…" if len(parts) >= 4:
ch = int(parts[1], 10)
x = int(parts[2], 10)
y = int(parts[3], 10) # поддержка знака: "+…" и "-…"
else:
ch = 0
x = int(parts[1], 10)
y = int(parts[2], 10) # поддержка знака: "+…" и "-…"
else:
# формат вида "s0"
ch = int(parts[0][1:], 10)
x = int(parts[1], 10)
y = int(parts[2], 10) # поддержка знака: "+…" и "-…"
except Exception: except Exception:
continue continue
if cur_channel is None:
cur_channel = ch
cur_channels.add(ch)
xs.append(x) xs.append(x)
ys.append(y) ys.append(y)
@ -445,7 +582,7 @@ class SweepReader(threading.Thread):
finally: finally:
try: try:
# Завершаем оставшийся свип # Завершаем оставшийся свип
self._finalize_current(xs, ys) self._finalize_current(xs, ys, cur_channels)
except Exception: except Exception:
pass pass
try: try:
@ -478,6 +615,15 @@ def main():
"Напр. 2,98. 'off' — отключить" "Напр. 2,98. 'off' — отключить"
), ),
) )
parser.add_argument(
"--spec-mean-sec",
type=float,
default=0.0,
help=(
"Вычитание среднего по каждой частоте за последние N секунд "
"в водопаде спектров (0 — отключить)"
),
)
parser.add_argument("--title", default="ADC Sweeps", help="Заголовок окна") parser.add_argument("--title", default="ADC Sweeps", help="Заголовок окна")
parser.add_argument( parser.add_argument(
"--fancy", "--fancy",
@ -496,6 +642,12 @@ def main():
default="auto", default="auto",
help="Графический бэкенд: pyqtgraph (pg) — быстрее; matplotlib (mpl) — совместимый. По умолчанию auto", help="Графический бэкенд: pyqtgraph (pg) — быстрее; matplotlib (mpl) — совместимый. По умолчанию auto",
) )
parser.add_argument(
"--norm-type",
choices=["projector", "simple"],
default="projector",
help="Тип нормировки: projector (по огибающим в [-1,+1]) или simple (raw/calib)",
)
args = parser.parse_args() args = parser.parse_args()
@ -513,7 +665,7 @@ def main():
import matplotlib import matplotlib
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation from matplotlib.animation import FuncAnimation
from matplotlib.widgets import Slider from matplotlib.widgets import Slider, CheckButtons
except Exception as e: except Exception as e:
sys.stderr.write(f"[error] Нужны matplotlib и ее зависимости: {e}\n") sys.stderr.write(f"[error] Нужны matplotlib и ее зависимости: {e}\n")
sys.exit(1) sys.exit(1)
@ -532,7 +684,9 @@ def main():
fig.subplots_adjust(wspace=0.25, hspace=0.35, left=0.07, right=0.90, top=0.92, bottom=0.08) fig.subplots_adjust(wspace=0.25, hspace=0.35, left=0.07, right=0.90, top=0.92, bottom=0.08)
# Состояние для отображения # Состояние для отображения
current_sweep: Optional[np.ndarray] = None current_sweep_raw: Optional[np.ndarray] = None
current_sweep_norm: Optional[np.ndarray] = None
last_calib_sweep: Optional[np.ndarray] = None
current_info: Optional[SweepInfo] = None current_info: Optional[SweepInfo] = None
x_shared: Optional[np.ndarray] = None x_shared: Optional[np.ndarray] = None
width: Optional[int] = None width: Optional[int] = None
@ -548,10 +702,14 @@ def main():
freq_shared: Optional[np.ndarray] = None freq_shared: Optional[np.ndarray] = None
# Параметры контраста водопада спектров # Параметры контраста водопада спектров
spec_clip = _parse_spec_clip(getattr(args, "spec_clip", None)) spec_clip = _parse_spec_clip(getattr(args, "spec_clip", None))
spec_mean_sec = float(getattr(args, "spec_mean_sec", 0.0))
# Ползунки управления Y для B-scan и контрастом # Ползунки управления Y для B-scan и контрастом
ymin_slider = None ymin_slider = None
ymax_slider = None ymax_slider = None
contrast_slider = None contrast_slider = None
calib_enabled = False
norm_type = str(getattr(args, "norm_type", "projector")).strip().lower()
cb = None
# Статусная строка (внизу окна) # Статусная строка (внизу окна)
status_text = fig.text( status_text = fig.text(
@ -565,16 +723,28 @@ def main():
) )
# Линейный график последнего свипа # Линейный график последнего свипа
line_obj, = ax_line.plot([], [], lw=1) line_obj, = ax_line.plot([], [], lw=1, color="tab:blue")
line_calib_obj, = ax_line.plot([], [], lw=1, color="tab:red")
line_norm_obj, = ax_line.plot([], [], lw=1, color="tab:green")
ax_line.set_title("Сырые данные", pad=1) ax_line.set_title("Сырые данные", pad=1)
ax_line.set_xlabel("F") ax_line.set_xlabel("ГГц")
ax_line.set_ylabel("") ax_line.set_ylabel("")
channel_text = ax_line.text(
0.98,
0.98,
"",
transform=ax_line.transAxes,
ha="right",
va="top",
fontsize=9,
family="monospace",
)
# Линейный график спектра текущего свипа # Линейный график спектра текущего свипа
fft_line_obj, = ax_fft.plot([], [], lw=1) fft_line_obj, = ax_fft.plot([], [], lw=1)
ax_fft.set_title("FFT", pad=1) ax_fft.set_title("FFT", pad=1)
ax_fft.set_xlabel("X") ax_fft.set_xlabel("Время")
ax_fft.set_ylabel("Амплитуда, дБ") ax_fft.set_ylabel("дБ")
# Диапазон по Y для последнего свипа: авто по умолчанию (поддерживает отрицательные значения) # Диапазон по Y для последнего свипа: авто по умолчанию (поддерживает отрицательные значения)
fixed_ylim: Optional[Tuple[float, float]] = None fixed_ylim: Optional[Tuple[float, float]] = None
@ -598,7 +768,7 @@ def main():
) )
ax_img.set_title("Сырые данные", pad=12) ax_img.set_title("Сырые данные", pad=12)
ax_img.set_xlabel("") ax_img.set_xlabel("")
ax_img.set_ylabel("частота") ax_img.set_ylabel("ГГц")
# Не показываем численные значения по времени на водопаде сырых данных # Не показываем численные значения по времени на водопаде сырых данных
try: try:
ax_img.tick_params(axis="x", labelbottom=False) ax_img.tick_params(axis="x", labelbottom=False)
@ -621,14 +791,31 @@ def main():
ax_spec.tick_params(axis="x", labelbottom=False) ax_spec.tick_params(axis="x", labelbottom=False)
except Exception: except Exception:
pass pass
def _normalize_sweep(raw: np.ndarray, calib: np.ndarray) -> np.ndarray:
return _normalize_by_calib(raw, calib, norm_type=norm_type)
def _set_calib_enabled():
nonlocal calib_enabled, current_sweep_norm
try:
calib_enabled = bool(cb.get_status()[0]) if cb is not None else False
except Exception:
calib_enabled = False
if calib_enabled and current_sweep_raw is not None and last_calib_sweep is not None:
current_sweep_norm = _normalize_sweep(current_sweep_raw, last_calib_sweep)
else:
current_sweep_norm = None
# Слайдеры для управления осью Y B-scan (мин/макс) и контрастом # Слайдеры для управления осью Y B-scan (мин/макс) и контрастом
try: try:
ax_smin = fig.add_axes([0.92, 0.55, 0.02, 0.35]) ax_smin = fig.add_axes([0.92, 0.55, 0.02, 0.35])
ax_smax = fig.add_axes([0.95, 0.55, 0.02, 0.35]) ax_smax = fig.add_axes([0.95, 0.55, 0.02, 0.35])
ax_sctr = fig.add_axes([0.98, 0.55, 0.02, 0.35]) ax_sctr = fig.add_axes([0.98, 0.55, 0.02, 0.35])
ax_cb = fig.add_axes([0.92, 0.45, 0.08, 0.08])
ymin_slider = Slider(ax_smin, "Y min", 0, max(1, fft_bins - 1), valinit=0, valstep=1, orientation="vertical") ymin_slider = Slider(ax_smin, "Y min", 0, max(1, fft_bins - 1), valinit=0, valstep=1, orientation="vertical")
ymax_slider = Slider(ax_smax, "Y max", 0, max(1, fft_bins - 1), valinit=max(1, fft_bins - 1), valstep=1, orientation="vertical") ymax_slider = Slider(ax_smax, "Y max", 0, max(1, fft_bins - 1), valinit=max(1, fft_bins - 1), valstep=1, orientation="vertical")
contrast_slider = Slider(ax_sctr, "Int max", 0, 100, valinit=100, valstep=1, orientation="vertical") contrast_slider = Slider(ax_sctr, "Int max", 0, 100, valinit=100, valstep=1, orientation="vertical")
cb = CheckButtons(ax_cb, ["калибровка"], [False])
def _on_ylim_change(_val): def _on_ylim_change(_val):
try: try:
@ -643,6 +830,7 @@ def main():
ymax_slider.on_changed(_on_ylim_change) ymax_slider.on_changed(_on_ylim_change)
# Контраст влияет на верхнюю границу цветовой шкалы (процент от авто-диапазона) # Контраст влияет на верхнюю границу цветовой шкалы (процент от авто-диапазона)
contrast_slider.on_changed(lambda _v: fig.canvas.draw_idle()) contrast_slider.on_changed(lambda _v: fig.canvas.draw_idle())
cb.on_clicked(lambda _v: _set_calib_enabled())
except Exception: except Exception:
pass pass
@ -651,20 +839,21 @@ def main():
interval_ms = int(1000.0 / max_fps) interval_ms = int(1000.0 / max_fps)
frames_since_ylim_update = 0 frames_since_ylim_update = 0
def ensure_buffer(_w: int): def ensure_buffer(_w: int):
nonlocal ring, width, head, x_shared, ring_fft, freq_shared, ring_time nonlocal ring, width, head, x_shared, ring_fft, freq_shared, ring_time
if ring is not None: if ring is not None:
return return
width = WF_WIDTH width = WF_WIDTH
x_shared = np.arange(width, dtype=np.int32) x_shared = np.linspace(3.3, 14.3, width, dtype=np.float32)
ring = np.full((max_sweeps, width), np.nan, dtype=np.float32) ring = np.full((max_sweeps, width), np.nan, dtype=np.float32)
ring_time = np.full((max_sweeps,), np.nan, dtype=np.float64) ring_time = np.full((max_sweeps,), np.nan, dtype=np.float64)
head = 0 head = 0
# Обновляем изображение под новые размеры: время по X (горизонталь), X по Y # Обновляем изображение под новые размеры: время по X (горизонталь), X по Y
img_obj.set_data(np.zeros((width, max_sweeps), dtype=np.float32)) img_obj.set_data(np.zeros((width, max_sweeps), dtype=np.float32))
img_obj.set_extent((0, max_sweeps - 1, 0, width - 1 if width > 0 else 1)) img_obj.set_extent((0, max_sweeps - 1, 3.3, 14.3))
ax_img.set_xlim(0, max_sweeps - 1) ax_img.set_xlim(0, max_sweeps - 1)
ax_img.set_ylim(0, max(1, width - 1)) ax_img.set_ylim(3.3, 14.3)
# FFT буферы: время по X, бин по Y # FFT буферы: время по X, бин по Y
ring_fft = np.full((max_sweeps, fft_bins), np.nan, dtype=np.float32) ring_fft = np.full((max_sweeps, fft_bins), np.nan, dtype=np.float32)
img_fft_obj.set_data(np.zeros((fft_bins, max_sweeps), dtype=np.float32)) img_fft_obj.set_data(np.zeros((fft_bins, max_sweeps), dtype=np.float32))
@ -736,7 +925,7 @@ def main():
# Окно Хэннинга # Окно Хэннинга
win = np.hanning(take_fft).astype(np.float32) win = np.hanning(take_fft).astype(np.float32)
fft_in[:take_fft] = seg * win fft_in[:take_fft] = seg * win
spec = np.fft.rfft(fft_in) spec = np.fft.ifft(fft_in)
mag = np.abs(spec).astype(np.float32) mag = np.abs(spec).astype(np.float32)
fft_row = 20.0 * np.log10(mag + 1e-9) fft_row = 20.0 * np.log10(mag + 1e-9)
if fft_row.shape[0] != bins: if fft_row.shape[0] != bins:
@ -753,7 +942,7 @@ def main():
y_max_fft = float(fr_max) y_max_fft = float(fr_max)
def drain_queue(): def drain_queue():
nonlocal current_sweep, current_info nonlocal current_sweep_raw, current_sweep_norm, current_info, last_calib_sweep
drained = 0 drained = 0
while True: while True:
try: try:
@ -761,10 +950,26 @@ def main():
except Empty: except Empty:
break break
drained += 1 drained += 1
current_sweep = s current_sweep_raw = s
current_info = info current_info = info
ch = 0
try:
ch = int(info.get("ch", 0)) if isinstance(info, dict) else 0
except Exception:
ch = 0
if ch == 0:
last_calib_sweep = s
current_sweep_norm = None
sweep_for_proc = s
else:
if calib_enabled and last_calib_sweep is not None:
current_sweep_norm = _normalize_sweep(s, last_calib_sweep)
sweep_for_proc = current_sweep_norm
else:
current_sweep_norm = None
sweep_for_proc = s
ensure_buffer(s.size) ensure_buffer(s.size)
push_sweep(s) push_sweep(sweep_for_proc)
return drained return drained
def make_display_ring(): def make_display_ring():
@ -780,6 +985,24 @@ def main():
base_t = ring_time if head == 0 else np.roll(ring_time, -head) base_t = ring_time if head == 0 else np.roll(ring_time, -head)
return base_t return base_t
def _subtract_recent_mean_fft(disp_fft: np.ndarray) -> np.ndarray:
"""Вычесть среднее по каждой частоте за последние spec_mean_sec секунд."""
if spec_mean_sec <= 0.0:
return disp_fft
disp_times = make_display_times()
if disp_times is None:
return disp_fft
now_t = time.time()
mask = np.isfinite(disp_times) & (disp_times >= (now_t - spec_mean_sec))
if not np.any(mask):
return disp_fft
try:
mean_spec = np.nanmean(disp_fft[:, mask], axis=1)
except Exception:
return disp_fft
mean_spec = np.nan_to_num(mean_spec, nan=0.0)
return disp_fft - mean_spec[:, None]
def make_display_ring_fft(): def make_display_ring_fft():
if ring_fft is None: if ring_fft is None:
return np.zeros((1, 1), dtype=np.float32) return np.zeros((1, 1), dtype=np.float32)
@ -791,18 +1014,26 @@ def main():
changed = drain_queue() > 0 changed = drain_queue() > 0
# Обновление линии последнего свипа # Обновление линии последнего свипа
if current_sweep is not None: if current_sweep_raw is not None:
if x_shared is not None and current_sweep.size <= x_shared.size: if x_shared is not None and current_sweep_raw.size <= x_shared.size:
xs = x_shared[: current_sweep.size] xs = x_shared[: current_sweep_raw.size]
else: else:
xs = np.arange(current_sweep.size, dtype=np.int32) xs = np.arange(current_sweep_raw.size, dtype=np.int32)
line_obj.set_data(xs, current_sweep) line_obj.set_data(xs, current_sweep_raw)
# Лимиты по X постоянные под текущую ширину if last_calib_sweep is not None:
ax_line.set_xlim(0, max(1, current_sweep.size - 1)) line_calib_obj.set_data(xs[: last_calib_sweep.size], last_calib_sweep)
else:
line_calib_obj.set_data([], [])
if current_sweep_norm is not None:
line_norm_obj.set_data(xs[: current_sweep_norm.size], current_sweep_norm)
else:
line_norm_obj.set_data([], [])
# Лимиты по X: 3.3 ГГц .. 14.3 ГГц
ax_line.set_xlim(3.3, 14.3)
# Адаптивные Y-лимиты (если не задан --ylim) # Адаптивные Y-лимиты (если не задан --ylim)
if fixed_ylim is None: if fixed_ylim is None:
y0 = float(np.nanmin(current_sweep)) y0 = float(np.nanmin(current_sweep_raw))
y1 = float(np.nanmax(current_sweep)) y1 = float(np.nanmax(current_sweep_raw))
if np.isfinite(y0) and np.isfinite(y1): if np.isfinite(y0) and np.isfinite(y1):
if y0 == y1: if y0 == y1:
pad = max(1.0, abs(y0) * 0.05) pad = max(1.0, abs(y0) * 0.05)
@ -815,13 +1046,14 @@ def main():
ax_line.set_ylim(y0, y1) ax_line.set_ylim(y0, y1)
# Обновление спектра текущего свипа # Обновление спектра текущего свипа
take_fft = min(int(current_sweep.size), FFT_LEN) sweep_for_fft = current_sweep_norm if current_sweep_norm is not None else current_sweep_raw
take_fft = min(int(sweep_for_fft.size), FFT_LEN)
if take_fft > 0 and freq_shared is not None: if take_fft > 0 and freq_shared is not None:
fft_in = np.zeros((FFT_LEN,), dtype=np.float32) fft_in = np.zeros((FFT_LEN,), dtype=np.float32)
seg = np.nan_to_num(current_sweep[:take_fft], nan=0.0).astype(np.float32, copy=False) seg = np.nan_to_num(sweep_for_fft[:take_fft], nan=0.0).astype(np.float32, copy=False)
win = np.hanning(take_fft).astype(np.float32) win = np.hanning(take_fft).astype(np.float32)
fft_in[:take_fft] = seg * win fft_in[:take_fft] = seg * win
spec = np.fft.rfft(fft_in) spec = np.fft.ifft(fft_in)
mag = np.abs(spec).astype(np.float32) mag = np.abs(spec).astype(np.float32)
fft_vals = 20.0 * np.log10(mag + 1e-9) fft_vals = 20.0 * np.log10(mag + 1e-9)
xs_fft = freq_shared xs_fft = freq_shared
@ -830,7 +1062,7 @@ def main():
fft_line_obj.set_data(xs_fft[: fft_vals.size], fft_vals) fft_line_obj.set_data(xs_fft[: fft_vals.size], fft_vals)
# Авто-диапазон по Y для спектра # Авто-диапазон по Y для спектра
if np.isfinite(np.nanmin(fft_vals)) and np.isfinite(np.nanmax(fft_vals)): if np.isfinite(np.nanmin(fft_vals)) and np.isfinite(np.nanmax(fft_vals)):
ax_fft.set_xlim(0, max(1, xs_fft.size - 1)) ax_fft.set_xlim(0, max(1, xs_fft.size - 1) * 1.5)
ax_fft.set_ylim(float(np.nanmin(fft_vals)), float(np.nanmax(fft_vals))) ax_fft.set_ylim(float(np.nanmin(fft_vals)), float(np.nanmax(fft_vals)))
# Обновление водопада # Обновление водопада
@ -847,6 +1079,7 @@ def main():
# Обновление водопада спектров # Обновление водопада спектров
if changed and ring_fft is not None: if changed and ring_fft is not None:
disp_fft = make_display_ring_fft() disp_fft = make_display_ring_fft()
disp_fft = _subtract_recent_mean_fft(disp_fft)
# Новые данные справа: без реверса # Новые данные справа: без реверса
img_fft_obj.set_data(disp_fft) img_fft_obj.set_data(disp_fft)
# Подписи времени не обновляем динамически (оставляем авто-тики) # Подписи времени не обновляем динамически (оставляем авто-тики)
@ -881,9 +1114,33 @@ def main():
if changed and current_info: if changed and current_info:
status_text.set_text(_format_status_kv(current_info)) status_text.set_text(_format_status_kv(current_info))
chs = current_info.get("chs") if isinstance(current_info, dict) else None
if chs is None:
chs = current_info.get("ch") if isinstance(current_info, dict) else None
if chs is None:
channel_text.set_text("")
else:
try:
if isinstance(chs, (list, tuple, set)):
ch_list = sorted(int(v) for v in chs)
ch_text_val = ", ".join(str(v) for v in ch_list)
else:
ch_text_val = str(int(chs))
channel_text.set_text(f"chs {ch_text_val}")
except Exception:
channel_text.set_text(f"chs {chs}")
# Возвращаем обновлённые артисты # Возвращаем обновлённые артисты
return (line_obj, img_obj, fft_line_obj, img_fft_obj, status_text) return (
line_obj,
line_calib_obj,
line_norm_obj,
img_obj,
fft_line_obj,
img_fft_obj,
status_text,
channel_text,
)
ani = FuncAnimation(fig, update, interval=interval_ms, blit=False) ani = FuncAnimation(fig, update, interval=interval_ms, blit=False)
@ -929,8 +1186,13 @@ def run_pyqtgraph(args):
p_line = win.addPlot(row=0, col=0, title="Сырые данные") p_line = win.addPlot(row=0, col=0, title="Сырые данные")
p_line.showGrid(x=True, y=True, alpha=0.3) p_line.showGrid(x=True, y=True, alpha=0.3)
curve = p_line.plot(pen=pg.mkPen((80, 120, 255), width=1)) curve = p_line.plot(pen=pg.mkPen((80, 120, 255), width=1))
p_line.setLabel("bottom", "X") curve_calib = p_line.plot(pen=pg.mkPen((220, 60, 60), width=1))
curve_norm = p_line.plot(pen=pg.mkPen((60, 180, 90), width=1))
p_line.setLabel("bottom", "ГГц")
p_line.setLabel("left", "Y") p_line.setLabel("left", "Y")
ch_text = pg.TextItem("", anchor=(1, 1))
ch_text.setZValue(10)
p_line.addItem(ch_text)
# Водопад (справа-сверху) # Водопад (справа-сверху)
p_img = win.addPlot(row=0, col=1, title="Сырые данные водопад") p_img = win.addPlot(row=0, col=1, title="Сырые данные водопад")
@ -941,7 +1203,7 @@ def run_pyqtgraph(args):
p_img.getAxis("bottom").setStyle(showValues=False) p_img.getAxis("bottom").setStyle(showValues=False)
except Exception: except Exception:
pass pass
p_img.setLabel("left", "X (0 снизу)") p_img.setLabel("left", "ГГц")
img = pg.ImageItem() img = pg.ImageItem()
p_img.addItem(img) p_img.addItem(img)
@ -949,8 +1211,8 @@ def run_pyqtgraph(args):
p_fft = win.addPlot(row=1, col=0, title="FFT") p_fft = win.addPlot(row=1, col=0, title="FFT")
p_fft.showGrid(x=True, y=True, alpha=0.3) p_fft.showGrid(x=True, y=True, alpha=0.3)
curve_fft = p_fft.plot(pen=pg.mkPen((255, 120, 80), width=1)) curve_fft = p_fft.plot(pen=pg.mkPen((255, 120, 80), width=1))
p_fft.setLabel("bottom", "Бин") p_fft.setLabel("bottom", "Время")
p_fft.setLabel("left", "Амплитуда, дБ") p_fft.setLabel("left", "дБ")
# Водопад спектров (справа-снизу) # Водопад спектров (справа-снизу)
p_spec = win.addPlot(row=1, col=1, title="B-scan (дБ)") p_spec = win.addPlot(row=1, col=1, title="B-scan (дБ)")
@ -965,16 +1227,25 @@ def run_pyqtgraph(args):
img_fft = pg.ImageItem() img_fft = pg.ImageItem()
p_spec.addItem(img_fft) p_spec.addItem(img_fft)
# Чекбокс калибровки
calib_cb = QtWidgets.QCheckBox("калибровка")
cb_proxy = QtWidgets.QGraphicsProxyWidget()
cb_proxy.setWidget(calib_cb)
win.addItem(cb_proxy, row=2, col=1)
# Статусная строка (внизу окна) # Статусная строка (внизу окна)
status = pg.LabelItem(justify="left") status = pg.LabelItem(justify="left")
win.addItem(status, row=2, col=0, colspan=2) win.addItem(status, row=3, col=0, colspan=2)
# Состояние # Состояние
ring: Optional[np.ndarray] = None ring: Optional[np.ndarray] = None
ring_time: Optional[np.ndarray] = None
head = 0 head = 0
width: Optional[int] = None width: Optional[int] = None
x_shared: Optional[np.ndarray] = None x_shared: Optional[np.ndarray] = None
current_sweep: Optional[np.ndarray] = None current_sweep_raw: Optional[np.ndarray] = None
current_sweep_norm: Optional[np.ndarray] = None
last_calib_sweep: Optional[np.ndarray] = None
current_info: Optional[SweepInfo] = None current_info: Optional[SweepInfo] = None
# Авто-уровни цветовой шкалы водопада сырых данных пересчитываются по видимой области. # Авто-уровни цветовой шкалы водопада сырых данных пересчитываются по видимой области.
# Для спектров # Для спектров
@ -984,6 +1255,9 @@ def run_pyqtgraph(args):
y_min_fft, y_max_fft = None, None y_min_fft, y_max_fft = None, None
# Параметры контраста водопада спектров (процентильная обрезка) # Параметры контраста водопада спектров (процентильная обрезка)
spec_clip = _parse_spec_clip(getattr(args, "spec_clip", None)) spec_clip = _parse_spec_clip(getattr(args, "spec_clip", None))
spec_mean_sec = float(getattr(args, "spec_mean_sec", 0.0))
calib_enabled = False
norm_type = str(getattr(args, "norm_type", "projector")).strip().lower()
# Диапазон по Y: авто по умолчанию (поддерживает отрицательные значения) # Диапазон по Y: авто по умолчанию (поддерживает отрицательные значения)
fixed_ylim: Optional[Tuple[float, float]] = None fixed_ylim: Optional[Tuple[float, float]] = None
if args.ylim: if args.ylim:
@ -995,18 +1269,39 @@ def run_pyqtgraph(args):
if fixed_ylim is not None: if fixed_ylim is not None:
p_line.setYRange(fixed_ylim[0], fixed_ylim[1], padding=0) p_line.setYRange(fixed_ylim[0], fixed_ylim[1], padding=0)
def _normalize_sweep(raw: np.ndarray, calib: np.ndarray) -> np.ndarray:
return _normalize_by_calib(raw, calib, norm_type=norm_type)
def _set_calib_enabled():
nonlocal calib_enabled, current_sweep_norm
try:
calib_enabled = bool(calib_cb.isChecked())
except Exception:
calib_enabled = False
if calib_enabled and current_sweep_raw is not None and last_calib_sweep is not None:
current_sweep_norm = _normalize_sweep(current_sweep_raw, last_calib_sweep)
else:
current_sweep_norm = None
try:
calib_cb.stateChanged.connect(lambda _v: _set_calib_enabled())
except Exception:
pass
def ensure_buffer(_w: int): def ensure_buffer(_w: int):
nonlocal ring, head, width, x_shared, ring_fft, freq_shared nonlocal ring, ring_time, head, width, x_shared, ring_fft, freq_shared
if ring is not None: if ring is not None:
return return
width = WF_WIDTH width = WF_WIDTH
x_shared = np.arange(width, dtype=np.int32) x_shared = np.linspace(3.3, 14.3, width, dtype=np.float32)
ring = np.full((max_sweeps, width), np.nan, dtype=np.float32) ring = np.full((max_sweeps, width), np.nan, dtype=np.float32)
ring_time = np.full((max_sweeps,), np.nan, dtype=np.float64)
head = 0 head = 0
# Водопад: время по оси X, X по оси Y # Водопад: время по оси X, X по оси Y (ось Y: 3.3..14.3 ГГц)
img.setImage(ring.T, autoLevels=False) img.setImage(ring.T, autoLevels=False)
p_img.setRange(xRange=(0, max_sweeps - 1), yRange=(0, max(1, width - 1)), padding=0) img.setRect(0, 3.3, max_sweeps, 14.3 - 3.3)
p_line.setXRange(0, max(1, width - 1), padding=0) p_img.setRange(xRange=(0, max_sweeps - 1), yRange=(3.3, 14.3), padding=0)
p_line.setXRange(3.3, 14.3, padding=0)
# FFT: время по оси X, бин по оси Y # FFT: время по оси X, бин по оси Y
ring_fft = np.full((max_sweeps, fft_bins), np.nan, dtype=np.float32) ring_fft = np.full((max_sweeps, fft_bins), np.nan, dtype=np.float32)
img_fft.setImage(ring_fft.T, autoLevels=False) img_fft.setImage(ring_fft.T, autoLevels=False)
@ -1046,7 +1341,7 @@ def run_pyqtgraph(args):
return (vmin, vmax) return (vmin, vmax)
def push_sweep(s: np.ndarray): def push_sweep(s: np.ndarray):
nonlocal ring, head, ring_fft, y_min_fft, y_max_fft nonlocal ring, ring_time, head, ring_fft, y_min_fft, y_max_fft
if s is None or s.size == 0 or ring is None: if s is None or s.size == 0 or ring is None:
return return
w = ring.shape[1] w = ring.shape[1]
@ -1054,6 +1349,8 @@ def run_pyqtgraph(args):
take = min(w, s.size) take = min(w, s.size)
row[:take] = s[:take] row[:take] = s[:take]
ring[head, :] = row ring[head, :] = row
if ring_time is not None:
ring_time[head] = time.time()
head = (head + 1) % ring.shape[0] head = (head + 1) % ring.shape[0]
# FFT строка (дБ) # FFT строка (дБ)
if ring_fft is not None: if ring_fft is not None:
@ -1064,7 +1361,7 @@ def run_pyqtgraph(args):
seg = np.nan_to_num(s[:take_fft], nan=0.0).astype(np.float32, copy=False) seg = np.nan_to_num(s[:take_fft], nan=0.0).astype(np.float32, copy=False)
win = np.hanning(take_fft).astype(np.float32) win = np.hanning(take_fft).astype(np.float32)
fft_in[:take_fft] = seg * win fft_in[:take_fft] = seg * win
spec = np.fft.rfft(fft_in) spec = np.fft.ifft(fft_in)
mag = np.abs(spec).astype(np.float32) mag = np.abs(spec).astype(np.float32)
fft_row = 20.0 * np.log10(mag + 1e-9) fft_row = 20.0 * np.log10(mag + 1e-9)
if fft_row.shape[0] != bins: if fft_row.shape[0] != bins:
@ -1080,7 +1377,7 @@ def run_pyqtgraph(args):
y_max_fft = float(fr_max) y_max_fft = float(fr_max)
def drain_queue(): def drain_queue():
nonlocal current_sweep, current_info nonlocal current_sweep_raw, current_sweep_norm, current_info, last_calib_sweep
drained = 0 drained = 0
while True: while True:
try: try:
@ -1088,10 +1385,26 @@ def run_pyqtgraph(args):
except Empty: except Empty:
break break
drained += 1 drained += 1
current_sweep = s current_sweep_raw = s
current_info = info current_info = info
ch = 0
try:
ch = int(info.get("ch", 0)) if isinstance(info, dict) else 0
except Exception:
ch = 0
if ch == 0:
last_calib_sweep = s
current_sweep_norm = None
sweep_for_proc = s
else:
if calib_enabled and last_calib_sweep is not None:
current_sweep_norm = _normalize_sweep(s, last_calib_sweep)
sweep_for_proc = current_sweep_norm
else:
current_sweep_norm = None
sweep_for_proc = s
ensure_buffer(s.size) ensure_buffer(s.size)
push_sweep(s) push_sweep(sweep_for_proc)
return drained return drained
# Попытка применить LUT из колормэпа (если доступен) # Попытка применить LUT из колормэпа (если доступен)
@ -1105,33 +1418,43 @@ def run_pyqtgraph(args):
def update(): def update():
changed = drain_queue() > 0 changed = drain_queue() > 0
if current_sweep is not None and x_shared is not None: if current_sweep_raw is not None and x_shared is not None:
if current_sweep.size <= x_shared.size: if current_sweep_raw.size <= x_shared.size:
xs = x_shared[: current_sweep.size] xs = x_shared[: current_sweep_raw.size]
else: else:
xs = np.arange(current_sweep.size) xs = np.arange(current_sweep_raw.size)
curve.setData(xs, current_sweep, autoDownsample=True) curve.setData(xs, current_sweep_raw, autoDownsample=True)
if last_calib_sweep is not None:
curve_calib.setData(xs[: last_calib_sweep.size], last_calib_sweep, autoDownsample=True)
else:
curve_calib.setData([], [])
if current_sweep_norm is not None:
curve_norm.setData(xs[: current_sweep_norm.size], current_sweep_norm, autoDownsample=True)
else:
curve_norm.setData([], [])
if fixed_ylim is None: if fixed_ylim is None:
y0 = float(np.nanmin(current_sweep)) y0 = float(np.nanmin(current_sweep_raw))
y1 = float(np.nanmax(current_sweep)) y1 = float(np.nanmax(current_sweep_raw))
if np.isfinite(y0) and np.isfinite(y1): if np.isfinite(y0) and np.isfinite(y1):
margin = 0.05 * max(1.0, (y1 - y0)) margin = 0.05 * max(1.0, (y1 - y0))
p_line.setYRange(y0 - margin, y1 + margin, padding=0) p_line.setYRange(y0 - margin, y1 + margin, padding=0)
# Обновим спектр # Обновим спектр
take_fft = min(int(current_sweep.size), FFT_LEN) sweep_for_fft = current_sweep_norm if current_sweep_norm is not None else current_sweep_raw
take_fft = min(int(sweep_for_fft.size), FFT_LEN)
if take_fft > 0 and freq_shared is not None: if take_fft > 0 and freq_shared is not None:
fft_in = np.zeros((FFT_LEN,), dtype=np.float32) fft_in = np.zeros((FFT_LEN,), dtype=np.float32)
seg = np.nan_to_num(current_sweep[:take_fft], nan=0.0).astype(np.float32, copy=False) seg = np.nan_to_num(sweep_for_fft[:take_fft], nan=0.0).astype(np.float32, copy=False)
win = np.hanning(take_fft).astype(np.float32) win = np.hanning(take_fft).astype(np.float32)
fft_in[:take_fft] = seg * win fft_in[:take_fft] = seg * win
spec = np.fft.rfft(fft_in) spec = np.fft.ifft(fft_in)
mag = np.abs(spec).astype(np.float32) mag = np.abs(spec).astype(np.float32)
fft_vals = 20.0 * np.log10(mag + 1e-9) fft_vals = 20.0 * np.log10(mag + 1e-9)
xs_fft = freq_shared xs_fft = freq_shared
if fft_vals.size > xs_fft.size: if fft_vals.size > xs_fft.size:
fft_vals = fft_vals[: xs_fft.size] fft_vals = fft_vals[: xs_fft.size]
curve_fft.setData(xs_fft[: fft_vals.size], fft_vals) curve_fft.setData(xs_fft[: fft_vals.size], fft_vals)
p_fft.setXRange(0, max(1, xs_fft.size - 1) * 1.5, padding=0)
p_fft.setYRange(float(np.nanmin(fft_vals)), float(np.nanmax(fft_vals)), padding=0) p_fft.setYRange(float(np.nanmin(fft_vals)), float(np.nanmax(fft_vals)), padding=0)
if changed and ring is not None: if changed and ring is not None:
@ -1148,10 +1471,41 @@ def run_pyqtgraph(args):
status.setText(_format_status_kv(current_info)) status.setText(_format_status_kv(current_info))
except Exception: except Exception:
pass pass
try:
chs = current_info.get("chs") if isinstance(current_info, dict) else None
if chs is None:
chs = current_info.get("ch") if isinstance(current_info, dict) else None
if chs is None:
ch_text.setText("")
else:
if isinstance(chs, (list, tuple, set)):
ch_list = sorted(int(v) for v in chs)
ch_text_val = ", ".join(str(v) for v in ch_list)
else:
ch_text_val = str(int(chs))
ch_text.setText(f"chs {ch_text_val}")
(x0, x1), (y0, y1) = p_line.viewRange()
dx = 0.01 * max(1.0, float(x1 - x0))
dy = 0.01 * max(1.0, float(y1 - y0))
ch_text.setPos(float(x1 - dx), float(y1 - dy))
except Exception:
pass
if changed and ring_fft is not None: if changed and ring_fft is not None:
disp_fft = ring_fft if head == 0 else np.roll(ring_fft, -head, axis=0) disp_fft = ring_fft if head == 0 else np.roll(ring_fft, -head, axis=0)
disp_fft = disp_fft.T[:, ::-1] disp_fft = disp_fft.T[:, ::-1]
if spec_mean_sec > 0.0 and ring_time is not None:
disp_times = ring_time if head == 0 else np.roll(ring_time, -head)
disp_times = disp_times[::-1]
now_t = time.time()
mask = np.isfinite(disp_times) & (disp_times >= (now_t - spec_mean_sec))
if np.any(mask):
try:
mean_spec = np.nanmean(disp_fft[:, mask], axis=1)
mean_spec = np.nan_to_num(mean_spec, nan=0.0)
disp_fft = disp_fft - mean_spec[:, None]
except Exception:
pass
# Автодиапазон по среднему спектру за видимый интервал (как в хорошей версии) # Автодиапазон по среднему спектру за видимый интервал (как в хорошей версии)
levels = None levels = None
try: try: