new background remove algoritm
This commit is contained in:
@ -1,6 +1,7 @@
|
||||
"""Runtime state helpers."""
|
||||
|
||||
from rfg_adc_plotter.state.background_buffer import BackgroundMedianBuffer
|
||||
from rfg_adc_plotter.state.ring_buffer import RingBuffer
|
||||
from rfg_adc_plotter.state.runtime_state import RuntimeState
|
||||
|
||||
__all__ = ["RingBuffer", "RuntimeState"]
|
||||
__all__ = ["BackgroundMedianBuffer", "RingBuffer", "RuntimeState"]
|
||||
|
||||
49
rfg_adc_plotter/state/background_buffer.py
Normal file
49
rfg_adc_plotter/state/background_buffer.py
Normal file
@ -0,0 +1,49 @@
|
||||
"""Rolling median buffer for persisted FFT background capture."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Optional
|
||||
|
||||
import numpy as np
|
||||
|
||||
|
||||
class BackgroundMedianBuffer:
|
||||
"""Store recent FFT rows and expose their median profile."""
|
||||
|
||||
def __init__(self, max_rows: int):
|
||||
self.max_rows = max(1, int(max_rows))
|
||||
self.width = 0
|
||||
self.head = 0
|
||||
self.count = 0
|
||||
self.rows: Optional[np.ndarray] = None
|
||||
|
||||
def reset(self) -> None:
|
||||
self.width = 0
|
||||
self.head = 0
|
||||
self.count = 0
|
||||
self.rows = None
|
||||
|
||||
def push(self, fft_mag: np.ndarray) -> None:
|
||||
values = np.asarray(fft_mag, dtype=np.float32).reshape(-1)
|
||||
if values.size == 0:
|
||||
return
|
||||
if self.rows is None or self.width != values.size:
|
||||
self.width = values.size
|
||||
self.rows = np.full((self.max_rows, self.width), np.nan, dtype=np.float32)
|
||||
self.head = 0
|
||||
self.count = 0
|
||||
self.rows[self.head, :] = values
|
||||
self.head = (self.head + 1) % self.max_rows
|
||||
self.count = min(self.count + 1, self.max_rows)
|
||||
|
||||
def median(self) -> Optional[np.ndarray]:
|
||||
if self.rows is None or self.count <= 0:
|
||||
return None
|
||||
rows = self.rows[: self.count] if self.count < self.max_rows else self.rows
|
||||
valid_rows = np.any(np.isfinite(rows), axis=1)
|
||||
if not np.any(valid_rows):
|
||||
return None
|
||||
median = np.nanmedian(rows[valid_rows], axis=0).astype(np.float32, copy=False)
|
||||
if not np.any(np.isfinite(median)):
|
||||
return None
|
||||
return np.nan_to_num(median, nan=0.0).astype(np.float32, copy=False)
|
||||
@ -25,6 +25,7 @@ class RingBuffer:
|
||||
self.ring_fft: Optional[np.ndarray] = None
|
||||
self.x_shared: Optional[np.ndarray] = None
|
||||
self.distance_axis: Optional[np.ndarray] = None
|
||||
self.last_fft_mag: Optional[np.ndarray] = None
|
||||
self.last_fft_db: Optional[np.ndarray] = None
|
||||
self.last_freqs: Optional[np.ndarray] = None
|
||||
self.y_min_fft: Optional[float] = None
|
||||
@ -47,6 +48,7 @@ class RingBuffer:
|
||||
self.ring_fft = None
|
||||
self.x_shared = None
|
||||
self.distance_axis = None
|
||||
self.last_fft_mag = None
|
||||
self.last_fft_db = None
|
||||
self.last_freqs = None
|
||||
self.y_min_fft = None
|
||||
@ -125,6 +127,7 @@ class RingBuffer:
|
||||
last_idx = (self.head - 1) % self.max_sweeps
|
||||
if self.ring_fft.shape[0] > 0:
|
||||
last_fft = self.ring_fft[last_idx]
|
||||
self.last_fft_mag = np.asarray(last_fft, dtype=np.float32).copy()
|
||||
self.last_fft_db = fft_mag_to_db(last_fft)
|
||||
finite = self.ring_fft[np.isfinite(self.ring_fft)]
|
||||
if finite.size > 0:
|
||||
@ -155,6 +158,7 @@ class RingBuffer:
|
||||
|
||||
fft_mag = compute_fft_mag_row(sweep, freqs, self.fft_bins, mode=self.fft_mode)
|
||||
self.ring_fft[self.head, :] = fft_mag
|
||||
self.last_fft_mag = np.asarray(fft_mag, dtype=np.float32).copy()
|
||||
self.last_fft_db = fft_mag_to_db(fft_mag)
|
||||
|
||||
if self.last_fft_db.size > 0:
|
||||
@ -178,6 +182,11 @@ class RingBuffer:
|
||||
base = self.ring_fft if self.head == 0 else np.roll(self.ring_fft, -self.head, axis=0)
|
||||
return base.T
|
||||
|
||||
def get_last_fft_linear(self) -> Optional[np.ndarray]:
|
||||
if self.last_fft_mag is None:
|
||||
return None
|
||||
return np.asarray(self.last_fft_mag, dtype=np.float32).copy()
|
||||
|
||||
def get_display_times(self) -> Optional[np.ndarray]:
|
||||
if self.ring_time is None:
|
||||
return None
|
||||
|
||||
@ -7,6 +7,8 @@ from typing import Dict, List, Optional
|
||||
|
||||
import numpy as np
|
||||
|
||||
from rfg_adc_plotter.constants import BACKGROUND_MEDIAN_SWEEPS
|
||||
from rfg_adc_plotter.state.background_buffer import BackgroundMedianBuffer
|
||||
from rfg_adc_plotter.state.ring_buffer import RingBuffer
|
||||
from rfg_adc_plotter.types import SweepAuxCurves, SweepInfo
|
||||
|
||||
@ -23,12 +25,17 @@ class RuntimeState:
|
||||
current_sweep_raw: Optional[np.ndarray] = None
|
||||
current_aux_curves: SweepAuxCurves = None
|
||||
current_sweep_norm: Optional[np.ndarray] = None
|
||||
current_fft_mag: Optional[np.ndarray] = None
|
||||
current_fft_db: Optional[np.ndarray] = None
|
||||
last_calib_sweep: Optional[np.ndarray] = None
|
||||
calib_envelope: Optional[np.ndarray] = None
|
||||
calib_file_path: Optional[str] = None
|
||||
background_buffer: BackgroundMedianBuffer = field(
|
||||
default_factory=lambda: BackgroundMedianBuffer(BACKGROUND_MEDIAN_SWEEPS)
|
||||
)
|
||||
background_profile: Optional[np.ndarray] = None
|
||||
background_file_path: Optional[str] = None
|
||||
current_info: Optional[SweepInfo] = None
|
||||
bg_spec_cache: Optional[np.ndarray] = None
|
||||
current_peak_width: Optional[float] = None
|
||||
current_peak_amplitude: Optional[float] = None
|
||||
peak_candidates: List[Dict[str, float]] = field(default_factory=list)
|
||||
|
||||
Reference in New Issue
Block a user