From 08dc6b3a1f6b88fe040495d044b1800a851ce440 Mon Sep 17 00:00:00 2001 From: awe Date: Fri, 29 May 2026 17:15:32 +0300 Subject: [PATCH] ch1 ch2 new --- rfg_adc_plotter/gui/pyqtgraph_backend.py | 59 ++++++++ rfg_adc_plotter/io/sweep_parser_core.py | 181 +++++++++++++++++++---- rfg_adc_plotter/state/runtime_state.py | 4 + rfg_adc_plotter/types.py | 2 + tests/test_sweep_parser_core.py | 109 ++++++++++++++ 5 files changed, 328 insertions(+), 27 deletions(-) diff --git a/rfg_adc_plotter/gui/pyqtgraph_backend.py b/rfg_adc_plotter/gui/pyqtgraph_backend.py index 7c9fa64..423e10d 100644 --- a/rfg_adc_plotter/gui/pyqtgraph_backend.py +++ b/rfg_adc_plotter/gui/pyqtgraph_backend.py @@ -74,6 +74,7 @@ TTY_RANGE_MIN_V = 1e-6 TTY_RANGE_MAX_V = 1_000_000.0 LOGDET_EXP_INPUT_LIMIT = 80.0 DO1_TAGGED_INFO_KEY = "_do1_tagged_payload" +SECONDARY_INFO_KEY = "_secondary_payload" DISPLAY_DISTANCE_ZERO_M = 9.0 @@ -944,6 +945,8 @@ def run_pyqtgraph(args) -> None: curve = p_line.plot(pen=pg.mkPen((80, 120, 255), width=1)) curve_raw_low = p_line.plot(pen=pg.mkPen((255, 90, 90), width=1)) curve_raw_high = p_line.plot(pen=pg.mkPen((90, 220, 255), width=1)) + curve_secondary_ch1 = p_line.plot(pen=pg.mkPen((0, 200, 100), width=1)) + curve_secondary_ch2 = p_line.plot(pen=pg.mkPen((200, 100, 200), width=1)) p_line_aux_vb = None if bin_iq_power_mode: p_line_aux_vb = pg.ViewBox() @@ -1889,6 +1892,8 @@ def run_pyqtgraph(args) -> None: runtime.current_do1_tagged_raw_high = None runtime.current_do1_tagged_aux_low = None runtime.current_do1_tagged_aux_high = None + runtime.current_secondary_ch1 = None + runtime.current_secondary_ch2 = None runtime.current_sweep_norm = None runtime.current_fft_mag = None runtime.current_fft_db = None @@ -1952,6 +1957,27 @@ def run_pyqtgraph(args) -> None: runtime.current_do1_tagged_aux_low = None runtime.current_do1_tagged_aux_high = None + if runtime.full_secondary_ch1 is not None: + runtime.current_secondary_ch1 = apply_working_range_to_signal( + runtime.full_current_freqs, + runtime.full_current_sweep_raw, + runtime.full_secondary_ch1, + runtime.range_min_ghz, + runtime.range_max_ghz, + ) + else: + runtime.current_secondary_ch1 = None + if runtime.full_secondary_ch2 is not None: + runtime.current_secondary_ch2 = apply_working_range_to_signal( + runtime.full_current_freqs, + runtime.full_current_sweep_raw, + runtime.full_secondary_ch2, + runtime.range_min_ghz, + runtime.range_max_ghz, + ) + else: + runtime.current_secondary_ch2 = None + if runtime.current_sweep_raw.size == 0: if push_to_ring: reset_ring_buffers() @@ -1965,6 +1991,8 @@ def run_pyqtgraph(args) -> None: runtime.current_do1_tagged_raw_high = None runtime.current_do1_tagged_aux_low = None runtime.current_do1_tagged_aux_high = None + runtime.current_secondary_ch1 = None + runtime.current_secondary_ch2 = None runtime.current_sweep_norm = None runtime.current_fft_mag = None runtime.current_fft_db = None @@ -2722,6 +2750,8 @@ def run_pyqtgraph(args) -> None: runtime.full_do1_tagged_aux_high = None runtime.full_do1_tagged_aux_low_codes = None runtime.full_do1_tagged_aux_high_codes = None + runtime.full_secondary_ch1 = None + runtime.full_secondary_ch2 = None signal_kind = get_signal_kind(info) if signal_kind == "bin_iq_do1_tagged": calibrated = calibrate_freqs( @@ -2845,6 +2875,20 @@ def run_pyqtgraph(args) -> None: runtime.full_current_aux_curves = None runtime.full_current_aux_curves_codes = None runtime.full_current_sweep_codes = None + secondary_payload = info.get(SECONDARY_INFO_KEY) if isinstance(info, dict) else None + if isinstance(secondary_payload, dict): + sec_ch1 = secondary_payload.get("ch1") + sec_ch2 = secondary_payload.get("ch2") + if sec_ch1 is not None: + runtime.full_secondary_ch1 = np.asarray( + calibrate_freqs({"F": base_freqs, "I": sec_ch1})["I"], + dtype=np.float32, + ) + if sec_ch2 is not None: + runtime.full_secondary_ch2 = np.asarray( + calibrate_freqs({"F": base_freqs, "I": sec_ch2})["I"], + dtype=np.float32, + ) refresh_current_window(push_to_ring=True) processed_frames += 1 last_packet_processed_at = time.time() @@ -2999,6 +3043,21 @@ def run_pyqtgraph(args) -> None: clear_curve_if_needed("raw_low", curve_raw_low) clear_curve_if_needed("raw_high", curve_raw_high) + if runtime.current_secondary_ch1 is not None: + sec_width = min(xs.size, runtime.current_secondary_ch1.size) + sec_x1, sec_y1 = decimate_curve_for_display(xs[:sec_width], runtime.current_secondary_ch1[:sec_width]) + sec_x1, sec_y1 = sanitize_curve_data_for_display(sec_x1, sec_y1) + set_curve_data("secondary_ch1", curve_secondary_ch1, sec_x1, sec_y1, autoDownsample=False) + else: + clear_curve_if_needed("secondary_ch1", curve_secondary_ch1) + if runtime.current_secondary_ch2 is not None: + sec_width2 = min(xs.size, runtime.current_secondary_ch2.size) + sec_x2, sec_y2 = decimate_curve_for_display(xs[:sec_width2], runtime.current_secondary_ch2[:sec_width2]) + sec_x2, sec_y2 = sanitize_curve_data_for_display(sec_x2, sec_y2) + set_curve_data("secondary_ch2", curve_secondary_ch2, sec_x2, sec_y2, autoDownsample=False) + else: + clear_curve_if_needed("secondary_ch2", curve_secondary_ch2) + if active_do1_tagged: if displayed_tagged_aux_low is not None: aux_low_1, aux_low_2 = displayed_tagged_aux_low diff --git a/rfg_adc_plotter/io/sweep_parser_core.py b/rfg_adc_plotter/io/sweep_parser_core.py index 15364e9..28a195b 100644 --- a/rfg_adc_plotter/io/sweep_parser_core.py +++ b/rfg_adc_plotter/io/sweep_parser_core.py @@ -286,6 +286,28 @@ class LegacyBinaryParser: ) ) + def _emit_secondary_point( + self, + events: List[ParserEvent], + step: int, + ch_1_word: int, + ch_2_word: int, + ) -> None: + self._mode = "bin" + self._current_signal_kind = self._current_signal_kind or "bin_iq" + ch_1 = u16_to_i16(int(ch_1_word)) + ch_2 = u16_to_i16(int(ch_2_word)) + events.append( + PointEvent( + ch=0, + x=int(step), + y=0.0, + aux=(float(ch_1), float(ch_2)), + signal_kind="bin_iq", + is_secondary=True, + ) + ) + def _emit_logdet_point(self, events: List[ParserEvent], step: int, value_word: int) -> None: self._prepare_bin_point(events, step=int(step), signal_kind="bin_logdet") value = u16_to_i16(int(value_word)) @@ -310,8 +332,9 @@ class LegacyBinaryParser: w0 = words[:, 0] w1 = words[:, 1] is_tty_point = (w0 == 0x000A) & (w1 != 0xFFFF) + is_sec_point = (w0 == 0x00A8) & (w1 != 0xFFFF) is_legacy_point = (raw[:, 6] == 0x0A) & (w0 != 0xFFFF) - valid = is_tty_point + valid = is_tty_point | is_sec_point if require_not_legacy: valid = valid & (~is_legacy_point) if valid.size <= 0 or not bool(valid[0]): @@ -321,7 +344,15 @@ class LegacyBinaryParser: if valid_count <= 0: return False - steps = words[:valid_count, 1].astype(np.int64, copy=True) + primary_mask = is_tty_point[:valid_count] + secondary_mask = is_sec_point[:valid_count] + + primary_indices = np.nonzero(primary_mask)[0] + if primary_indices.size <= 0: + # No primary records in this block — cannot batch + return False + + primary_steps = words[:valid_count, 1][primary_mask].astype(np.int64, copy=True) if self._current_signal_kind != "bin_iq": if self._seen_points: events.append(StartEvent(ch=0, signal_kind="bin_iq")) @@ -330,44 +361,72 @@ class LegacyBinaryParser: self._current_signal_kind = "bin_iq" self._reset_tagged_steps() - if self._seen_points and self._last_step is not None and steps[0] <= int(self._last_step): + if self._seen_points and self._last_step is not None and primary_steps[0] <= int(self._last_step): events.append(StartEvent(ch=0, signal_kind="bin_iq")) self._last_step = None self._seen_points = False self._reset_tagged_steps() - reset_idx = np.nonzero(np.diff(steps) <= 0)[0] - take_count = int(reset_idx[0] + 1) if reset_idx.size > 0 else int(steps.size) + reset_idx = np.nonzero(np.diff(primary_steps) <= 0)[0] + if reset_idx.size > 0: + reset_primary_pos = int(reset_idx[0] + 1) + if reset_primary_pos < primary_indices.size: + take_count = int(primary_indices[reset_primary_pos]) + else: + take_count = valid_count + else: + take_count = valid_count if take_count <= 0: return False + primary_mask_take = is_tty_point[:take_count] + secondary_mask_take = is_sec_point[:take_count] batch_words = words[:take_count].copy() - xs = batch_words[:, 1].astype(np.int64, copy=False) - ch_1 = batch_words[:, 2].astype(np.uint16, copy=False).view(np.int16) - ch_2 = batch_words[:, 3].astype(np.uint16, copy=False).view(np.int16) del raw del words del w0 del w1 del self._buf[: take_count * 8] - ch_1_i64 = ch_1.astype(np.int64) - ch_2_i64 = ch_2.astype(np.int64) - ys = ((ch_1_i64 * ch_1_i64) + (ch_2_i64 * ch_2_i64)).astype(np.float32) - self._mode = "bin" - self._seen_points = True - self._last_step = int(xs[-1]) - self._current_signal_kind = "bin_iq" - self._reset_tagged_steps() - events.append( - BatchPointEvent( - ch=0, - xs=xs, - ys=ys, - aux=(ch_1.astype(np.float32), ch_2.astype(np.float32)), - signal_kind="bin_iq", + if np.any(primary_mask_take): + p_words = batch_words[primary_mask_take] + p_xs = p_words[:, 1].astype(np.int64, copy=False) + p_ch1 = p_words[:, 2].astype(np.uint16, copy=False).view(np.int16) + p_ch2 = p_words[:, 3].astype(np.uint16, copy=False).view(np.int16) + p_ch1_i64 = p_ch1.astype(np.int64) + p_ch2_i64 = p_ch2.astype(np.int64) + p_ys = ((p_ch1_i64 * p_ch1_i64) + (p_ch2_i64 * p_ch2_i64)).astype(np.float32) + self._mode = "bin" + self._seen_points = True + self._last_step = int(p_xs[-1]) + self._current_signal_kind = "bin_iq" + self._reset_tagged_steps() + events.append( + BatchPointEvent( + ch=0, + xs=p_xs, + ys=p_ys, + aux=(p_ch1.astype(np.float32), p_ch2.astype(np.float32)), + signal_kind="bin_iq", + ) ) - ) + + if np.any(secondary_mask_take): + s_words = batch_words[secondary_mask_take] + s_xs = s_words[:, 1].astype(np.int64, copy=False) + s_ch1 = s_words[:, 2].astype(np.uint16, copy=False).view(np.int16) + s_ch2 = s_words[:, 3].astype(np.uint16, copy=False).view(np.int16) + events.append( + BatchPointEvent( + ch=0, + xs=s_xs, + ys=np.zeros(s_xs.size, dtype=np.float32), + aux=(s_ch1.astype(np.float32), s_ch2.astype(np.float32)), + signal_kind="bin_iq", + is_secondary=True, + ) + ) + return True def feed(self, data: bytes) -> List[ParserEvent]: @@ -387,6 +446,7 @@ class LegacyBinaryParser: is_tty_tagged_low_point = (w0 == 0x00A3 and w1 != 0xFFFF) is_tty_tagged_high_point = (w0 == 0x00A4 and w1 != 0xFFFF) is_logdet_point = (w0 == 0x001A and w3 == 0x0000) + is_secondary_point = (w0 == 0x00A8 and w1 != 0xFFFF) if is_legacy_start: self._emit_legacy_start(events, ch=int(self._buf[7])) @@ -404,7 +464,7 @@ class LegacyBinaryParser: continue if self._mode == "legacy": - if is_tty_point and (not is_legacy_point) and self._try_emit_tty_batch(events, require_not_legacy=True): + if (is_tty_point or is_secondary_point) and (not is_legacy_point) and self._try_emit_tty_batch(events, require_not_legacy=True): continue if is_legacy_point: self._emit_legacy_point( @@ -440,11 +500,15 @@ class LegacyBinaryParser: ) del self._buf[:8] continue + if is_secondary_point and (not is_legacy_point): + self._emit_secondary_point(events, step=int(w1), ch_1_word=int(w2), ch_2_word=int(w3)) + del self._buf[:8] + continue del self._buf[:1] continue if self._mode == "bin": - if is_tty_point and self._try_emit_tty_batch(events, require_not_legacy=False): + if (is_tty_point or is_secondary_point) and self._try_emit_tty_batch(events, require_not_legacy=False): continue if is_tty_point: self._emit_tty_point(events, step=int(w1), ch_1_word=int(w2), ch_2_word=int(w3)) @@ -470,6 +534,10 @@ class LegacyBinaryParser: ) del self._buf[:8] continue + if is_secondary_point: + self._emit_secondary_point(events, step=int(w1), ch_1_word=int(w2), ch_2_word=int(w3)) + del self._buf[:8] + continue if is_legacy_point and (not is_tty_point): self._emit_legacy_point( events, @@ -485,9 +553,13 @@ class LegacyBinaryParser: # Mode is still unknown. Accept only unambiguous point shapes to avoid # jumping between tty and legacy interpretations on coincidental bytes. - if is_tty_point and (not is_legacy_point): + if (is_tty_point or is_secondary_point) and (not is_legacy_point): if self._try_emit_tty_batch(events, require_not_legacy=True): continue + if is_secondary_point: + self._emit_secondary_point(events, step=int(w1), ch_1_word=int(w2), ch_2_word=int(w3)) + del self._buf[:8] + continue self._emit_tty_point(events, step=int(w1), ch_1_word=int(w2), ch_2_word=int(w3)) del self._buf[:8] continue @@ -514,6 +586,11 @@ class LegacyBinaryParser: del self._buf[:8] continue + if is_secondary_point and (not is_legacy_point): + self._emit_secondary_point(events, step=int(w1), ch_1_word=int(w2), ch_2_word=int(w3)) + del self._buf[:8] + continue + if is_legacy_point and (not is_tty_point): self._emit_legacy_point( events, @@ -737,10 +814,18 @@ class SweepAssembler: self._tagged_high_ys: list[float] = [] self._tagged_high_aux_1: list[float] = [] self._tagged_high_aux_2: list[float] = [] + self._secondary_xs: list[int] = [] + self._secondary_aux_1: list[float] = [] + self._secondary_aux_2: list[float] = [] self._cur_channel: Optional[int] = None self._cur_signal_kind: Optional[SignalKind] = None self._cur_channels: set[int] = set() + def _reset_secondary_current(self) -> None: + self._secondary_xs.clear() + self._secondary_aux_1.clear() + self._secondary_aux_2.clear() + def _reset_tagged_current(self) -> None: self._tagged_low_xs.clear() self._tagged_low_ys.clear() @@ -757,6 +842,7 @@ class SweepAssembler: self._aux_1.clear() self._aux_2.clear() self._reset_tagged_current() + self._reset_secondary_current() self._cur_channel = None self._cur_signal_kind = None self._cur_channels.clear() @@ -870,6 +956,30 @@ class SweepAssembler: self._aux_2.extend(aux_2_arr[:aux_width].tolist()) return packet + def _consume_secondary_point(self, event: PointEvent) -> None: + self._secondary_xs.append(int(event.x)) + if event.aux is not None: + self._secondary_aux_1.append(float(event.aux[0])) + self._secondary_aux_2.append(float(event.aux[1])) + + def _consume_secondary_batch(self, event: BatchPointEvent) -> None: + xs_arr = np.asarray(event.xs, dtype=np.int64).reshape(-1) + width = xs_arr.size + if width <= 0: + return + self._secondary_xs.extend(xs_arr.tolist()) + if event.aux is not None: + try: + aux_1, aux_2 = event.aux + aux_1_arr = np.asarray(aux_1, dtype=np.float32).reshape(-1) + aux_2_arr = np.asarray(aux_2, dtype=np.float32).reshape(-1) + aux_width = min(width, aux_1_arr.size, aux_2_arr.size) + except Exception: + aux_width = 0 + if aux_width > 0: + self._secondary_aux_1.extend(aux_1_arr[:aux_width].tolist()) + self._secondary_aux_2.extend(aux_2_arr[:aux_width].tolist()) + def consume(self, event: ParserEvent) -> Optional[SweepPacket]: if isinstance(event, StartEvent): packet = self.finalize_current() @@ -879,8 +989,15 @@ class SweepAssembler: self._cur_signal_kind = event.signal_kind return packet if isinstance(event, BatchPointEvent): + if event.is_secondary: + self._consume_secondary_batch(event) + return None return self._consume_batch(event) + if isinstance(event, PointEvent) and event.is_secondary: + self._consume_secondary_point(event) + return None + point_ch = int(event.ch) point_signal_kind = event.signal_kind packet: Optional[SweepPacket] = None @@ -1015,4 +1132,14 @@ class SweepAssembler: } if do1_tagged_payload is not None: info["_do1_tagged_payload"] = do1_tagged_payload + if ( + self._secondary_xs + and self._secondary_aux_1 + and self._secondary_aux_2 + and len(self._secondary_aux_1) == len(self._secondary_xs) + ): + info["_secondary_payload"] = { + "ch1": self._scatter(self._secondary_xs, self._secondary_aux_1, target_width), + "ch2": self._scatter(self._secondary_xs, self._secondary_aux_2, target_width), + } return (sweep, info, aux_curves) diff --git a/rfg_adc_plotter/state/runtime_state.py b/rfg_adc_plotter/state/runtime_state.py index dc8f7af..d38d4f6 100644 --- a/rfg_adc_plotter/state/runtime_state.py +++ b/rfg_adc_plotter/state/runtime_state.py @@ -30,6 +30,8 @@ class RuntimeState: full_do1_tagged_aux_high: SweepAuxCurves = None full_do1_tagged_aux_low_codes: SweepAuxCurves = None full_do1_tagged_aux_high_codes: SweepAuxCurves = None + full_secondary_ch1: Optional[np.ndarray] = None + full_secondary_ch2: Optional[np.ndarray] = None current_freqs: Optional[np.ndarray] = None current_distances: Optional[np.ndarray] = None current_sweep_raw: Optional[np.ndarray] = None @@ -41,6 +43,8 @@ class RuntimeState: current_do1_tagged_raw_high: Optional[np.ndarray] = None current_do1_tagged_aux_low: SweepAuxCurves = None current_do1_tagged_aux_high: SweepAuxCurves = None + current_secondary_ch1: Optional[np.ndarray] = None + current_secondary_ch2: Optional[np.ndarray] = None current_sweep_norm: Optional[np.ndarray] = None current_fft_mag: Optional[np.ndarray] = None current_fft_db: Optional[np.ndarray] = None diff --git a/rfg_adc_plotter/types.py b/rfg_adc_plotter/types.py index 8e239d5..5328e86 100644 --- a/rfg_adc_plotter/types.py +++ b/rfg_adc_plotter/types.py @@ -31,6 +31,7 @@ class PointEvent: aux: Optional[Tuple[float, float]] = None signal_kind: Optional[SignalKind] = None do1_level: Optional[Do1Level] = None + is_secondary: bool = False @dataclass(frozen=True) @@ -41,6 +42,7 @@ class BatchPointEvent: aux: Optional[Tuple[np.ndarray, np.ndarray]] = None signal_kind: Optional[SignalKind] = None do1_level: Optional[Do1Level] = None + is_secondary: bool = False ParserEvent: TypeAlias = Union[StartEvent, PointEvent, BatchPointEvent] diff --git a/tests/test_sweep_parser_core.py b/tests/test_sweep_parser_core.py index 6d9c806..212c7d0 100644 --- a/tests/test_sweep_parser_core.py +++ b/tests/test_sweep_parser_core.py @@ -588,5 +588,114 @@ class SweepParserCoreTests(unittest.TestCase): self.assertAlmostEqual(float(sweep_2[1]), 20.0, places=6) + def test_legacy_binary_parser_accepts_secondary_0xa8_stream(self): + parser = LegacyBinaryParser() + stream = b"".join( + [ + _pack_tty_start(), + _pack_tty_point(1, 5, 0), + _pack_tty_tagged_point(0x00A8, 1, 0xFFCD, 0xFFDC), + _pack_tty_point(2, 0xFFF8, 1), + _pack_tty_tagged_point(0x00A8, 2, 0xFFCE, 0xFFE9), + ] + ) + events = parser.feed(stream) + + start_events = [e for e in events if isinstance(e, StartEvent)] + self.assertEqual(len(start_events), 1) + + point_events = [e for e in events if isinstance(e, PointEvent)] + primary = [e for e in point_events if not e.is_secondary] + secondary = [e for e in point_events if e.is_secondary] + + self.assertEqual(len(primary), 2) + self.assertEqual(len(secondary), 2) + self.assertEqual(secondary[0].x, 1) + self.assertEqual(secondary[0].aux, (-51.0, -36.0)) + self.assertTrue(secondary[0].is_secondary) + self.assertEqual(secondary[1].x, 2) + self.assertEqual(secondary[1].aux, (-50.0, -23.0)) + + def test_secondary_0xa8_does_not_trigger_sweep_reset(self): + parser = LegacyBinaryParser() + stream = b"".join( + [ + _pack_tty_start(), + _pack_tty_point(1, 5, 0), + _pack_tty_tagged_point(0x00A8, 1, 100, 200), + _pack_tty_point(2, 6, 0), + _pack_tty_tagged_point(0x00A8, 2, 110, 210), + ] + ) + events = parser.feed(stream) + start_events = [e for e in events if isinstance(e, StartEvent)] + self.assertEqual(len(start_events), 1) + + def test_legacy_binary_parser_batch_handles_interleaved_secondary(self): + parser = LegacyBinaryParser(batch_events=True) + stream = b"".join( + [ + _pack_tty_start(), + _pack_tty_point(1, 5, 0), + _pack_tty_tagged_point(0x00A8, 1, 100, 200), + _pack_tty_point(2, 6, 0), + _pack_tty_tagged_point(0x00A8, 2, 110, 210), + ] + ) + events = parser.feed(stream) + + batch_events = [e for e in events if isinstance(e, BatchPointEvent)] + primary_batches = [e for e in batch_events if not e.is_secondary] + secondary_batches = [e for e in batch_events if e.is_secondary] + + self.assertTrue(len(primary_batches) >= 1) + self.assertTrue(len(secondary_batches) >= 1) + + pb = primary_batches[0] + self.assertTrue(np.array_equal(pb.xs, np.array([1, 2], dtype=np.int64))) + self.assertFalse(pb.is_secondary) + + sb = secondary_batches[0] + self.assertTrue(np.array_equal(sb.xs, np.array([1, 2], dtype=np.int64))) + self.assertTrue(sb.is_secondary) + + def test_sweep_assembler_packages_secondary_payload(self): + assembler = SweepAssembler(fancy=False, apply_inversion=False) + assembler.consume(StartEvent(ch=0, signal_kind="bin_iq")) + assembler.consume(PointEvent(ch=0, x=1, y=25.0, aux=(5.0, 0.0), signal_kind="bin_iq")) + assembler.consume( + PointEvent(ch=0, x=1, y=0.0, aux=(-51.0, -36.0), signal_kind="bin_iq", is_secondary=True) + ) + assembler.consume(PointEvent(ch=0, x=2, y=65.0, aux=(-8.0, 1.0), signal_kind="bin_iq")) + assembler.consume( + PointEvent(ch=0, x=2, y=0.0, aux=(-50.0, -23.0), signal_kind="bin_iq", is_secondary=True) + ) + + sweep, info, aux = assembler.finalize_current() + + self.assertEqual(info["signal_kind"], "bin_iq") + self.assertAlmostEqual(float(sweep[1]), 25.0, places=6) + self.assertAlmostEqual(float(sweep[2]), 65.0, places=6) + + payload = info.get("_secondary_payload") + self.assertIsNotNone(payload) + self.assertIn("ch1", payload) + self.assertIn("ch2", payload) + self.assertAlmostEqual(float(payload["ch1"][1]), -51.0, places=6) + self.assertAlmostEqual(float(payload["ch2"][1]), -36.0, places=6) + self.assertAlmostEqual(float(payload["ch1"][2]), -50.0, places=6) + self.assertAlmostEqual(float(payload["ch2"][2]), -23.0, places=6) + + def test_sweep_assembler_secondary_absent_when_no_0xa8_data(self): + assembler = SweepAssembler(fancy=False, apply_inversion=False) + assembler.consume(StartEvent(ch=0, signal_kind="bin_iq")) + assembler.consume(PointEvent(ch=0, x=1, y=25.0, aux=(5.0, 0.0), signal_kind="bin_iq")) + assembler.consume(PointEvent(ch=0, x=2, y=65.0, aux=(-8.0, 1.0), signal_kind="bin_iq")) + + sweep, info, aux = assembler.finalize_current() + + self.assertNotIn("_secondary_payload", info) + + if __name__ == "__main__": unittest.main()