Compare commits
1 Commits
1e05b1f3fd
...
head-fixer
| Author | SHA1 | Date | |
|---|---|---|---|
| 26c3dd7ad5 |
Binary file not shown.
@ -146,9 +146,7 @@ def run_matplotlib(args):
|
|||||||
ax_line.set_ylim(fixed_ylim)
|
ax_line.set_ylim(fixed_ylim)
|
||||||
|
|
||||||
# График спектра
|
# График спектра
|
||||||
fft_line_t1, = ax_fft.plot([], [], lw=1, color="tab:blue", label="1/3 (low f)")
|
fft_line_obj, = ax_fft.plot([], [], lw=1, color="tab:blue", label="full band")
|
||||||
fft_line_t2, = ax_fft.plot([], [], lw=1, color="tab:orange", label="2/3 (mid f)")
|
|
||||||
fft_line_t3, = ax_fft.plot([], [], lw=1, color="tab:green", label="3/3 (high f)")
|
|
||||||
ax_fft.set_title("FFT", pad=1)
|
ax_fft.set_title("FFT", pad=1)
|
||||||
ax_fft.set_xlabel("Глубина, м")
|
ax_fft.set_xlabel("Глубина, м")
|
||||||
ax_fft.set_ylabel("Амплитуда")
|
ax_fft.set_ylabel("Амплитуда")
|
||||||
@ -438,9 +436,7 @@ def run_matplotlib(args):
|
|||||||
ring.set_fft_complex_mode(str(label))
|
ring.set_fft_complex_mode(str(label))
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
fft_line_t1.set_data([], [])
|
fft_line_obj.set_data([], [])
|
||||||
fft_line_t2.set_data([], [])
|
|
||||||
fft_line_t3.set_data([], [])
|
|
||||||
_refresh_status_texts()
|
_refresh_status_texts()
|
||||||
try:
|
try:
|
||||||
fig.canvas.draw_idle()
|
fig.canvas.draw_idle()
|
||||||
@ -590,31 +586,20 @@ def run_matplotlib(args):
|
|||||||
ax_line.autoscale_view(scalex=False, scaley=True)
|
ax_line.autoscale_view(scalex=False, scaley=True)
|
||||||
ax_line.set_ylabel("Y")
|
ax_line.set_ylabel("Y")
|
||||||
|
|
||||||
# Профиль по глубине: три линии для 1/3, 2/3, 3/3 частотного диапазона.
|
axis_fft = ring.fft_depth_axis_m
|
||||||
third_axes = ring.last_fft_third_axes_m
|
vals_fft = ring.last_fft_vals
|
||||||
third_vals = ring.last_fft_third_vals
|
if axis_fft is None or vals_fft is None:
|
||||||
lines = (fft_line_t1, fft_line_t2, fft_line_t3)
|
fft_line_obj.set_data([], [])
|
||||||
xs_max = []
|
else:
|
||||||
ys_min = []
|
n_fft = min(int(axis_fft.size), int(vals_fft.size))
|
||||||
ys_max = []
|
if n_fft <= 0:
|
||||||
for line_fft, xs_fft, fft_vals in zip(lines, third_axes, third_vals):
|
fft_line_obj.set_data([], [])
|
||||||
if xs_fft is None or fft_vals is None:
|
else:
|
||||||
line_fft.set_data([], [])
|
x_fft = axis_fft[:n_fft]
|
||||||
continue
|
y_fft = vals_fft[:n_fft]
|
||||||
n = min(int(xs_fft.size), int(fft_vals.size))
|
fft_line_obj.set_data(x_fft, y_fft)
|
||||||
if n <= 0:
|
ax_fft.set_xlim(0, float(x_fft[n_fft - 1]))
|
||||||
line_fft.set_data([], [])
|
ax_fft.set_ylim(float(np.nanmin(y_fft)), float(np.nanmax(y_fft)))
|
||||||
continue
|
|
||||||
x_seg = xs_fft[:n]
|
|
||||||
y_seg = fft_vals[:n]
|
|
||||||
line_fft.set_data(x_seg, y_seg)
|
|
||||||
xs_max.append(float(x_seg[n - 1]))
|
|
||||||
ys_min.append(float(np.nanmin(y_seg)))
|
|
||||||
ys_max.append(float(np.nanmax(y_seg)))
|
|
||||||
|
|
||||||
if xs_max and ys_min and ys_max:
|
|
||||||
ax_fft.set_xlim(0, float(max(xs_max)))
|
|
||||||
ax_fft.set_ylim(float(min(ys_min)), float(max(ys_max)))
|
|
||||||
|
|
||||||
# Водопад сырых данных
|
# Водопад сырых данных
|
||||||
if changed and ring.is_ready:
|
if changed and ring.is_ready:
|
||||||
@ -664,9 +649,7 @@ def run_matplotlib(args):
|
|||||||
line_env_lo,
|
line_env_lo,
|
||||||
line_env_hi,
|
line_env_hi,
|
||||||
img_obj,
|
img_obj,
|
||||||
fft_line_t1,
|
fft_line_obj,
|
||||||
fft_line_t2,
|
|
||||||
fft_line_t3,
|
|
||||||
img_fft_obj,
|
img_fft_obj,
|
||||||
status_text,
|
status_text,
|
||||||
pipeline_text,
|
pipeline_text,
|
||||||
|
|||||||
@ -202,9 +202,7 @@ def run_pyqtgraph(args):
|
|||||||
# FFT (слева-снизу)
|
# FFT (слева-снизу)
|
||||||
p_fft = win.addPlot(row=1, col=0, title="FFT")
|
p_fft = win.addPlot(row=1, col=0, title="FFT")
|
||||||
p_fft.showGrid(x=True, y=True, alpha=0.3)
|
p_fft.showGrid(x=True, y=True, alpha=0.3)
|
||||||
curve_fft_t1 = p_fft.plot(pen=pg.mkPen((80, 120, 255), width=1))
|
curve_fft = p_fft.plot(pen=pg.mkPen((80, 120, 255), width=1))
|
||||||
curve_fft_t2 = p_fft.plot(pen=pg.mkPen((255, 140, 70), width=1))
|
|
||||||
curve_fft_t3 = p_fft.plot(pen=pg.mkPen((60, 180, 90), width=1))
|
|
||||||
p_fft.setLabel("bottom", "Глубина, м")
|
p_fft.setLabel("bottom", "Глубина, м")
|
||||||
p_fft.setLabel("left", "Амплитуда")
|
p_fft.setLabel("left", "Амплитуда")
|
||||||
|
|
||||||
@ -494,9 +492,7 @@ def run_pyqtgraph(args):
|
|||||||
changed = False
|
changed = False
|
||||||
if changed:
|
if changed:
|
||||||
try:
|
try:
|
||||||
curve_fft_t1.setData([], [])
|
curve_fft.setData([], [])
|
||||||
curve_fft_t2.setData([], [])
|
|
||||||
curve_fft_t3.setData([], [])
|
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
_refresh_pipeline_label()
|
_refresh_pipeline_label()
|
||||||
@ -630,31 +626,20 @@ def run_pyqtgraph(args):
|
|||||||
p_line.enableAutoRange(axis="y", enable=True)
|
p_line.enableAutoRange(axis="y", enable=True)
|
||||||
p_line.setLabel("left", "Y")
|
p_line.setLabel("left", "Y")
|
||||||
|
|
||||||
# Профиль по глубине: три линии для 1/3, 2/3, 3/3 частотного диапазона.
|
axis_fft = ring.fft_depth_axis_m
|
||||||
third_axes = ring.last_fft_third_axes_m
|
vals_fft = ring.last_fft_vals
|
||||||
third_vals = ring.last_fft_third_vals
|
if axis_fft is None or vals_fft is None:
|
||||||
curves = (curve_fft_t1, curve_fft_t2, curve_fft_t3)
|
|
||||||
xs_max = []
|
|
||||||
ys_min = []
|
|
||||||
ys_max = []
|
|
||||||
for curve_fft, xs_fft, fft_vals in zip(curves, third_axes, third_vals):
|
|
||||||
if xs_fft is None or fft_vals is None:
|
|
||||||
curve_fft.setData([], [])
|
curve_fft.setData([], [])
|
||||||
continue
|
else:
|
||||||
n = min(int(xs_fft.size), int(fft_vals.size))
|
n_fft = min(int(axis_fft.size), int(vals_fft.size))
|
||||||
if n <= 0:
|
if n_fft <= 0:
|
||||||
curve_fft.setData([], [])
|
curve_fft.setData([], [])
|
||||||
continue
|
else:
|
||||||
x_seg = xs_fft[:n]
|
x_fft = axis_fft[:n_fft]
|
||||||
y_seg = fft_vals[:n]
|
y_fft = vals_fft[:n_fft]
|
||||||
curve_fft.setData(x_seg, y_seg)
|
curve_fft.setData(x_fft, y_fft)
|
||||||
xs_max.append(float(x_seg[n - 1]))
|
p_fft.setXRange(0.0, float(x_fft[n_fft - 1]), padding=0)
|
||||||
ys_min.append(float(np.nanmin(y_seg)))
|
p_fft.setYRange(float(np.nanmin(y_fft)), float(np.nanmax(y_fft)), padding=0)
|
||||||
ys_max.append(float(np.nanmax(y_seg)))
|
|
||||||
|
|
||||||
if xs_max and ys_min and ys_max:
|
|
||||||
p_fft.setXRange(0.0, float(max(xs_max)), padding=0)
|
|
||||||
p_fft.setYRange(float(min(ys_min)), float(max(ys_max)), padding=0)
|
|
||||||
|
|
||||||
# Позиция подписи канала
|
# Позиция подписи канала
|
||||||
try:
|
try:
|
||||||
|
|||||||
@ -46,31 +46,23 @@ def detect_reference_file_format(path: str) -> Optional[str]:
|
|||||||
size = os.path.getsize(p)
|
size = os.path.getsize(p)
|
||||||
except Exception:
|
except Exception:
|
||||||
return None
|
return None
|
||||||
if size <= 0 or (size % 8) != 0:
|
if size <= 0:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
try:
|
try:
|
||||||
with open(p, "rb") as f:
|
with open(p, "rb") as f:
|
||||||
sample = f.read(min(size, 8 * 2048))
|
sample = f.read(min(size, 256 * 1024))
|
||||||
except Exception:
|
except Exception:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
if len(sample) < 8:
|
if len(sample) < 8:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# Быстрый sniff aligned-записей: в валидных записях байт 6 == 0x0A.
|
# Универсальный sniff: прогоняем тем же потоковым парсером,
|
||||||
recs = len(sample) // 8
|
# который используется в realtime/capture-import.
|
||||||
if recs <= 0:
|
parser = BinaryRecordStreamParser()
|
||||||
return None
|
_ = parser.feed(sample)
|
||||||
marker_hits = 0
|
if parser.start_count >= 1 and parser.point_count >= 16:
|
||||||
start_hits = 0
|
|
||||||
for i in range(0, recs * 8, 8):
|
|
||||||
b = sample[i : i + 8]
|
|
||||||
if b[6] == 0x0A:
|
|
||||||
marker_hits += 1
|
|
||||||
if b[:6] == b"\xff\xff\xff\xff\xff\xff":
|
|
||||||
start_hits += 1
|
|
||||||
if marker_hits >= max(4, int(recs * 0.8)) and start_hits >= 1:
|
|
||||||
return "bin_capture"
|
return "bin_capture"
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|||||||
@ -2,9 +2,10 @@
|
|||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import math
|
||||||
from collections import deque
|
from collections import deque
|
||||||
import time
|
import time
|
||||||
from typing import Iterable, List, Optional, Sequence, Set, Tuple
|
from typing import List, Optional, Sequence, Set, Tuple
|
||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
|
|
||||||
@ -14,7 +15,13 @@ from rfg_adc_plotter.types import SweepInfo, SweepPacket
|
|||||||
# Binary parser events:
|
# Binary parser events:
|
||||||
# ("start", ch)
|
# ("start", ch)
|
||||||
# ("point", ch, x, y)
|
# ("point", ch, x, y)
|
||||||
BinaryEvent = Tuple[str, int] | Tuple[str, int, int, int]
|
BinaryEvent = Tuple[str, int] | Tuple[str, int, int, float]
|
||||||
|
|
||||||
|
# Параметры преобразования пары log-detector значений в линейную амплитуду.
|
||||||
|
_LOG_DETECTOR_BASE = 10.0
|
||||||
|
_LOG_DETECTOR_SCALER = 0.001
|
||||||
|
_LOG_DETECTOR_POSTSCALE = 1000.0
|
||||||
|
_LOG_DETECTOR_EXP_LIMIT = 300.0
|
||||||
|
|
||||||
|
|
||||||
def u32_to_i32(v: int) -> int:
|
def u32_to_i32(v: int) -> int:
|
||||||
@ -22,8 +29,44 @@ def u32_to_i32(v: int) -> int:
|
|||||||
return v - 0x1_0000_0000 if (v & 0x8000_0000) else v
|
return v - 0x1_0000_0000 if (v & 0x8000_0000) else v
|
||||||
|
|
||||||
|
|
||||||
|
def u_bits_to_i(v: int, bits: int) -> int:
|
||||||
|
"""Преобразование беззнакового целого fixed-width в знаковое (two's complement)."""
|
||||||
|
if bits <= 0:
|
||||||
|
return 0
|
||||||
|
sign = 1 << (bits - 1)
|
||||||
|
full = 1 << bits
|
||||||
|
return v - full if (v & sign) else v
|
||||||
|
|
||||||
|
|
||||||
|
def words_be_to_i(words: Sequence[int]) -> int:
|
||||||
|
"""Собрать big-endian набор 16-bit слов в знаковое число."""
|
||||||
|
acc = 0
|
||||||
|
for w in words:
|
||||||
|
acc = (acc << 16) | (int(w) & 0xFFFF)
|
||||||
|
return u_bits_to_i(acc, 16 * int(len(words)))
|
||||||
|
|
||||||
|
|
||||||
|
def _log_pair_to_linear(avg_1: int, avg_2: int) -> float:
|
||||||
|
"""Разность двух логарифмических усреднений в линейной шкале."""
|
||||||
|
exp1 = max(-_LOG_DETECTOR_EXP_LIMIT, min(_LOG_DETECTOR_EXP_LIMIT, float(avg_1) * _LOG_DETECTOR_SCALER))
|
||||||
|
exp2 = max(-_LOG_DETECTOR_EXP_LIMIT, min(_LOG_DETECTOR_EXP_LIMIT, float(avg_2) * _LOG_DETECTOR_SCALER))
|
||||||
|
return (math.pow(_LOG_DETECTOR_BASE, exp1) - math.pow(_LOG_DETECTOR_BASE, exp2)) * _LOG_DETECTOR_POSTSCALE
|
||||||
|
|
||||||
|
|
||||||
class BinaryRecordStreamParser:
|
class BinaryRecordStreamParser:
|
||||||
"""Инкрементальный парсер бинарных записей протокола (по 8 байт)."""
|
"""Инкрементальный парсер бинарных записей нескольких wire-форматов.
|
||||||
|
|
||||||
|
Поддерживаемые форматы:
|
||||||
|
1) legacy 8-byte:
|
||||||
|
старт: 0xFFFF,0xFFFF,0xFFFF,(ch<<8)|0x0A
|
||||||
|
точка: step,value_hi16,value_lo16,(ch<<8)|0x0A
|
||||||
|
2) log-detector:
|
||||||
|
старт: 0xFFFF x5, (ch<<8)|0x0A
|
||||||
|
точка: step, avg1, avg2, (ch<<8)|0x0A,
|
||||||
|
где avg1/avg2 кодируются фиксированной шириной в 16-bit словах:
|
||||||
|
- 2 слова (int32) или
|
||||||
|
- 8 слов (int128).
|
||||||
|
"""
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self._buf = bytearray()
|
self._buf = bytearray()
|
||||||
@ -31,6 +74,49 @@ class BinaryRecordStreamParser:
|
|||||||
self.start_count: int = 0
|
self.start_count: int = 0
|
||||||
self.point_count: int = 0
|
self.point_count: int = 0
|
||||||
self.desync_count: int = 0
|
self.desync_count: int = 0
|
||||||
|
self._log_pair_words: Optional[int] = None
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _u16_at(buf: bytearray, offset: int) -> int:
|
||||||
|
return int(buf[offset]) | (int(buf[offset + 1]) << 8)
|
||||||
|
|
||||||
|
def _try_parse_log_start(self, buf: bytearray) -> Optional[Tuple[int, int]]:
|
||||||
|
rec_bytes = 12 # 6 слов: FFFF x5 + terminator
|
||||||
|
if len(buf) < rec_bytes:
|
||||||
|
return None
|
||||||
|
for wi in range(5):
|
||||||
|
if self._u16_at(buf, wi * 2) != 0xFFFF:
|
||||||
|
return None
|
||||||
|
term = self._u16_at(buf, 10)
|
||||||
|
if (term & 0x00FF) != 0x000A:
|
||||||
|
return None
|
||||||
|
ch = int((term >> 8) & 0x00FF)
|
||||||
|
return ch, rec_bytes
|
||||||
|
|
||||||
|
def _try_parse_log_point(self, buf: bytearray, pair_words: int) -> Optional[Tuple[int, int, float, int]]:
|
||||||
|
if pair_words <= 0:
|
||||||
|
return None
|
||||||
|
rec_words = 2 + 2 * int(pair_words)
|
||||||
|
rec_bytes = 2 * rec_words
|
||||||
|
if len(buf) < rec_bytes:
|
||||||
|
return None
|
||||||
|
|
||||||
|
step = self._u16_at(buf, 0)
|
||||||
|
if step == 0xFFFF:
|
||||||
|
return None
|
||||||
|
|
||||||
|
term_off = rec_bytes - 2
|
||||||
|
term = self._u16_at(buf, term_off)
|
||||||
|
if (term & 0x00FF) != 0x000A:
|
||||||
|
return None
|
||||||
|
|
||||||
|
a1_words = [self._u16_at(buf, 2 + 2 * i) for i in range(pair_words)]
|
||||||
|
a2_words = [self._u16_at(buf, 2 + 2 * (pair_words + i)) for i in range(pair_words)]
|
||||||
|
avg_1 = words_be_to_i(a1_words)
|
||||||
|
avg_2 = words_be_to_i(a2_words)
|
||||||
|
y_val = _log_pair_to_linear(avg_1, avg_2)
|
||||||
|
ch = int((term >> 8) & 0x00FF)
|
||||||
|
return ch, int(step), float(y_val), rec_bytes
|
||||||
|
|
||||||
def feed(self, data: bytes) -> List[BinaryEvent]:
|
def feed(self, data: bytes) -> List[BinaryEvent]:
|
||||||
if data:
|
if data:
|
||||||
@ -39,22 +125,57 @@ class BinaryRecordStreamParser:
|
|||||||
buf = self._buf
|
buf = self._buf
|
||||||
|
|
||||||
while len(buf) >= 8:
|
while len(buf) >= 8:
|
||||||
w0 = int(buf[0]) | (int(buf[1]) << 8)
|
# 1) log-detector start (12-byte): FFFF x5 + (ch<<8)|0x0A
|
||||||
w1 = int(buf[2]) | (int(buf[3]) << 8)
|
parsed_log_start = self._try_parse_log_start(buf)
|
||||||
w2 = int(buf[4]) | (int(buf[5]) << 8)
|
if parsed_log_start is not None:
|
||||||
|
ch, consumed = parsed_log_start
|
||||||
|
events.append(("start", ch))
|
||||||
|
del buf[:consumed]
|
||||||
|
self.bytes_consumed += consumed
|
||||||
|
self.start_count += 1
|
||||||
|
# Ширину пары (32/128) определим на ближайшей точке.
|
||||||
|
self._log_pair_words = None
|
||||||
|
continue
|
||||||
|
|
||||||
|
# 2) log-detector point:
|
||||||
|
# сперва в уже известной ширине пары, иначе авто-детект 128/32.
|
||||||
|
# В авто-режиме сначала пробуем 32-bit пару (наиболее частый формат),
|
||||||
|
# затем 128-bit. Это снижает риск ложного совпадения 128-bit длины на 32-bit потоке.
|
||||||
|
pair_candidates = [self._log_pair_words] if self._log_pair_words in (2, 8) else [2, 8]
|
||||||
|
parsed_log_point: Optional[Tuple[int, int, float, int]] = None
|
||||||
|
for pair_words in pair_candidates:
|
||||||
|
if pair_words is None:
|
||||||
|
continue
|
||||||
|
parsed_log_point = self._try_parse_log_point(buf, int(pair_words))
|
||||||
|
if parsed_log_point is not None:
|
||||||
|
self._log_pair_words = int(pair_words)
|
||||||
|
break
|
||||||
|
if parsed_log_point is not None:
|
||||||
|
ch, step, y_val, consumed = parsed_log_point
|
||||||
|
events.append(("point", ch, step, y_val))
|
||||||
|
del buf[:consumed]
|
||||||
|
self.bytes_consumed += consumed
|
||||||
|
self.point_count += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
# 3) legacy 8-byte start / point.
|
||||||
|
w0 = self._u16_at(buf, 0)
|
||||||
|
w1 = self._u16_at(buf, 2)
|
||||||
|
w2 = self._u16_at(buf, 4)
|
||||||
if w0 == 0xFFFF and w1 == 0xFFFF and w2 == 0xFFFF and buf[6] == 0x0A:
|
if w0 == 0xFFFF and w1 == 0xFFFF and w2 == 0xFFFF and buf[6] == 0x0A:
|
||||||
ch = int(buf[7])
|
ch = int(buf[7])
|
||||||
events.append(("start", ch))
|
events.append(("start", ch))
|
||||||
del buf[:8]
|
del buf[:8]
|
||||||
self.bytes_consumed += 8
|
self.bytes_consumed += 8
|
||||||
self.start_count += 1
|
self.start_count += 1
|
||||||
|
# legacy не использует пару avg1/avg2.
|
||||||
|
self._log_pair_words = None
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if buf[6] == 0x0A:
|
if buf[6] == 0x0A:
|
||||||
ch = int(buf[7])
|
ch = int(buf[7])
|
||||||
value_u32 = (w1 << 16) | w2
|
value_u32 = (w1 << 16) | w2
|
||||||
events.append(("point", ch, int(w0), u32_to_i32(value_u32)))
|
events.append(("point", ch, int(w0), float(u32_to_i32(value_u32))))
|
||||||
del buf[:8]
|
del buf[:8]
|
||||||
self.bytes_consumed += 8
|
self.bytes_consumed += 8
|
||||||
self.point_count += 1
|
self.point_count += 1
|
||||||
@ -88,7 +209,7 @@ class SweepAssembler:
|
|||||||
self._n_valid_hist = deque()
|
self._n_valid_hist = deque()
|
||||||
|
|
||||||
self._xs: list[int] = []
|
self._xs: list[int] = []
|
||||||
self._ys: list[int] = []
|
self._ys: list[float] = []
|
||||||
self._cur_channel: Optional[int] = None
|
self._cur_channel: Optional[int] = None
|
||||||
self._cur_channels: set[int] = set()
|
self._cur_channels: set[int] = set()
|
||||||
|
|
||||||
@ -98,12 +219,12 @@ class SweepAssembler:
|
|||||||
self._cur_channel = None
|
self._cur_channel = None
|
||||||
self._cur_channels.clear()
|
self._cur_channels.clear()
|
||||||
|
|
||||||
def add_point(self, ch: int, x: int, y: int):
|
def add_point(self, ch: int, x: int, y: float):
|
||||||
if self._cur_channel is None:
|
if self._cur_channel is None:
|
||||||
self._cur_channel = int(ch)
|
self._cur_channel = int(ch)
|
||||||
self._cur_channels.add(int(ch))
|
self._cur_channels.add(int(ch))
|
||||||
self._xs.append(int(x))
|
self._xs.append(int(x))
|
||||||
self._ys.append(int(y))
|
self._ys.append(float(y))
|
||||||
|
|
||||||
def start_new_sweep(self, ch: int, now_ts: Optional[float] = None) -> Optional[SweepPacket]:
|
def start_new_sweep(self, ch: int, now_ts: Optional[float] = None) -> Optional[SweepPacket]:
|
||||||
packet = self.finalize_current(now_ts=now_ts)
|
packet = self.finalize_current(now_ts=now_ts)
|
||||||
@ -122,13 +243,13 @@ class SweepAssembler:
|
|||||||
return out
|
return out
|
||||||
# point
|
# point
|
||||||
_tag, ch, x, y = event # type: ignore[misc]
|
_tag, ch, x, y = event # type: ignore[misc]
|
||||||
self.add_point(int(ch), int(x), int(y))
|
self.add_point(int(ch), int(x), float(y))
|
||||||
return out
|
return out
|
||||||
|
|
||||||
def finalize_arrays(
|
def finalize_arrays(
|
||||||
self,
|
self,
|
||||||
xs: Sequence[int],
|
xs: Sequence[int],
|
||||||
ys: Sequence[int],
|
ys: Sequence[float],
|
||||||
channels: Optional[Set[int]],
|
channels: Optional[Set[int]],
|
||||||
now_ts: Optional[float] = None,
|
now_ts: Optional[float] = None,
|
||||||
) -> Optional[SweepPacket]:
|
) -> Optional[SweepPacket]:
|
||||||
|
|||||||
@ -151,20 +151,17 @@ class SweepReader(threading.Thread):
|
|||||||
|
|
||||||
def _run_binary_stream(self, chunk_reader: SerialChunkReader):
|
def _run_binary_stream(self, chunk_reader: SerialChunkReader):
|
||||||
xs: list[int] = []
|
xs: list[int] = []
|
||||||
ys: list[int] = []
|
ys: list[float] = []
|
||||||
cur_channel: Optional[int] = None
|
cur_channel: Optional[int] = None
|
||||||
cur_channels: set[int] = set()
|
cur_channels: set[int] = set()
|
||||||
parser = BinaryRecordStreamParser()
|
parser = BinaryRecordStreamParser()
|
||||||
|
|
||||||
# Бинарный протокол (4 слова LE u16 = 8 байт на запись):
|
# Поддерживаются оба wire-формата:
|
||||||
# старт свипа: 0xFFFF, 0xFFFF, 0xFFFF, (ch<<8)|0x0A
|
# 1) legacy: 8-byte записи (start/point с одним int32 значением).
|
||||||
# Байты на проводе: ff ff ff ff ff ff 0a [ch]
|
# 2) log-detector: start = FFFF x5 + (ch<<8)|0x0A,
|
||||||
# ch=0 → последнее слово=0x000A; ch=1 → 0x010A; и т.д.
|
# point = step + (avg1, avg2), где avg1/avg2 имеют ширину 32-bit или 128-bit.
|
||||||
# точка данных: step_u16, value_hi_u16, value_lo_u16, (ch<<8)|0x0A
|
# Для point парсер сразу преобразует (avg1, avg2) в линейную амплитуду y.
|
||||||
# Байты на проводе: [step_lo step_hi] [hi_lo hi_hi] [lo_lo lo_hi] 0a [ch]
|
# В обоих режимах при десинхронизации parser.feed() сдвигается на 1 байт.
|
||||||
# value_i32 = sign_extend((value_hi<<16)|value_lo)
|
|
||||||
# Признак записи: байт 6 == 0x0A, байт 7 — номер канала.
|
|
||||||
# При десинхронизации сдвигаемся на 1 БАЙТ (не слово) для самосинхронизации.
|
|
||||||
|
|
||||||
_dbg_byte_count = 0
|
_dbg_byte_count = 0
|
||||||
_dbg_desync_count = 0
|
_dbg_desync_count = 0
|
||||||
@ -196,13 +193,13 @@ class SweepReader(threading.Thread):
|
|||||||
_tag, ch_from_term, step, value_i32 = ev # type: ignore[misc]
|
_tag, ch_from_term, step, value_i32 = ev # type: ignore[misc]
|
||||||
if cur_channel is None:
|
if cur_channel is None:
|
||||||
cur_channel = int(ch_from_term)
|
cur_channel = int(ch_from_term)
|
||||||
cur_channels.add(int(cur_channel))
|
cur_channels.add(int(ch_from_term))
|
||||||
xs.append(int(step))
|
xs.append(int(step))
|
||||||
ys.append(int(value_i32))
|
ys.append(float(value_i32))
|
||||||
_dbg_point_count += 1
|
_dbg_point_count += 1
|
||||||
if self._debug and _dbg_point_count <= 3:
|
if self._debug and _dbg_point_count <= 3:
|
||||||
sys.stderr.write(
|
sys.stderr.write(
|
||||||
f"[debug] BIN точка: step={int(step)} ch={int(ch_from_term)} → value={int(value_i32)}\n"
|
f"[debug] BIN точка: step={int(step)} ch={int(ch_from_term)} → value={float(value_i32):.3f}\n"
|
||||||
)
|
)
|
||||||
|
|
||||||
_dbg_byte_count = parser.bytes_consumed
|
_dbg_byte_count = parser.bytes_consumed
|
||||||
|
|||||||
@ -11,10 +11,7 @@ from rfg_adc_plotter.constants import (
|
|||||||
WF_WIDTH,
|
WF_WIDTH,
|
||||||
)
|
)
|
||||||
from rfg_adc_plotter.processing.fourier import (
|
from rfg_adc_plotter.processing.fourier import (
|
||||||
build_frequency_axis_hz,
|
|
||||||
compute_ifft_profile_from_sweep,
|
compute_ifft_profile_from_sweep,
|
||||||
perform_ifft_depth_response,
|
|
||||||
reconstruct_complex_spectrum_from_real_trace,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@ -43,17 +40,6 @@ class RingBuffer:
|
|||||||
self.y_max_fft: Optional[float] = None
|
self.y_max_fft: Optional[float] = None
|
||||||
# FFT последнего свипа (для отображения без повторного вычисления)
|
# FFT последнего свипа (для отображения без повторного вычисления)
|
||||||
self.last_fft_vals: Optional[np.ndarray] = None
|
self.last_fft_vals: Optional[np.ndarray] = None
|
||||||
# FFT-профили по третям входного частотного диапазона (для line-plot).
|
|
||||||
self.last_fft_third_axes_m: tuple[Optional[np.ndarray], Optional[np.ndarray], Optional[np.ndarray]] = (
|
|
||||||
None,
|
|
||||||
None,
|
|
||||||
None,
|
|
||||||
)
|
|
||||||
self.last_fft_third_vals: tuple[Optional[np.ndarray], Optional[np.ndarray], Optional[np.ndarray]] = (
|
|
||||||
None,
|
|
||||||
None,
|
|
||||||
None,
|
|
||||||
)
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def is_ready(self) -> bool:
|
def is_ready(self) -> bool:
|
||||||
@ -80,8 +66,6 @@ class RingBuffer:
|
|||||||
self.fft_depth_axis_m = None
|
self.fft_depth_axis_m = None
|
||||||
self.fft_bins = 0
|
self.fft_bins = 0
|
||||||
self.last_fft_vals = None
|
self.last_fft_vals = None
|
||||||
self.last_fft_third_axes_m = (None, None, None)
|
|
||||||
self.last_fft_third_vals = (None, None, None)
|
|
||||||
self.y_min_fft = None
|
self.y_min_fft = None
|
||||||
self.y_max_fft = None
|
self.y_max_fft = None
|
||||||
return True
|
return True
|
||||||
@ -112,11 +96,6 @@ class RingBuffer:
|
|||||||
self._push_fft(s)
|
self._push_fft(s)
|
||||||
|
|
||||||
def _push_fft(self, s: np.ndarray):
|
def _push_fft(self, s: np.ndarray):
|
||||||
empty_thirds = (
|
|
||||||
np.zeros((0,), dtype=np.float32),
|
|
||||||
np.zeros((0,), dtype=np.float32),
|
|
||||||
np.zeros((0,), dtype=np.float32),
|
|
||||||
)
|
|
||||||
depth_axis_m, fft_row = compute_ifft_profile_from_sweep(
|
depth_axis_m, fft_row = compute_ifft_profile_from_sweep(
|
||||||
s,
|
s,
|
||||||
complex_mode=self.fft_complex_mode,
|
complex_mode=self.fft_complex_mode,
|
||||||
@ -126,21 +105,13 @@ class RingBuffer:
|
|||||||
|
|
||||||
n = min(int(fft_row.size), int(depth_axis_m.size))
|
n = min(int(fft_row.size), int(depth_axis_m.size))
|
||||||
if n <= 0:
|
if n <= 0:
|
||||||
self.last_fft_third_axes_m = empty_thirds
|
self.last_fft_vals = None
|
||||||
self.last_fft_third_vals = empty_thirds
|
|
||||||
return
|
return
|
||||||
if n != fft_row.size:
|
if n != fft_row.size:
|
||||||
fft_row = fft_row[:n]
|
fft_row = fft_row[:n]
|
||||||
if n != depth_axis_m.size:
|
if n != depth_axis_m.size:
|
||||||
depth_axis_m = depth_axis_m[:n]
|
depth_axis_m = depth_axis_m[:n]
|
||||||
|
|
||||||
# Для отображения храним только первую половину IFFT-профиля:
|
|
||||||
# вторая половина для текущей схемы симметрична и визуально избыточна.
|
|
||||||
n_keep = max(1, (n + 1) // 2)
|
|
||||||
fft_row = fft_row[:n_keep]
|
|
||||||
depth_axis_m = depth_axis_m[:n_keep]
|
|
||||||
n = n_keep
|
|
||||||
|
|
||||||
needs_reset = (
|
needs_reset = (
|
||||||
self.ring_fft is None
|
self.ring_fft is None
|
||||||
or self.fft_depth_axis_m is None
|
or self.fft_depth_axis_m is None
|
||||||
@ -169,7 +140,6 @@ class RingBuffer:
|
|||||||
prev_head = (self.head - 1) % self.ring_fft.shape[0]
|
prev_head = (self.head - 1) % self.ring_fft.shape[0]
|
||||||
self.ring_fft[prev_head, :] = fft_row
|
self.ring_fft[prev_head, :] = fft_row
|
||||||
self.last_fft_vals = fft_row
|
self.last_fft_vals = fft_row
|
||||||
self.last_fft_third_axes_m, self.last_fft_third_vals = self._compute_fft_thirds(s)
|
|
||||||
|
|
||||||
fr_min = np.nanmin(fft_row)
|
fr_min = np.nanmin(fft_row)
|
||||||
fr_max = float(np.nanpercentile(fft_row, 90))
|
fr_max = float(np.nanpercentile(fft_row, 90))
|
||||||
@ -178,65 +148,6 @@ class RingBuffer:
|
|||||||
if self.y_max_fft is None or (not np.isnan(fr_max) and fr_max > self.y_max_fft):
|
if self.y_max_fft is None or (not np.isnan(fr_max) and fr_max > self.y_max_fft):
|
||||||
self.y_max_fft = float(fr_max)
|
self.y_max_fft = float(fr_max)
|
||||||
|
|
||||||
def _compute_fft_thirds(
|
|
||||||
self, s: np.ndarray
|
|
||||||
) -> tuple[tuple[np.ndarray, np.ndarray, np.ndarray], tuple[np.ndarray, np.ndarray, np.ndarray]]:
|
|
||||||
sweep = np.asarray(s, dtype=np.float64).ravel()
|
|
||||||
total = int(sweep.size)
|
|
||||||
|
|
||||||
def _empty() -> np.ndarray:
|
|
||||||
return np.zeros((0,), dtype=np.float32)
|
|
||||||
|
|
||||||
if total <= 0:
|
|
||||||
return (_empty(), _empty(), _empty()), (_empty(), _empty(), _empty())
|
|
||||||
|
|
||||||
freq_hz = build_frequency_axis_hz(total)
|
|
||||||
edges = np.linspace(0, total, 4, dtype=np.int64)
|
|
||||||
|
|
||||||
axes: list[np.ndarray] = []
|
|
||||||
vals: list[np.ndarray] = []
|
|
||||||
|
|
||||||
for idx in range(3):
|
|
||||||
i0 = int(edges[idx])
|
|
||||||
i1 = int(edges[idx + 1])
|
|
||||||
if i1 - i0 < 2:
|
|
||||||
axes.append(_empty())
|
|
||||||
vals.append(_empty())
|
|
||||||
continue
|
|
||||||
|
|
||||||
seg = sweep[i0:i1]
|
|
||||||
seg_freq = freq_hz[i0:i1]
|
|
||||||
seg_complex = reconstruct_complex_spectrum_from_real_trace(
|
|
||||||
seg,
|
|
||||||
complex_mode=self.fft_complex_mode,
|
|
||||||
)
|
|
||||||
depth_m, seg_fft = perform_ifft_depth_response(seg_complex, seg_freq, axis="abs")
|
|
||||||
|
|
||||||
depth_m = np.asarray(depth_m, dtype=np.float32).ravel()
|
|
||||||
seg_fft = np.asarray(seg_fft, dtype=np.float32).ravel()
|
|
||||||
n = min(int(depth_m.size), int(seg_fft.size))
|
|
||||||
if n <= 0:
|
|
||||||
axes.append(_empty())
|
|
||||||
vals.append(_empty())
|
|
||||||
continue
|
|
||||||
|
|
||||||
depth_m = depth_m[:n]
|
|
||||||
seg_fft = seg_fft[:n]
|
|
||||||
|
|
||||||
n_keep = max(1, (n + 1) // 2)
|
|
||||||
axes.append(depth_m[:n_keep])
|
|
||||||
vals.append(seg_fft[:n_keep])
|
|
||||||
|
|
||||||
return (
|
|
||||||
axes[0],
|
|
||||||
axes[1],
|
|
||||||
axes[2],
|
|
||||||
), (
|
|
||||||
vals[0],
|
|
||||||
vals[1],
|
|
||||||
vals[2],
|
|
||||||
)
|
|
||||||
|
|
||||||
def get_display_ring(self) -> np.ndarray:
|
def get_display_ring(self) -> np.ndarray:
|
||||||
"""Кольцо в порядке от старого к новому, ось времени по X. Форма: (width, time)."""
|
"""Кольцо в порядке от старого к новому, ось времени по X. Форма: (width, time)."""
|
||||||
if self.ring is None:
|
if self.ring is None:
|
||||||
|
|||||||
@ -13,11 +13,13 @@ from rfg_adc_plotter.processing.pipeline import SweepPreprocessor
|
|||||||
ROOT = Path(__file__).resolve().parents[1]
|
ROOT = Path(__file__).resolve().parents[1]
|
||||||
SAMPLE_BG = ROOT / "sample_data" / "empty"
|
SAMPLE_BG = ROOT / "sample_data" / "empty"
|
||||||
SAMPLE_CALIB = ROOT / "sample_data" / "no_antennas_35dB_attenuators"
|
SAMPLE_CALIB = ROOT / "sample_data" / "no_antennas_35dB_attenuators"
|
||||||
|
SAMPLE_NEW_FMT = ROOT / "sample_data" / "new_format" / "attenuators_50dB"
|
||||||
|
|
||||||
|
|
||||||
def test_detect_reference_file_format_for_sample_capture():
|
def test_detect_reference_file_format_for_sample_capture():
|
||||||
assert detect_reference_file_format(str(SAMPLE_BG)) == "bin_capture"
|
assert detect_reference_file_format(str(SAMPLE_BG)) == "bin_capture"
|
||||||
assert detect_reference_file_format(str(SAMPLE_CALIB)) == "bin_capture"
|
assert detect_reference_file_format(str(SAMPLE_CALIB)) == "bin_capture"
|
||||||
|
assert detect_reference_file_format(str(SAMPLE_NEW_FMT)) == "bin_capture"
|
||||||
|
|
||||||
|
|
||||||
def test_load_capture_sweeps_parses_binary_capture():
|
def test_load_capture_sweeps_parses_binary_capture():
|
||||||
@ -33,6 +35,22 @@ def test_load_capture_sweeps_parses_binary_capture():
|
|||||||
assert channels == {0}
|
assert channels == {0}
|
||||||
|
|
||||||
|
|
||||||
|
def test_load_capture_sweeps_parses_new_format_logdetector_capture():
|
||||||
|
sweeps = load_capture_sweeps(str(SAMPLE_NEW_FMT), fancy=False, logscale=False)
|
||||||
|
assert len(sweeps) > 900
|
||||||
|
|
||||||
|
widths = [int(s.size) for s, _info in sweeps]
|
||||||
|
dominant_width = max(set(widths), key=widths.count)
|
||||||
|
# Должно совпадать с ожидаемой шириной свипа из штатных capture.
|
||||||
|
assert dominant_width in (758, 759)
|
||||||
|
|
||||||
|
channels = set()
|
||||||
|
for _s, info in sweeps:
|
||||||
|
chs = info.get("chs", [info.get("ch", 0)])
|
||||||
|
channels.update(int(v) for v in chs)
|
||||||
|
assert channels == {0}
|
||||||
|
|
||||||
|
|
||||||
def test_aggregate_capture_reference_filters_incomplete_sweeps():
|
def test_aggregate_capture_reference_filters_incomplete_sweeps():
|
||||||
sweeps = load_capture_sweeps(str(SAMPLE_BG), fancy=False, logscale=False)
|
sweeps = load_capture_sweeps(str(SAMPLE_BG), fancy=False, logscale=False)
|
||||||
vector, summary = aggregate_capture_reference(sweeps, channel=0, method="median", path=str(SAMPLE_BG))
|
vector, summary = aggregate_capture_reference(sweeps, channel=0, method="median", path=str(SAMPLE_BG))
|
||||||
|
|||||||
@ -1,5 +1,6 @@
|
|||||||
import numpy as np
|
import numpy as np
|
||||||
|
|
||||||
|
from rfg_adc_plotter.processing.fourier import compute_ifft_profile_from_sweep
|
||||||
from rfg_adc_plotter.state.ring_buffer import RingBuffer
|
from rfg_adc_plotter.state.ring_buffer import RingBuffer
|
||||||
|
|
||||||
|
|
||||||
@ -8,6 +9,7 @@ def test_ring_buffer_allocates_fft_buffers_from_first_push():
|
|||||||
ring.ensure_init(64)
|
ring.ensure_init(64)
|
||||||
|
|
||||||
sweep = np.linspace(-1.0, 1.0, 64, dtype=np.float32)
|
sweep = np.linspace(-1.0, 1.0, 64, dtype=np.float32)
|
||||||
|
depth_expected, vals_expected = compute_ifft_profile_from_sweep(sweep, complex_mode="arccos")
|
||||||
ring.push(sweep)
|
ring.push(sweep)
|
||||||
|
|
||||||
assert ring.ring_fft is not None
|
assert ring.ring_fft is not None
|
||||||
@ -16,14 +18,7 @@ def test_ring_buffer_allocates_fft_buffers_from_first_push():
|
|||||||
assert ring.fft_bins == ring.ring_fft.shape[1]
|
assert ring.fft_bins == ring.ring_fft.shape[1]
|
||||||
assert ring.fft_bins == ring.fft_depth_axis_m.size
|
assert ring.fft_bins == ring.fft_depth_axis_m.size
|
||||||
assert ring.fft_bins == ring.last_fft_vals.size
|
assert ring.fft_bins == ring.last_fft_vals.size
|
||||||
assert ring.last_fft_third_axes_m != (None, None, None)
|
assert ring.fft_bins == min(depth_expected.size, vals_expected.size)
|
||||||
assert ring.last_fft_third_vals != (None, None, None)
|
|
||||||
for axis, vals in zip(ring.last_fft_third_axes_m, ring.last_fft_third_vals):
|
|
||||||
assert axis is not None
|
|
||||||
assert vals is not None
|
|
||||||
assert axis.dtype == np.float32
|
|
||||||
assert vals.dtype == np.float32
|
|
||||||
assert axis.size == vals.size
|
|
||||||
# Legacy alias kept for compatibility with existing GUI code paths.
|
# Legacy alias kept for compatibility with existing GUI code paths.
|
||||||
assert ring.fft_time_axis is ring.fft_depth_axis_m
|
assert ring.fft_time_axis is ring.fft_depth_axis_m
|
||||||
|
|
||||||
@ -56,8 +51,6 @@ def test_ring_buffer_mode_switch_resets_fft_buffers_only():
|
|||||||
assert ring.ring is not None
|
assert ring.ring is not None
|
||||||
assert ring.ring_fft is not None
|
assert ring.ring_fft is not None
|
||||||
raw_before = ring.ring.copy()
|
raw_before = ring.ring.copy()
|
||||||
assert ring.last_fft_third_axes_m != (None, None, None)
|
|
||||||
assert ring.last_fft_third_vals != (None, None, None)
|
|
||||||
|
|
||||||
changed = ring.set_fft_complex_mode("diff")
|
changed = ring.set_fft_complex_mode("diff")
|
||||||
assert changed is True
|
assert changed is True
|
||||||
@ -67,35 +60,22 @@ def test_ring_buffer_mode_switch_resets_fft_buffers_only():
|
|||||||
assert ring.ring_fft is None
|
assert ring.ring_fft is None
|
||||||
assert ring.fft_depth_axis_m is None
|
assert ring.fft_depth_axis_m is None
|
||||||
assert ring.last_fft_vals is None
|
assert ring.last_fft_vals is None
|
||||||
assert ring.last_fft_third_axes_m == (None, None, None)
|
|
||||||
assert ring.last_fft_third_vals == (None, None, None)
|
|
||||||
assert ring.fft_bins == 0
|
assert ring.fft_bins == 0
|
||||||
|
|
||||||
ring.push(np.linspace(-1.0, 1.0, 128, dtype=np.float32))
|
ring.push(np.linspace(-1.0, 1.0, 128, dtype=np.float32))
|
||||||
assert ring.ring_fft is not None
|
assert ring.ring_fft is not None
|
||||||
assert ring.fft_depth_axis_m is not None
|
assert ring.fft_depth_axis_m is not None
|
||||||
assert ring.last_fft_vals is not None
|
assert ring.last_fft_vals is not None
|
||||||
assert ring.last_fft_third_axes_m != (None, None, None)
|
|
||||||
assert ring.last_fft_third_vals != (None, None, None)
|
|
||||||
for axis, vals in zip(ring.last_fft_third_axes_m, ring.last_fft_third_vals):
|
|
||||||
assert axis is not None
|
|
||||||
assert vals is not None
|
|
||||||
assert axis.dtype == np.float32
|
|
||||||
assert vals.dtype == np.float32
|
|
||||||
assert axis.size == vals.size
|
|
||||||
|
|
||||||
|
|
||||||
def test_ring_buffer_short_sweeps_keep_third_profiles_well_formed():
|
def test_ring_buffer_short_sweeps_keep_fft_profile_well_formed():
|
||||||
for n in (1, 2, 3):
|
for n in (1, 2, 3):
|
||||||
ring = RingBuffer(max_sweeps=4)
|
ring = RingBuffer(max_sweeps=4)
|
||||||
ring.ensure_init(n)
|
ring.ensure_init(n)
|
||||||
ring.push(np.linspace(-1.0, 1.0, n, dtype=np.float32))
|
ring.push(np.linspace(-1.0, 1.0, n, dtype=np.float32))
|
||||||
|
|
||||||
assert ring.last_fft_third_axes_m != (None, None, None)
|
assert ring.fft_depth_axis_m is not None
|
||||||
assert ring.last_fft_third_vals != (None, None, None)
|
assert ring.last_fft_vals is not None
|
||||||
for axis, vals in zip(ring.last_fft_third_axes_m, ring.last_fft_third_vals):
|
assert ring.fft_depth_axis_m.dtype == np.float32
|
||||||
assert axis is not None
|
assert ring.last_fft_vals.dtype == np.float32
|
||||||
assert vals is not None
|
assert ring.fft_depth_axis_m.size == ring.last_fft_vals.size
|
||||||
assert axis.dtype == np.float32
|
|
||||||
assert vals.dtype == np.float32
|
|
||||||
assert axis.size == vals.size
|
|
||||||
|
|||||||
110
tests/test_sweep_parser_core_binary_protocols.py
Normal file
110
tests/test_sweep_parser_core_binary_protocols.py
Normal file
@ -0,0 +1,110 @@
|
|||||||
|
import math
|
||||||
|
|
||||||
|
from rfg_adc_plotter.io.sweep_parser_core import BinaryRecordStreamParser
|
||||||
|
|
||||||
|
|
||||||
|
def _u16le(word: int) -> bytes:
|
||||||
|
w = int(word) & 0xFFFF
|
||||||
|
return bytes((w & 0xFF, (w >> 8) & 0xFF))
|
||||||
|
|
||||||
|
|
||||||
|
def _pack_signed_words_be(value: int, words: int) -> list[int]:
|
||||||
|
bits = 16 * int(words)
|
||||||
|
v = int(value)
|
||||||
|
if v < 0:
|
||||||
|
v = (1 << bits) + v
|
||||||
|
out: list[int] = []
|
||||||
|
for i in range(words):
|
||||||
|
shift = (words - 1 - i) * 16
|
||||||
|
out.append((v >> shift) & 0xFFFF)
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
def _pack_legacy_start(ch: int) -> bytes:
|
||||||
|
return b"\xff\xff" * 3 + bytes((0x0A, int(ch) & 0xFF))
|
||||||
|
|
||||||
|
|
||||||
|
def _pack_legacy_point(ch: int, step: int, value_i32: int) -> bytes:
|
||||||
|
v = int(value_i32) & 0xFFFF_FFFF
|
||||||
|
return b"".join(
|
||||||
|
[
|
||||||
|
_u16le(step),
|
||||||
|
_u16le((v >> 16) & 0xFFFF),
|
||||||
|
_u16le(v & 0xFFFF),
|
||||||
|
bytes((0x0A, int(ch) & 0xFF)),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _pack_log_start(ch: int) -> bytes:
|
||||||
|
return b"\xff\xff" * 5 + bytes((0x0A, int(ch) & 0xFF))
|
||||||
|
|
||||||
|
|
||||||
|
def _pack_log_point(step: int, avg1: int, avg2: int, pair_words: int, ch: int = 0) -> bytes:
|
||||||
|
words = [int(step) & 0xFFFF]
|
||||||
|
words.extend(_pack_signed_words_be(avg1, pair_words))
|
||||||
|
words.extend(_pack_signed_words_be(avg2, pair_words))
|
||||||
|
words.append(((int(ch) & 0xFF) << 8) | 0x000A)
|
||||||
|
return b"".join(_u16le(w) for w in words)
|
||||||
|
|
||||||
|
|
||||||
|
def _log_pair_to_linear(avg1: int, avg2: int) -> float:
|
||||||
|
exp1 = max(-300.0, min(300.0, float(avg1) * 0.001))
|
||||||
|
exp2 = max(-300.0, min(300.0, float(avg2) * 0.001))
|
||||||
|
return (math.pow(10.0, exp1) - math.pow(10.0, exp2)) * 1000.0
|
||||||
|
|
||||||
|
|
||||||
|
def test_binary_parser_parses_legacy_8_byte_records():
|
||||||
|
parser = BinaryRecordStreamParser()
|
||||||
|
stream = b"".join(
|
||||||
|
[
|
||||||
|
_pack_legacy_start(3),
|
||||||
|
_pack_legacy_point(3, 1, -2),
|
||||||
|
_pack_legacy_point(3, 2, 123456),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
events = []
|
||||||
|
events.extend(parser.feed(stream[:5]))
|
||||||
|
events.extend(parser.feed(stream[5:17]))
|
||||||
|
events.extend(parser.feed(stream[17:]))
|
||||||
|
|
||||||
|
assert events[0] == ("start", 3)
|
||||||
|
assert events[1] == ("point", 3, 1, -2.0)
|
||||||
|
assert events[2] == ("point", 3, 2, 123456.0)
|
||||||
|
|
||||||
|
|
||||||
|
def test_binary_parser_parses_logdetector_32bit_pair_records():
|
||||||
|
parser = BinaryRecordStreamParser()
|
||||||
|
stream = b"".join(
|
||||||
|
[
|
||||||
|
_pack_log_start(0),
|
||||||
|
_pack_log_point(1, 1500, 700, pair_words=2, ch=0),
|
||||||
|
_pack_log_point(2, 1510, 710, pair_words=2, ch=0),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
events = parser.feed(stream)
|
||||||
|
assert events[0] == ("start", 0)
|
||||||
|
assert events[1][0:3] == ("point", 0, 1)
|
||||||
|
assert events[2][0:3] == ("point", 0, 2)
|
||||||
|
assert abs(float(events[1][3]) - _log_pair_to_linear(1500, 700)) < 1e-6
|
||||||
|
assert abs(float(events[2][3]) - _log_pair_to_linear(1510, 710)) < 1e-6
|
||||||
|
|
||||||
|
|
||||||
|
def test_binary_parser_parses_logdetector_128bit_pair_records():
|
||||||
|
parser = BinaryRecordStreamParser()
|
||||||
|
stream = b"".join(
|
||||||
|
[
|
||||||
|
_pack_log_start(5),
|
||||||
|
_pack_log_point(7, 1600, 800, pair_words=8, ch=5),
|
||||||
|
_pack_log_point(8, 1610, 810, pair_words=8, ch=5),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
events = parser.feed(stream)
|
||||||
|
assert events[0] == ("start", 5)
|
||||||
|
assert events[1][0:3] == ("point", 5, 7)
|
||||||
|
assert events[2][0:3] == ("point", 5, 8)
|
||||||
|
assert abs(float(events[1][3]) - _log_pair_to_linear(1600, 800)) < 1e-6
|
||||||
|
assert abs(float(events[2][3]) - _log_pair_to_linear(1610, 810)) < 1e-6
|
||||||
Reference in New Issue
Block a user