3 Commits

Author SHA1 Message Date
awe
26c3dd7ad5 something working new format 2026-03-05 17:56:27 +03:00
awe
1e05b1f3fd 3 freq diversion 2026-03-02 15:43:41 +03:00
awe
8cc21316e7 try normalization after grad 2026-03-02 13:25:12 +03:00
12 changed files with 336 additions and 74 deletions

BIN
().npy

Binary file not shown.

Binary file not shown.

View File

@ -146,10 +146,11 @@ def run_matplotlib(args):
ax_line.set_ylim(fixed_ylim)
# График спектра
fft_line_obj, = ax_fft.plot([], [], lw=1)
fft_line_obj, = ax_fft.plot([], [], lw=1, color="tab:blue", label="full band")
ax_fft.set_title("FFT", pad=1)
ax_fft.set_xlabel("Глубина, м")
ax_fft.set_ylabel("Амплитуда")
ax_fft.legend(loc="upper right", fontsize=8)
# Водопад сырых данных
img_obj = ax_img.imshow(
@ -435,6 +436,7 @@ def run_matplotlib(args):
ring.set_fft_complex_mode(str(label))
except Exception:
pass
fft_line_obj.set_data([], [])
_refresh_status_texts()
try:
fig.canvas.draw_idle()
@ -584,18 +586,20 @@ def run_matplotlib(args):
ax_line.autoscale_view(scalex=False, scaley=True)
ax_line.set_ylabel("Y")
# Профиль по глубине — используем уже вычисленный в ring IFFT.
if ring.last_fft_vals is not None and ring.fft_depth_axis_m is not None:
fft_vals = ring.last_fft_vals
xs_fft = ring.fft_depth_axis_m
n = min(fft_vals.size, xs_fft.size)
if n > 0:
fft_line_obj.set_data(xs_fft[:n], fft_vals[:n])
else:
axis_fft = ring.fft_depth_axis_m
vals_fft = ring.last_fft_vals
if axis_fft is None or vals_fft is None:
fft_line_obj.set_data([], [])
if n > 0 and np.isfinite(np.nanmin(fft_vals)) and np.isfinite(np.nanmax(fft_vals)):
ax_fft.set_xlim(0, float(xs_fft[n - 1]))
ax_fft.set_ylim(float(np.nanmin(fft_vals)), float(np.nanmax(fft_vals)))
else:
n_fft = min(int(axis_fft.size), int(vals_fft.size))
if n_fft <= 0:
fft_line_obj.set_data([], [])
else:
x_fft = axis_fft[:n_fft]
y_fft = vals_fft[:n_fft]
fft_line_obj.set_data(x_fft, y_fft)
ax_fft.set_xlim(0, float(x_fft[n_fft - 1]))
ax_fft.set_ylim(float(np.nanmin(y_fft)), float(np.nanmax(y_fft)))
# Водопад сырых данных
if changed and ring.is_ready:

View File

@ -202,7 +202,7 @@ def run_pyqtgraph(args):
# FFT (слева-снизу)
p_fft = win.addPlot(row=1, col=0, title="FFT")
p_fft.showGrid(x=True, y=True, alpha=0.3)
curve_fft = p_fft.plot(pen=pg.mkPen((255, 120, 80), width=1))
curve_fft = p_fft.plot(pen=pg.mkPen((80, 120, 255), width=1))
p_fft.setLabel("bottom", "Глубина, м")
p_fft.setLabel("left", "Амплитуда")
@ -626,15 +626,20 @@ def run_pyqtgraph(args):
p_line.enableAutoRange(axis="y", enable=True)
p_line.setLabel("left", "Y")
# Профиль по глубине — используем уже вычисленный в ring IFFT.
if ring.last_fft_vals is not None and ring.fft_depth_axis_m is not None:
fft_vals = ring.last_fft_vals
xs_fft = ring.fft_depth_axis_m
n = min(fft_vals.size, xs_fft.size)
if n > 0:
curve_fft.setData(xs_fft[:n], fft_vals[:n])
p_fft.setXRange(0.0, float(xs_fft[n - 1]), padding=0)
p_fft.setYRange(float(np.nanmin(fft_vals)), float(np.nanmax(fft_vals)), padding=0)
axis_fft = ring.fft_depth_axis_m
vals_fft = ring.last_fft_vals
if axis_fft is None or vals_fft is None:
curve_fft.setData([], [])
else:
n_fft = min(int(axis_fft.size), int(vals_fft.size))
if n_fft <= 0:
curve_fft.setData([], [])
else:
x_fft = axis_fft[:n_fft]
y_fft = vals_fft[:n_fft]
curve_fft.setData(x_fft, y_fft)
p_fft.setXRange(0.0, float(x_fft[n_fft - 1]), padding=0)
p_fft.setYRange(float(np.nanmin(y_fft)), float(np.nanmax(y_fft)), padding=0)
# Позиция подписи канала
try:

View File

@ -46,31 +46,23 @@ def detect_reference_file_format(path: str) -> Optional[str]:
size = os.path.getsize(p)
except Exception:
return None
if size <= 0 or (size % 8) != 0:
if size <= 0:
return None
try:
with open(p, "rb") as f:
sample = f.read(min(size, 8 * 2048))
sample = f.read(min(size, 256 * 1024))
except Exception:
return None
if len(sample) < 8:
return None
# Быстрый sniff aligned-записей: в валидных записях байт 6 == 0x0A.
recs = len(sample) // 8
if recs <= 0:
return None
marker_hits = 0
start_hits = 0
for i in range(0, recs * 8, 8):
b = sample[i : i + 8]
if b[6] == 0x0A:
marker_hits += 1
if b[:6] == b"\xff\xff\xff\xff\xff\xff":
start_hits += 1
if marker_hits >= max(4, int(recs * 0.8)) and start_hits >= 1:
# Универсальный sniff: прогоняем тем же потоковым парсером,
# который используется в realtime/capture-import.
parser = BinaryRecordStreamParser()
_ = parser.feed(sample)
if parser.start_count >= 1 and parser.point_count >= 16:
return "bin_capture"
return None

View File

@ -2,9 +2,10 @@
from __future__ import annotations
import math
from collections import deque
import time
from typing import Iterable, List, Optional, Sequence, Set, Tuple
from typing import List, Optional, Sequence, Set, Tuple
import numpy as np
@ -14,7 +15,13 @@ from rfg_adc_plotter.types import SweepInfo, SweepPacket
# Binary parser events:
# ("start", ch)
# ("point", ch, x, y)
BinaryEvent = Tuple[str, int] | Tuple[str, int, int, int]
BinaryEvent = Tuple[str, int] | Tuple[str, int, int, float]
# Параметры преобразования пары log-detector значений в линейную амплитуду.
_LOG_DETECTOR_BASE = 10.0
_LOG_DETECTOR_SCALER = 0.001
_LOG_DETECTOR_POSTSCALE = 1000.0
_LOG_DETECTOR_EXP_LIMIT = 300.0
def u32_to_i32(v: int) -> int:
@ -22,8 +29,44 @@ def u32_to_i32(v: int) -> int:
return v - 0x1_0000_0000 if (v & 0x8000_0000) else v
def u_bits_to_i(v: int, bits: int) -> int:
"""Преобразование беззнакового целого fixed-width в знаковое (two's complement)."""
if bits <= 0:
return 0
sign = 1 << (bits - 1)
full = 1 << bits
return v - full if (v & sign) else v
def words_be_to_i(words: Sequence[int]) -> int:
"""Собрать big-endian набор 16-bit слов в знаковое число."""
acc = 0
for w in words:
acc = (acc << 16) | (int(w) & 0xFFFF)
return u_bits_to_i(acc, 16 * int(len(words)))
def _log_pair_to_linear(avg_1: int, avg_2: int) -> float:
"""Разность двух логарифмических усреднений в линейной шкале."""
exp1 = max(-_LOG_DETECTOR_EXP_LIMIT, min(_LOG_DETECTOR_EXP_LIMIT, float(avg_1) * _LOG_DETECTOR_SCALER))
exp2 = max(-_LOG_DETECTOR_EXP_LIMIT, min(_LOG_DETECTOR_EXP_LIMIT, float(avg_2) * _LOG_DETECTOR_SCALER))
return (math.pow(_LOG_DETECTOR_BASE, exp1) - math.pow(_LOG_DETECTOR_BASE, exp2)) * _LOG_DETECTOR_POSTSCALE
class BinaryRecordStreamParser:
"""Инкрементальный парсер бинарных записей протокола (по 8 байт)."""
"""Инкрементальный парсер бинарных записей нескольких wire-форматов.
Поддерживаемые форматы:
1) legacy 8-byte:
старт: 0xFFFF,0xFFFF,0xFFFF,(ch<<8)|0x0A
точка: step,value_hi16,value_lo16,(ch<<8)|0x0A
2) log-detector:
старт: 0xFFFF x5, (ch<<8)|0x0A
точка: step, avg1, avg2, (ch<<8)|0x0A,
где avg1/avg2 кодируются фиксированной шириной в 16-bit словах:
- 2 слова (int32) или
- 8 слов (int128).
"""
def __init__(self):
self._buf = bytearray()
@ -31,6 +74,49 @@ class BinaryRecordStreamParser:
self.start_count: int = 0
self.point_count: int = 0
self.desync_count: int = 0
self._log_pair_words: Optional[int] = None
@staticmethod
def _u16_at(buf: bytearray, offset: int) -> int:
return int(buf[offset]) | (int(buf[offset + 1]) << 8)
def _try_parse_log_start(self, buf: bytearray) -> Optional[Tuple[int, int]]:
rec_bytes = 12 # 6 слов: FFFF x5 + terminator
if len(buf) < rec_bytes:
return None
for wi in range(5):
if self._u16_at(buf, wi * 2) != 0xFFFF:
return None
term = self._u16_at(buf, 10)
if (term & 0x00FF) != 0x000A:
return None
ch = int((term >> 8) & 0x00FF)
return ch, rec_bytes
def _try_parse_log_point(self, buf: bytearray, pair_words: int) -> Optional[Tuple[int, int, float, int]]:
if pair_words <= 0:
return None
rec_words = 2 + 2 * int(pair_words)
rec_bytes = 2 * rec_words
if len(buf) < rec_bytes:
return None
step = self._u16_at(buf, 0)
if step == 0xFFFF:
return None
term_off = rec_bytes - 2
term = self._u16_at(buf, term_off)
if (term & 0x00FF) != 0x000A:
return None
a1_words = [self._u16_at(buf, 2 + 2 * i) for i in range(pair_words)]
a2_words = [self._u16_at(buf, 2 + 2 * (pair_words + i)) for i in range(pair_words)]
avg_1 = words_be_to_i(a1_words)
avg_2 = words_be_to_i(a2_words)
y_val = _log_pair_to_linear(avg_1, avg_2)
ch = int((term >> 8) & 0x00FF)
return ch, int(step), float(y_val), rec_bytes
def feed(self, data: bytes) -> List[BinaryEvent]:
if data:
@ -39,22 +125,57 @@ class BinaryRecordStreamParser:
buf = self._buf
while len(buf) >= 8:
w0 = int(buf[0]) | (int(buf[1]) << 8)
w1 = int(buf[2]) | (int(buf[3]) << 8)
w2 = int(buf[4]) | (int(buf[5]) << 8)
# 1) log-detector start (12-byte): FFFF x5 + (ch<<8)|0x0A
parsed_log_start = self._try_parse_log_start(buf)
if parsed_log_start is not None:
ch, consumed = parsed_log_start
events.append(("start", ch))
del buf[:consumed]
self.bytes_consumed += consumed
self.start_count += 1
# Ширину пары (32/128) определим на ближайшей точке.
self._log_pair_words = None
continue
# 2) log-detector point:
# сперва в уже известной ширине пары, иначе авто-детект 128/32.
# В авто-режиме сначала пробуем 32-bit пару (наиболее частый формат),
# затем 128-bit. Это снижает риск ложного совпадения 128-bit длины на 32-bit потоке.
pair_candidates = [self._log_pair_words] if self._log_pair_words in (2, 8) else [2, 8]
parsed_log_point: Optional[Tuple[int, int, float, int]] = None
for pair_words in pair_candidates:
if pair_words is None:
continue
parsed_log_point = self._try_parse_log_point(buf, int(pair_words))
if parsed_log_point is not None:
self._log_pair_words = int(pair_words)
break
if parsed_log_point is not None:
ch, step, y_val, consumed = parsed_log_point
events.append(("point", ch, step, y_val))
del buf[:consumed]
self.bytes_consumed += consumed
self.point_count += 1
continue
# 3) legacy 8-byte start / point.
w0 = self._u16_at(buf, 0)
w1 = self._u16_at(buf, 2)
w2 = self._u16_at(buf, 4)
if w0 == 0xFFFF and w1 == 0xFFFF and w2 == 0xFFFF and buf[6] == 0x0A:
ch = int(buf[7])
events.append(("start", ch))
del buf[:8]
self.bytes_consumed += 8
self.start_count += 1
# legacy не использует пару avg1/avg2.
self._log_pair_words = None
continue
if buf[6] == 0x0A:
ch = int(buf[7])
value_u32 = (w1 << 16) | w2
events.append(("point", ch, int(w0), u32_to_i32(value_u32)))
events.append(("point", ch, int(w0), float(u32_to_i32(value_u32))))
del buf[:8]
self.bytes_consumed += 8
self.point_count += 1
@ -88,7 +209,7 @@ class SweepAssembler:
self._n_valid_hist = deque()
self._xs: list[int] = []
self._ys: list[int] = []
self._ys: list[float] = []
self._cur_channel: Optional[int] = None
self._cur_channels: set[int] = set()
@ -98,12 +219,12 @@ class SweepAssembler:
self._cur_channel = None
self._cur_channels.clear()
def add_point(self, ch: int, x: int, y: int):
def add_point(self, ch: int, x: int, y: float):
if self._cur_channel is None:
self._cur_channel = int(ch)
self._cur_channels.add(int(ch))
self._xs.append(int(x))
self._ys.append(int(y))
self._ys.append(float(y))
def start_new_sweep(self, ch: int, now_ts: Optional[float] = None) -> Optional[SweepPacket]:
packet = self.finalize_current(now_ts=now_ts)
@ -122,13 +243,13 @@ class SweepAssembler:
return out
# point
_tag, ch, x, y = event # type: ignore[misc]
self.add_point(int(ch), int(x), int(y))
self.add_point(int(ch), int(x), float(y))
return out
def finalize_arrays(
self,
xs: Sequence[int],
ys: Sequence[int],
ys: Sequence[float],
channels: Optional[Set[int]],
now_ts: Optional[float] = None,
) -> Optional[SweepPacket]:

View File

@ -151,20 +151,17 @@ class SweepReader(threading.Thread):
def _run_binary_stream(self, chunk_reader: SerialChunkReader):
xs: list[int] = []
ys: list[int] = []
ys: list[float] = []
cur_channel: Optional[int] = None
cur_channels: set[int] = set()
parser = BinaryRecordStreamParser()
# Бинарный протокол (4 слова LE u16 = 8 байт на запись):
# старт свипа: 0xFFFF, 0xFFFF, 0xFFFF, (ch<<8)|0x0A
# Байты на проводе: ff ff ff ff ff ff 0a [ch]
# ch=0 → последнее слово=0x000A; ch=1 → 0x010A; и т.д.
# точка данных: step_u16, value_hi_u16, value_lo_u16, (ch<<8)|0x0A
# Байты на проводе: [step_lo step_hi] [hi_lo hi_hi] [lo_lo lo_hi] 0a [ch]
# value_i32 = sign_extend((value_hi<<16)|value_lo)
# Признак записи: байт 6 == 0x0A, байт 7 — номер канала.
# При десинхронизации сдвигаемся на 1 БАЙТ (не слово) для самосинхронизации.
# Поддерживаются оба wire-формата:
# 1) legacy: 8-byte записи (start/point с одним int32 значением).
# 2) log-detector: start = FFFF x5 + (ch<<8)|0x0A,
# point = step + (avg1, avg2), где avg1/avg2 имеют ширину 32-bit или 128-bit.
# Для point парсер сразу преобразует (avg1, avg2) в линейную амплитуду y.
# В обоих режимах при десинхронизации parser.feed() сдвигается на 1 байт.
_dbg_byte_count = 0
_dbg_desync_count = 0
@ -196,13 +193,13 @@ class SweepReader(threading.Thread):
_tag, ch_from_term, step, value_i32 = ev # type: ignore[misc]
if cur_channel is None:
cur_channel = int(ch_from_term)
cur_channels.add(int(cur_channel))
cur_channels.add(int(ch_from_term))
xs.append(int(step))
ys.append(int(value_i32))
ys.append(float(value_i32))
_dbg_point_count += 1
if self._debug and _dbg_point_count <= 3:
sys.stderr.write(
f"[debug] BIN точка: step={int(step)} ch={int(ch_from_term)} → value={int(value_i32)}\n"
f"[debug] BIN точка: step={int(step)} ch={int(ch_from_term)} → value={float(value_i32):.3f}\n"
)
_dbg_byte_count = parser.bytes_consumed

View File

@ -156,14 +156,18 @@ def reconstruct_complex_spectrum_diff(sweep: np.ndarray) -> np.ndarray:
d = np.gradient(cos_phi)
sin_est = normalize_trace_unit_range(d)
sin_est = np.clip(sin_est, -1.0, 1.0)
sin_est = normalize_trace_unit_range(d)
# mag = np.abs(sin_est)
# mask = mag > _EPS
# if np.any(mask):
# sin_est[mask] = sin_est[mask] / mag[mask]
z = cos_phi.astype(np.complex128, copy=False) + 1j * sin_est.astype(np.complex128, copy=False)
mag = np.abs(z)
z_unit = np.ones_like(z, dtype=np.complex128)
mask = mag > _EPS
if np.any(mask):
z_unit[mask] = z[mask] / mag[mask]
return mag
return z_unit
def reconstruct_complex_spectrum_from_real_trace(
@ -284,7 +288,7 @@ def compute_ifft_profile_from_sweep(
n = min(depth_m.size, y.size)
if n <= 0:
return _fallback_depth_response(s.size, s)
return depth_m[:n].astype(np.float32, copy=False), y[:n].astype(np.float32, copy=False) # log10 для лучшей визуализации в водопаде
return depth_m[:n].astype(np.float32, copy=False), np.maximum(y[:n], 1e-12).astype(np.float32, copy=False) # log10 для лучшей визуализации в водопаде
except Exception as exc: # noqa: BLE001
logger.error("compute_ifft_profile_from_sweep failed: %r", exc)
return _fallback_depth_response(np.asarray(sweep).size if sweep is not None else 1, sweep)
@ -294,4 +298,3 @@ def compute_ifft_db_profile(sweep: Optional[np.ndarray]) -> np.ndarray:
"""Legacy wrapper (deprecated name): возвращает линейный |IFFT| профиль."""
_depth_m, y = compute_ifft_profile_from_sweep(sweep, complex_mode="arccos")
return y

View File

@ -10,7 +10,9 @@ from rfg_adc_plotter.constants import (
FREQ_MIN_GHZ,
WF_WIDTH,
)
from rfg_adc_plotter.processing.fourier import compute_ifft_profile_from_sweep
from rfg_adc_plotter.processing.fourier import (
compute_ifft_profile_from_sweep,
)
class RingBuffer:
@ -103,19 +105,13 @@ class RingBuffer:
n = min(int(fft_row.size), int(depth_axis_m.size))
if n <= 0:
self.last_fft_vals = None
return
if n != fft_row.size:
fft_row = fft_row[:n]
if n != depth_axis_m.size:
depth_axis_m = depth_axis_m[:n]
# Для отображения храним только первую половину IFFT-профиля:
# вторая половина для текущей схемы симметрична и визуально избыточна.
n_keep = max(1, (n + 1) // 2)
fft_row = fft_row[:n_keep]
depth_axis_m = depth_axis_m[:n_keep]
n = n_keep
needs_reset = (
self.ring_fft is None
or self.fft_depth_axis_m is None

View File

@ -13,11 +13,13 @@ from rfg_adc_plotter.processing.pipeline import SweepPreprocessor
ROOT = Path(__file__).resolve().parents[1]
SAMPLE_BG = ROOT / "sample_data" / "empty"
SAMPLE_CALIB = ROOT / "sample_data" / "no_antennas_35dB_attenuators"
SAMPLE_NEW_FMT = ROOT / "sample_data" / "new_format" / "attenuators_50dB"
def test_detect_reference_file_format_for_sample_capture():
assert detect_reference_file_format(str(SAMPLE_BG)) == "bin_capture"
assert detect_reference_file_format(str(SAMPLE_CALIB)) == "bin_capture"
assert detect_reference_file_format(str(SAMPLE_NEW_FMT)) == "bin_capture"
def test_load_capture_sweeps_parses_binary_capture():
@ -33,6 +35,22 @@ def test_load_capture_sweeps_parses_binary_capture():
assert channels == {0}
def test_load_capture_sweeps_parses_new_format_logdetector_capture():
sweeps = load_capture_sweeps(str(SAMPLE_NEW_FMT), fancy=False, logscale=False)
assert len(sweeps) > 900
widths = [int(s.size) for s, _info in sweeps]
dominant_width = max(set(widths), key=widths.count)
# Должно совпадать с ожидаемой шириной свипа из штатных capture.
assert dominant_width in (758, 759)
channels = set()
for _s, info in sweeps:
chs = info.get("chs", [info.get("ch", 0)])
channels.update(int(v) for v in chs)
assert channels == {0}
def test_aggregate_capture_reference_filters_incomplete_sweeps():
sweeps = load_capture_sweeps(str(SAMPLE_BG), fancy=False, logscale=False)
vector, summary = aggregate_capture_reference(sweeps, channel=0, method="median", path=str(SAMPLE_BG))

View File

@ -1,5 +1,6 @@
import numpy as np
from rfg_adc_plotter.processing.fourier import compute_ifft_profile_from_sweep
from rfg_adc_plotter.state.ring_buffer import RingBuffer
@ -8,6 +9,7 @@ def test_ring_buffer_allocates_fft_buffers_from_first_push():
ring.ensure_init(64)
sweep = np.linspace(-1.0, 1.0, 64, dtype=np.float32)
depth_expected, vals_expected = compute_ifft_profile_from_sweep(sweep, complex_mode="arccos")
ring.push(sweep)
assert ring.ring_fft is not None
@ -16,6 +18,7 @@ def test_ring_buffer_allocates_fft_buffers_from_first_push():
assert ring.fft_bins == ring.ring_fft.shape[1]
assert ring.fft_bins == ring.fft_depth_axis_m.size
assert ring.fft_bins == ring.last_fft_vals.size
assert ring.fft_bins == min(depth_expected.size, vals_expected.size)
# Legacy alias kept for compatibility with existing GUI code paths.
assert ring.fft_time_axis is ring.fft_depth_axis_m
@ -63,3 +66,16 @@ def test_ring_buffer_mode_switch_resets_fft_buffers_only():
assert ring.ring_fft is not None
assert ring.fft_depth_axis_m is not None
assert ring.last_fft_vals is not None
def test_ring_buffer_short_sweeps_keep_fft_profile_well_formed():
for n in (1, 2, 3):
ring = RingBuffer(max_sweeps=4)
ring.ensure_init(n)
ring.push(np.linspace(-1.0, 1.0, n, dtype=np.float32))
assert ring.fft_depth_axis_m is not None
assert ring.last_fft_vals is not None
assert ring.fft_depth_axis_m.dtype == np.float32
assert ring.last_fft_vals.dtype == np.float32
assert ring.fft_depth_axis_m.size == ring.last_fft_vals.size

View File

@ -0,0 +1,110 @@
import math
from rfg_adc_plotter.io.sweep_parser_core import BinaryRecordStreamParser
def _u16le(word: int) -> bytes:
w = int(word) & 0xFFFF
return bytes((w & 0xFF, (w >> 8) & 0xFF))
def _pack_signed_words_be(value: int, words: int) -> list[int]:
bits = 16 * int(words)
v = int(value)
if v < 0:
v = (1 << bits) + v
out: list[int] = []
for i in range(words):
shift = (words - 1 - i) * 16
out.append((v >> shift) & 0xFFFF)
return out
def _pack_legacy_start(ch: int) -> bytes:
return b"\xff\xff" * 3 + bytes((0x0A, int(ch) & 0xFF))
def _pack_legacy_point(ch: int, step: int, value_i32: int) -> bytes:
v = int(value_i32) & 0xFFFF_FFFF
return b"".join(
[
_u16le(step),
_u16le((v >> 16) & 0xFFFF),
_u16le(v & 0xFFFF),
bytes((0x0A, int(ch) & 0xFF)),
]
)
def _pack_log_start(ch: int) -> bytes:
return b"\xff\xff" * 5 + bytes((0x0A, int(ch) & 0xFF))
def _pack_log_point(step: int, avg1: int, avg2: int, pair_words: int, ch: int = 0) -> bytes:
words = [int(step) & 0xFFFF]
words.extend(_pack_signed_words_be(avg1, pair_words))
words.extend(_pack_signed_words_be(avg2, pair_words))
words.append(((int(ch) & 0xFF) << 8) | 0x000A)
return b"".join(_u16le(w) for w in words)
def _log_pair_to_linear(avg1: int, avg2: int) -> float:
exp1 = max(-300.0, min(300.0, float(avg1) * 0.001))
exp2 = max(-300.0, min(300.0, float(avg2) * 0.001))
return (math.pow(10.0, exp1) - math.pow(10.0, exp2)) * 1000.0
def test_binary_parser_parses_legacy_8_byte_records():
parser = BinaryRecordStreamParser()
stream = b"".join(
[
_pack_legacy_start(3),
_pack_legacy_point(3, 1, -2),
_pack_legacy_point(3, 2, 123456),
]
)
events = []
events.extend(parser.feed(stream[:5]))
events.extend(parser.feed(stream[5:17]))
events.extend(parser.feed(stream[17:]))
assert events[0] == ("start", 3)
assert events[1] == ("point", 3, 1, -2.0)
assert events[2] == ("point", 3, 2, 123456.0)
def test_binary_parser_parses_logdetector_32bit_pair_records():
parser = BinaryRecordStreamParser()
stream = b"".join(
[
_pack_log_start(0),
_pack_log_point(1, 1500, 700, pair_words=2, ch=0),
_pack_log_point(2, 1510, 710, pair_words=2, ch=0),
]
)
events = parser.feed(stream)
assert events[0] == ("start", 0)
assert events[1][0:3] == ("point", 0, 1)
assert events[2][0:3] == ("point", 0, 2)
assert abs(float(events[1][3]) - _log_pair_to_linear(1500, 700)) < 1e-6
assert abs(float(events[2][3]) - _log_pair_to_linear(1510, 710)) < 1e-6
def test_binary_parser_parses_logdetector_128bit_pair_records():
parser = BinaryRecordStreamParser()
stream = b"".join(
[
_pack_log_start(5),
_pack_log_point(7, 1600, 800, pair_words=8, ch=5),
_pack_log_point(8, 1610, 810, pair_words=8, ch=5),
]
)
events = parser.feed(stream)
assert events[0] == ("start", 5)
assert events[1][0:3] == ("point", 5, 7)
assert events[2][0:3] == ("point", 5, 8)
assert abs(float(events[1][3]) - _log_pair_to_linear(1600, 800)) < 1e-6
assert abs(float(events[2][3]) - _log_pair_to_linear(1610, 810)) < 1e-6