From c40df97085d8f01e5545672aa51a49b69cd2eca7 Mon Sep 17 00:00:00 2001 From: awe Date: Wed, 15 Apr 2026 19:09:11 +0300 Subject: [PATCH] ampl parser --- README.md | 9 +- rfg_adc_plotter/cli.py | 9 +- rfg_adc_plotter/gui/pyqtgraph_backend.py | 121 +++++++++++++++++++++-- rfg_adc_plotter/io/sweep_parser_core.py | 102 +++++++++++++++++-- rfg_adc_plotter/io/sweep_reader.py | 12 +++ rfg_adc_plotter/state/runtime_state.py | 1 + rfg_adc_plotter/types.py | 5 +- tests/test_processing.py | 24 +++++ tests/test_sweep_parser_core.py | 84 +++++++++++++++- tests/test_sweep_reader.py | 31 ++++++ 10 files changed, 371 insertions(+), 27 deletions(-) diff --git a/README.md b/README.md index c1a0a3f..439a166 100644 --- a/README.md +++ b/README.md @@ -109,8 +109,13 @@ Legacy binary: .venv/bin/python -m rfg_adc_plotter.main /dev/ttyACM0 --bin ``` -`--bin` также понимает `tty`-поток CH1/CH2 из `kamil_adc` (`tty:/tmp/ttyADC_data`) в 8-байтном формате `0x000A,step,ch1_i16,ch2_i16`. -В этом режиме сырая кривая строится как `ch1^2 + ch2^2`, а FFT рассчитывается от комплексного сигнала `ch1 + i*ch2`. +`--bin` понимает mixed 8-байтный поток: +- `0x000A,step,ch1_i16,ch2_i16` для CH1/CH2 из `kamil_adc` (`tty:/tmp/ttyADC_data`) +- `0x001A,step,data_i16,0x0000` для логарифмического детектора + +Для `0x000A` сырая кривая строится как `ch1^2 + ch2^2`, а FFT рассчитывается от комплексного сигнала `ch1 + i*ch2`. +Для `0x001A` signed `data_i16` сначала переводится в В, затем raw отображается как `V`, а FFT рассчитывается от `exp(V)`. +Параметр `--tty-range-v` применяется к обоим типам `--bin`-данных. Logscale binary с парой `int32`: diff --git a/rfg_adc_plotter/cli.py b/rfg_adc_plotter/cli.py index 0964735..362ff74 100644 --- a/rfg_adc_plotter/cli.py +++ b/rfg_adc_plotter/cli.py @@ -73,9 +73,10 @@ def build_parser() -> argparse.ArgumentParser: help=( "8-байтный бинарный протокол: либо legacy старт " "0xFFFF,0xFFFF,0xFFFF,(CH<<8)|0x0A и точки step,uint32(hi16,lo16),0x000A, " - "либо tty CH1/CH2 поток из kamil_adc в формате 0x000A,step,ch1_i16,ch2_i16. " - "Для tty CH1/CH2: после парсинга int16 переводятся в В, " - "сырая кривая = ch1^2+ch2^2 (В^2), FFT вход = ch1+i*ch2 (В)" + "либо mixed поток 0x000A,step,ch1_i16,ch2_i16 и 0x001A,step,data_i16,0x0000. " + "Для 0x000A: после парсинга int16 переводятся в В, " + "сырая кривая = ch1^2+ch2^2 (В^2), FFT вход = ch1+i*ch2 (В). " + "Для 0x001A: code_i16 переводится в В, raw = V, FFT вход = exp(V)" ), ) parser.add_argument( @@ -84,7 +85,7 @@ def build_parser() -> argparse.ArgumentParser: default=5.0, help=( "Полный диапазон для пересчета tty int16 в напряжение ±V " - "(только для --bin CH1/CH2, по умолчанию 5.0)" + "(для --bin 0x000A CH1/CH2 и 0x001A log-detector, по умолчанию 5.0)" ), ) parser.add_argument( diff --git a/rfg_adc_plotter/gui/pyqtgraph_backend.py b/rfg_adc_plotter/gui/pyqtgraph_backend.py index 2e3480d..38027a3 100644 --- a/rfg_adc_plotter/gui/pyqtgraph_backend.py +++ b/rfg_adc_plotter/gui/pyqtgraph_backend.py @@ -65,6 +65,7 @@ TTY_CODE_SCALE_DENOM = 32767.0 TTY_RANGE_DEFAULT_V = 5.0 TTY_RANGE_MIN_V = 1e-6 TTY_RANGE_MAX_V = 1_000_000.0 +LOGDET_EXP_INPUT_LIMIT = 80.0 def sanitize_curve_data_for_display( @@ -515,6 +516,30 @@ def convert_tty_i16_to_voltage(codes: np.ndarray, range_v: float) -> np.ndarray: return np.clip(volt, -range_abs_v, range_abs_v).astype(np.float32, copy=False) +def build_logdet_voltage_fft_input( + codes: np.ndarray, + range_v: float, + *, + exp_input_limit: float = LOGDET_EXP_INPUT_LIMIT, +) -> Tuple[np.ndarray, np.ndarray]: + """Convert 1a00 log-detector codes to raw volts and a real FFT input ``exp(V)``.""" + volts = convert_tty_i16_to_voltage(codes, range_v) + if volts.size <= 0: + empty = np.zeros((0,), dtype=np.float32) + return empty, empty + + try: + limit_v = abs(float(exp_input_limit)) + except Exception: + limit_v = float(LOGDET_EXP_INPUT_LIMIT) + if (not np.isfinite(limit_v)) or limit_v <= 0.0: + limit_v = float(LOGDET_EXP_INPUT_LIMIT) + + clipped_v = np.clip(volts, -limit_v, limit_v).astype(np.float32, copy=False) + fft_input = np.exp(clipped_v).astype(np.float32, copy=False) + return volts, fft_input + + def decimate_curve_for_display( xs: Optional[np.ndarray], ys: Optional[np.ndarray], @@ -1211,6 +1236,50 @@ def run_pyqtgraph(args) -> None: path = "" return path or "fft_background.npy" + def get_signal_kind(info: Optional[SweepInfo] = None) -> Optional[str]: + payload = runtime.current_info if info is None else info + if not isinstance(payload, dict): + return None + signal_kind = payload.get("signal_kind") + if signal_kind in {"bin_iq", "bin_logdet"}: + return str(signal_kind) + return None + + def current_packet_is_complex() -> bool: + return bool(complex_sweep_mode) and get_signal_kind() != "bin_logdet" + + def refresh_signal_mode_labels() -> None: + signal_kind = get_signal_kind() + active_complex = current_packet_is_complex() + is_logdet = signal_kind == "bin_logdet" + is_bin_iq = signal_kind == "bin_iq" + + try: + if is_logdet: + p_line.setTitle("Лог-детектор (В)") + p_line.setLabel("left", "В") + p_fft.setTitle("FFT: exp(V)") + parsed_data_cb.setText("Сырые log-detector (В)") + elif is_bin_iq: + p_line.setTitle("Сырые CH1/CH2 (В) и CH1^2 + CH2^2 (В^2)") + p_line.setLabel("left", "CH1^2 + CH2^2, В^2") + p_fft.setTitle("FFT: CH1 + i*CH2") + parsed_data_cb.setText("Сырые CH1/CH2 (В)") + elif complex_sweep_mode: + p_line.setTitle("Сырые данные до FFT") + p_line.setLabel("left", "Y") + p_fft.setTitle("FFT: Re / Im / Abs") + parsed_data_cb.setText("Сырые Re/Im") + else: + p_line.setTitle("Сырые данные") + p_line.setLabel("left", "Y") + p_fft.setTitle("FFT") + parsed_data_cb.setText("данные после парсинга") + p_fft.setLabel("left", "Амплитуда" if active_complex else "дБ") + p_complex_calib.setVisible(bool(active_complex)) + except Exception: + pass + def rebuild_tty_voltage_curves_from_codes() -> bool: if (not bin_iq_power_mode) or runtime.full_current_aux_curves_codes is None: return False @@ -1230,8 +1299,28 @@ def run_pyqtgraph(args) -> None: ch_1_v_f64 = ch_1_v.astype(np.float64, copy=False) ch_2_v_f64 = ch_2_v.astype(np.float64, copy=False) runtime.full_current_sweep_raw = np.asarray((ch_1_v_f64 * ch_1_v_f64) + (ch_2_v_f64 * ch_2_v_f64), dtype=np.float32) + runtime.full_current_sweep_codes = None return True + def rebuild_logdet_voltage_curve_from_codes() -> bool: + if (not bin_iq_power_mode) or runtime.full_current_sweep_codes is None: + return False + code_arr = np.asarray(runtime.full_current_sweep_codes, dtype=np.float32).reshape(-1) + if code_arr.size <= 0: + return False + sweep_raw_v, fft_input = build_logdet_voltage_fft_input(code_arr, tty_range_v) + runtime.full_current_aux_curves = None + runtime.full_current_aux_curves_codes = None + runtime.full_current_sweep_raw = sweep_raw_v + runtime.full_current_fft_source = fft_input + return True + + def rebuild_bin_scaled_curves_from_codes() -> bool: + signal_kind = get_signal_kind() + if signal_kind == "bin_logdet": + return rebuild_logdet_voltage_curve_from_codes() + return rebuild_tty_voltage_curves_from_codes() + def reset_background_state(*, clear_profile: bool = True) -> None: runtime.background_buffer.reset() if clear_profile: @@ -1522,7 +1611,7 @@ def run_pyqtgraph(args) -> None: tty_range_spin.setValue(tty_range_v) finally: tty_range_change_in_progress = False - if rebuild_tty_voltage_curves_from_codes(): + if rebuild_bin_scaled_curves_from_codes(): reset_background_state(clear_profile=True) refresh_current_window(push_to_ring=True, reset_ring=True) set_status_note(f"tty диапазон: ±{tty_range_v:.6g} В") @@ -1790,6 +1879,7 @@ def run_pyqtgraph(args) -> None: set_fft_curve_visibility() set_fft_mode() set_fft_low_cut_percent() + refresh_signal_mode_labels() try: range_min_spin.valueChanged.connect(lambda _v: set_working_range()) @@ -2031,10 +2121,23 @@ def run_pyqtgraph(args) -> None: f"ui short sweep width:{int(sweep.size)} expected:{int(runtime.ring.width)}", ) base_freqs = np.linspace(SWEEP_FREQ_MIN_GHZ, SWEEP_FREQ_MAX_GHZ, sweep.size, dtype=np.float64) + runtime.current_info = info runtime.full_current_aux_curves = None runtime.full_current_aux_curves_codes = None + runtime.full_current_sweep_codes = None runtime.full_current_fft_source = None - if complex_sweep_mode and aux_curves is not None: + signal_kind = get_signal_kind(info) + if signal_kind == "bin_logdet": + calibrated = calibrate_freqs( + { + "F": base_freqs, + "I": sweep, + } + ) + runtime.full_current_freqs = np.asarray(calibrated["F"], dtype=np.float64) + runtime.full_current_sweep_codes = np.asarray(calibrated["I"], dtype=np.float32) + rebuild_logdet_voltage_curve_from_codes() + elif complex_sweep_mode and aux_curves is not None: try: aux_1, aux_2 = aux_curves calibrated_aux_1_payload = calibrate_freqs({"F": base_freqs, "I": aux_1}) @@ -2083,7 +2186,8 @@ def run_pyqtgraph(args) -> None: except Exception: runtime.full_current_aux_curves = None runtime.full_current_aux_curves_codes = None - runtime.current_info = info + runtime.full_current_sweep_codes = None + refresh_signal_mode_labels() refresh_current_window(push_to_ring=True) processed_frames += 1 last_packet_processed_at = time.time() @@ -2139,6 +2243,9 @@ def run_pyqtgraph(args) -> None: ) if redraw_needed: + refresh_signal_mode_labels() + active_signal_kind = get_signal_kind() + active_complex_mode = current_packet_is_complex() xs = resolve_curve_xs( runtime.current_sweep_raw.size if runtime.current_sweep_raw is not None @@ -2202,7 +2309,7 @@ def run_pyqtgraph(args) -> None: curve_norm.setData([], []) if fixed_ylim is None: - if bin_iq_power_mode: + if active_signal_kind == "bin_iq": y_series = [ runtime.current_sweep_raw, displayed_calib, @@ -2237,7 +2344,7 @@ def run_pyqtgraph(args) -> None: complex_calib_plot_signal: Optional[np.ndarray] = None if ( - complex_sweep_mode + active_complex_mode and complex_calib_enabled and runtime.current_fft_input is not None and np.iscomplexobj(runtime.current_fft_input) @@ -2274,7 +2381,7 @@ def run_pyqtgraph(args) -> None: or runtime.current_fft_mag.size != distance_axis.size or runtime.plot_dirty or ( - complex_sweep_mode + active_complex_mode and ( runtime.current_fft_complex is None or runtime.current_fft_complex.size != distance_axis.size @@ -2317,7 +2424,7 @@ def run_pyqtgraph(args) -> None: fft_vals_db = fft_mag_to_db(fft_mag_plot) ref_curve_for_range = None - if complex_sweep_mode: + if active_complex_mode: visible_abs, visible_real, visible_imag = resolve_visible_fft_curves( fft_complex_plot, fft_mag_plot, diff --git a/rfg_adc_plotter/io/sweep_parser_core.py b/rfg_adc_plotter/io/sweep_parser_core.py index 89bb71f..a36a3f9 100644 --- a/rfg_adc_plotter/io/sweep_parser_core.py +++ b/rfg_adc_plotter/io/sweep_parser_core.py @@ -10,7 +10,15 @@ from typing import List, Optional, Sequence, Set import numpy as np from rfg_adc_plotter.constants import DATA_INVERSION_THRESHOLD, LOG_BASE, LOG_EXP_LIMIT, LOG_POSTSCALER, LOG_SCALER -from rfg_adc_plotter.types import ParserEvent, PointEvent, StartEvent, SweepAuxCurves, SweepInfo, SweepPacket +from rfg_adc_plotter.types import ( + ParserEvent, + PointEvent, + SignalKind, + StartEvent, + SweepAuxCurves, + SweepInfo, + SweepPacket, +) def u32_to_i32(value: int) -> int: @@ -153,6 +161,7 @@ class LegacyBinaryParser: self._last_step: Optional[int] = None self._seen_points = False self._mode: Optional[str] = None + self._current_signal_kind: Optional[SignalKind] = None @staticmethod def _u16_at(buf: bytearray, offset: int) -> int: @@ -162,16 +171,22 @@ class LegacyBinaryParser: self._mode = "legacy" self._last_step = None self._seen_points = False + self._current_signal_kind = None events.append(StartEvent(ch=int(ch))) - def _emit_tty_start(self, events: List[ParserEvent]) -> None: - self._mode = "tty" + def _emit_bin_start(self, events: List[ParserEvent], signal_kind: SignalKind) -> None: + self._mode = "bin" self._last_step = None self._seen_points = False - events.append(StartEvent(ch=0)) + self._current_signal_kind = signal_kind + events.append(StartEvent(ch=0, signal_kind=signal_kind)) + + def _emit_tty_start(self, events: List[ParserEvent]) -> None: + self._emit_bin_start(events, signal_kind="bin_iq") def _emit_legacy_point(self, events: List[ParserEvent], step: int, value_word_hi: int, value_word_lo: int, ch: int) -> None: self._mode = "legacy" + self._current_signal_kind = None if self._seen_points and self._last_step is not None and step <= self._last_step: events.append(StartEvent(ch=int(ch))) self._seen_points = True @@ -179,12 +194,23 @@ class LegacyBinaryParser: value = u32_to_i32((int(value_word_hi) << 16) | int(value_word_lo)) events.append(PointEvent(ch=int(ch), x=int(step), y=float(value))) - def _emit_tty_point(self, events: List[ParserEvent], step: int, ch_1_word: int, ch_2_word: int) -> None: - self._mode = "tty" + def _prepare_bin_point(self, events: List[ParserEvent], step: int, signal_kind: SignalKind) -> None: + self._mode = "bin" + if self._current_signal_kind != signal_kind: + if self._seen_points: + events.append(StartEvent(ch=0, signal_kind=signal_kind)) + self._last_step = None + self._seen_points = False + self._current_signal_kind = signal_kind if self._seen_points and self._last_step is not None and step <= self._last_step: - events.append(StartEvent(ch=0)) + events.append(StartEvent(ch=0, signal_kind=signal_kind)) + self._last_step = None + self._seen_points = False self._seen_points = True self._last_step = int(step) + + def _emit_tty_point(self, events: List[ParserEvent], step: int, ch_1_word: int, ch_2_word: int) -> None: + self._prepare_bin_point(events, step=int(step), signal_kind="bin_iq") ch_1 = u16_to_i16(int(ch_1_word)) ch_2 = u16_to_i16(int(ch_2_word)) events.append( @@ -193,6 +219,19 @@ class LegacyBinaryParser: x=int(step), y=tty_ch_pair_to_sweep(ch_1, ch_2), aux=(float(ch_1), float(ch_2)), + signal_kind="bin_iq", + ) + ) + + def _emit_logdet_point(self, events: List[ParserEvent], step: int, value_word: int) -> None: + self._prepare_bin_point(events, step=int(step), signal_kind="bin_logdet") + value = u16_to_i16(int(value_word)) + events.append( + PointEvent( + ch=0, + x=int(step), + y=float(value), + signal_kind="bin_logdet", ) ) @@ -210,6 +249,7 @@ class LegacyBinaryParser: is_tty_start = (w0 == 0x000A and w1 == 0xFFFF and w2 == 0xFFFF and w3 == 0xFFFF) is_legacy_point = (self._buf[6] == 0x0A and w0 != 0xFFFF) is_tty_point = (w0 == 0x000A and w1 != 0xFFFF) + is_logdet_point = (w0 == 0x001A and w3 == 0x0000) if is_legacy_start: self._emit_legacy_start(events, ch=int(self._buf[7])) @@ -221,19 +261,44 @@ class LegacyBinaryParser: del self._buf[:8] continue + if is_logdet_point: + self._emit_logdet_point(events, step=int(w1), value_word=int(w2)) + del self._buf[:8] + continue + if self._mode == "legacy": if is_legacy_point: - self._emit_legacy_point(events, step=int(w0), value_word_hi=int(w1), value_word_lo=int(w2), ch=int(self._buf[7])) + self._emit_legacy_point( + events, + step=int(w0), + value_word_hi=int(w1), + value_word_lo=int(w2), + ch=int(self._buf[7]), + ) + del self._buf[:8] + continue + if is_tty_point and (not is_legacy_point): + self._emit_tty_point(events, step=int(w1), ch_1_word=int(w2), ch_2_word=int(w3)) del self._buf[:8] continue del self._buf[:1] continue - if self._mode == "tty": + if self._mode == "bin": if is_tty_point: self._emit_tty_point(events, step=int(w1), ch_1_word=int(w2), ch_2_word=int(w3)) del self._buf[:8] continue + if is_legacy_point and (not is_tty_point): + self._emit_legacy_point( + events, + step=int(w0), + value_word_hi=int(w1), + value_word_lo=int(w2), + ch=int(self._buf[7]), + ) + del self._buf[:8] + continue del self._buf[:1] continue @@ -245,7 +310,13 @@ class LegacyBinaryParser: continue if is_legacy_point and (not is_tty_point): - self._emit_legacy_point(events, step=int(w0), value_word_hi=int(w1), value_word_lo=int(w2), ch=int(self._buf[7])) + self._emit_legacy_point( + events, + step=int(w0), + value_word_hi=int(w1), + value_word_lo=int(w2), + ch=int(self._buf[7]), + ) del self._buf[:8] continue @@ -454,6 +525,7 @@ class SweepAssembler: self._aux_1: list[float] = [] self._aux_2: list[float] = [] self._cur_channel: Optional[int] = None + self._cur_signal_kind: Optional[SignalKind] = None self._cur_channels: set[int] = set() def _reset_current(self) -> None: @@ -462,6 +534,7 @@ class SweepAssembler: self._aux_1.clear() self._aux_2.clear() self._cur_channel = None + self._cur_signal_kind = None self._cur_channels.clear() def _scatter(self, xs: Sequence[int], values: Sequence[float], width: int) -> np.ndarray: @@ -500,9 +573,11 @@ class SweepAssembler: self._reset_current() if event.ch is not None: self._cur_channel = int(event.ch) + self._cur_signal_kind = event.signal_kind return packet point_ch = int(event.ch) + point_signal_kind = event.signal_kind packet: Optional[SweepPacket] = None if self._cur_channel is None: self._cur_channel = point_ch @@ -513,6 +588,12 @@ class SweepAssembler: packet = self.finalize_current() self._reset_current() self._cur_channel = point_ch + if self._cur_signal_kind != point_signal_kind: + if self._xs: + packet = self.finalize_current() + self._reset_current() + self._cur_channel = point_ch + self._cur_signal_kind = point_signal_kind self._cur_channels.add(point_ch) self._xs.append(int(event.x)) @@ -581,6 +662,7 @@ class SweepAssembler: "sweep": self._sweep_idx, "ch": ch_primary, "chs": ch_list, + "signal_kind": self._cur_signal_kind, "n_valid": n_valid, "min": vmin, "max": vmax, diff --git a/rfg_adc_plotter/io/sweep_reader.py b/rfg_adc_plotter/io/sweep_reader.py index a26171a..f8e2c11 100644 --- a/rfg_adc_plotter/io/sweep_reader.py +++ b/rfg_adc_plotter/io/sweep_reader.py @@ -43,13 +43,17 @@ def _looks_like_legacy_8byte_stream(data: bytes) -> bool: min_matches = max(_LEGACY_STREAM_MIN_RECORDS, int(blocks * _LEGACY_STREAM_MIN_MATCH_RATIO)) matched_steps_legacy: list[int] = [] matched_steps_tty: list[int] = [] + matched_steps_logdet: list[int] = [] for block_idx in range(blocks): base = offset + (block_idx * 8) if (_u16le_at(buf, base + 6) & 0x00FF) != 0x000A: w0 = _u16le_at(buf, base) w1 = _u16le_at(buf, base + 2) + w3 = _u16le_at(buf, base + 6) if w0 == 0x000A and w1 != 0xFFFF: matched_steps_tty.append(w1) + elif w0 == 0x001A and w3 == 0x0000: + matched_steps_logdet.append(w1) continue matched_steps_legacy.append(_u16le_at(buf, base)) @@ -70,6 +74,14 @@ def _looks_like_legacy_8byte_stream(data: bytes) -> bool: if monotonic_or_reset >= max(4, len(matched_steps_tty) - 4): return True + if len(matched_steps_logdet) >= tty_min_matches: + monotonic_or_reset = 0 + for prev_step, next_step in zip(matched_steps_logdet, matched_steps_logdet[1:]): + if next_step == (prev_step + 1) or next_step <= 2: + monotonic_or_reset += 1 + if monotonic_or_reset >= max(4, len(matched_steps_logdet) - 4): + return True + return False diff --git a/rfg_adc_plotter/state/runtime_state.py b/rfg_adc_plotter/state/runtime_state.py index 84de51d..7731739 100644 --- a/rfg_adc_plotter/state/runtime_state.py +++ b/rfg_adc_plotter/state/runtime_state.py @@ -20,6 +20,7 @@ class RuntimeState: range_max_ghz: float = 0.0 full_current_freqs: Optional[np.ndarray] = None full_current_sweep_raw: Optional[np.ndarray] = None + full_current_sweep_codes: Optional[np.ndarray] = None full_current_fft_source: Optional[np.ndarray] = None full_current_aux_curves: SweepAuxCurves = None full_current_aux_curves_codes: SweepAuxCurves = None diff --git a/rfg_adc_plotter/types.py b/rfg_adc_plotter/types.py index 660b5d5..5bcc77c 100644 --- a/rfg_adc_plotter/types.py +++ b/rfg_adc_plotter/types.py @@ -3,12 +3,13 @@ from __future__ import annotations from dataclasses import dataclass -from typing import Any, Dict, Optional, Tuple, TypeAlias, Union +from typing import Any, Dict, Literal, Optional, Tuple, TypeAlias, Union import numpy as np Number = Union[int, float] +SignalKind = Literal["bin_iq", "bin_logdet"] SweepInfo = Dict[str, Any] SweepData = Dict[str, np.ndarray] SweepAuxCurves = Optional[Tuple[np.ndarray, np.ndarray]] @@ -18,6 +19,7 @@ SweepPacket = Tuple[np.ndarray, SweepInfo, SweepAuxCurves] @dataclass(frozen=True) class StartEvent: ch: Optional[int] = None + signal_kind: Optional[SignalKind] = None @dataclass(frozen=True) @@ -26,6 +28,7 @@ class PointEvent: x: int y: float aux: Optional[Tuple[float, float]] = None + signal_kind: Optional[SignalKind] = None ParserEvent: TypeAlias = Union[StartEvent, PointEvent] diff --git a/tests/test_processing.py b/tests/test_processing.py index a3038dc..0491074 100644 --- a/tests/test_processing.py +++ b/tests/test_processing.py @@ -10,6 +10,7 @@ from rfg_adc_plotter.gui.pyqtgraph_backend import ( apply_distance_cut_to_axis, apply_working_range, apply_working_range_to_aux_curves, + build_logdet_voltage_fft_input, build_main_window_layout, coalesce_packets_for_ui, compute_background_subtracted_bscan_levels, @@ -74,6 +75,29 @@ class ProcessingTests(unittest.TestCase): self.assertTrue(np.all(volts >= -5.0)) self.assertTrue(np.all(volts <= 5.0)) + def test_build_logdet_voltage_fft_input_converts_codes_and_exponentiates(self): + codes = np.asarray([-32768.0, 0.0, 32767.0], dtype=np.float32) + volts, fft_input = build_logdet_voltage_fft_input(codes, 5.0) + + self.assertEqual(volts.shape, codes.shape) + self.assertEqual(fft_input.shape, codes.shape) + self.assertAlmostEqual(float(volts[0]), -5.0, places=6) + self.assertAlmostEqual(float(volts[1]), 0.0, places=6) + self.assertAlmostEqual(float(volts[2]), 5.0, places=6) + self.assertTrue(np.allclose(fft_input, np.exp(volts.astype(np.float32)))) + + def test_build_logdet_voltage_fft_input_clips_exp_argument_and_respects_range(self): + codes = np.asarray([32767.0], dtype=np.float32) + volts_5, fft_5 = build_logdet_voltage_fft_input(codes, 5.0, exp_input_limit=2.0) + volts_10, fft_10 = build_logdet_voltage_fft_input(codes, 10.0, exp_input_limit=2.0) + + self.assertAlmostEqual(float(volts_5[0]), 5.0, places=6) + self.assertAlmostEqual(float(volts_10[0]), 10.0, places=6) + self.assertAlmostEqual(float(fft_5[0]), float(np.exp(np.float32(2.0))), places=5) + self.assertAlmostEqual(float(fft_10[0]), float(np.exp(np.float32(2.0))), places=5) + self.assertTrue(np.isfinite(fft_5[0])) + self.assertTrue(np.isfinite(fft_10[0])) + def test_recalculate_calibration_preserves_requested_edges(self): coeffs = recalculate_calibration_c(np.asarray([0.0, 1.0, 0.025], dtype=np.float64), 3.3, 14.3) y0 = coeffs[0] + coeffs[1] * 3.3 + coeffs[2] * (3.3 ** 2) diff --git a/tests/test_sweep_parser_core.py b/tests/test_sweep_parser_core.py index fb50cc7..6a920fe 100644 --- a/tests/test_sweep_parser_core.py +++ b/tests/test_sweep_parser_core.py @@ -87,6 +87,17 @@ def _pack_tty_point(step: int, ch1: int, ch2: int) -> bytes: ) +def _pack_logdet_point(step: int, value: int) -> bytes: + return b"".join( + [ + _u16le(0x001A), + _u16le(step), + _u16le(value), + _u16le(0x0000), + ] + ) + + class SweepParserCoreTests(unittest.TestCase): def test_ascii_parser_emits_start_and_points(self): parser = AsciiSweepParser() @@ -148,10 +159,12 @@ class SweepParserCoreTests(unittest.TestCase): self.assertEqual(events[1].x, 1) self.assertEqual(events[1].y, 18100.0) self.assertEqual(events[1].aux, (100.0, 90.0)) + self.assertEqual(events[1].signal_kind, "bin_iq") self.assertIsInstance(events[2], PointEvent) self.assertEqual(events[2].x, 2) self.assertEqual(events[2].y, 23425.0) self.assertEqual(events[2].aux, (120.0, 95.0)) + self.assertEqual(events[2].signal_kind, "bin_iq") def test_legacy_binary_parser_detects_new_tty_sweep_on_step_reset(self): parser = LegacyBinaryParser() @@ -174,6 +187,7 @@ class SweepParserCoreTests(unittest.TestCase): self.assertIsInstance(events[4], PointEvent) self.assertEqual(events[4].x, 1) self.assertEqual(events[4].aux, (120.0, 80.0)) + self.assertEqual(events[4].signal_kind, "bin_iq") def test_legacy_binary_parser_tty_mode_does_not_flip_to_legacy_on_ch2_low_byte_0x0a(self): parser = LegacyBinaryParser() @@ -203,6 +217,53 @@ class SweepParserCoreTests(unittest.TestCase): self.assertEqual(events[2].aux, (120.0, 1040.0)) self.assertEqual(events[2].y, 1096000.0) + def test_legacy_binary_parser_accepts_logdet_stream(self): + parser = LegacyBinaryParser() + stream = b"".join( + [ + _pack_logdet_point(1, 0x0F77), + _pack_logdet_point(2, 0xF234), + ] + ) + + events = parser.feed(stream) + + self.assertEqual(len(events), 2) + self.assertIsInstance(events[0], PointEvent) + self.assertEqual(events[0].x, 1) + self.assertEqual(events[0].y, 3959.0) + self.assertIsNone(events[0].aux) + self.assertEqual(events[0].signal_kind, "bin_logdet") + self.assertIsInstance(events[1], PointEvent) + self.assertEqual(events[1].x, 2) + self.assertEqual(events[1].y, -3532.0) + self.assertEqual(events[1].signal_kind, "bin_logdet") + + def test_legacy_binary_parser_splits_packet_on_bin_signal_kind_change(self): + parser = LegacyBinaryParser() + stream = b"".join( + [ + _pack_tty_start(), + _pack_tty_point(1, 100, 90), + _pack_tty_point(2, 110, 95), + _pack_logdet_point(3, 0x0F77), + ] + ) + + events = parser.feed(stream) + + self.assertIsInstance(events[0], StartEvent) + self.assertEqual(events[0].signal_kind, "bin_iq") + self.assertIsInstance(events[1], PointEvent) + self.assertEqual(events[1].signal_kind, "bin_iq") + self.assertIsInstance(events[2], PointEvent) + self.assertEqual(events[2].signal_kind, "bin_iq") + self.assertIsInstance(events[3], StartEvent) + self.assertEqual(events[3].signal_kind, "bin_logdet") + self.assertIsInstance(events[4], PointEvent) + self.assertEqual(events[4].x, 3) + self.assertEqual(events[4].signal_kind, "bin_logdet") + def test_complex_ascii_parser_detects_new_sweep_on_step_reset(self): parser = ComplexAsciiSweepParser() events = parser.feed(b"0 3 4\n1 5 12\n0 8 15\n") @@ -305,12 +366,13 @@ class SweepParserCoreTests(unittest.TestCase): def test_sweep_assembler_builds_aux_curves_without_inversion(self): assembler = SweepAssembler(fancy=False, apply_inversion=False) - self.assertIsNone(assembler.consume(StartEvent(ch=1))) - assembler.consume(PointEvent(ch=1, x=1, y=10.0, aux=(100.0, 90.0))) - assembler.consume(PointEvent(ch=1, x=2, y=20.0, aux=(110.0, 95.0))) + self.assertIsNone(assembler.consume(StartEvent(ch=1, signal_kind="bin_iq"))) + assembler.consume(PointEvent(ch=1, x=1, y=10.0, aux=(100.0, 90.0), signal_kind="bin_iq")) + assembler.consume(PointEvent(ch=1, x=2, y=20.0, aux=(110.0, 95.0), signal_kind="bin_iq")) sweep, info, aux = assembler.finalize_current() self.assertEqual(sweep.shape[0], 3) self.assertEqual(info["ch"], 1) + self.assertEqual(info["signal_kind"], "bin_iq") self.assertIsNotNone(aux) self.assertEqual(aux[0][1], 100.0) self.assertEqual(aux[1][2], 95.0) @@ -333,6 +395,22 @@ class SweepParserCoreTests(unittest.TestCase): self.assertEqual(info_2["chs"], [2]) self.assertAlmostEqual(float(sweep_2[1]), 20.0, places=6) + def test_sweep_assembler_splits_packet_on_signal_kind_switch(self): + assembler = SweepAssembler(fancy=False, apply_inversion=False) + self.assertIsNone(assembler.consume(PointEvent(ch=0, x=1, y=10.0, signal_kind="bin_iq"))) + packet = assembler.consume(PointEvent(ch=0, x=1, y=20.0, signal_kind="bin_logdet")) + self.assertIsNotNone(packet) + + sweep_1, info_1, aux_1 = packet + self.assertIsNone(aux_1) + self.assertEqual(info_1["signal_kind"], "bin_iq") + self.assertAlmostEqual(float(sweep_1[1]), 10.0, places=6) + + sweep_2, info_2, aux_2 = assembler.finalize_current() + self.assertIsNone(aux_2) + self.assertEqual(info_2["signal_kind"], "bin_logdet") + self.assertAlmostEqual(float(sweep_2[1]), 20.0, places=6) + if __name__ == "__main__": unittest.main() diff --git a/tests/test_sweep_reader.py b/tests/test_sweep_reader.py index a3ecfc8..7e1eaea 100644 --- a/tests/test_sweep_reader.py +++ b/tests/test_sweep_reader.py @@ -66,6 +66,17 @@ def _pack_tty_point(step: int, ch1: int, ch2: int) -> bytes: ) +def _pack_logdet_point(step: int, value: int) -> bytes: + return b"".join( + [ + _u16le(0x001A), + _u16le(step), + _u16le(value), + _u16le(0x0000), + ] + ) + + def _chunk_bytes(data: bytes, size: int = 4096) -> list[bytes]: return [data[idx : idx + size] for idx in range(0, len(data), size)] @@ -178,6 +189,26 @@ class SweepReaderTests(unittest.TestCase): reader.join(timeout=1.0) stack.close() + def test_parser_16_bit_x2_falls_back_to_logdet_1a00_stream(self): + payload = bytearray() + while len(payload) < (_PARSER_16_BIT_X2_PROBE_BYTES + 24): + payload += _pack_logdet_point(1, 0x0F77) + payload += _pack_logdet_point(2, 0x0FCB) + payload += _pack_logdet_point(1, 0x0F88) + + stack, reader, queue, stop_event, stderr = self._start_reader(bytes(payload), parser_16_bit_x2=True) + try: + sweep, info, aux = queue.get(timeout=2.0) + self.assertEqual(info["signal_kind"], "bin_logdet") + self.assertIsNone(aux) + self.assertGreaterEqual(sweep.shape[0], 3) + self.assertAlmostEqual(float(sweep[1]), 3959.0, places=6) + self.assertIn("fallback -> legacy", stderr.getvalue()) + finally: + stop_event.set() + reader.join(timeout=1.0) + stack.close() + def test_parser_16_bit_x2_probe_inconclusive_logs_hint(self): payload = b"\x00" * (_PARSER_16_BIT_X2_PROBE_BYTES + 128)