from __future__ import annotations import os import tempfile import numpy as np import unittest from rfg_adc_plotter.constants import FFT_LEN, SWEEP_FREQ_MAX_GHZ, SWEEP_FREQ_MIN_GHZ from rfg_adc_plotter.gui.pyqtgraph_backend import ( apply_working_range, apply_working_range_to_aux_curves, build_main_window_layout, coalesce_packets_for_ui, compute_background_subtracted_bscan_levels, decimate_curve_for_display, resolve_axis_bounds, resolve_heavy_refresh_stride, resolve_initial_window_size, sanitize_curve_data_for_display, sanitize_image_for_display, set_image_rect_if_ready, resolve_visible_fft_curves, resolve_visible_aux_curves, ) from rfg_adc_plotter.processing.calibration import ( build_calib_envelope, calibrate_freqs, load_calib_envelope, recalculate_calibration_c, save_calib_envelope, ) from rfg_adc_plotter.processing.background import ( load_fft_background, save_fft_background, subtract_fft_background, ) from rfg_adc_plotter.processing.fft import ( build_positive_only_centered_ifft_spectrum, build_symmetric_ifft_spectrum, compute_distance_axis, compute_fft_complex_row, compute_fft_mag_row, compute_fft_row, fft_mag_to_db, ) from rfg_adc_plotter.processing.normalization import ( build_calib_envelopes, normalize_by_calib, normalize_by_envelope, resample_envelope, ) from rfg_adc_plotter.processing.peaks import find_peak_width_markers, find_top_peaks_over_ref, rolling_median_ref class ProcessingTests(unittest.TestCase): def test_recalculate_calibration_preserves_requested_edges(self): coeffs = recalculate_calibration_c(np.asarray([0.0, 1.0, 0.025], dtype=np.float64), 3.3, 14.3) y0 = coeffs[0] + coeffs[1] * 3.3 + coeffs[2] * (3.3 ** 2) y1 = coeffs[0] + coeffs[1] * 14.3 + coeffs[2] * (14.3 ** 2) self.assertTrue(np.isclose(y0, 3.3)) self.assertTrue(np.isclose(y1, 14.3)) def test_calibrate_freqs_returns_monotonic_axis_and_same_shape(self): sweep = {"F": np.linspace(3.3, 14.3, 32), "I": np.linspace(-1.0, 1.0, 32)} calibrated = calibrate_freqs(sweep) self.assertEqual(calibrated["F"].shape, (32,)) self.assertEqual(calibrated["I"].shape, (32,)) self.assertTrue(np.all(np.diff(calibrated["F"]) >= 0.0)) def test_calibrate_freqs_keeps_complex_payload(self): sweep = { "F": np.linspace(3.3, 14.3, 32), "I": np.exp(1j * np.linspace(0.0, np.pi, 32)).astype(np.complex64), } calibrated = calibrate_freqs(sweep) self.assertEqual(calibrated["F"].shape, (32,)) self.assertEqual(calibrated["I"].shape, (32,)) self.assertTrue(np.iscomplexobj(calibrated["I"])) self.assertTrue(np.all(np.isfinite(calibrated["I"]))) def test_normalizers_and_envelopes_return_finite_ranges(self): calib = (np.sin(np.linspace(0.0, 4.0 * np.pi, 64)) * 5.0).astype(np.float32) raw = calib * 0.75 lower, upper = build_calib_envelopes(calib) self.assertEqual(lower.shape, calib.shape) self.assertEqual(upper.shape, calib.shape) self.assertTrue(np.all(lower <= upper)) self.assertTrue(np.all(np.isfinite(upper))) self.assertLess( float(np.mean(np.abs(np.diff(upper, n=2)))), float(np.mean(np.abs(np.diff(calib, n=2)))), ) simple = normalize_by_calib(raw, calib + 10.0, norm_type="simple") projector = normalize_by_calib(raw, calib, norm_type="projector") self.assertEqual(simple.shape, raw.shape) self.assertEqual(projector.shape, raw.shape) self.assertTrue(np.any(np.isfinite(simple))) self.assertTrue(np.any(np.isfinite(projector))) def test_file_calibration_envelope_roundtrip_and_division(self): raw = (np.sin(np.linspace(0.0, 8.0 * np.pi, 128)) * 50.0 + 100.0).astype(np.float32) envelope = build_calib_envelope(raw) normalized = normalize_by_envelope(raw, envelope) resampled = resample_envelope(envelope, 96) self.assertEqual(envelope.shape, raw.shape) self.assertEqual(normalized.shape, raw.shape) self.assertEqual(resampled.shape, (96,)) self.assertTrue(np.any(np.isfinite(normalized))) self.assertTrue(np.all(np.isfinite(envelope))) with tempfile.TemporaryDirectory() as tmp_dir: path = os.path.join(tmp_dir, "calibration_envelope") saved_path = save_calib_envelope(path, envelope) loaded = load_calib_envelope(saved_path) self.assertTrue(saved_path.endswith(".npy")) self.assertTrue(np.allclose(loaded, envelope)) def test_normalize_by_envelope_adds_small_epsilon_to_zero_denominator(self): raw = np.asarray([1.0, 2.0, 3.0], dtype=np.float32) envelope = np.asarray([0.0, 1.0, -1.0], dtype=np.float32) normalized = normalize_by_envelope(raw, envelope) self.assertTrue(np.all(np.isfinite(normalized))) self.assertGreater(normalized[0], 1e8) self.assertAlmostEqual(float(normalized[1]), 2.0, places=5) self.assertAlmostEqual(float(normalized[2]), -3.0, places=5) def test_normalize_by_envelope_supports_complex_input(self): raw = np.asarray([1.0 + 1.0j, 2.0 - 2.0j], dtype=np.complex64) envelope = np.asarray([1.0, 2.0], dtype=np.float32) normalized = normalize_by_envelope(raw, envelope) self.assertTrue(np.iscomplexobj(normalized)) self.assertTrue(np.all(np.isfinite(normalized))) self.assertTrue(np.allclose(normalized, np.asarray([1.0 + 1.0j, 1.0 - 1.0j], dtype=np.complex64))) def test_load_calib_envelope_rejects_empty_payload(self): with tempfile.TemporaryDirectory() as tmp_dir: path = os.path.join(tmp_dir, "empty.npy") np.save(path, np.zeros((0,), dtype=np.float32)) with self.assertRaises(ValueError): load_calib_envelope(path) def test_fft_background_roundtrip_and_rejects_non_1d_payload(self): background = np.asarray([0.5, 1.5, 2.5], dtype=np.float32) with tempfile.TemporaryDirectory() as tmp_dir: path = os.path.join(tmp_dir, "fft_background") saved_path = save_fft_background(path, background) loaded = load_fft_background(saved_path) self.assertTrue(saved_path.endswith(".npy")) self.assertTrue(np.allclose(loaded, background)) invalid_path = os.path.join(tmp_dir, "fft_background_invalid.npy") np.save(invalid_path, np.zeros((2, 2), dtype=np.float32)) with self.assertRaises(ValueError): load_fft_background(invalid_path) def test_subtract_fft_background_clamps_negative_residuals_to_zero(self): signal = np.asarray([1.0, 2.0, 3.0], dtype=np.float32) background = np.asarray([1.0, 1.5, 5.0], dtype=np.float32) subtracted = subtract_fft_background(signal, background) self.assertTrue(np.allclose(subtracted, np.asarray([0.0, 0.5, 0.0], dtype=np.float32))) self.assertTrue(np.allclose(subtract_fft_background(signal, signal), 0.0)) def test_apply_working_range_crops_sweep_to_selected_band(self): freqs = np.linspace(3.3, 14.3, 12, dtype=np.float64) sweep = np.arange(12, dtype=np.float32) cropped_freqs, cropped_sweep = apply_working_range(freqs, sweep, 5.0, 9.0) self.assertGreater(cropped_freqs.size, 0) self.assertEqual(cropped_freqs.shape, cropped_sweep.shape) self.assertGreaterEqual(float(np.min(cropped_freqs)), 5.0) self.assertLessEqual(float(np.max(cropped_freqs)), 9.0) def test_apply_working_range_returns_empty_when_no_points_match(self): freqs = np.linspace(3.3, 14.3, 12, dtype=np.float64) sweep = np.arange(12, dtype=np.float32) cropped_freqs, cropped_sweep = apply_working_range(freqs, sweep, 20.0, 21.0) self.assertEqual(cropped_freqs.shape, (0,)) self.assertEqual(cropped_sweep.shape, (0,)) def test_apply_working_range_to_aux_curves_uses_same_mask_as_raw_sweep(self): freqs = np.linspace(3.3, 14.3, 6, dtype=np.float64) sweep = np.asarray([0.0, 1.0, np.nan, 3.0, 4.0, 5.0], dtype=np.float32) aux = ( np.asarray([10.0, 11.0, 12.0, 13.0, 14.0, 15.0], dtype=np.float32), np.asarray([20.0, 21.0, 22.0, 23.0, 24.0, 25.0], dtype=np.float32), ) cropped_freqs, cropped_sweep = apply_working_range(freqs, sweep, 4.0, 12.5) cropped_aux = apply_working_range_to_aux_curves(freqs, sweep, aux, 4.0, 12.5) self.assertIsNotNone(cropped_aux) self.assertEqual(cropped_aux[0].shape, cropped_freqs.shape) self.assertEqual(cropped_aux[1].shape, cropped_freqs.shape) self.assertEqual(cropped_aux[0].shape, cropped_sweep.shape) self.assertTrue(np.allclose(cropped_aux[0], np.asarray([11.0, 13.0, 14.0], dtype=np.float32))) self.assertTrue(np.allclose(cropped_aux[1], np.asarray([21.0, 23.0, 24.0], dtype=np.float32))) def test_resolve_visible_aux_curves_obeys_checkbox_state(self): aux = ( np.asarray([1.0, 2.0], dtype=np.float32), np.asarray([3.0, 4.0], dtype=np.float32), ) self.assertIsNone(resolve_visible_aux_curves(aux, enabled=False)) visible = resolve_visible_aux_curves(aux, enabled=True) self.assertIsNotNone(visible) self.assertTrue(np.allclose(visible[0], aux[0])) self.assertTrue(np.allclose(visible[1], aux[1])) def test_decimate_curve_for_display_preserves_small_series(self): xs = np.linspace(3.3, 14.3, 64, dtype=np.float64) ys = np.linspace(-1.0, 1.0, 64, dtype=np.float32) decimated_x, decimated_y = decimate_curve_for_display(xs, ys, max_points=128) self.assertTrue(np.allclose(decimated_x, xs)) self.assertTrue(np.allclose(decimated_y, ys)) def test_decimate_curve_for_display_limits_points_and_keeps_endpoints(self): xs = np.linspace(3.3, 14.3, 10000, dtype=np.float64) ys = np.sin(np.linspace(0.0, 12.0 * np.pi, 10000)).astype(np.float32) decimated_x, decimated_y = decimate_curve_for_display(xs, ys, max_points=512) self.assertLessEqual(decimated_x.size, 512) self.assertEqual(decimated_x.shape, decimated_y.shape) self.assertAlmostEqual(float(decimated_x[0]), float(xs[0]), places=12) self.assertAlmostEqual(float(decimated_x[-1]), float(xs[-1]), places=12) self.assertAlmostEqual(float(decimated_y[0]), float(ys[0]), places=6) self.assertAlmostEqual(float(decimated_y[-1]), float(ys[-1]), places=6) def test_coalesce_packets_for_ui_keeps_newest_packets(self): packets = [ (np.asarray([float(idx)], dtype=np.float32), {"sweep": idx}, None) for idx in range(6) ] kept, skipped = coalesce_packets_for_ui(packets, max_packets=2) self.assertEqual(skipped, 4) self.assertEqual(len(kept), 2) self.assertEqual(int(kept[0][1]["sweep"]), 4) self.assertEqual(int(kept[1][1]["sweep"]), 5) def test_coalesce_packets_for_ui_never_returns_empty_for_non_empty_input(self): packets = [ (np.asarray([1.0], dtype=np.float32), {"sweep": 1}, None), ] kept, skipped = coalesce_packets_for_ui(packets, max_packets=0) self.assertEqual(skipped, 0) self.assertEqual(len(kept), 1) self.assertEqual(int(kept[0][1]["sweep"]), 1) def test_coalesce_packets_for_ui_switches_to_latest_only_on_large_backlog(self): packets = [ (np.asarray([float(idx)], dtype=np.float32), {"sweep": idx}, None) for idx in range(40) ] kept, skipped = coalesce_packets_for_ui(packets, max_packets=8, backlog_packets=40) self.assertEqual(skipped, 39) self.assertEqual(len(kept), 1) self.assertEqual(int(kept[0][1]["sweep"]), 39) def test_resolve_heavy_refresh_stride_increases_with_backlog(self): self.assertEqual(resolve_heavy_refresh_stride(0, max_packets=8), 1) self.assertEqual(resolve_heavy_refresh_stride(20, max_packets=8), 2) self.assertEqual(resolve_heavy_refresh_stride(40, max_packets=8), 4) def test_sanitize_curve_data_for_display_rejects_fully_nonfinite_series(self): xs, ys = sanitize_curve_data_for_display( np.asarray([np.nan, np.nan], dtype=np.float64), np.asarray([np.nan, np.nan], dtype=np.float32), ) self.assertEqual(xs.shape, (0,)) self.assertEqual(ys.shape, (0,)) def test_sanitize_image_for_display_rejects_fully_nonfinite_frame(self): data = sanitize_image_for_display(np.full((4, 4), np.nan, dtype=np.float32)) self.assertIsNone(data) def test_set_image_rect_if_ready_skips_uninitialized_image(self): class _DummyImageItem: def __init__(self): self.calls = 0 def width(self): return None def height(self): return None def setRect(self, *_args): self.calls += 1 image_item = _DummyImageItem() applied = set_image_rect_if_ready(image_item, 0.0, 0.0, 10.0, 1.0) self.assertFalse(applied) self.assertEqual(image_item.calls, 0) def test_resolve_axis_bounds_rejects_nonfinite_ranges(self): bounds = resolve_axis_bounds(np.asarray([np.nan, np.inf], dtype=np.float64)) self.assertIsNone(bounds) def test_resolve_initial_window_size_stays_within_small_screen(self): width, height = resolve_initial_window_size(800, 480) self.assertLessEqual(width, 800) self.assertLessEqual(height, 480) self.assertGreaterEqual(width, 640) self.assertGreaterEqual(height, 420) def test_build_main_window_layout_uses_splitter_and_scroll_area(self): os.environ.setdefault("QT_QPA_PLATFORM", "offscreen") try: from PyQt5 import QtCore, QtWidgets except Exception as exc: # pragma: no cover - environment-dependent self.skipTest(f"Qt unavailable: {exc}") app = QtWidgets.QApplication.instance() or QtWidgets.QApplication([]) main_window = QtWidgets.QWidget() try: _layout, splitter, _plot_layout, settings_widget, settings_layout, settings_scroll = build_main_window_layout( QtCore, QtWidgets, main_window, ) self.assertIsInstance(splitter, QtWidgets.QSplitter) self.assertIsInstance(settings_scroll, QtWidgets.QScrollArea) self.assertIs(settings_scroll.widget(), settings_widget) self.assertIsInstance(settings_layout, QtWidgets.QVBoxLayout) finally: main_window.close() def test_background_subtracted_bscan_levels_ignore_zero_floor(self): disp_fft_lin = np.zeros((4, 8), dtype=np.float32) disp_fft_lin[1, 2:6] = np.asarray([0.05, 0.1, 0.5, 2.0], dtype=np.float32) disp_fft_lin[2, 1:6] = np.asarray([0.08, 0.2, 0.7, 3.0, 9.0], dtype=np.float32) disp_fft = fft_mag_to_db(disp_fft_lin) levels = compute_background_subtracted_bscan_levels(disp_fft_lin, disp_fft) self.assertIsNotNone(levels) positive_vals = disp_fft[disp_fft_lin > 0.0] self.assertAlmostEqual(levels[0], float(np.nanpercentile(positive_vals, 15.0)), places=5) self.assertAlmostEqual(levels[1], float(np.nanpercentile(positive_vals, 99.7)), places=5) zero_floor = disp_fft[disp_fft_lin == 0.0] self.assertLess(float(np.nanmax(zero_floor)), levels[0]) def test_background_subtracted_bscan_levels_fallback_when_residuals_too_sparse(self): disp_fft_lin = np.zeros((3, 4), dtype=np.float32) disp_fft_lin[1, 2] = 1.0 disp_fft = fft_mag_to_db(disp_fft_lin) levels = compute_background_subtracted_bscan_levels(disp_fft_lin, disp_fft) self.assertIsNone(levels) def test_fft_helpers_return_expected_shapes(self): sweep = np.sin(np.linspace(0.0, 4.0 * np.pi, 128)).astype(np.float32) freqs = np.linspace(3.3, 14.3, 128, dtype=np.float64) mag = compute_fft_mag_row(sweep, freqs, 513) row = compute_fft_row(sweep, freqs, 513) axis = compute_distance_axis(freqs, 513) self.assertEqual(mag.shape, (513,)) self.assertEqual(row.shape, (513,)) self.assertEqual(axis.shape, (513,)) self.assertTrue(np.all(np.diff(axis) >= 0.0)) def test_symmetric_ifft_spectrum_has_zero_gap_and_mirrored_band(self): sweep = np.linspace(1.0, 2.0, 128, dtype=np.float32) freqs = np.linspace(4.0, 10.0, 128, dtype=np.float64) spectrum = build_symmetric_ifft_spectrum(sweep, freqs, fft_len=FFT_LEN) self.assertIsNotNone(spectrum) freq_axis = np.linspace(-10.0, 10.0, FFT_LEN, dtype=np.float64) neg_idx_all = np.flatnonzero(freq_axis <= (-4.0)) pos_idx_all = np.flatnonzero(freq_axis >= 4.0) band_len = int(min(neg_idx_all.size, pos_idx_all.size)) neg_idx = neg_idx_all[:band_len] pos_idx = pos_idx_all[-band_len:] zero_mask = (freq_axis > (-4.0)) & (freq_axis < 4.0) self.assertTrue(np.allclose(spectrum[zero_mask], 0.0)) self.assertTrue(np.allclose(spectrum[neg_idx], spectrum[pos_idx][::-1])) def test_positive_only_centered_spectrum_keeps_zeros_until_positive_min(self): sweep = np.linspace(1.0, 2.0, 128, dtype=np.float32) freqs = np.linspace(4.0, 10.0, 128, dtype=np.float64) spectrum = build_positive_only_centered_ifft_spectrum(sweep, freqs, fft_len=FFT_LEN) self.assertIsNotNone(spectrum) freq_axis = np.linspace(-10.0, 10.0, FFT_LEN, dtype=np.float64) zero_mask = freq_axis < 4.0 pos_idx = np.flatnonzero(freq_axis >= 4.0) self.assertTrue(np.allclose(spectrum[zero_mask], 0.0)) self.assertTrue(np.any(np.abs(spectrum[pos_idx]) > 0.0)) def test_complex_symmetric_ifft_spectrum_uses_conjugate_mirror(self): sweep = np.exp(1j * np.linspace(0.0, np.pi, 128)).astype(np.complex64) freqs = np.linspace(4.0, 10.0, 128, dtype=np.float64) spectrum = build_symmetric_ifft_spectrum(sweep, freqs, fft_len=FFT_LEN) self.assertIsNotNone(spectrum) freq_axis = np.linspace(-10.0, 10.0, FFT_LEN, dtype=np.float64) neg_idx_all = np.flatnonzero(freq_axis <= (-4.0)) pos_idx_all = np.flatnonzero(freq_axis >= 4.0) band_len = int(min(neg_idx_all.size, pos_idx_all.size)) neg_idx = neg_idx_all[:band_len] pos_idx = pos_idx_all[-band_len:] self.assertTrue(np.iscomplexobj(spectrum)) self.assertTrue(np.allclose(spectrum[neg_idx], np.conj(spectrum[pos_idx][::-1]))) def test_compute_fft_helpers_accept_complex_input(self): sweep = np.exp(1j * np.linspace(0.0, 2.0 * np.pi, 128)).astype(np.complex64) freqs = np.linspace(3.3, 14.3, 128, dtype=np.float64) complex_row = compute_fft_complex_row(sweep, freqs, 513, mode="positive_only") mag = compute_fft_mag_row(sweep, freqs, 513, mode="positive_only") row = compute_fft_row(sweep, freqs, 513, mode="positive_only") self.assertEqual(complex_row.shape, (513,)) self.assertTrue(np.iscomplexobj(complex_row)) self.assertEqual(mag.shape, (513,)) self.assertEqual(row.shape, (513,)) self.assertTrue(np.allclose(mag, np.abs(complex_row), equal_nan=True)) self.assertTrue(np.any(np.isfinite(mag))) self.assertTrue(np.any(np.isfinite(row))) def test_resolve_visible_fft_curves_handles_complex_mode(self): complex_row = np.asarray([1.0 + 2.0j, -3.0 + 4.0j], dtype=np.complex64) mag = np.abs(complex_row).astype(np.float32) abs_curve, real_curve, imag_curve = resolve_visible_fft_curves( complex_row, mag, complex_mode=True, show_abs=True, show_real=False, show_imag=True, ) self.assertTrue(np.allclose(abs_curve, mag)) self.assertIsNone(real_curve) self.assertTrue(np.allclose(imag_curve, np.asarray([2.0, 4.0], dtype=np.float32))) def test_resolve_visible_fft_curves_preserves_legacy_abs_mode(self): mag = np.asarray([1.0, 2.0, 3.0], dtype=np.float32) abs_curve, real_curve, imag_curve = resolve_visible_fft_curves( None, mag, complex_mode=False, show_abs=True, show_real=True, show_imag=True, ) self.assertTrue(np.allclose(abs_curve, mag)) self.assertIsNone(real_curve) self.assertIsNone(imag_curve) def test_symmetric_distance_axis_uses_windowed_frequency_bounds(self): freqs = np.linspace(4.0, 10.0, 128, dtype=np.float64) axis = compute_distance_axis(freqs, 513, mode="symmetric") df_hz = (2.0 * 10.0 / max(1, FFT_LEN - 1)) * 1e9 expected_step = 299_792_458.0 / (2.0 * FFT_LEN * df_hz) self.assertEqual(axis.shape, (513,)) self.assertTrue(np.all(np.diff(axis) >= 0.0)) self.assertAlmostEqual(float(axis[1] - axis[0]), expected_step, places=15) def test_peak_helpers_find_reference_and_peak_boxes(self): xs = np.linspace(0.0, 10.0, 200) ys = np.exp(-((xs - 5.0) ** 2) / 0.4) * 10.0 + 1.0 ref = rolling_median_ref(xs, ys, 2.0) peaks = find_top_peaks_over_ref(xs, ys, ref, top_n=3) width = find_peak_width_markers(xs, ys) self.assertEqual(ref.shape, ys.shape) self.assertEqual(len(peaks), 1) self.assertGreater(peaks[0]["x"], 4.0) self.assertLess(peaks[0]["x"], 6.0) self.assertIsNotNone(width) self.assertGreater(width["width"], 0.0) if __name__ == "__main__": unittest.main()