From 64e66933e484b3df3defa6c34185d3d7c5b78932 Mon Sep 17 00:00:00 2001 From: awe Date: Wed, 25 Mar 2026 18:54:59 +0300 Subject: [PATCH] new adc --- README.md | 6 + rfg_adc_plotter/cli.py | 8 ++ rfg_adc_plotter/gui/pyqtgraph_backend.py | 126 +++++++++++++++++--- rfg_adc_plotter/io/sweep_parser_core.py | 56 +++++++++ rfg_adc_plotter/io/sweep_reader.py | 5 + rfg_adc_plotter/processing/calibration.py | 13 +- rfg_adc_plotter/processing/fft.py | 48 ++++++-- rfg_adc_plotter/processing/normalization.py | 10 +- rfg_adc_plotter/state/ring_buffer.py | 39 ++++-- rfg_adc_plotter/state/runtime_state.py | 3 + tests/test_cli.py | 1 + tests/test_processing.py | 48 ++++++++ tests/test_ring_buffer.py | 14 +++ tests/test_sweep_parser_core.py | 15 +++ 14 files changed, 350 insertions(+), 42 deletions(-) diff --git a/README.md b/README.md index 8e7e028..0bdef69 100644 --- a/README.md +++ b/README.md @@ -127,6 +127,12 @@ Logscale binary `16-bit x2`: .venv/bin/python -m rfg_adc_plotter.main /dev/ttyACM0 --parser_test ``` +Комплексный ASCII-поток `step real imag`: + +```bash +.venv/bin/python -m rfg_adc_plotter.main /dev/ttyACM0 --parser_complex_ascii +``` + ## Локальная проверка через replay_pty Если есть лог-файл захвата, его можно воспроизвести как виртуальный последовательный порт. diff --git a/rfg_adc_plotter/cli.py b/rfg_adc_plotter/cli.py index 6172d28..96f0761 100644 --- a/rfg_adc_plotter/cli.py +++ b/rfg_adc_plotter/cli.py @@ -95,6 +95,14 @@ def build_parser() -> argparse.ArgumentParser: "одиночный 0xFFFF завершает точку, серия 0xFFFF начинает новый свип" ), ) + parser.add_argument( + "--parser_complex_ascii", + action="store_true", + help=( + "ASCII-поток из трех чисел на строку: step real imag. " + "Новый свип определяется по сбросу/повтору step, FFT строится по комплексным данным" + ), + ) parser.add_argument( "--calibrate", action="store_true", diff --git a/rfg_adc_plotter/gui/pyqtgraph_backend.py b/rfg_adc_plotter/gui/pyqtgraph_backend.py index 13cdf9c..9b10486 100644 --- a/rfg_adc_plotter/gui/pyqtgraph_backend.py +++ b/rfg_adc_plotter/gui/pyqtgraph_backend.py @@ -155,6 +155,41 @@ def apply_working_range_to_aux_curves( ) +def apply_working_range_to_signal( + freqs: Optional[np.ndarray], + sweep: Optional[np.ndarray], + signal_values: Optional[np.ndarray], + range_min_ghz: float, + range_max_ghz: float, +) -> Optional[np.ndarray]: + """Crop an arbitrary signal with the same mask used for the raw sweep.""" + if freqs is None or sweep is None or signal_values is None: + return None + + freq_arr = np.asarray(freqs, dtype=np.float64).reshape(-1) + sweep_arr = np.asarray(sweep, dtype=np.float32).reshape(-1) + signal_arr = np.asarray(signal_values).reshape(-1) + width = min(freq_arr.size, sweep_arr.size, signal_arr.size) + if width <= 0: + return None + + freq_arr = freq_arr[:width] + sweep_arr = sweep_arr[:width] + signal_arr = signal_arr[:width] + valid = ( + np.isfinite(freq_arr) + & np.isfinite(sweep_arr) + & (freq_arr >= float(range_min_ghz)) + & (freq_arr <= float(range_max_ghz)) + ) + if not np.any(valid): + return None + + if np.iscomplexobj(signal_arr): + return np.asarray(signal_arr[valid], dtype=np.complex64) + return np.asarray(signal_arr[valid], dtype=np.float32) + + def resolve_visible_aux_curves(aux_curves: SweepAuxCurves, enabled: bool) -> SweepAuxCurves: """Return auxiliary curves only when their display is enabled.""" if (not enabled) or aux_curves is None: @@ -198,6 +233,7 @@ def run_pyqtgraph(args) -> None: """Start the PyQtGraph GUI.""" peak_calibrate_mode = bool(getattr(args, "calibrate", False)) peak_search_enabled = bool(getattr(args, "peak_search", False)) + complex_sweep_mode = bool(getattr(args, "parser_complex_ascii", False)) try: import pyqtgraph as pg from pyqtgraph.Qt import QtCore, QtWidgets # type: ignore @@ -218,6 +254,7 @@ def run_pyqtgraph(args) -> None: logscale=bool(args.logscale), parser_16_bit_x2=bool(args.parser_16_bit_x2), parser_test=bool(args.parser_test), + parser_complex_ascii=complex_sweep_mode, ) reader.start() @@ -413,6 +450,12 @@ def run_pyqtgraph(args) -> None: background_buttons_row.addWidget(background_load_btn) background_group_layout.addLayout(background_buttons_row) parsed_data_cb = QtWidgets.QCheckBox("данные после парсинга") + if complex_sweep_mode: + try: + parsed_data_cb.setText("Re/Im после парсинга") + p_line.setTitle("Модуль комплексного сигнала") + except Exception: + pass try: settings_layout.addWidget(QtWidgets.QLabel("Настройки")) except Exception: @@ -576,6 +619,8 @@ def run_pyqtgraph(args) -> None: if runtime.full_current_freqs is None or runtime.full_current_sweep_raw is None: runtime.current_freqs = None runtime.current_sweep_raw = None + runtime.current_fft_source = None + runtime.current_fft_input = None runtime.current_aux_curves = None runtime.current_sweep_norm = None runtime.current_fft_mag = None @@ -591,6 +636,13 @@ def run_pyqtgraph(args) -> None: ) runtime.current_freqs = current_freqs runtime.current_sweep_raw = current_sweep + runtime.current_fft_source = apply_working_range_to_signal( + runtime.full_current_freqs, + runtime.full_current_sweep_raw, + runtime.full_current_fft_source, + runtime.range_min_ghz, + runtime.range_max_ghz, + ) runtime.current_aux_curves = apply_working_range_to_aux_curves( runtime.full_current_freqs, runtime.full_current_sweep_raw, @@ -604,6 +656,8 @@ def run_pyqtgraph(args) -> None: reset_ring_buffers() runtime.current_freqs = None runtime.current_sweep_raw = None + runtime.current_fft_source = None + runtime.current_fft_input = None runtime.current_aux_curves = None runtime.current_sweep_norm = None runtime.current_fft_mag = None @@ -615,6 +669,10 @@ def run_pyqtgraph(args) -> None: recompute_current_processed_sweep(push_to_ring=push_to_ring) def recompute_current_processed_sweep(push_to_ring: bool = False) -> None: + fft_source = runtime.current_fft_source + if fft_source is None and runtime.current_sweep_raw is not None: + fft_source = np.asarray(runtime.current_sweep_raw, dtype=np.float32) + if ( runtime.current_sweep_raw is not None and runtime.current_sweep_raw.size > 0 @@ -625,6 +683,16 @@ def run_pyqtgraph(args) -> None: else: runtime.current_sweep_norm = None + if fft_source is None or np.asarray(fft_source).size == 0: + runtime.current_fft_input = None + elif calib_enabled and runtime.calib_envelope is not None: + runtime.current_fft_input = normalize_by_envelope(fft_source, runtime.calib_envelope) + else: + runtime.current_fft_input = np.asarray( + fft_source, + dtype=np.complex64 if np.iscomplexobj(fft_source) else np.float32, + ).copy() + runtime.current_fft_mag = None runtime.current_fft_db = None if ( @@ -635,8 +703,9 @@ def run_pyqtgraph(args) -> None: return sweep_for_processing = runtime.current_sweep_norm if runtime.current_sweep_norm is not None else runtime.current_sweep_raw + fft_input_for_processing = runtime.current_fft_input if runtime.current_fft_input is not None else sweep_for_processing ensure_buffer(runtime.current_sweep_raw.size) - runtime.ring.push(sweep_for_processing, runtime.current_freqs) + runtime.ring.push(sweep_for_processing, runtime.current_freqs, fft_input=fft_input_for_processing) runtime.current_distances = runtime.ring.distance_axis runtime.current_fft_mag = runtime.ring.get_last_fft_linear() runtime.current_fft_db = runtime.ring.last_fft_db @@ -860,7 +929,7 @@ def run_pyqtgraph(args) -> None: runtime.mark_dirty() try: - fft_mode_combo.setCurrentIndex(1) + fft_mode_combo.setCurrentIndex(2 if complex_sweep_mode else 1) except Exception: pass restore_range_controls() @@ -1060,27 +1129,46 @@ def run_pyqtgraph(args) -> None: break drained += 1 base_freqs = np.linspace(SWEEP_FREQ_MIN_GHZ, SWEEP_FREQ_MAX_GHZ, sweep.size, dtype=np.float64) - calibrated = calibrate_freqs( - { - "F": base_freqs, - "I": sweep, - } - ) - runtime.full_current_freqs = np.asarray(calibrated["F"], dtype=np.float64) - runtime.full_current_sweep_raw = np.asarray(calibrated["I"], dtype=np.float32) - if aux_curves is None: - runtime.full_current_aux_curves = None - else: + runtime.full_current_aux_curves = None + runtime.full_current_fft_source = None + if complex_sweep_mode and aux_curves is not None: try: aux_1, aux_2 = aux_curves - calibrated_aux_1 = calibrate_freqs({"F": base_freqs, "I": aux_1})["I"] + calibrated_aux_1_payload = calibrate_freqs({"F": base_freqs, "I": aux_1}) calibrated_aux_2 = calibrate_freqs({"F": base_freqs, "I": aux_2})["I"] - runtime.full_current_aux_curves = ( - np.asarray(calibrated_aux_1, dtype=np.float32), - np.asarray(calibrated_aux_2, dtype=np.float32), + runtime.full_current_freqs = np.asarray(calibrated_aux_1_payload["F"], dtype=np.float64) + calibrated_aux_1 = np.asarray(calibrated_aux_1_payload["I"], dtype=np.float32) + calibrated_aux_2 = np.asarray(calibrated_aux_2, dtype=np.float32) + runtime.full_current_aux_curves = (calibrated_aux_1, calibrated_aux_2) + runtime.full_current_fft_source = ( + calibrated_aux_1.astype(np.complex64) + (1j * calibrated_aux_2.astype(np.complex64)) ) + runtime.full_current_sweep_raw = np.abs(runtime.full_current_fft_source).astype(np.float32) except Exception: runtime.full_current_aux_curves = None + runtime.full_current_fft_source = None + + if runtime.full_current_fft_source is None: + calibrated = calibrate_freqs( + { + "F": base_freqs, + "I": sweep, + } + ) + runtime.full_current_freqs = np.asarray(calibrated["F"], dtype=np.float64) + runtime.full_current_sweep_raw = np.asarray(calibrated["I"], dtype=np.float32) + runtime.full_current_fft_source = np.asarray(runtime.full_current_sweep_raw, dtype=np.float32).copy() + if aux_curves is not None: + try: + aux_1, aux_2 = aux_curves + calibrated_aux_1 = calibrate_freqs({"F": base_freqs, "I": aux_1})["I"] + calibrated_aux_2 = calibrate_freqs({"F": base_freqs, "I": aux_2})["I"] + runtime.full_current_aux_curves = ( + np.asarray(calibrated_aux_1, dtype=np.float32), + np.asarray(calibrated_aux_2, dtype=np.float32), + ) + except Exception: + runtime.full_current_aux_curves = None runtime.current_info = info refresh_current_window(push_to_ring=True) if drained > 0: @@ -1165,7 +1253,9 @@ def run_pyqtgraph(args) -> None: if finite_x.size > 0: p_line.setXRange(float(np.min(finite_x)), float(np.max(finite_x)), padding=0) - sweep_for_fft = runtime.current_sweep_norm if runtime.current_sweep_norm is not None else runtime.current_sweep_raw + sweep_for_fft = runtime.current_fft_input + if sweep_for_fft is None: + sweep_for_fft = runtime.current_sweep_norm if runtime.current_sweep_norm is not None else runtime.current_sweep_raw distance_axis = runtime.current_distances if runtime.current_distances is not None else runtime.ring.distance_axis if sweep_for_fft is not None and sweep_for_fft.size > 0 and distance_axis is not None: if runtime.current_fft_mag is None or runtime.current_fft_mag.size != distance_axis.size or runtime.plot_dirty: diff --git a/rfg_adc_plotter/io/sweep_parser_core.py b/rfg_adc_plotter/io/sweep_parser_core.py index 2772709..e5fa2c3 100644 --- a/rfg_adc_plotter/io/sweep_parser_core.py +++ b/rfg_adc_plotter/io/sweep_parser_core.py @@ -82,6 +82,62 @@ class AsciiSweepParser: return events +class ComplexAsciiSweepParser: + """Incremental parser for ASCII ``step real imag`` streams.""" + + def __init__(self): + self._buf = bytearray() + self._last_step: Optional[int] = None + self._seen_points = False + + def feed(self, data: bytes) -> List[ParserEvent]: + if data: + self._buf += data + events: List[ParserEvent] = [] + while True: + nl = self._buf.find(b"\n") + if nl == -1: + break + line = bytes(self._buf[:nl]) + del self._buf[: nl + 1] + if line.endswith(b"\r"): + line = line[:-1] + if not line: + continue + + if line.lower().startswith(b"sweep_start"): + self._last_step = None + self._seen_points = False + events.append(StartEvent()) + continue + + parts = line.split() + if len(parts) < 3: + continue + try: + step = int(parts[0], 10) + real = float(parts[1]) + imag = float(parts[2]) + except Exception: + continue + if step < 0 or (not math.isfinite(real)) or (not math.isfinite(imag)): + continue + + if self._seen_points and self._last_step is not None and step <= self._last_step: + events.append(StartEvent()) + self._seen_points = True + self._last_step = step + events.append( + PointEvent( + ch=0, + x=step, + y=float(abs(complex(real, imag))), + aux=(float(real), float(imag)), + ) + ) + return events + + class LegacyBinaryParser: """Byte-resynchronizing parser for legacy 8-byte binary records.""" diff --git a/rfg_adc_plotter/io/sweep_reader.py b/rfg_adc_plotter/io/sweep_reader.py index 3c3639f..2cb6730 100644 --- a/rfg_adc_plotter/io/sweep_reader.py +++ b/rfg_adc_plotter/io/sweep_reader.py @@ -10,6 +10,7 @@ from queue import Full, Queue from rfg_adc_plotter.io.serial_source import SerialChunkReader, SerialLineSource from rfg_adc_plotter.io.sweep_parser_core import ( AsciiSweepParser, + ComplexAsciiSweepParser, LegacyBinaryParser, LogScale16BitX2BinaryParser, LogScaleBinaryParser32, @@ -33,6 +34,7 @@ class SweepReader(threading.Thread): logscale: bool = False, parser_16_bit_x2: bool = False, parser_test: bool = False, + parser_complex_ascii: bool = False, ): super().__init__(daemon=True) self._port_path = port_path @@ -44,9 +46,12 @@ class SweepReader(threading.Thread): self._logscale = bool(logscale) self._parser_16_bit_x2 = bool(parser_16_bit_x2) self._parser_test = bool(parser_test) + self._parser_complex_ascii = bool(parser_complex_ascii) self._src: SerialLineSource | None = None def _build_parser(self): + if self._parser_complex_ascii: + return ComplexAsciiSweepParser(), SweepAssembler(fancy=self._fancy, apply_inversion=False) if self._parser_test: return ParserTestStreamParser(), SweepAssembler(fancy=self._fancy, apply_inversion=False) if self._parser_16_bit_x2: diff --git a/rfg_adc_plotter/processing/calibration.py b/rfg_adc_plotter/processing/calibration.py index 0a23dc2..ff41fa1 100644 --- a/rfg_adc_plotter/processing/calibration.py +++ b/rfg_adc_plotter/processing/calibration.py @@ -65,14 +65,23 @@ def set_calibration_base_value(index: int, value: float) -> np.ndarray: def calibrate_freqs(sweep: Mapping[str, Any]) -> SweepData: """Return a sweep copy with calibrated and resampled frequency axis.""" freqs = np.asarray(sweep["F"], dtype=np.float64).copy() - values = np.asarray(sweep["I"], dtype=np.float64).copy() + values_in = np.asarray(sweep["I"]).reshape(-1) + values = np.asarray( + values_in, + dtype=np.complex128 if np.iscomplexobj(values_in) else np.float64, + ).copy() coeffs = np.asarray(CALIBRATION_C, dtype=np.float64) if freqs.size > 0: freqs = coeffs[0] + coeffs[1] * freqs + coeffs[2] * (freqs * freqs) if freqs.size >= 2: freqs_cal = np.linspace(float(freqs[0]), float(freqs[-1]), freqs.size, dtype=np.float64) - values_cal = np.interp(freqs_cal, freqs, values).astype(np.float64) + if np.iscomplexobj(values): + values_real = np.interp(freqs_cal, freqs, values.real.astype(np.float64, copy=False)) + values_imag = np.interp(freqs_cal, freqs, values.imag.astype(np.float64, copy=False)) + values_cal = (values_real + (1j * values_imag)).astype(np.complex64) + else: + values_cal = np.interp(freqs_cal, freqs, values).astype(np.float64) else: freqs_cal = freqs.copy() values_cal = values.copy() diff --git a/rfg_adc_plotter/processing/fft.py b/rfg_adc_plotter/processing/fft.py index d91e31c..8688337 100644 --- a/rfg_adc_plotter/processing/fft.py +++ b/rfg_adc_plotter/processing/fft.py @@ -24,6 +24,21 @@ def _finite_freq_bounds(freqs: Optional[np.ndarray]) -> Optional[Tuple[float, fl return f_min, f_max +def _coerce_sweep_array(sweep: np.ndarray) -> np.ndarray: + values = np.asarray(sweep).reshape(-1) + if np.iscomplexobj(values): + return np.asarray(values, dtype=np.complex64) + return np.asarray(values, dtype=np.float32) + + +def _interp_signal(x_uniform: np.ndarray, x_known: np.ndarray, y_known: np.ndarray) -> np.ndarray: + if np.iscomplexobj(y_known): + real = np.interp(x_uniform, x_known, np.asarray(y_known.real, dtype=np.float64)) + imag = np.interp(x_uniform, x_known, np.asarray(y_known.imag, dtype=np.float64)) + return (real + (1j * imag)).astype(np.complex64) + return np.interp(x_uniform, x_known, np.asarray(y_known, dtype=np.float64)).astype(np.float32) + + def prepare_fft_segment( sweep: np.ndarray, freqs: Optional[np.ndarray], @@ -34,8 +49,10 @@ def prepare_fft_segment( if take_fft <= 0: return None - sweep_seg = np.asarray(sweep[:take_fft], dtype=np.float32) - fallback = np.nan_to_num(sweep_seg, nan=0.0).astype(np.float32, copy=False) + sweep_arr = _coerce_sweep_array(sweep) + sweep_seg = sweep_arr[:take_fft] + fallback_dtype = np.complex64 if np.iscomplexobj(sweep_seg) else np.float32 + fallback = np.nan_to_num(sweep_seg, nan=0.0).astype(fallback_dtype, copy=False) if freqs is None: return fallback, take_fft @@ -59,7 +76,7 @@ def prepare_fft_segment( return fallback, take_fft x_uniform = np.linspace(float(x_unique[0]), float(x_unique[-1]), take_fft, dtype=np.float64) - resampled = np.interp(x_uniform, x_unique, y_unique).astype(np.float32) + resampled = _interp_signal(x_uniform, x_unique, y_unique) return resampled, take_fft @@ -94,18 +111,20 @@ def build_symmetric_ifft_spectrum( fft_seg, take_fft = prepared if take_fft != band_len: - fft_seg = np.asarray(fft_seg[:band_len], dtype=np.float32) + fft_dtype = np.complex64 if np.iscomplexobj(fft_seg) else np.float32 + fft_seg = np.asarray(fft_seg[:band_len], dtype=fft_dtype) if fft_seg.size < band_len: - padded = np.zeros((band_len,), dtype=np.float32) + padded = np.zeros((band_len,), dtype=fft_dtype) padded[: fft_seg.size] = fft_seg fft_seg = padded window = np.hanning(band_len).astype(np.float32) - band = np.nan_to_num(fft_seg, nan=0.0).astype(np.float32, copy=False) * window + band_dtype = np.complex64 if np.iscomplexobj(fft_seg) else np.float32 + band = np.nan_to_num(fft_seg, nan=0.0).astype(band_dtype, copy=False) * window - spectrum = np.zeros((int(fft_len),), dtype=np.float32) + spectrum = np.zeros((int(fft_len),), dtype=band_dtype) spectrum[pos_idx] = band - spectrum[neg_idx] = band[::-1] + spectrum[neg_idx] = np.conj(band[::-1]) if np.iscomplexobj(band) else band[::-1] return spectrum @@ -137,16 +156,18 @@ def build_positive_only_centered_ifft_spectrum( fft_seg, take_fft = prepared if take_fft != band_len: - fft_seg = np.asarray(fft_seg[:band_len], dtype=np.float32) + fft_dtype = np.complex64 if np.iscomplexobj(fft_seg) else np.float32 + fft_seg = np.asarray(fft_seg[:band_len], dtype=fft_dtype) if fft_seg.size < band_len: - padded = np.zeros((band_len,), dtype=np.float32) + padded = np.zeros((band_len,), dtype=fft_dtype) padded[: fft_seg.size] = fft_seg fft_seg = padded window = np.hanning(band_len).astype(np.float32) - band = np.nan_to_num(fft_seg, nan=0.0).astype(np.float32, copy=False) * window + band_dtype = np.complex64 if np.iscomplexobj(fft_seg) else np.float32 + band = np.nan_to_num(fft_seg, nan=0.0).astype(band_dtype, copy=False) * window - spectrum = np.zeros((int(fft_len),), dtype=np.float32) + spectrum = np.zeros((int(fft_len),), dtype=band_dtype) spectrum[pos_idx] = band return spectrum @@ -168,7 +189,8 @@ def _compute_fft_mag_row_direct( return np.full((bins,), np.nan, dtype=np.float32) fft_seg, take_fft = prepared - fft_in = np.zeros((FFT_LEN,), dtype=np.float32) + fft_dtype = np.complex64 if np.iscomplexobj(fft_seg) else np.float32 + fft_in = np.zeros((FFT_LEN,), dtype=fft_dtype) window = np.hanning(take_fft).astype(np.float32) fft_in[:take_fft] = fft_seg * window spec = np.fft.ifft(fft_in) diff --git a/rfg_adc_plotter/processing/normalization.py b/rfg_adc_plotter/processing/normalization.py index ec3bd39..0a6e41c 100644 --- a/rfg_adc_plotter/processing/normalization.py +++ b/rfg_adc_plotter/processing/normalization.py @@ -150,18 +150,24 @@ def resample_envelope(envelope: np.ndarray, width: int) -> np.ndarray: def normalize_by_envelope(raw: np.ndarray, envelope: np.ndarray) -> np.ndarray: """Normalize a sweep by an envelope with safe resampling and zero protection.""" - raw_arr = np.asarray(raw, dtype=np.float32).reshape(-1) + raw_in = np.asarray(raw).reshape(-1) + raw_dtype = np.complex64 if np.iscomplexobj(raw_in) else np.float32 + raw_arr = np.asarray(raw_in, dtype=raw_dtype).reshape(-1) if raw_arr.size == 0: return raw_arr.copy() env = resample_envelope(envelope, raw_arr.size) - out = np.full_like(raw_arr, np.nan, dtype=np.float32) + out = np.full(raw_arr.shape, np.nan + 0j if np.iscomplexobj(raw_arr) else np.nan, dtype=raw_dtype) den_eps = np.float32(1e-9) valid = np.isfinite(raw_arr) & np.isfinite(env) if np.any(valid): with np.errstate(divide="ignore", invalid="ignore"): denom = env[valid] + np.where(env[valid] >= 0.0, den_eps, -den_eps) out[valid] = raw_arr[valid] / denom + if np.iscomplexobj(out): + out_real = np.nan_to_num(out.real, nan=np.nan, posinf=np.nan, neginf=np.nan) + out_imag = np.nan_to_num(out.imag, nan=np.nan, posinf=np.nan, neginf=np.nan) + return (out_real + (1j * out_imag)).astype(np.complex64, copy=False) return np.nan_to_num(out, nan=np.nan, posinf=np.nan, neginf=np.nan) diff --git a/rfg_adc_plotter/state/ring_buffer.py b/rfg_adc_plotter/state/ring_buffer.py index 124c451..86430d9 100644 --- a/rfg_adc_plotter/state/ring_buffer.py +++ b/rfg_adc_plotter/state/ring_buffer.py @@ -23,6 +23,7 @@ class RingBuffer: self.ring: Optional[np.ndarray] = None self.ring_time: Optional[np.ndarray] = None self.ring_fft: Optional[np.ndarray] = None + self.ring_fft_input: Optional[np.ndarray] = None self.x_shared: Optional[np.ndarray] = None self.distance_axis: Optional[np.ndarray] = None self.last_fft_mag: Optional[np.ndarray] = None @@ -46,6 +47,7 @@ class RingBuffer: self.ring = None self.ring_time = None self.ring_fft = None + self.ring_fft_input = None self.x_shared = None self.distance_axis = None self.last_fft_mag = None @@ -63,13 +65,18 @@ class RingBuffer: self.ring = np.full((self.max_sweeps, self.width), np.nan, dtype=np.float32) self.ring_time = np.full((self.max_sweeps,), np.nan, dtype=np.float64) self.ring_fft = np.full((self.max_sweeps, self.fft_bins), np.nan, dtype=np.float32) + self.ring_fft_input = np.full((self.max_sweeps, self.width), np.nan + 0j, dtype=np.complex64) self.head = 0 changed = True elif target_width != self.width: new_ring = np.full((self.max_sweeps, target_width), np.nan, dtype=np.float32) + new_fft_input = np.full((self.max_sweeps, target_width), np.nan + 0j, dtype=np.complex64) take = min(self.width, target_width) new_ring[:, :take] = self.ring[:, :take] + if self.ring_fft_input is not None: + new_fft_input[:, :take] = self.ring_fft_input[:, :take] self.ring = new_ring + self.ring_fft_input = new_fft_input self.width = target_width changed = True @@ -106,12 +113,18 @@ class RingBuffer: self.ring_fft.fill(np.nan) for row_idx in range(self.ring.shape[0]): - sweep_row = self.ring[row_idx] - if not np.any(np.isfinite(sweep_row)): + fft_source_row = self.ring_fft_input[row_idx] if self.ring_fft_input is not None else self.ring[row_idx] + if not np.any(np.isfinite(fft_source_row)): continue + finite_idx = np.flatnonzero(np.isfinite(fft_source_row)) + if finite_idx.size <= 0: + continue + row_width = int(finite_idx[-1]) + 1 + fft_source = fft_source_row[:row_width] + freqs = self.last_freqs[:row_width] if self.last_freqs is not None and self.last_freqs.size >= row_width else self.last_freqs fft_mag = compute_fft_mag_row( - sweep_row, - self.last_freqs, + fft_source, + freqs, self.fft_bins, mode=self.fft_mode, ) @@ -140,12 +153,18 @@ class RingBuffer: """Backward-compatible wrapper for the old two-state FFT switch.""" return self.set_fft_mode("symmetric" if enabled else "direct") - def push(self, sweep: np.ndarray, freqs: Optional[np.ndarray] = None) -> None: + def push( + self, + sweep: np.ndarray, + freqs: Optional[np.ndarray] = None, + *, + fft_input: Optional[np.ndarray] = None, + ) -> None: """Push a processed sweep and refresh raw/FFT buffers.""" if sweep is None or sweep.size == 0: return self.ensure_init(int(sweep.size)) - if self.ring is None or self.ring_time is None or self.ring_fft is None: + if self.ring is None or self.ring_time is None or self.ring_fft is None or self.ring_fft_input is None: return row = np.full((self.width,), np.nan, dtype=np.float32) @@ -156,7 +175,13 @@ class RingBuffer: if freqs is not None: self.last_freqs = np.asarray(freqs, dtype=np.float64).copy() - fft_mag = compute_fft_mag_row(sweep, freqs, self.fft_bins, mode=self.fft_mode) + fft_source = np.asarray(fft_input if fft_input is not None else sweep).reshape(-1) + fft_row = np.full((self.width,), np.nan + 0j, dtype=np.complex64) + fft_take = min(self.width, int(fft_source.size)) + fft_row[:fft_take] = np.asarray(fft_source[:fft_take], dtype=np.complex64) + self.ring_fft_input[self.head, :] = fft_row + + fft_mag = compute_fft_mag_row(fft_source, freqs, self.fft_bins, mode=self.fft_mode) self.ring_fft[self.head, :] = fft_mag self.last_fft_mag = np.asarray(fft_mag, dtype=np.float32).copy() self.last_fft_db = fft_mag_to_db(fft_mag) diff --git a/rfg_adc_plotter/state/runtime_state.py b/rfg_adc_plotter/state/runtime_state.py index 80f3e83..d08efa5 100644 --- a/rfg_adc_plotter/state/runtime_state.py +++ b/rfg_adc_plotter/state/runtime_state.py @@ -20,10 +20,13 @@ class RuntimeState: range_max_ghz: float = 0.0 full_current_freqs: Optional[np.ndarray] = None full_current_sweep_raw: Optional[np.ndarray] = None + full_current_fft_source: Optional[np.ndarray] = None full_current_aux_curves: SweepAuxCurves = None current_freqs: Optional[np.ndarray] = None current_distances: Optional[np.ndarray] = None current_sweep_raw: Optional[np.ndarray] = None + current_fft_source: Optional[np.ndarray] = None + current_fft_input: Optional[np.ndarray] = None current_aux_curves: SweepAuxCurves = None current_sweep_norm: Optional[np.ndarray] = None current_fft_mag: Optional[np.ndarray] = None diff --git a/tests/test_cli.py b/tests/test_cli.py index a67b802..41e4f83 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -31,6 +31,7 @@ class CliTests(unittest.TestCase): self.assertEqual(proc.returncode, 0) self.assertIn("usage:", proc.stdout) self.assertIn("--parser_16_bit_x2", proc.stdout) + self.assertIn("--parser_complex_ascii", proc.stdout) def test_backend_mpl_reports_removal(self): proc = _run("-m", "rfg_adc_plotter.main", "/dev/null", "--backend", "mpl") diff --git a/tests/test_processing.py b/tests/test_processing.py index 5802ca0..69503ce 100644 --- a/tests/test_processing.py +++ b/tests/test_processing.py @@ -56,6 +56,18 @@ class ProcessingTests(unittest.TestCase): self.assertEqual(calibrated["I"].shape, (32,)) self.assertTrue(np.all(np.diff(calibrated["F"]) >= 0.0)) + def test_calibrate_freqs_keeps_complex_payload(self): + sweep = { + "F": np.linspace(3.3, 14.3, 32), + "I": np.exp(1j * np.linspace(0.0, np.pi, 32)).astype(np.complex64), + } + calibrated = calibrate_freqs(sweep) + + self.assertEqual(calibrated["F"].shape, (32,)) + self.assertEqual(calibrated["I"].shape, (32,)) + self.assertTrue(np.iscomplexobj(calibrated["I"])) + self.assertTrue(np.all(np.isfinite(calibrated["I"]))) + def test_normalizers_and_envelopes_return_finite_ranges(self): calib = (np.sin(np.linspace(0.0, 4.0 * np.pi, 64)) * 5.0).astype(np.float32) raw = calib * 0.75 @@ -105,6 +117,15 @@ class ProcessingTests(unittest.TestCase): self.assertAlmostEqual(float(normalized[1]), 2.0, places=5) self.assertAlmostEqual(float(normalized[2]), -3.0, places=5) + def test_normalize_by_envelope_supports_complex_input(self): + raw = np.asarray([1.0 + 1.0j, 2.0 - 2.0j], dtype=np.complex64) + envelope = np.asarray([1.0, 2.0], dtype=np.float32) + normalized = normalize_by_envelope(raw, envelope) + + self.assertTrue(np.iscomplexobj(normalized)) + self.assertTrue(np.all(np.isfinite(normalized))) + self.assertTrue(np.allclose(normalized, np.asarray([1.0 + 1.0j, 1.0 - 1.0j], dtype=np.complex64))) + def test_load_calib_envelope_rejects_empty_payload(self): with tempfile.TemporaryDirectory() as tmp_dir: path = os.path.join(tmp_dir, "empty.npy") @@ -247,6 +268,33 @@ class ProcessingTests(unittest.TestCase): self.assertTrue(np.allclose(spectrum[zero_mask], 0.0)) self.assertTrue(np.any(np.abs(spectrum[pos_idx]) > 0.0)) + def test_complex_symmetric_ifft_spectrum_uses_conjugate_mirror(self): + sweep = np.exp(1j * np.linspace(0.0, np.pi, 128)).astype(np.complex64) + freqs = np.linspace(4.0, 10.0, 128, dtype=np.float64) + spectrum = build_symmetric_ifft_spectrum(sweep, freqs, fft_len=FFT_LEN) + + self.assertIsNotNone(spectrum) + freq_axis = np.linspace(-10.0, 10.0, FFT_LEN, dtype=np.float64) + neg_idx_all = np.flatnonzero(freq_axis <= (-4.0)) + pos_idx_all = np.flatnonzero(freq_axis >= 4.0) + band_len = int(min(neg_idx_all.size, pos_idx_all.size)) + neg_idx = neg_idx_all[:band_len] + pos_idx = pos_idx_all[-band_len:] + + self.assertTrue(np.iscomplexobj(spectrum)) + self.assertTrue(np.allclose(spectrum[neg_idx], np.conj(spectrum[pos_idx][::-1]))) + + def test_compute_fft_helpers_accept_complex_input(self): + sweep = np.exp(1j * np.linspace(0.0, 2.0 * np.pi, 128)).astype(np.complex64) + freqs = np.linspace(3.3, 14.3, 128, dtype=np.float64) + mag = compute_fft_mag_row(sweep, freqs, 513, mode="positive_only") + row = compute_fft_row(sweep, freqs, 513, mode="positive_only") + + self.assertEqual(mag.shape, (513,)) + self.assertEqual(row.shape, (513,)) + self.assertTrue(np.any(np.isfinite(mag))) + self.assertTrue(np.any(np.isfinite(row))) + def test_symmetric_distance_axis_uses_windowed_frequency_bounds(self): freqs = np.linspace(4.0, 10.0, 128, dtype=np.float64) axis = compute_distance_axis(freqs, 513, mode="symmetric") diff --git a/tests/test_ring_buffer.py b/tests/test_ring_buffer.py index 7cf2c4f..26fd692 100644 --- a/tests/test_ring_buffer.py +++ b/tests/test_ring_buffer.py @@ -3,6 +3,7 @@ from __future__ import annotations import numpy as np import unittest +from rfg_adc_plotter.processing.fft import compute_fft_mag_row from rfg_adc_plotter.state.ring_buffer import RingBuffer @@ -72,6 +73,19 @@ class RingBufferTests(unittest.TestCase): self.assertEqual(ring.last_fft_db.shape, (ring.fft_bins,)) self.assertIsNotNone(ring.distance_axis) + def test_ring_buffer_rebuilds_fft_from_complex_input(self): + ring = RingBuffer(max_sweeps=2) + freqs = np.linspace(3.3, 14.3, 64, dtype=np.float64) + complex_input = np.exp(1j * np.linspace(0.0, 2.0 * np.pi, 64)).astype(np.complex64) + display_sweep = np.abs(complex_input).astype(np.float32) + ring.push(display_sweep, freqs, fft_input=complex_input) + + ring.set_fft_mode("direct") + + expected = compute_fft_mag_row(complex_input, freqs, ring.fft_bins, mode="direct") + self.assertTrue(np.allclose(ring.get_last_fft_linear(), expected)) + self.assertTrue(np.allclose(ring.get_display_raw()[: display_sweep.size, -1], display_sweep)) + def test_ring_buffer_reset_clears_cached_history(self): ring = RingBuffer(max_sweeps=2) ring.push(np.linspace(0.0, 1.0, 64, dtype=np.float32), np.linspace(4.0, 10.0, 64)) diff --git a/tests/test_sweep_parser_core.py b/tests/test_sweep_parser_core.py index 04c2f42..aa8ccc8 100644 --- a/tests/test_sweep_parser_core.py +++ b/tests/test_sweep_parser_core.py @@ -5,6 +5,7 @@ import unittest from rfg_adc_plotter.io.sweep_parser_core import ( AsciiSweepParser, + ComplexAsciiSweepParser, LegacyBinaryParser, LogScale16BitX2BinaryParser, LogScaleBinaryParser32, @@ -96,6 +97,20 @@ class SweepParserCoreTests(unittest.TestCase): self.assertEqual(events[1].x, 1) self.assertEqual(events[1].y, -2.0) + def test_complex_ascii_parser_detects_new_sweep_on_step_reset(self): + parser = ComplexAsciiSweepParser() + events = parser.feed(b"0 3 4\n1 5 12\n0 8 15\n") + + self.assertIsInstance(events[0], PointEvent) + self.assertEqual(events[0].x, 0) + self.assertEqual(events[0].y, 5.0) + self.assertEqual(events[0].aux, (3.0, 4.0)) + self.assertIsInstance(events[1], PointEvent) + self.assertEqual(events[1].y, 13.0) + self.assertIsInstance(events[2], StartEvent) + self.assertIsInstance(events[3], PointEvent) + self.assertEqual(events[3].aux, (8.0, 15.0)) + def test_logscale_32_parser_keeps_channel_and_aux_values(self): parser = LogScaleBinaryParser32() stream = _pack_log_start(5) + _pack_log_point(7, 1500, 700, ch=5)