189 lines
8.3 KiB
Python
189 lines
8.3 KiB
Python
from __future__ import annotations
|
|
|
|
import os
|
|
import tempfile
|
|
import numpy as np
|
|
import unittest
|
|
|
|
from rfg_adc_plotter.constants import FFT_LEN, SWEEP_FREQ_MAX_GHZ, SWEEP_FREQ_MIN_GHZ
|
|
from rfg_adc_plotter.gui.pyqtgraph_backend import apply_working_range
|
|
from rfg_adc_plotter.processing.calibration import (
|
|
build_calib_envelope,
|
|
calibrate_freqs,
|
|
load_calib_envelope,
|
|
recalculate_calibration_c,
|
|
save_calib_envelope,
|
|
)
|
|
from rfg_adc_plotter.processing.fft import (
|
|
build_positive_only_centered_ifft_spectrum,
|
|
build_symmetric_ifft_spectrum,
|
|
compute_distance_axis,
|
|
compute_fft_mag_row,
|
|
compute_fft_row,
|
|
)
|
|
from rfg_adc_plotter.processing.normalization import (
|
|
build_calib_envelopes,
|
|
normalize_by_calib,
|
|
normalize_by_envelope,
|
|
resample_envelope,
|
|
)
|
|
from rfg_adc_plotter.processing.peaks import find_peak_width_markers, find_top_peaks_over_ref, rolling_median_ref
|
|
|
|
|
|
class ProcessingTests(unittest.TestCase):
|
|
def test_recalculate_calibration_preserves_requested_edges(self):
|
|
coeffs = recalculate_calibration_c(np.asarray([0.0, 1.0, 0.025], dtype=np.float64), 3.3, 14.3)
|
|
y0 = coeffs[0] + coeffs[1] * 3.3 + coeffs[2] * (3.3 ** 2)
|
|
y1 = coeffs[0] + coeffs[1] * 14.3 + coeffs[2] * (14.3 ** 2)
|
|
self.assertTrue(np.isclose(y0, 3.3))
|
|
self.assertTrue(np.isclose(y1, 14.3))
|
|
|
|
def test_calibrate_freqs_returns_monotonic_axis_and_same_shape(self):
|
|
sweep = {"F": np.linspace(3.3, 14.3, 32), "I": np.linspace(-1.0, 1.0, 32)}
|
|
calibrated = calibrate_freqs(sweep)
|
|
self.assertEqual(calibrated["F"].shape, (32,))
|
|
self.assertEqual(calibrated["I"].shape, (32,))
|
|
self.assertTrue(np.all(np.diff(calibrated["F"]) >= 0.0))
|
|
|
|
def test_normalizers_and_envelopes_return_finite_ranges(self):
|
|
calib = (np.sin(np.linspace(0.0, 4.0 * np.pi, 64)) * 5.0).astype(np.float32)
|
|
raw = calib * 0.75
|
|
lower, upper = build_calib_envelopes(calib)
|
|
self.assertEqual(lower.shape, calib.shape)
|
|
self.assertEqual(upper.shape, calib.shape)
|
|
self.assertTrue(np.all(lower <= upper))
|
|
self.assertTrue(np.all(np.isfinite(upper)))
|
|
self.assertLess(
|
|
float(np.mean(np.abs(np.diff(upper, n=2)))),
|
|
float(np.mean(np.abs(np.diff(calib, n=2)))),
|
|
)
|
|
|
|
simple = normalize_by_calib(raw, calib + 10.0, norm_type="simple")
|
|
projector = normalize_by_calib(raw, calib, norm_type="projector")
|
|
self.assertEqual(simple.shape, raw.shape)
|
|
self.assertEqual(projector.shape, raw.shape)
|
|
self.assertTrue(np.any(np.isfinite(simple)))
|
|
self.assertTrue(np.any(np.isfinite(projector)))
|
|
|
|
def test_file_calibration_envelope_roundtrip_and_division(self):
|
|
raw = (np.sin(np.linspace(0.0, 8.0 * np.pi, 128)) * 50.0 + 100.0).astype(np.float32)
|
|
envelope = build_calib_envelope(raw)
|
|
normalized = normalize_by_envelope(raw, envelope)
|
|
resampled = resample_envelope(envelope, 96)
|
|
|
|
self.assertEqual(envelope.shape, raw.shape)
|
|
self.assertEqual(normalized.shape, raw.shape)
|
|
self.assertEqual(resampled.shape, (96,))
|
|
self.assertTrue(np.any(np.isfinite(normalized)))
|
|
self.assertTrue(np.all(np.isfinite(envelope)))
|
|
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
path = os.path.join(tmp_dir, "calibration_envelope")
|
|
saved_path = save_calib_envelope(path, envelope)
|
|
loaded = load_calib_envelope(saved_path)
|
|
self.assertTrue(saved_path.endswith(".npy"))
|
|
self.assertTrue(np.allclose(loaded, envelope))
|
|
|
|
def test_normalize_by_envelope_adds_small_epsilon_to_zero_denominator(self):
|
|
raw = np.asarray([1.0, 2.0, 3.0], dtype=np.float32)
|
|
envelope = np.asarray([0.0, 1.0, -1.0], dtype=np.float32)
|
|
normalized = normalize_by_envelope(raw, envelope)
|
|
|
|
self.assertTrue(np.all(np.isfinite(normalized)))
|
|
self.assertGreater(normalized[0], 1e8)
|
|
self.assertAlmostEqual(float(normalized[1]), 2.0, places=5)
|
|
self.assertAlmostEqual(float(normalized[2]), -3.0, places=5)
|
|
|
|
def test_load_calib_envelope_rejects_empty_payload(self):
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
path = os.path.join(tmp_dir, "empty.npy")
|
|
np.save(path, np.zeros((0,), dtype=np.float32))
|
|
with self.assertRaises(ValueError):
|
|
load_calib_envelope(path)
|
|
|
|
def test_apply_working_range_crops_sweep_to_selected_band(self):
|
|
freqs = np.linspace(3.3, 14.3, 12, dtype=np.float64)
|
|
sweep = np.arange(12, dtype=np.float32)
|
|
cropped_freqs, cropped_sweep = apply_working_range(freqs, sweep, 5.0, 9.0)
|
|
|
|
self.assertGreater(cropped_freqs.size, 0)
|
|
self.assertEqual(cropped_freqs.shape, cropped_sweep.shape)
|
|
self.assertGreaterEqual(float(np.min(cropped_freqs)), 5.0)
|
|
self.assertLessEqual(float(np.max(cropped_freqs)), 9.0)
|
|
|
|
def test_apply_working_range_returns_empty_when_no_points_match(self):
|
|
freqs = np.linspace(3.3, 14.3, 12, dtype=np.float64)
|
|
sweep = np.arange(12, dtype=np.float32)
|
|
cropped_freqs, cropped_sweep = apply_working_range(freqs, sweep, 20.0, 21.0)
|
|
|
|
self.assertEqual(cropped_freqs.shape, (0,))
|
|
self.assertEqual(cropped_sweep.shape, (0,))
|
|
|
|
def test_fft_helpers_return_expected_shapes(self):
|
|
sweep = np.sin(np.linspace(0.0, 4.0 * np.pi, 128)).astype(np.float32)
|
|
freqs = np.linspace(3.3, 14.3, 128, dtype=np.float64)
|
|
mag = compute_fft_mag_row(sweep, freqs, 513)
|
|
row = compute_fft_row(sweep, freqs, 513)
|
|
axis = compute_distance_axis(freqs, 513)
|
|
self.assertEqual(mag.shape, (513,))
|
|
self.assertEqual(row.shape, (513,))
|
|
self.assertEqual(axis.shape, (513,))
|
|
self.assertTrue(np.all(np.diff(axis) >= 0.0))
|
|
|
|
def test_symmetric_ifft_spectrum_has_zero_gap_and_mirrored_band(self):
|
|
sweep = np.linspace(1.0, 2.0, 128, dtype=np.float32)
|
|
freqs = np.linspace(4.0, 10.0, 128, dtype=np.float64)
|
|
spectrum = build_symmetric_ifft_spectrum(sweep, freqs, fft_len=FFT_LEN)
|
|
|
|
self.assertIsNotNone(spectrum)
|
|
freq_axis = np.linspace(-10.0, 10.0, FFT_LEN, dtype=np.float64)
|
|
neg_idx_all = np.flatnonzero(freq_axis <= (-4.0))
|
|
pos_idx_all = np.flatnonzero(freq_axis >= 4.0)
|
|
band_len = int(min(neg_idx_all.size, pos_idx_all.size))
|
|
neg_idx = neg_idx_all[:band_len]
|
|
pos_idx = pos_idx_all[-band_len:]
|
|
zero_mask = (freq_axis > (-4.0)) & (freq_axis < 4.0)
|
|
|
|
self.assertTrue(np.allclose(spectrum[zero_mask], 0.0))
|
|
self.assertTrue(np.allclose(spectrum[neg_idx], spectrum[pos_idx][::-1]))
|
|
|
|
def test_positive_only_centered_spectrum_keeps_zeros_until_positive_min(self):
|
|
sweep = np.linspace(1.0, 2.0, 128, dtype=np.float32)
|
|
freqs = np.linspace(4.0, 10.0, 128, dtype=np.float64)
|
|
spectrum = build_positive_only_centered_ifft_spectrum(sweep, freqs, fft_len=FFT_LEN)
|
|
|
|
self.assertIsNotNone(spectrum)
|
|
freq_axis = np.linspace(-10.0, 10.0, FFT_LEN, dtype=np.float64)
|
|
zero_mask = freq_axis < 4.0
|
|
pos_idx = np.flatnonzero(freq_axis >= 4.0)
|
|
|
|
self.assertTrue(np.allclose(spectrum[zero_mask], 0.0))
|
|
self.assertTrue(np.any(np.abs(spectrum[pos_idx]) > 0.0))
|
|
|
|
def test_symmetric_distance_axis_uses_windowed_frequency_bounds(self):
|
|
freqs = np.linspace(4.0, 10.0, 128, dtype=np.float64)
|
|
axis = compute_distance_axis(freqs, 513, mode="symmetric")
|
|
df_hz = (2.0 * 10.0 / max(1, FFT_LEN - 1)) * 1e9
|
|
expected_step = 299_792_458.0 / (2.0 * FFT_LEN * df_hz)
|
|
|
|
self.assertEqual(axis.shape, (513,))
|
|
self.assertTrue(np.all(np.diff(axis) >= 0.0))
|
|
self.assertAlmostEqual(float(axis[1] - axis[0]), expected_step, places=15)
|
|
|
|
def test_peak_helpers_find_reference_and_peak_boxes(self):
|
|
xs = np.linspace(0.0, 10.0, 200)
|
|
ys = np.exp(-((xs - 5.0) ** 2) / 0.4) * 10.0 + 1.0
|
|
ref = rolling_median_ref(xs, ys, 2.0)
|
|
peaks = find_top_peaks_over_ref(xs, ys, ref, top_n=3)
|
|
width = find_peak_width_markers(xs, ys)
|
|
self.assertEqual(ref.shape, ys.shape)
|
|
self.assertEqual(len(peaks), 1)
|
|
self.assertGreater(peaks[0]["x"], 4.0)
|
|
self.assertLess(peaks[0]["x"], 6.0)
|
|
self.assertIsNotNone(width)
|
|
self.assertGreater(width["width"], 0.0)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|