new project structure

This commit is contained in:
awe
2026-02-11 16:32:21 +03:00
parent 0eaa07c03a
commit c3acd0c193
16 changed files with 1576 additions and 0 deletions

View File

View File

@ -0,0 +1,119 @@
"""Состояние приложения: текущие свипы и настройки калибровки/нормировки."""
from queue import Empty, Queue
from typing import Any, Dict, Mapping, Optional
import numpy as np
from rfg_adc_plotter.processing.normalizer import normalize_by_calib
from rfg_adc_plotter.state.ring_buffer import RingBuffer
from rfg_adc_plotter.types import SweepInfo, SweepPacket
def format_status(data: Mapping[str, Any]) -> str:
"""Преобразовать словарь метрик в одну строку 'k:v'."""
def _fmt(v: Any) -> str:
if v is None:
return "NA"
try:
fv = float(v)
except Exception:
return str(v)
if not np.isfinite(fv):
return "nan"
if abs(fv) >= 1000 or (0 < abs(fv) < 0.01):
return f"{fv:.3g}"
return f"{fv:.3f}".rstrip("0").rstrip(".")
parts = [f"{k}:{_fmt(v)}" for k, v in data.items()]
return " ".join(parts)
class AppState:
"""Весь изменяемый GUI-state: текущие данные, калибровка, настройки.
Методы drain_queue и set_calib_enabled заменяют одноимённые closures
с nonlocal из оригинального кода.
"""
def __init__(self, norm_type: str = "projector"):
self.current_sweep_raw: Optional[np.ndarray] = None
self.current_sweep_norm: Optional[np.ndarray] = None
self.last_calib_sweep: Optional[np.ndarray] = None
self.current_info: Optional[SweepInfo] = None
self.calib_enabled: bool = False
self.norm_type: str = norm_type
def _normalize(self, raw: np.ndarray, calib: np.ndarray) -> np.ndarray:
return normalize_by_calib(raw, calib, self.norm_type)
def set_calib_enabled(self, enabled: bool):
"""Включить/выключить режим калибровки, пересчитать norm-свип."""
self.calib_enabled = enabled
if (
self.calib_enabled
and self.current_sweep_raw is not None
and self.last_calib_sweep is not None
):
self.current_sweep_norm = self._normalize(
self.current_sweep_raw, self.last_calib_sweep
)
else:
self.current_sweep_norm = None
def drain_queue(self, q: "Queue[SweepPacket]", ring: RingBuffer) -> int:
"""Вытащить все ожидающие свипы из очереди, обновить state и ring.
Возвращает количество обработанных свипов.
"""
drained = 0
while True:
try:
s, info = q.get_nowait()
except Empty:
break
drained += 1
self.current_sweep_raw = s
self.current_info = info
ch = 0
try:
ch = int(info.get("ch", 0)) if isinstance(info, dict) else 0
except Exception:
ch = 0
# Канал 0 — опорный (калибровочный) свип
if ch == 0:
self.last_calib_sweep = s
self.current_sweep_norm = None
sweep_for_ring = s
else:
if self.calib_enabled and self.last_calib_sweep is not None:
self.current_sweep_norm = self._normalize(s, self.last_calib_sweep)
sweep_for_ring = self.current_sweep_norm
else:
self.current_sweep_norm = None
sweep_for_ring = s
ring.ensure_init(s.size)
ring.push(sweep_for_ring)
return drained
def format_channel_label(self) -> str:
"""Строка с номерами каналов для подписи на графике."""
if self.current_info is None:
return ""
info = self.current_info
chs = info.get("chs") if isinstance(info, dict) else None
if chs is None:
chs = info.get("ch") if isinstance(info, dict) else None
if chs is None:
return ""
try:
if isinstance(chs, (list, tuple, set)):
ch_list = sorted(int(v) for v in chs)
return "chs " + ", ".join(str(v) for v in ch_list)
return f"chs {int(chs)}"
except Exception:
return f"chs {chs}"

View File

@ -0,0 +1,166 @@
"""Кольцевой буфер свипов и FFT-строк для водопадного отображения."""
import time
from typing import Optional, Tuple
import numpy as np
from rfg_adc_plotter.constants import FFT_LEN, WF_WIDTH
class RingBuffer:
"""Хранит последние N свипов и соответствующие FFT-строки.
Все мутабельные поля водопада инкапсулированы здесь,
что устраняет необходимость nonlocal в GUI-коде.
"""
def __init__(self, max_sweeps: int):
self.max_sweeps = max_sweeps
self.fft_bins = FFT_LEN // 2 + 1
# Инициализируются при первом свипе (ensure_init)
self.ring: Optional[np.ndarray] = None # (max_sweeps, WF_WIDTH)
self.ring_fft: Optional[np.ndarray] = None # (max_sweeps, fft_bins)
self.ring_time: Optional[np.ndarray] = None # (max_sweeps,)
self.head: int = 0
self.width: Optional[int] = None
self.x_shared: Optional[np.ndarray] = None
self.freq_shared: Optional[np.ndarray] = None
self.y_min_fft: Optional[float] = None
self.y_max_fft: Optional[float] = None
# FFT последнего свипа (для отображения без повторного вычисления)
self.last_fft_vals: Optional[np.ndarray] = None
@property
def is_ready(self) -> bool:
return self.ring is not None
def ensure_init(self, sweep_width: int):
"""Инициализировать буферы при первом свипе. Повторные вызовы — no-op."""
if self.ring is not None:
return
self.width = WF_WIDTH
self.x_shared = np.arange(self.width, dtype=np.int32)
self.ring = np.full((self.max_sweeps, self.width), np.nan, dtype=np.float32)
self.ring_time = np.full((self.max_sweeps,), np.nan, dtype=np.float64)
self.ring_fft = np.full((self.max_sweeps, self.fft_bins), np.nan, dtype=np.float32)
self.freq_shared = np.arange(self.fft_bins, dtype=np.int32)
self.head = 0
def push(self, s: np.ndarray):
"""Добавить строку свипа в кольцевой буфер, вычислить FFT-строку."""
if s is None or s.size == 0 or self.ring is None:
return
w = self.ring.shape[1]
row = np.full((w,), np.nan, dtype=np.float32)
take = min(w, s.size)
row[:take] = s[:take]
self.ring[self.head, :] = row
self.ring_time[self.head] = time.time()
self.head = (self.head + 1) % self.ring.shape[0]
self._push_fft(s)
def _push_fft(self, s: np.ndarray):
bins = self.ring_fft.shape[1]
take_fft = min(int(s.size), FFT_LEN)
if take_fft <= 0:
fft_row = np.full((bins,), np.nan, dtype=np.float32)
else:
fft_in = np.zeros((FFT_LEN,), dtype=np.float32)
seg = np.nan_to_num(s[:take_fft], nan=0.0).astype(np.float32, copy=False)
win = np.hanning(take_fft).astype(np.float32)
fft_in[:take_fft] = seg * win
spec = np.fft.rfft(fft_in)
mag = np.abs(spec).astype(np.float32)
fft_row = (20.0 * np.log10(mag + 1e-9)).astype(np.float32)
if fft_row.shape[0] != bins:
fft_row = fft_row[:bins]
prev_head = (self.head - 1) % self.ring_fft.shape[0]
self.ring_fft[prev_head, :] = fft_row
self.last_fft_vals = fft_row
fr_min = np.nanmin(fft_row)
fr_max = float(np.nanpercentile(fft_row, 90))
if self.y_min_fft is None or (not np.isnan(fr_min) and fr_min < self.y_min_fft):
self.y_min_fft = float(fr_min)
if self.y_max_fft is None or (not np.isnan(fr_max) and fr_max > self.y_max_fft):
self.y_max_fft = float(fr_max)
def get_display_ring(self) -> np.ndarray:
"""Кольцо в порядке от старого к новому, ось времени по X. Форма: (width, time)."""
if self.ring is None:
return np.zeros((1, 1), dtype=np.float32)
base = self.ring if self.head == 0 else np.roll(self.ring, -self.head, axis=0)
return base.T # (width, time)
def get_display_ring_fft(self) -> np.ndarray:
"""FFT-кольцо в порядке от старого к новому. Форма: (bins, time)."""
if self.ring_fft is None:
return np.zeros((1, 1), dtype=np.float32)
base = self.ring_fft if self.head == 0 else np.roll(self.ring_fft, -self.head, axis=0)
return base.T # (bins, time)
def get_display_times(self) -> Optional[np.ndarray]:
"""Временные метки строк в порядке от старого к новому."""
if self.ring_time is None:
return None
return self.ring_time if self.head == 0 else np.roll(self.ring_time, -self.head)
def subtract_recent_mean_fft(
self, disp_fft: np.ndarray, spec_mean_sec: float
) -> np.ndarray:
"""Вычесть среднее по каждой частоте за последние spec_mean_sec секунд."""
if spec_mean_sec <= 0.0:
return disp_fft
disp_times = self.get_display_times()
if disp_times is None:
return disp_fft
now_t = time.time()
mask = np.isfinite(disp_times) & (disp_times >= (now_t - spec_mean_sec))
if not np.any(mask):
return disp_fft
try:
mean_spec = np.nanmean(disp_fft[:, mask], axis=1)
except Exception:
return disp_fft
mean_spec = np.nan_to_num(mean_spec, nan=0.0)
return disp_fft - mean_spec[:, None]
def compute_fft_levels(
self, disp_fft: np.ndarray, spec_clip: Optional[Tuple[float, float]]
) -> Optional[Tuple[float, float]]:
"""Вычислить (vmin, vmax) для отображения водопада спектров."""
# 1. По среднему спектру за видимое время
try:
mean_spec = np.nanmean(disp_fft, axis=1)
vmin_v = float(np.nanmin(mean_spec))
vmax_v = float(np.nanmax(mean_spec))
if np.isfinite(vmin_v) and np.isfinite(vmax_v) and vmin_v != vmax_v:
return (vmin_v, vmax_v)
except Exception:
pass
# 2. Процентильная обрезка
if spec_clip is not None:
try:
vmin_v = float(np.nanpercentile(disp_fft, spec_clip[0]))
vmax_v = float(np.nanpercentile(disp_fft, spec_clip[1]))
if np.isfinite(vmin_v) and np.isfinite(vmax_v) and vmin_v != vmax_v:
return (vmin_v, vmax_v)
except Exception:
pass
# 3. Глобальные накопленные мин/макс
if (
self.y_min_fft is not None
and self.y_max_fft is not None
and np.isfinite(self.y_min_fft)
and np.isfinite(self.y_max_fft)
and self.y_min_fft != self.y_max_fft
):
return (self.y_min_fft, self.y_max_fft)
return None