from __future__ import annotations import os import tempfile import numpy as np import unittest from rfg_adc_plotter.constants import C_M_S, FFT_LEN, SWEEP_FREQ_MAX_GHZ, SWEEP_FREQ_MIN_GHZ from rfg_adc_plotter.gui.pyqtgraph_backend import ( apply_distance_cut_to_axis, apply_working_range, apply_working_range_to_aux_curves, build_logdet_voltage_fft_input, build_main_window_layout, coalesce_packets_for_ui, compute_background_subtracted_bscan_levels, compute_aux_phase_curve, convert_tty_i16_to_voltage, decimate_curve_for_display, resolve_axis_bounds, resolve_heavy_refresh_stride, resolve_initial_window_size, resolve_distance_cut_start, sanitize_curve_data_for_display, sanitize_image_for_display, set_image_rect_if_ready, resolve_visible_fft_curves, resolve_visible_aux_curves, ) from rfg_adc_plotter.processing.calibration import ( build_calib_envelope, build_complex_calibration_curve, calibrate_freqs, load_calib_envelope, load_complex_calibration, recalculate_calibration_c, save_calib_envelope, save_complex_calibration, ) from rfg_adc_plotter.processing.background import ( load_fft_background, save_fft_background, subtract_fft_background, ) from rfg_adc_plotter.processing.fft import ( build_positive_only_exact_centered_ifft_spectrum, build_positive_only_centered_ifft_spectrum, build_symmetric_ifft_spectrum, compute_distance_axis, compute_fft_complex_row, compute_fft_mag_row, compute_fft_row, fft_mag_to_db, ) from rfg_adc_plotter.processing.normalization import ( build_calib_envelopes, fit_complex_calibration_to_width, normalize_by_calib, normalize_by_complex_calibration, normalize_by_envelope, resample_envelope, ) from rfg_adc_plotter.processing.peaks import find_peak_width_markers, find_top_peaks_over_ref, rolling_median_ref class ProcessingTests(unittest.TestCase): def test_convert_tty_i16_to_voltage_maps_and_clips_full_range(self): codes = np.asarray([-32768.0, -16384.0, 0.0, 16384.0, 32767.0], dtype=np.float32) volts = convert_tty_i16_to_voltage(codes, 5.0) self.assertEqual(volts.shape, codes.shape) self.assertAlmostEqual(float(volts[0]), -5.0, places=6) self.assertAlmostEqual(float(volts[2]), 0.0, places=6) self.assertAlmostEqual(float(volts[-1]), 5.0, places=6) self.assertTrue(np.all(volts >= -5.0)) self.assertTrue(np.all(volts <= 5.0)) def test_build_logdet_voltage_fft_input_converts_codes_and_exponentiates(self): codes = np.asarray([-32768.0, 0.0, 32767.0], dtype=np.float32) volts, fft_input = build_logdet_voltage_fft_input(codes, 5.0) self.assertEqual(volts.shape, codes.shape) self.assertEqual(fft_input.shape, codes.shape) self.assertAlmostEqual(float(volts[0]), -5.0, places=6) self.assertAlmostEqual(float(volts[1]), 0.0, places=6) self.assertAlmostEqual(float(volts[2]), 5.0, places=6) self.assertTrue(np.allclose(fft_input, np.exp(volts.astype(np.float32)))) def test_build_logdet_voltage_fft_input_clips_exp_argument_and_respects_range(self): codes = np.asarray([32767.0], dtype=np.float32) volts_5, fft_5 = build_logdet_voltage_fft_input(codes, 5.0, exp_input_limit=2.0) volts_10, fft_10 = build_logdet_voltage_fft_input(codes, 10.0, exp_input_limit=2.0) self.assertAlmostEqual(float(volts_5[0]), 5.0, places=6) self.assertAlmostEqual(float(volts_10[0]), 10.0, places=6) self.assertAlmostEqual(float(fft_5[0]), float(np.exp(np.float32(2.0))), places=5) self.assertAlmostEqual(float(fft_10[0]), float(np.exp(np.float32(2.0))), places=5) self.assertTrue(np.isfinite(fft_5[0])) self.assertTrue(np.isfinite(fft_10[0])) def test_recalculate_calibration_preserves_requested_edges(self): coeffs = recalculate_calibration_c(np.asarray([0.0, 1.0, 0.025], dtype=np.float64), 3.3, 14.3) y0 = coeffs[0] + coeffs[1] * 3.3 + coeffs[2] * (3.3 ** 2) y1 = coeffs[0] + coeffs[1] * 14.3 + coeffs[2] * (14.3 ** 2) self.assertTrue(np.isclose(y0, 3.3)) self.assertTrue(np.isclose(y1, 14.3)) def test_calibrate_freqs_returns_monotonic_axis_and_same_shape(self): sweep = {"F": np.linspace(3.3, 14.3, 32), "I": np.linspace(-1.0, 1.0, 32)} calibrated = calibrate_freqs(sweep) self.assertEqual(calibrated["F"].shape, (32,)) self.assertEqual(calibrated["I"].shape, (32,)) self.assertTrue(np.all(np.diff(calibrated["F"]) >= 0.0)) def test_calibrate_freqs_keeps_complex_payload(self): sweep = { "F": np.linspace(3.3, 14.3, 32), "I": np.exp(1j * np.linspace(0.0, np.pi, 32)).astype(np.complex64), } calibrated = calibrate_freqs(sweep) self.assertEqual(calibrated["F"].shape, (32,)) self.assertEqual(calibrated["I"].shape, (32,)) self.assertTrue(np.iscomplexobj(calibrated["I"])) self.assertTrue(np.all(np.isfinite(calibrated["I"]))) def test_normalizers_and_envelopes_return_finite_ranges(self): calib = (np.sin(np.linspace(0.0, 4.0 * np.pi, 64)) * 5.0).astype(np.float32) raw = calib * 0.75 lower, upper = build_calib_envelopes(calib) self.assertEqual(lower.shape, calib.shape) self.assertEqual(upper.shape, calib.shape) self.assertTrue(np.all(lower <= upper)) self.assertTrue(np.all(np.isfinite(upper))) self.assertLess( float(np.mean(np.abs(np.diff(upper, n=2)))), float(np.mean(np.abs(np.diff(calib, n=2)))), ) simple = normalize_by_calib(raw, calib + 10.0, norm_type="simple") projector = normalize_by_calib(raw, calib, norm_type="projector") self.assertEqual(simple.shape, raw.shape) self.assertEqual(projector.shape, raw.shape) self.assertTrue(np.any(np.isfinite(simple))) self.assertTrue(np.any(np.isfinite(projector))) def test_file_calibration_envelope_roundtrip_and_division(self): raw = (np.sin(np.linspace(0.0, 8.0 * np.pi, 128)) * 50.0 + 100.0).astype(np.float32) envelope = build_calib_envelope(raw) normalized = normalize_by_envelope(raw, envelope) resampled = resample_envelope(envelope, 96) self.assertEqual(envelope.shape, raw.shape) self.assertEqual(normalized.shape, raw.shape) self.assertEqual(resampled.shape, (96,)) self.assertTrue(np.any(np.isfinite(normalized))) self.assertTrue(np.all(np.isfinite(envelope))) with tempfile.TemporaryDirectory() as tmp_dir: path = os.path.join(tmp_dir, "calibration_envelope") saved_path = save_calib_envelope(path, envelope) loaded = load_calib_envelope(saved_path) self.assertTrue(saved_path.endswith(".npy")) self.assertTrue(np.allclose(loaded, envelope)) def test_normalize_by_envelope_adds_small_epsilon_to_zero_denominator(self): raw = np.asarray([1.0, 2.0, 3.0], dtype=np.float32) envelope = np.asarray([0.0, 1.0, -1.0], dtype=np.float32) normalized = normalize_by_envelope(raw, envelope) self.assertTrue(np.all(np.isfinite(normalized))) self.assertGreater(normalized[0], 1e8) self.assertAlmostEqual(float(normalized[1]), 2.0, places=5) self.assertAlmostEqual(float(normalized[2]), -3.0, places=5) def test_normalize_by_envelope_supports_complex_input(self): raw = np.asarray([1.0 + 1.0j, 2.0 - 2.0j], dtype=np.complex64) envelope = np.asarray([1.0, 2.0], dtype=np.float32) normalized = normalize_by_envelope(raw, envelope) self.assertTrue(np.iscomplexobj(normalized)) self.assertTrue(np.all(np.isfinite(normalized))) self.assertTrue(np.allclose(normalized, np.asarray([1.0 + 1.0j, 1.0 - 1.0j], dtype=np.complex64))) def test_load_calib_envelope_rejects_empty_payload(self): with tempfile.TemporaryDirectory() as tmp_dir: path = os.path.join(tmp_dir, "empty.npy") np.save(path, np.zeros((0,), dtype=np.float32)) with self.assertRaises(ValueError): load_calib_envelope(path) def test_complex_calibration_curve_roundtrip(self): ch1 = np.asarray([1.0, 2.0, 3.0], dtype=np.float32) ch2 = np.asarray([0.5, -1.0, 4.0], dtype=np.float32) curve = build_complex_calibration_curve(ch1, ch2) expected = np.asarray([1.0 + 0.5j, 2.0 - 1.0j, 3.0 + 4.0j], dtype=np.complex64) self.assertTrue(np.iscomplexobj(curve)) self.assertTrue(np.allclose(curve, expected)) with tempfile.TemporaryDirectory() as tmp_dir: path = os.path.join(tmp_dir, "complex_calibration") saved_path = save_complex_calibration(path, curve) loaded = load_complex_calibration(saved_path) self.assertTrue(saved_path.endswith(".npy")) self.assertEqual(loaded.dtype, np.complex64) self.assertTrue(np.allclose(loaded, expected)) def test_fit_complex_calibration_to_width_pads_or_trims(self): calib = np.asarray([1.0 + 1.0j, 2.0 + 2.0j], dtype=np.complex64) padded = fit_complex_calibration_to_width(calib, 4) trimmed = fit_complex_calibration_to_width( np.asarray([1.0 + 1.0j, 2.0 + 2.0j, 3.0 + 3.0j], dtype=np.complex64), 2, ) self.assertEqual(padded.shape, (4,)) self.assertTrue(np.allclose(padded, np.asarray([1.0 + 1.0j, 2.0 + 2.0j, 1.0 + 0.0j, 1.0 + 0.0j], dtype=np.complex64))) self.assertEqual(trimmed.shape, (2,)) self.assertTrue(np.allclose(trimmed, np.asarray([1.0 + 1.0j, 2.0 + 2.0j], dtype=np.complex64))) def test_normalize_by_complex_calibration_handles_zero_and_length_mismatch(self): signal = np.asarray([2.0 + 2.0j, 4.0 + 0.0j, 3.0 + 3.0j], dtype=np.complex64) calib = np.asarray([1.0 + 1.0j, 0.0 + 0.0j], dtype=np.complex64) normalized = normalize_by_complex_calibration(signal, calib) expected = np.asarray([2.0 + 0.0j, 4.0 + 0.0j, 3.0 + 3.0j], dtype=np.complex64) self.assertTrue(np.iscomplexobj(normalized)) self.assertTrue(np.all(np.isfinite(normalized))) self.assertTrue(np.allclose(normalized, expected)) def test_fft_background_roundtrip_and_rejects_non_1d_payload(self): background = np.asarray([0.5, 1.5, 2.5], dtype=np.float32) with tempfile.TemporaryDirectory() as tmp_dir: path = os.path.join(tmp_dir, "fft_background") saved_path = save_fft_background(path, background) loaded = load_fft_background(saved_path) self.assertTrue(saved_path.endswith(".npy")) self.assertTrue(np.allclose(loaded, background)) invalid_path = os.path.join(tmp_dir, "fft_background_invalid.npy") np.save(invalid_path, np.zeros((2, 2), dtype=np.float32)) with self.assertRaises(ValueError): load_fft_background(invalid_path) def test_subtract_fft_background_clamps_negative_residuals_to_zero(self): signal = np.asarray([1.0, 2.0, 3.0], dtype=np.float32) background = np.asarray([1.0, 1.5, 5.0], dtype=np.float32) subtracted = subtract_fft_background(signal, background) self.assertTrue(np.allclose(subtracted, np.asarray([0.0, 0.5, 0.0], dtype=np.float32))) self.assertTrue(np.allclose(subtract_fft_background(signal, signal), 0.0)) def test_apply_working_range_crops_sweep_to_selected_band(self): freqs = np.linspace(3.3, 14.3, 12, dtype=np.float64) sweep = np.arange(12, dtype=np.float32) cropped_freqs, cropped_sweep = apply_working_range(freqs, sweep, 5.0, 9.0) self.assertGreater(cropped_freqs.size, 0) self.assertEqual(cropped_freqs.shape, cropped_sweep.shape) self.assertGreaterEqual(float(np.min(cropped_freqs)), 5.0) self.assertLessEqual(float(np.max(cropped_freqs)), 9.0) def test_apply_working_range_returns_empty_when_no_points_match(self): freqs = np.linspace(3.3, 14.3, 12, dtype=np.float64) sweep = np.arange(12, dtype=np.float32) cropped_freqs, cropped_sweep = apply_working_range(freqs, sweep, 20.0, 21.0) self.assertEqual(cropped_freqs.shape, (0,)) self.assertEqual(cropped_sweep.shape, (0,)) def test_apply_working_range_to_aux_curves_uses_same_mask_as_raw_sweep(self): freqs = np.linspace(3.3, 14.3, 6, dtype=np.float64) sweep = np.asarray([0.0, 1.0, np.nan, 3.0, 4.0, 5.0], dtype=np.float32) aux = ( np.asarray([10.0, 11.0, 12.0, 13.0, 14.0, 15.0], dtype=np.float32), np.asarray([20.0, 21.0, 22.0, 23.0, 24.0, 25.0], dtype=np.float32), ) cropped_freqs, cropped_sweep = apply_working_range(freqs, sweep, 4.0, 12.5) cropped_aux = apply_working_range_to_aux_curves(freqs, sweep, aux, 4.0, 12.5) self.assertIsNotNone(cropped_aux) self.assertEqual(cropped_aux[0].shape, cropped_freqs.shape) self.assertEqual(cropped_aux[1].shape, cropped_freqs.shape) self.assertEqual(cropped_aux[0].shape, cropped_sweep.shape) self.assertTrue(np.allclose(cropped_aux[0], np.asarray([11.0, 13.0, 14.0], dtype=np.float32))) self.assertTrue(np.allclose(cropped_aux[1], np.asarray([21.0, 23.0, 24.0], dtype=np.float32))) def test_resolve_visible_aux_curves_obeys_checkbox_state(self): aux = ( np.asarray([1.0, 2.0], dtype=np.float32), np.asarray([3.0, 4.0], dtype=np.float32), ) self.assertIsNone(resolve_visible_aux_curves(aux, enabled=False)) visible = resolve_visible_aux_curves(aux, enabled=True) self.assertIsNotNone(visible) self.assertTrue(np.allclose(visible[0], aux[0])) self.assertTrue(np.allclose(visible[1], aux[1])) def test_compute_aux_phase_curve_returns_atan2_of_aux_channels(self): aux = ( np.asarray([1.0, 1.0, -1.0, 0.0], dtype=np.float32), np.asarray([0.0, 1.0, 1.0, 1.0], dtype=np.float32), ) phase = compute_aux_phase_curve(aux) self.assertIsNotNone(phase) expected = np.asarray([0.0, np.pi / 4.0, 3.0 * np.pi / 4.0, np.pi / 2.0], dtype=np.float32) self.assertEqual(phase.shape, expected.shape) self.assertTrue(np.allclose(phase, expected, atol=1e-6)) def test_decimate_curve_for_display_preserves_small_series(self): xs = np.linspace(3.3, 14.3, 64, dtype=np.float64) ys = np.linspace(-1.0, 1.0, 64, dtype=np.float32) decimated_x, decimated_y = decimate_curve_for_display(xs, ys, max_points=128) self.assertTrue(np.allclose(decimated_x, xs)) self.assertTrue(np.allclose(decimated_y, ys)) def test_decimate_curve_for_display_limits_points_and_keeps_endpoints(self): xs = np.linspace(3.3, 14.3, 10000, dtype=np.float64) ys = np.sin(np.linspace(0.0, 12.0 * np.pi, 10000)).astype(np.float32) decimated_x, decimated_y = decimate_curve_for_display(xs, ys, max_points=512) self.assertLessEqual(decimated_x.size, 512) self.assertEqual(decimated_x.shape, decimated_y.shape) self.assertAlmostEqual(float(decimated_x[0]), float(xs[0]), places=12) self.assertAlmostEqual(float(decimated_x[-1]), float(xs[-1]), places=12) self.assertAlmostEqual(float(decimated_y[0]), float(ys[0]), places=6) self.assertAlmostEqual(float(decimated_y[-1]), float(ys[-1]), places=6) def test_coalesce_packets_for_ui_keeps_newest_packets(self): packets = [ (np.asarray([float(idx)], dtype=np.float32), {"sweep": idx}, None) for idx in range(6) ] kept, skipped = coalesce_packets_for_ui(packets, max_packets=2) self.assertEqual(skipped, 4) self.assertEqual(len(kept), 2) self.assertEqual(int(kept[0][1]["sweep"]), 4) self.assertEqual(int(kept[1][1]["sweep"]), 5) def test_coalesce_packets_for_ui_never_returns_empty_for_non_empty_input(self): packets = [ (np.asarray([1.0], dtype=np.float32), {"sweep": 1}, None), ] kept, skipped = coalesce_packets_for_ui(packets, max_packets=0) self.assertEqual(skipped, 0) self.assertEqual(len(kept), 1) self.assertEqual(int(kept[0][1]["sweep"]), 1) def test_coalesce_packets_for_ui_switches_to_latest_only_on_large_backlog(self): packets = [ (np.asarray([float(idx)], dtype=np.float32), {"sweep": idx}, None) for idx in range(40) ] kept, skipped = coalesce_packets_for_ui(packets, max_packets=8, backlog_packets=40) self.assertEqual(skipped, 39) self.assertEqual(len(kept), 1) self.assertEqual(int(kept[0][1]["sweep"]), 39) def test_resolve_heavy_refresh_stride_increases_with_backlog(self): self.assertEqual(resolve_heavy_refresh_stride(0, max_packets=8), 1) self.assertEqual(resolve_heavy_refresh_stride(20, max_packets=8), 2) self.assertEqual(resolve_heavy_refresh_stride(40, max_packets=8), 4) def test_sanitize_curve_data_for_display_rejects_fully_nonfinite_series(self): xs, ys = sanitize_curve_data_for_display( np.asarray([np.nan, np.nan], dtype=np.float64), np.asarray([np.nan, np.nan], dtype=np.float32), ) self.assertEqual(xs.shape, (0,)) self.assertEqual(ys.shape, (0,)) def test_sanitize_image_for_display_rejects_fully_nonfinite_frame(self): data = sanitize_image_for_display(np.full((4, 4), np.nan, dtype=np.float32)) self.assertIsNone(data) def test_set_image_rect_if_ready_skips_uninitialized_image(self): class _DummyImageItem: def __init__(self): self.calls = 0 def width(self): return None def height(self): return None def setRect(self, *_args): self.calls += 1 image_item = _DummyImageItem() applied = set_image_rect_if_ready(image_item, 0.0, 0.0, 10.0, 1.0) self.assertFalse(applied) self.assertEqual(image_item.calls, 0) def test_resolve_axis_bounds_rejects_nonfinite_ranges(self): bounds = resolve_axis_bounds(np.asarray([np.nan, np.inf], dtype=np.float64)) self.assertIsNone(bounds) def test_resolve_distance_cut_start_interpolates_with_percent(self): axis = np.asarray([0.0, 1.0, 2.0, 3.0], dtype=np.float64) cut_start = resolve_distance_cut_start(axis, 50.0) self.assertIsNotNone(cut_start) self.assertAlmostEqual(float(cut_start), 1.5, places=6) def test_apply_distance_cut_to_axis_keeps_farthest_point_for_extreme_cut(self): axis = np.asarray([0.0, 1.0, 2.0, 3.0], dtype=np.float64) cut_axis, keep_mask = apply_distance_cut_to_axis(axis, 10.0) self.assertEqual(cut_axis.shape, (1,)) self.assertEqual(keep_mask.shape, axis.shape) self.assertTrue(bool(keep_mask[-1])) self.assertAlmostEqual(float(cut_axis[0]), 3.0, places=6) def test_resolve_initial_window_size_stays_within_small_screen(self): width, height = resolve_initial_window_size(800, 480) self.assertLessEqual(width, 800) self.assertLessEqual(height, 480) self.assertGreaterEqual(width, 640) self.assertGreaterEqual(height, 420) def test_build_main_window_layout_uses_splitter_and_scroll_area(self): os.environ.setdefault("QT_QPA_PLATFORM", "offscreen") try: from PyQt5 import QtCore, QtWidgets except Exception as exc: # pragma: no cover - environment-dependent self.skipTest(f"Qt unavailable: {exc}") app = QtWidgets.QApplication.instance() or QtWidgets.QApplication([]) main_window = QtWidgets.QWidget() try: _layout, splitter, _plot_layout, settings_widget, settings_layout, settings_scroll = build_main_window_layout( QtCore, QtWidgets, main_window, ) self.assertIsInstance(splitter, QtWidgets.QSplitter) self.assertIsInstance(settings_scroll, QtWidgets.QScrollArea) self.assertIs(settings_scroll.widget(), settings_widget) self.assertIsInstance(settings_layout, QtWidgets.QVBoxLayout) finally: main_window.close() def test_background_subtracted_bscan_levels_ignore_zero_floor(self): disp_fft_lin = np.zeros((4, 8), dtype=np.float32) disp_fft_lin[1, 2:6] = np.asarray([0.05, 0.1, 0.5, 2.0], dtype=np.float32) disp_fft_lin[2, 1:6] = np.asarray([0.08, 0.2, 0.7, 3.0, 9.0], dtype=np.float32) disp_fft = fft_mag_to_db(disp_fft_lin) levels = compute_background_subtracted_bscan_levels(disp_fft_lin, disp_fft) self.assertIsNotNone(levels) positive_vals = disp_fft[disp_fft_lin > 0.0] self.assertAlmostEqual(levels[0], float(np.nanpercentile(positive_vals, 15.0)), places=5) self.assertAlmostEqual(levels[1], float(np.nanpercentile(positive_vals, 99.7)), places=5) zero_floor = disp_fft[disp_fft_lin == 0.0] self.assertLess(float(np.nanmax(zero_floor)), levels[0]) def test_background_subtracted_bscan_levels_fallback_when_residuals_too_sparse(self): disp_fft_lin = np.zeros((3, 4), dtype=np.float32) disp_fft_lin[1, 2] = 1.0 disp_fft = fft_mag_to_db(disp_fft_lin) levels = compute_background_subtracted_bscan_levels(disp_fft_lin, disp_fft) self.assertIsNone(levels) def test_fft_helpers_return_expected_shapes(self): sweep = np.sin(np.linspace(0.0, 4.0 * np.pi, 128)).astype(np.float32) freqs = np.linspace(3.3, 14.3, 128, dtype=np.float64) mag = compute_fft_mag_row(sweep, freqs, 513) row = compute_fft_row(sweep, freqs, 513) axis = compute_distance_axis(freqs, 513) self.assertEqual(mag.shape, (513,)) self.assertEqual(row.shape, (513,)) self.assertEqual(axis.shape, (513,)) self.assertTrue(np.all(np.diff(axis) >= 0.0)) def test_symmetric_ifft_spectrum_has_zero_gap_and_mirrored_band(self): sweep = np.linspace(1.0, 2.0, 128, dtype=np.float32) freqs = np.linspace(4.0, 10.0, 128, dtype=np.float64) spectrum = build_symmetric_ifft_spectrum(sweep, freqs, fft_len=FFT_LEN) self.assertIsNotNone(spectrum) freq_axis = np.linspace(-10.0, 10.0, FFT_LEN, dtype=np.float64) neg_idx_all = np.flatnonzero(freq_axis <= (-4.0)) pos_idx_all = np.flatnonzero(freq_axis >= 4.0) band_len = int(min(neg_idx_all.size, pos_idx_all.size)) neg_idx = neg_idx_all[:band_len] pos_idx = pos_idx_all[-band_len:] zero_mask = (freq_axis > (-4.0)) & (freq_axis < 4.0) self.assertTrue(np.allclose(spectrum[zero_mask], 0.0)) self.assertTrue(np.allclose(spectrum[neg_idx], spectrum[pos_idx][::-1])) def test_positive_only_centered_spectrum_keeps_zeros_until_positive_min(self): sweep = np.linspace(1.0, 2.0, 128, dtype=np.float32) freqs = np.linspace(4.0, 10.0, 128, dtype=np.float64) spectrum = build_positive_only_centered_ifft_spectrum(sweep, freqs, fft_len=FFT_LEN) self.assertIsNotNone(spectrum) freq_axis = np.linspace(-10.0, 10.0, FFT_LEN, dtype=np.float64) zero_mask = freq_axis < 4.0 pos_idx = np.flatnonzero(freq_axis >= 4.0) self.assertTrue(np.allclose(spectrum[zero_mask], 0.0)) self.assertTrue(np.any(np.abs(spectrum[pos_idx]) > 0.0)) def test_positive_only_exact_spectrum_uses_direct_index_insertion_without_window(self): sweep = np.asarray([1.0, 2.0, 3.0], dtype=np.float32) freqs = np.asarray([4.0, 5.0, 6.0], dtype=np.float64) spectrum = build_positive_only_exact_centered_ifft_spectrum(sweep, freqs) self.assertIsNotNone(spectrum) df = (6.0 - 4.0) / 2.0 f_shift = np.arange(-6.0, 6.0 + (0.5 * df), df, dtype=np.float64) idx = np.round((freqs - f_shift[0]) / df).astype(np.int64) zero_mask = (f_shift > -6.0) & (f_shift < 4.0) self.assertEqual(int(spectrum.size), int(f_shift.size)) self.assertTrue(np.allclose(spectrum[zero_mask], 0.0)) self.assertTrue(np.allclose(spectrum[idx], sweep)) def test_complex_symmetric_ifft_spectrum_uses_conjugate_mirror(self): sweep = np.exp(1j * np.linspace(0.0, np.pi, 128)).astype(np.complex64) freqs = np.linspace(4.0, 10.0, 128, dtype=np.float64) spectrum = build_symmetric_ifft_spectrum(sweep, freqs, fft_len=FFT_LEN) self.assertIsNotNone(spectrum) freq_axis = np.linspace(-10.0, 10.0, FFT_LEN, dtype=np.float64) neg_idx_all = np.flatnonzero(freq_axis <= (-4.0)) pos_idx_all = np.flatnonzero(freq_axis >= 4.0) band_len = int(min(neg_idx_all.size, pos_idx_all.size)) neg_idx = neg_idx_all[:band_len] pos_idx = pos_idx_all[-band_len:] self.assertTrue(np.iscomplexobj(spectrum)) self.assertTrue(np.allclose(spectrum[neg_idx], np.conj(spectrum[pos_idx][::-1]))) def test_compute_fft_helpers_accept_complex_input(self): sweep = np.exp(1j * np.linspace(0.0, 2.0 * np.pi, 128)).astype(np.complex64) freqs = np.linspace(3.3, 14.3, 128, dtype=np.float64) complex_row = compute_fft_complex_row(sweep, freqs, 513, mode="positive_only") mag = compute_fft_mag_row(sweep, freqs, 513, mode="positive_only") row = compute_fft_row(sweep, freqs, 513, mode="positive_only") self.assertEqual(complex_row.shape, (513,)) self.assertTrue(np.iscomplexobj(complex_row)) self.assertEqual(mag.shape, (513,)) self.assertEqual(row.shape, (513,)) self.assertTrue(np.allclose(mag, np.abs(complex_row), equal_nan=True)) self.assertTrue(np.any(np.isfinite(mag))) self.assertTrue(np.any(np.isfinite(row))) def test_compute_fft_complex_row_positive_only_exact_matches_manual_ifftshift_ifft(self): sweep = np.asarray([1.0 + 1.0j, 2.0 + 0.0j, 3.0 - 1.0j], dtype=np.complex64) freqs = np.asarray([4.0, 5.0, 6.0], dtype=np.float64) bins = 16 row = compute_fft_complex_row(sweep, freqs, bins, mode="positive_only_exact") df = (6.0 - 4.0) / 2.0 f_shift = np.arange(-6.0, 6.0 + (0.5 * df), df, dtype=np.float64) manual_shift = np.zeros((f_shift.size,), dtype=np.complex64) idx = np.round((freqs - f_shift[0]) / df).astype(np.int64) manual_shift[idx] = sweep manual_ifft = np.fft.ifft(np.fft.ifftshift(manual_shift)) expected = np.full((bins,), np.nan + 0j, dtype=np.complex64) expected[: manual_ifft.size] = np.asarray(manual_ifft, dtype=np.complex64) self.assertEqual(row.shape, (bins,)) self.assertTrue(np.allclose(row, expected, equal_nan=True)) def test_positive_only_exact_distance_axis_uses_exact_grid_geometry(self): freqs = np.asarray([4.0, 5.0, 6.0], dtype=np.float64) bins = 8 axis = compute_distance_axis(freqs, bins, mode="positive_only_exact") # With a small bins budget the exact-mode grid is downsampled so # internal IFFT length does not exceed visible bins. df_hz = 2e9 n_shift = int(np.arange(-6.0, 6.0 + 1.0, 2.0, dtype=np.float64).size) expected_step = C_M_S / (2.0 * n_shift * df_hz) expected = np.arange(bins, dtype=np.float64) * expected_step self.assertEqual(axis.shape, (bins,)) self.assertTrue(np.allclose(axis, expected)) def test_positive_only_exact_mode_remains_stable_when_input_points_double(self): bins = FFT_LEN // 2 + 1 tau_s = 45e-9 freqs_400 = np.linspace(3.3, 14.3, 400, dtype=np.float64) freqs_800 = np.linspace(3.3, 14.3, 800, dtype=np.float64) sweep_400 = np.exp(-1j * 2.0 * np.pi * freqs_400 * 1e9 * tau_s).astype(np.complex64) sweep_800 = np.exp(-1j * 2.0 * np.pi * freqs_800 * 1e9 * tau_s).astype(np.complex64) mag_400 = compute_fft_mag_row(sweep_400, freqs_400, bins, mode="positive_only_exact") mag_800 = compute_fft_mag_row(sweep_800, freqs_800, bins, mode="positive_only_exact") self.assertEqual(mag_400.shape, mag_800.shape) finite = np.isfinite(mag_400) & np.isfinite(mag_800) self.assertGreater(int(np.count_nonzero(finite)), int(0.95 * bins)) idx_400 = int(np.nanargmax(mag_400)) idx_800 = int(np.nanargmax(mag_800)) peak_400 = float(np.nanmax(mag_400)) peak_800 = float(np.nanmax(mag_800)) self.assertLess(abs(idx_400 - idx_800), 64) self.assertGreater(idx_400, 8) self.assertGreater(idx_800, 8) self.assertLess(idx_400, bins - 8) self.assertLess(idx_800, bins - 8) self.assertGreater(peak_400, 0.05) self.assertGreater(peak_800, 0.05) def test_resolve_visible_fft_curves_handles_complex_mode(self): complex_row = np.asarray([1.0 + 2.0j, -3.0 + 4.0j], dtype=np.complex64) mag = np.abs(complex_row).astype(np.float32) abs_curve, real_curve, imag_curve = resolve_visible_fft_curves( complex_row, mag, complex_mode=True, show_abs=True, show_real=False, show_imag=True, ) self.assertTrue(np.allclose(abs_curve, mag)) self.assertIsNone(real_curve) self.assertTrue(np.allclose(imag_curve, np.asarray([2.0, 4.0], dtype=np.float32))) def test_resolve_visible_fft_curves_preserves_legacy_abs_mode(self): mag = np.asarray([1.0, 2.0, 3.0], dtype=np.float32) abs_curve, real_curve, imag_curve = resolve_visible_fft_curves( None, mag, complex_mode=False, show_abs=True, show_real=True, show_imag=True, ) self.assertTrue(np.allclose(abs_curve, mag)) self.assertIsNone(real_curve) self.assertIsNone(imag_curve) def test_symmetric_distance_axis_uses_windowed_frequency_bounds(self): freqs = np.linspace(4.0, 10.0, 128, dtype=np.float64) axis = compute_distance_axis(freqs, 513, mode="symmetric") df_hz = (2.0 * 10.0 / max(1, FFT_LEN - 1)) * 1e9 expected_step = 299_792_458.0 / (2.0 * FFT_LEN * df_hz) self.assertEqual(axis.shape, (513,)) self.assertTrue(np.all(np.diff(axis) >= 0.0)) self.assertAlmostEqual(float(axis[1] - axis[0]), expected_step, places=15) def test_peak_helpers_find_reference_and_peak_boxes(self): xs = np.linspace(0.0, 10.0, 200) ys = np.exp(-((xs - 5.0) ** 2) / 0.4) * 10.0 + 1.0 ref = rolling_median_ref(xs, ys, 2.0) peaks = find_top_peaks_over_ref(xs, ys, ref, top_n=3) width = find_peak_width_markers(xs, ys) self.assertEqual(ref.shape, ys.shape) self.assertEqual(len(peaks), 1) self.assertGreater(peaks[0]["x"], 4.0) self.assertLess(peaks[0]["x"], 6.0) self.assertIsNotNone(width) self.assertGreater(width["width"], 0.0) if __name__ == "__main__": unittest.main()