thinking fft
This commit is contained in:
@ -93,7 +93,88 @@ def _extract_positive_exact_band(
|
|||||||
return freq_band, sweep_band, f_max, df_ghz
|
return freq_band, sweep_band, f_max, df_ghz
|
||||||
|
|
||||||
|
|
||||||
def _resolve_positive_only_exact_geometry(freqs: Optional[np.ndarray]) -> Optional[Tuple[int, float]]:
|
def _positive_exact_shift_size(f_max: float, df_ghz: float) -> int:
|
||||||
|
if (not np.isfinite(f_max)) or (not np.isfinite(df_ghz)) or f_max <= 0.0 or df_ghz <= 0.0:
|
||||||
|
return 0
|
||||||
|
return int(np.arange(-f_max, f_max + (0.5 * df_ghz), df_ghz, dtype=np.float64).size)
|
||||||
|
|
||||||
|
|
||||||
|
def _resolve_positive_exact_band_size(
|
||||||
|
f_min: float,
|
||||||
|
f_max: float,
|
||||||
|
n_band: int,
|
||||||
|
max_shift_len: Optional[int],
|
||||||
|
) -> int:
|
||||||
|
if n_band <= 2:
|
||||||
|
return max(2, int(n_band))
|
||||||
|
if max_shift_len is None:
|
||||||
|
return int(n_band)
|
||||||
|
|
||||||
|
limit = int(max_shift_len)
|
||||||
|
if limit <= 1:
|
||||||
|
return max(2, int(n_band))
|
||||||
|
|
||||||
|
span = float(f_max - f_min)
|
||||||
|
if (not np.isfinite(span)) or span <= 0.0:
|
||||||
|
return int(n_band)
|
||||||
|
|
||||||
|
df_current = float(span / max(1, int(n_band) - 1))
|
||||||
|
if _positive_exact_shift_size(f_max, df_current) <= limit:
|
||||||
|
return int(n_band)
|
||||||
|
|
||||||
|
denom = max(2.0 * f_max, 1e-12)
|
||||||
|
approx = int(np.floor(1.0 + ((float(limit - 1) * span) / denom)))
|
||||||
|
target = min(int(n_band), max(2, approx))
|
||||||
|
while target > 2:
|
||||||
|
df_try = float(span / max(1, target - 1))
|
||||||
|
if _positive_exact_shift_size(f_max, df_try) <= limit:
|
||||||
|
break
|
||||||
|
target -= 1
|
||||||
|
return max(2, target)
|
||||||
|
|
||||||
|
|
||||||
|
def _normalize_positive_exact_band(
|
||||||
|
freq_band: np.ndarray,
|
||||||
|
sweep_band: np.ndarray,
|
||||||
|
*,
|
||||||
|
max_shift_len: Optional[int] = None,
|
||||||
|
) -> Optional[Tuple[np.ndarray, np.ndarray, float, float]]:
|
||||||
|
freq_arr = np.asarray(freq_band, dtype=np.float64).reshape(-1)
|
||||||
|
sweep_arr = np.asarray(sweep_band).reshape(-1)
|
||||||
|
width = min(int(freq_arr.size), int(sweep_arr.size))
|
||||||
|
if width <= 1:
|
||||||
|
return None
|
||||||
|
|
||||||
|
freq_arr = freq_arr[:width]
|
||||||
|
sweep_arr = sweep_arr[:width]
|
||||||
|
f_min = float(freq_arr[0])
|
||||||
|
f_max = float(freq_arr[-1])
|
||||||
|
if (not np.isfinite(f_min)) or (not np.isfinite(f_max)) or f_max <= f_min:
|
||||||
|
return None
|
||||||
|
|
||||||
|
target_band = _resolve_positive_exact_band_size(f_min, f_max, int(freq_arr.size), max_shift_len)
|
||||||
|
if target_band < int(freq_arr.size):
|
||||||
|
target_freqs = np.linspace(f_min, f_max, target_band, dtype=np.float64)
|
||||||
|
target_sweep = _interp_signal(target_freqs, freq_arr, sweep_arr)
|
||||||
|
freq_arr = target_freqs
|
||||||
|
sweep_arr = np.asarray(target_sweep).reshape(-1)
|
||||||
|
|
||||||
|
n_band = int(freq_arr.size)
|
||||||
|
if n_band <= 1:
|
||||||
|
return None
|
||||||
|
|
||||||
|
df_ghz = float((f_max - f_min) / max(1, n_band - 1))
|
||||||
|
if (not np.isfinite(df_ghz)) or df_ghz <= 0.0:
|
||||||
|
return None
|
||||||
|
|
||||||
|
return freq_arr, sweep_arr, f_max, df_ghz
|
||||||
|
|
||||||
|
|
||||||
|
def _resolve_positive_only_exact_geometry(
|
||||||
|
freqs: Optional[np.ndarray],
|
||||||
|
*,
|
||||||
|
max_shift_len: Optional[int] = None,
|
||||||
|
) -> Optional[Tuple[int, float]]:
|
||||||
"""Return (N_shift, df_hz) for the exact centered positive-only mode."""
|
"""Return (N_shift, df_hz) for the exact centered positive-only mode."""
|
||||||
if freqs is None:
|
if freqs is None:
|
||||||
return None
|
return None
|
||||||
@ -110,14 +191,16 @@ def _resolve_positive_only_exact_geometry(freqs: Optional[np.ndarray]) -> Option
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
n_band = int(finite.size)
|
n_band = int(finite.size)
|
||||||
|
target_band = _resolve_positive_exact_band_size(f_min, f_max, n_band, max_shift_len)
|
||||||
|
n_band = max(2, min(n_band, target_band))
|
||||||
df_ghz = float((f_max - f_min) / max(1, n_band - 1))
|
df_ghz = float((f_max - f_min) / max(1, n_band - 1))
|
||||||
if (not np.isfinite(df_ghz)) or df_ghz <= 0.0:
|
if (not np.isfinite(df_ghz)) or df_ghz <= 0.0:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
f_shift = np.arange(-f_max, f_max + (0.5 * df_ghz), df_ghz, dtype=np.float64)
|
n_shift = _positive_exact_shift_size(f_max, df_ghz)
|
||||||
if f_shift.size <= 1:
|
if n_shift <= 1:
|
||||||
return None
|
return None
|
||||||
return int(f_shift.size), float(df_ghz * 1e9)
|
return int(n_shift), float(df_ghz * 1e9)
|
||||||
|
|
||||||
|
|
||||||
def prepare_fft_segment(
|
def prepare_fft_segment(
|
||||||
@ -256,13 +339,24 @@ def build_positive_only_centered_ifft_spectrum(
|
|||||||
def build_positive_only_exact_centered_ifft_spectrum(
|
def build_positive_only_exact_centered_ifft_spectrum(
|
||||||
sweep: np.ndarray,
|
sweep: np.ndarray,
|
||||||
freqs: Optional[np.ndarray],
|
freqs: Optional[np.ndarray],
|
||||||
|
*,
|
||||||
|
max_shift_len: Optional[int] = None,
|
||||||
) -> Optional[np.ndarray]:
|
) -> Optional[np.ndarray]:
|
||||||
"""Build centered spectrum exactly as zeros[-f_max..+f_min) + measured positive band."""
|
"""Build centered spectrum exactly as zeros[-f_max..+f_min) + measured positive band."""
|
||||||
prepared = _extract_positive_exact_band(sweep, freqs)
|
prepared = _extract_positive_exact_band(sweep, freqs)
|
||||||
if prepared is None:
|
if prepared is None:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
freq_band, sweep_band, f_max, df_ghz = prepared
|
freq_band, sweep_band, _f_max, _df_ghz = prepared
|
||||||
|
normalized = _normalize_positive_exact_band(
|
||||||
|
freq_band,
|
||||||
|
sweep_band,
|
||||||
|
max_shift_len=max_shift_len,
|
||||||
|
)
|
||||||
|
if normalized is None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
freq_band, sweep_band, f_max, df_ghz = normalized
|
||||||
f_shift = np.arange(-f_max, f_max + (0.5 * df_ghz), df_ghz, dtype=np.float64)
|
f_shift = np.arange(-f_max, f_max + (0.5 * df_ghz), df_ghz, dtype=np.float64)
|
||||||
if f_shift.size <= 1:
|
if f_shift.size <= 1:
|
||||||
return None
|
return None
|
||||||
@ -334,7 +428,11 @@ def compute_fft_complex_row(
|
|||||||
if fft_mode == "positive_only":
|
if fft_mode == "positive_only":
|
||||||
spectrum_centered = build_positive_only_centered_ifft_spectrum(sweep, freqs, fft_len=FFT_LEN)
|
spectrum_centered = build_positive_only_centered_ifft_spectrum(sweep, freqs, fft_len=FFT_LEN)
|
||||||
elif fft_mode == "positive_only_exact":
|
elif fft_mode == "positive_only_exact":
|
||||||
spectrum_centered = build_positive_only_exact_centered_ifft_spectrum(sweep, freqs)
|
spectrum_centered = build_positive_only_exact_centered_ifft_spectrum(
|
||||||
|
sweep,
|
||||||
|
freqs,
|
||||||
|
max_shift_len=bins,
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
spectrum_centered = build_symmetric_ifft_spectrum(sweep, freqs, fft_len=FFT_LEN)
|
spectrum_centered = build_symmetric_ifft_spectrum(sweep, freqs, fft_len=FFT_LEN)
|
||||||
if spectrum_centered is None:
|
if spectrum_centered is None:
|
||||||
@ -381,7 +479,7 @@ def compute_distance_axis(
|
|||||||
return np.zeros((0,), dtype=np.float64)
|
return np.zeros((0,), dtype=np.float64)
|
||||||
fft_mode = _normalize_fft_mode(mode, symmetric)
|
fft_mode = _normalize_fft_mode(mode, symmetric)
|
||||||
if fft_mode == "positive_only_exact":
|
if fft_mode == "positive_only_exact":
|
||||||
geometry = _resolve_positive_only_exact_geometry(freqs)
|
geometry = _resolve_positive_only_exact_geometry(freqs, max_shift_len=bins)
|
||||||
if geometry is None:
|
if geometry is None:
|
||||||
return np.arange(bins, dtype=np.float64)
|
return np.arange(bins, dtype=np.float64)
|
||||||
n_shift, df_hz = geometry
|
n_shift, df_hz = geometry
|
||||||
|
|||||||
@ -513,14 +513,45 @@ class ProcessingTests(unittest.TestCase):
|
|||||||
bins = 8
|
bins = 8
|
||||||
axis = compute_distance_axis(freqs, bins, mode="positive_only_exact")
|
axis = compute_distance_axis(freqs, bins, mode="positive_only_exact")
|
||||||
|
|
||||||
df_hz = 1e9
|
# With a small bins budget the exact-mode grid is downsampled so
|
||||||
n_shift = int(np.arange(-6.0, 6.0 + 0.5, 1.0, dtype=np.float64).size)
|
# internal IFFT length does not exceed visible bins.
|
||||||
|
df_hz = 2e9
|
||||||
|
n_shift = int(np.arange(-6.0, 6.0 + 1.0, 2.0, dtype=np.float64).size)
|
||||||
expected_step = C_M_S / (2.0 * n_shift * df_hz)
|
expected_step = C_M_S / (2.0 * n_shift * df_hz)
|
||||||
expected = np.arange(bins, dtype=np.float64) * expected_step
|
expected = np.arange(bins, dtype=np.float64) * expected_step
|
||||||
|
|
||||||
self.assertEqual(axis.shape, (bins,))
|
self.assertEqual(axis.shape, (bins,))
|
||||||
self.assertTrue(np.allclose(axis, expected))
|
self.assertTrue(np.allclose(axis, expected))
|
||||||
|
|
||||||
|
def test_positive_only_exact_mode_remains_stable_when_input_points_double(self):
|
||||||
|
bins = FFT_LEN // 2 + 1
|
||||||
|
tau_s = 45e-9
|
||||||
|
|
||||||
|
freqs_400 = np.linspace(3.3, 14.3, 400, dtype=np.float64)
|
||||||
|
freqs_800 = np.linspace(3.3, 14.3, 800, dtype=np.float64)
|
||||||
|
sweep_400 = np.exp(-1j * 2.0 * np.pi * freqs_400 * 1e9 * tau_s).astype(np.complex64)
|
||||||
|
sweep_800 = np.exp(-1j * 2.0 * np.pi * freqs_800 * 1e9 * tau_s).astype(np.complex64)
|
||||||
|
|
||||||
|
mag_400 = compute_fft_mag_row(sweep_400, freqs_400, bins, mode="positive_only_exact")
|
||||||
|
mag_800 = compute_fft_mag_row(sweep_800, freqs_800, bins, mode="positive_only_exact")
|
||||||
|
|
||||||
|
self.assertEqual(mag_400.shape, mag_800.shape)
|
||||||
|
finite = np.isfinite(mag_400) & np.isfinite(mag_800)
|
||||||
|
self.assertGreater(int(np.count_nonzero(finite)), int(0.95 * bins))
|
||||||
|
|
||||||
|
idx_400 = int(np.nanargmax(mag_400))
|
||||||
|
idx_800 = int(np.nanargmax(mag_800))
|
||||||
|
peak_400 = float(np.nanmax(mag_400))
|
||||||
|
peak_800 = float(np.nanmax(mag_800))
|
||||||
|
|
||||||
|
self.assertLess(abs(idx_400 - idx_800), 64)
|
||||||
|
self.assertGreater(idx_400, 8)
|
||||||
|
self.assertGreater(idx_800, 8)
|
||||||
|
self.assertLess(idx_400, bins - 8)
|
||||||
|
self.assertLess(idx_800, bins - 8)
|
||||||
|
self.assertGreater(peak_400, 0.05)
|
||||||
|
self.assertGreater(peak_800, 0.05)
|
||||||
|
|
||||||
def test_resolve_visible_fft_curves_handles_complex_mode(self):
|
def test_resolve_visible_fft_curves_handles_complex_mode(self):
|
||||||
complex_row = np.asarray([1.0 + 2.0j, -3.0 + 4.0j], dtype=np.complex64)
|
complex_row = np.asarray([1.0 + 2.0j, -3.0 + 4.0j], dtype=np.complex64)
|
||||||
mag = np.abs(complex_row).astype(np.float32)
|
mag = np.abs(complex_row).astype(np.float32)
|
||||||
|
|||||||
Reference in New Issue
Block a user