new complex for --bin

This commit is contained in:
awe
2026-04-10 20:20:16 +03:00
parent eee1039099
commit fac0add45d
6 changed files with 36 additions and 15 deletions

View File

@ -110,6 +110,7 @@ Legacy binary:
```
`--bin` также понимает `tty`-поток CH1/CH2 из `kamil_adc` (`tty:/tmp/ttyADC_data`) в 8-байтном формате `0x000A,step,ch1_i16,ch2_i16`.
В этом режиме сырая кривая строится как `ch1^2 + ch2^2`, а FFT рассчитывается от комплексного сигнала `ch1 + i*ch2`.
Logscale binary с парой `int32`:

View File

@ -73,7 +73,8 @@ def build_parser() -> argparse.ArgumentParser:
help=(
"8-байтный бинарный протокол: либо legacy старт "
"0xFFFF,0xFFFF,0xFFFF,(CH<<8)|0x0A и точки step,uint32(hi16,lo16),0x000A, "
"либо tty CH1/CH2 поток из kamil_adc в формате 0x000A,step,ch1_i16,ch2_i16"
"либо tty CH1/CH2 поток из kamil_adc в формате 0x000A,step,ch1_i16,ch2_i16. "
"Для tty CH1/CH2: сырая кривая = ch1^2+ch2^2, FFT вход = ch1+i*ch2"
),
)
parser.add_argument(

View File

@ -528,12 +528,15 @@ def run_pyqtgraph(args) -> None:
"""Start the PyQtGraph GUI."""
peak_calibrate_mode = bool(getattr(args, "calibrate", False))
peak_search_enabled = bool(getattr(args, "peak_search", False))
bin_mode = bool(getattr(args, "bin_mode", False))
complex_ascii_mode = bool(getattr(args, "parser_complex_ascii", False))
complex_sweep_mode = bool(
complex_ascii_mode
bin_mode
or complex_ascii_mode
or getattr(args, "parser_16_bit_x2", False)
or getattr(args, "parser_test", False)
)
bin_iq_power_mode = bool(bin_mode)
if not sys.platform.startswith("win"):
display_name = os.environ.get("DISPLAY") or os.environ.get("WAYLAND_DISPLAY")
if not display_name:
@ -556,7 +559,7 @@ def run_pyqtgraph(args) -> None:
queue,
stop_event,
fancy=bool(args.fancy),
bin_mode=bool(args.bin_mode),
bin_mode=bin_mode,
logscale=bool(args.logscale),
parser_16_bit_x2=bool(args.parser_16_bit_x2),
parser_test=bool(args.parser_test),
@ -657,8 +660,12 @@ def run_pyqtgraph(args) -> None:
p_fft.setLabel("left", "Амплитуда" if complex_sweep_mode else "дБ")
if complex_sweep_mode:
try:
p_fft.setTitle("FFT: Re / Im / Abs")
p_line.setTitle("Сырые данные до FFT")
if bin_iq_power_mode:
p_fft.setTitle("FFT: CH1 + i*CH2")
p_line.setTitle("Сырые CH1/CH2 и CH1^2 + CH2^2")
else:
p_fft.setTitle("FFT: Re / Im / Abs")
p_line.setTitle("Сырые данные до FFT")
except Exception:
pass
@ -759,7 +766,9 @@ def run_pyqtgraph(args) -> None:
parsed_data_cb = QtWidgets.QCheckBox("данные после парсинга")
if complex_sweep_mode:
try:
parsed_data_cb.setText("Сырые Re/Im")
parsed_data_cb.setText("Сырые CH1/CH2" if bin_iq_power_mode else "Сырые Re/Im")
if bin_iq_power_mode:
parsed_data_cb.setChecked(True)
except Exception:
pass
fft_curve_group = QtWidgets.QGroupBox("FFT кривые")
@ -1568,7 +1577,15 @@ def run_pyqtgraph(args) -> None:
runtime.full_current_fft_source = (
calibrated_aux_1.astype(np.complex64) + (1j * calibrated_aux_2.astype(np.complex64))
)
runtime.full_current_sweep_raw = np.abs(runtime.full_current_fft_source).astype(np.float32)
if bin_iq_power_mode:
aux_1_f64 = calibrated_aux_1.astype(np.float64, copy=False)
aux_2_f64 = calibrated_aux_2.astype(np.float64, copy=False)
runtime.full_current_sweep_raw = np.asarray(
(aux_1_f64 * aux_1_f64) + (aux_2_f64 * aux_2_f64),
dtype=np.float32,
)
else:
runtime.full_current_sweep_raw = np.abs(runtime.full_current_fft_source).astype(np.float32)
except Exception:
runtime.full_current_aux_curves = None
runtime.full_current_fft_source = None

View File

@ -33,8 +33,10 @@ def log_pair_to_sweep(avg_1: int, avg_2: int) -> float:
def tty_ch_pair_to_sweep(ch_1: int, ch_2: int) -> float:
"""Reduce a raw CH1/CH2 TTY point to a single sweep value."""
return float(abs(int(ch_1) - int(ch_2)))
"""Reduce a raw CH1/CH2 TTY point to power-like scalar ``ch1^2 + ch2^2``."""
ch_1_i = int(ch_1)
ch_2_i = int(ch_2)
return float((ch_1_i * ch_1_i) + (ch_2_i * ch_2_i))
class AsciiSweepParser:

View File

@ -146,11 +146,11 @@ class SweepParserCoreTests(unittest.TestCase):
self.assertEqual(events[0].ch, 0)
self.assertIsInstance(events[1], PointEvent)
self.assertEqual(events[1].x, 1)
self.assertEqual(events[1].y, 10.0)
self.assertEqual(events[1].y, 18100.0)
self.assertEqual(events[1].aux, (100.0, 90.0))
self.assertIsInstance(events[2], PointEvent)
self.assertEqual(events[2].x, 2)
self.assertEqual(events[2].y, 25.0)
self.assertEqual(events[2].y, 23425.0)
self.assertEqual(events[2].aux, (120.0, 95.0))
def test_legacy_binary_parser_detects_new_tty_sweep_on_step_reset(self):
@ -195,13 +195,13 @@ class SweepParserCoreTests(unittest.TestCase):
self.assertEqual(events[1].ch, 0)
self.assertEqual(events[1].x, 1)
self.assertEqual(events[1].aux, (100.0, 1034.0))
self.assertEqual(events[1].y, 934.0)
self.assertEqual(events[1].y, 1079156.0)
self.assertIsInstance(events[2], PointEvent)
self.assertEqual(events[2].ch, 0)
self.assertEqual(events[2].x, 2)
self.assertEqual(events[2].aux, (120.0, 1040.0))
self.assertEqual(events[2].y, 920.0)
self.assertEqual(events[2].y, 1096000.0)
def test_complex_ascii_parser_detects_new_sweep_on_step_reset(self):
parser = ComplexAsciiSweepParser()

View File

@ -147,8 +147,8 @@ class SweepReaderTests(unittest.TestCase):
self.assertEqual(info["ch"], 0)
self.assertIsNotNone(aux)
self.assertGreaterEqual(sweep.shape[0], 3)
self.assertAlmostEqual(float(sweep[1]), 10.0, places=6)
self.assertAlmostEqual(float(sweep[2]), 25.0, places=6)
self.assertAlmostEqual(float(sweep[1]), 18100.0, places=6)
self.assertAlmostEqual(float(sweep[2]), 23425.0, places=6)
self.assertIn("fallback -> legacy", stderr.getvalue())
finally:
stop_event.set()