1 Commits

Author SHA1 Message Date
awe
26c3dd7ad5 something working new format 2026-03-05 17:56:27 +03:00
10 changed files with 320 additions and 223 deletions

Binary file not shown.

View File

@ -146,9 +146,7 @@ def run_matplotlib(args):
ax_line.set_ylim(fixed_ylim)
# График спектра
fft_line_t1, = ax_fft.plot([], [], lw=1, color="tab:blue", label="1/3 (low f)")
fft_line_t2, = ax_fft.plot([], [], lw=1, color="tab:orange", label="2/3 (mid f)")
fft_line_t3, = ax_fft.plot([], [], lw=1, color="tab:green", label="3/3 (high f)")
fft_line_obj, = ax_fft.plot([], [], lw=1, color="tab:blue", label="full band")
ax_fft.set_title("FFT", pad=1)
ax_fft.set_xlabel("Глубина, м")
ax_fft.set_ylabel("Амплитуда")
@ -438,9 +436,7 @@ def run_matplotlib(args):
ring.set_fft_complex_mode(str(label))
except Exception:
pass
fft_line_t1.set_data([], [])
fft_line_t2.set_data([], [])
fft_line_t3.set_data([], [])
fft_line_obj.set_data([], [])
_refresh_status_texts()
try:
fig.canvas.draw_idle()
@ -590,31 +586,20 @@ def run_matplotlib(args):
ax_line.autoscale_view(scalex=False, scaley=True)
ax_line.set_ylabel("Y")
# Профиль по глубине: три линии для 1/3, 2/3, 3/3 частотного диапазона.
third_axes = ring.last_fft_third_axes_m
third_vals = ring.last_fft_third_vals
lines = (fft_line_t1, fft_line_t2, fft_line_t3)
xs_max = []
ys_min = []
ys_max = []
for line_fft, xs_fft, fft_vals in zip(lines, third_axes, third_vals):
if xs_fft is None or fft_vals is None:
line_fft.set_data([], [])
continue
n = min(int(xs_fft.size), int(fft_vals.size))
if n <= 0:
line_fft.set_data([], [])
continue
x_seg = xs_fft[:n]
y_seg = fft_vals[:n]
line_fft.set_data(x_seg, y_seg)
xs_max.append(float(x_seg[n - 1]))
ys_min.append(float(np.nanmin(y_seg)))
ys_max.append(float(np.nanmax(y_seg)))
if xs_max and ys_min and ys_max:
ax_fft.set_xlim(0, float(max(xs_max)))
ax_fft.set_ylim(float(min(ys_min)), float(max(ys_max)))
axis_fft = ring.fft_depth_axis_m
vals_fft = ring.last_fft_vals
if axis_fft is None or vals_fft is None:
fft_line_obj.set_data([], [])
else:
n_fft = min(int(axis_fft.size), int(vals_fft.size))
if n_fft <= 0:
fft_line_obj.set_data([], [])
else:
x_fft = axis_fft[:n_fft]
y_fft = vals_fft[:n_fft]
fft_line_obj.set_data(x_fft, y_fft)
ax_fft.set_xlim(0, float(x_fft[n_fft - 1]))
ax_fft.set_ylim(float(np.nanmin(y_fft)), float(np.nanmax(y_fft)))
# Водопад сырых данных
if changed and ring.is_ready:
@ -664,9 +649,7 @@ def run_matplotlib(args):
line_env_lo,
line_env_hi,
img_obj,
fft_line_t1,
fft_line_t2,
fft_line_t3,
fft_line_obj,
img_fft_obj,
status_text,
pipeline_text,

View File

@ -202,9 +202,7 @@ def run_pyqtgraph(args):
# FFT (слева-снизу)
p_fft = win.addPlot(row=1, col=0, title="FFT")
p_fft.showGrid(x=True, y=True, alpha=0.3)
curve_fft_t1 = p_fft.plot(pen=pg.mkPen((80, 120, 255), width=1))
curve_fft_t2 = p_fft.plot(pen=pg.mkPen((255, 140, 70), width=1))
curve_fft_t3 = p_fft.plot(pen=pg.mkPen((60, 180, 90), width=1))
curve_fft = p_fft.plot(pen=pg.mkPen((80, 120, 255), width=1))
p_fft.setLabel("bottom", "Глубина, м")
p_fft.setLabel("left", "Амплитуда")
@ -494,9 +492,7 @@ def run_pyqtgraph(args):
changed = False
if changed:
try:
curve_fft_t1.setData([], [])
curve_fft_t2.setData([], [])
curve_fft_t3.setData([], [])
curve_fft.setData([], [])
except Exception:
pass
_refresh_pipeline_label()
@ -630,31 +626,20 @@ def run_pyqtgraph(args):
p_line.enableAutoRange(axis="y", enable=True)
p_line.setLabel("left", "Y")
# Профиль по глубине: три линии для 1/3, 2/3, 3/3 частотного диапазона.
third_axes = ring.last_fft_third_axes_m
third_vals = ring.last_fft_third_vals
curves = (curve_fft_t1, curve_fft_t2, curve_fft_t3)
xs_max = []
ys_min = []
ys_max = []
for curve_fft, xs_fft, fft_vals in zip(curves, third_axes, third_vals):
if xs_fft is None or fft_vals is None:
axis_fft = ring.fft_depth_axis_m
vals_fft = ring.last_fft_vals
if axis_fft is None or vals_fft is None:
curve_fft.setData([], [])
continue
n = min(int(xs_fft.size), int(fft_vals.size))
if n <= 0:
else:
n_fft = min(int(axis_fft.size), int(vals_fft.size))
if n_fft <= 0:
curve_fft.setData([], [])
continue
x_seg = xs_fft[:n]
y_seg = fft_vals[:n]
curve_fft.setData(x_seg, y_seg)
xs_max.append(float(x_seg[n - 1]))
ys_min.append(float(np.nanmin(y_seg)))
ys_max.append(float(np.nanmax(y_seg)))
if xs_max and ys_min and ys_max:
p_fft.setXRange(0.0, float(max(xs_max)), padding=0)
p_fft.setYRange(float(min(ys_min)), float(max(ys_max)), padding=0)
else:
x_fft = axis_fft[:n_fft]
y_fft = vals_fft[:n_fft]
curve_fft.setData(x_fft, y_fft)
p_fft.setXRange(0.0, float(x_fft[n_fft - 1]), padding=0)
p_fft.setYRange(float(np.nanmin(y_fft)), float(np.nanmax(y_fft)), padding=0)
# Позиция подписи канала
try:

View File

@ -46,31 +46,23 @@ def detect_reference_file_format(path: str) -> Optional[str]:
size = os.path.getsize(p)
except Exception:
return None
if size <= 0 or (size % 8) != 0:
if size <= 0:
return None
try:
with open(p, "rb") as f:
sample = f.read(min(size, 8 * 2048))
sample = f.read(min(size, 256 * 1024))
except Exception:
return None
if len(sample) < 8:
return None
# Быстрый sniff aligned-записей: в валидных записях байт 6 == 0x0A.
recs = len(sample) // 8
if recs <= 0:
return None
marker_hits = 0
start_hits = 0
for i in range(0, recs * 8, 8):
b = sample[i : i + 8]
if b[6] == 0x0A:
marker_hits += 1
if b[:6] == b"\xff\xff\xff\xff\xff\xff":
start_hits += 1
if marker_hits >= max(4, int(recs * 0.8)) and start_hits >= 1:
# Универсальный sniff: прогоняем тем же потоковым парсером,
# который используется в realtime/capture-import.
parser = BinaryRecordStreamParser()
_ = parser.feed(sample)
if parser.start_count >= 1 and parser.point_count >= 16:
return "bin_capture"
return None

View File

@ -2,9 +2,10 @@
from __future__ import annotations
import math
from collections import deque
import time
from typing import Iterable, List, Optional, Sequence, Set, Tuple
from typing import List, Optional, Sequence, Set, Tuple
import numpy as np
@ -14,7 +15,13 @@ from rfg_adc_plotter.types import SweepInfo, SweepPacket
# Binary parser events:
# ("start", ch)
# ("point", ch, x, y)
BinaryEvent = Tuple[str, int] | Tuple[str, int, int, int]
BinaryEvent = Tuple[str, int] | Tuple[str, int, int, float]
# Параметры преобразования пары log-detector значений в линейную амплитуду.
_LOG_DETECTOR_BASE = 10.0
_LOG_DETECTOR_SCALER = 0.001
_LOG_DETECTOR_POSTSCALE = 1000.0
_LOG_DETECTOR_EXP_LIMIT = 300.0
def u32_to_i32(v: int) -> int:
@ -22,8 +29,44 @@ def u32_to_i32(v: int) -> int:
return v - 0x1_0000_0000 if (v & 0x8000_0000) else v
def u_bits_to_i(v: int, bits: int) -> int:
"""Преобразование беззнакового целого fixed-width в знаковое (two's complement)."""
if bits <= 0:
return 0
sign = 1 << (bits - 1)
full = 1 << bits
return v - full if (v & sign) else v
def words_be_to_i(words: Sequence[int]) -> int:
"""Собрать big-endian набор 16-bit слов в знаковое число."""
acc = 0
for w in words:
acc = (acc << 16) | (int(w) & 0xFFFF)
return u_bits_to_i(acc, 16 * int(len(words)))
def _log_pair_to_linear(avg_1: int, avg_2: int) -> float:
"""Разность двух логарифмических усреднений в линейной шкале."""
exp1 = max(-_LOG_DETECTOR_EXP_LIMIT, min(_LOG_DETECTOR_EXP_LIMIT, float(avg_1) * _LOG_DETECTOR_SCALER))
exp2 = max(-_LOG_DETECTOR_EXP_LIMIT, min(_LOG_DETECTOR_EXP_LIMIT, float(avg_2) * _LOG_DETECTOR_SCALER))
return (math.pow(_LOG_DETECTOR_BASE, exp1) - math.pow(_LOG_DETECTOR_BASE, exp2)) * _LOG_DETECTOR_POSTSCALE
class BinaryRecordStreamParser:
"""Инкрементальный парсер бинарных записей протокола (по 8 байт)."""
"""Инкрементальный парсер бинарных записей нескольких wire-форматов.
Поддерживаемые форматы:
1) legacy 8-byte:
старт: 0xFFFF,0xFFFF,0xFFFF,(ch<<8)|0x0A
точка: step,value_hi16,value_lo16,(ch<<8)|0x0A
2) log-detector:
старт: 0xFFFF x5, (ch<<8)|0x0A
точка: step, avg1, avg2, (ch<<8)|0x0A,
где avg1/avg2 кодируются фиксированной шириной в 16-bit словах:
- 2 слова (int32) или
- 8 слов (int128).
"""
def __init__(self):
self._buf = bytearray()
@ -31,6 +74,49 @@ class BinaryRecordStreamParser:
self.start_count: int = 0
self.point_count: int = 0
self.desync_count: int = 0
self._log_pair_words: Optional[int] = None
@staticmethod
def _u16_at(buf: bytearray, offset: int) -> int:
return int(buf[offset]) | (int(buf[offset + 1]) << 8)
def _try_parse_log_start(self, buf: bytearray) -> Optional[Tuple[int, int]]:
rec_bytes = 12 # 6 слов: FFFF x5 + terminator
if len(buf) < rec_bytes:
return None
for wi in range(5):
if self._u16_at(buf, wi * 2) != 0xFFFF:
return None
term = self._u16_at(buf, 10)
if (term & 0x00FF) != 0x000A:
return None
ch = int((term >> 8) & 0x00FF)
return ch, rec_bytes
def _try_parse_log_point(self, buf: bytearray, pair_words: int) -> Optional[Tuple[int, int, float, int]]:
if pair_words <= 0:
return None
rec_words = 2 + 2 * int(pair_words)
rec_bytes = 2 * rec_words
if len(buf) < rec_bytes:
return None
step = self._u16_at(buf, 0)
if step == 0xFFFF:
return None
term_off = rec_bytes - 2
term = self._u16_at(buf, term_off)
if (term & 0x00FF) != 0x000A:
return None
a1_words = [self._u16_at(buf, 2 + 2 * i) for i in range(pair_words)]
a2_words = [self._u16_at(buf, 2 + 2 * (pair_words + i)) for i in range(pair_words)]
avg_1 = words_be_to_i(a1_words)
avg_2 = words_be_to_i(a2_words)
y_val = _log_pair_to_linear(avg_1, avg_2)
ch = int((term >> 8) & 0x00FF)
return ch, int(step), float(y_val), rec_bytes
def feed(self, data: bytes) -> List[BinaryEvent]:
if data:
@ -39,22 +125,57 @@ class BinaryRecordStreamParser:
buf = self._buf
while len(buf) >= 8:
w0 = int(buf[0]) | (int(buf[1]) << 8)
w1 = int(buf[2]) | (int(buf[3]) << 8)
w2 = int(buf[4]) | (int(buf[5]) << 8)
# 1) log-detector start (12-byte): FFFF x5 + (ch<<8)|0x0A
parsed_log_start = self._try_parse_log_start(buf)
if parsed_log_start is not None:
ch, consumed = parsed_log_start
events.append(("start", ch))
del buf[:consumed]
self.bytes_consumed += consumed
self.start_count += 1
# Ширину пары (32/128) определим на ближайшей точке.
self._log_pair_words = None
continue
# 2) log-detector point:
# сперва в уже известной ширине пары, иначе авто-детект 128/32.
# В авто-режиме сначала пробуем 32-bit пару (наиболее частый формат),
# затем 128-bit. Это снижает риск ложного совпадения 128-bit длины на 32-bit потоке.
pair_candidates = [self._log_pair_words] if self._log_pair_words in (2, 8) else [2, 8]
parsed_log_point: Optional[Tuple[int, int, float, int]] = None
for pair_words in pair_candidates:
if pair_words is None:
continue
parsed_log_point = self._try_parse_log_point(buf, int(pair_words))
if parsed_log_point is not None:
self._log_pair_words = int(pair_words)
break
if parsed_log_point is not None:
ch, step, y_val, consumed = parsed_log_point
events.append(("point", ch, step, y_val))
del buf[:consumed]
self.bytes_consumed += consumed
self.point_count += 1
continue
# 3) legacy 8-byte start / point.
w0 = self._u16_at(buf, 0)
w1 = self._u16_at(buf, 2)
w2 = self._u16_at(buf, 4)
if w0 == 0xFFFF and w1 == 0xFFFF and w2 == 0xFFFF and buf[6] == 0x0A:
ch = int(buf[7])
events.append(("start", ch))
del buf[:8]
self.bytes_consumed += 8
self.start_count += 1
# legacy не использует пару avg1/avg2.
self._log_pair_words = None
continue
if buf[6] == 0x0A:
ch = int(buf[7])
value_u32 = (w1 << 16) | w2
events.append(("point", ch, int(w0), u32_to_i32(value_u32)))
events.append(("point", ch, int(w0), float(u32_to_i32(value_u32))))
del buf[:8]
self.bytes_consumed += 8
self.point_count += 1
@ -88,7 +209,7 @@ class SweepAssembler:
self._n_valid_hist = deque()
self._xs: list[int] = []
self._ys: list[int] = []
self._ys: list[float] = []
self._cur_channel: Optional[int] = None
self._cur_channels: set[int] = set()
@ -98,12 +219,12 @@ class SweepAssembler:
self._cur_channel = None
self._cur_channels.clear()
def add_point(self, ch: int, x: int, y: int):
def add_point(self, ch: int, x: int, y: float):
if self._cur_channel is None:
self._cur_channel = int(ch)
self._cur_channels.add(int(ch))
self._xs.append(int(x))
self._ys.append(int(y))
self._ys.append(float(y))
def start_new_sweep(self, ch: int, now_ts: Optional[float] = None) -> Optional[SweepPacket]:
packet = self.finalize_current(now_ts=now_ts)
@ -122,13 +243,13 @@ class SweepAssembler:
return out
# point
_tag, ch, x, y = event # type: ignore[misc]
self.add_point(int(ch), int(x), int(y))
self.add_point(int(ch), int(x), float(y))
return out
def finalize_arrays(
self,
xs: Sequence[int],
ys: Sequence[int],
ys: Sequence[float],
channels: Optional[Set[int]],
now_ts: Optional[float] = None,
) -> Optional[SweepPacket]:

View File

@ -151,20 +151,17 @@ class SweepReader(threading.Thread):
def _run_binary_stream(self, chunk_reader: SerialChunkReader):
xs: list[int] = []
ys: list[int] = []
ys: list[float] = []
cur_channel: Optional[int] = None
cur_channels: set[int] = set()
parser = BinaryRecordStreamParser()
# Бинарный протокол (4 слова LE u16 = 8 байт на запись):
# старт свипа: 0xFFFF, 0xFFFF, 0xFFFF, (ch<<8)|0x0A
# Байты на проводе: ff ff ff ff ff ff 0a [ch]
# ch=0 → последнее слово=0x000A; ch=1 → 0x010A; и т.д.
# точка данных: step_u16, value_hi_u16, value_lo_u16, (ch<<8)|0x0A
# Байты на проводе: [step_lo step_hi] [hi_lo hi_hi] [lo_lo lo_hi] 0a [ch]
# value_i32 = sign_extend((value_hi<<16)|value_lo)
# Признак записи: байт 6 == 0x0A, байт 7 — номер канала.
# При десинхронизации сдвигаемся на 1 БАЙТ (не слово) для самосинхронизации.
# Поддерживаются оба wire-формата:
# 1) legacy: 8-byte записи (start/point с одним int32 значением).
# 2) log-detector: start = FFFF x5 + (ch<<8)|0x0A,
# point = step + (avg1, avg2), где avg1/avg2 имеют ширину 32-bit или 128-bit.
# Для point парсер сразу преобразует (avg1, avg2) в линейную амплитуду y.
# В обоих режимах при десинхронизации parser.feed() сдвигается на 1 байт.
_dbg_byte_count = 0
_dbg_desync_count = 0
@ -196,13 +193,13 @@ class SweepReader(threading.Thread):
_tag, ch_from_term, step, value_i32 = ev # type: ignore[misc]
if cur_channel is None:
cur_channel = int(ch_from_term)
cur_channels.add(int(cur_channel))
cur_channels.add(int(ch_from_term))
xs.append(int(step))
ys.append(int(value_i32))
ys.append(float(value_i32))
_dbg_point_count += 1
if self._debug and _dbg_point_count <= 3:
sys.stderr.write(
f"[debug] BIN точка: step={int(step)} ch={int(ch_from_term)} → value={int(value_i32)}\n"
f"[debug] BIN точка: step={int(step)} ch={int(ch_from_term)} → value={float(value_i32):.3f}\n"
)
_dbg_byte_count = parser.bytes_consumed

View File

@ -11,10 +11,7 @@ from rfg_adc_plotter.constants import (
WF_WIDTH,
)
from rfg_adc_plotter.processing.fourier import (
build_frequency_axis_hz,
compute_ifft_profile_from_sweep,
perform_ifft_depth_response,
reconstruct_complex_spectrum_from_real_trace,
)
@ -43,17 +40,6 @@ class RingBuffer:
self.y_max_fft: Optional[float] = None
# FFT последнего свипа (для отображения без повторного вычисления)
self.last_fft_vals: Optional[np.ndarray] = None
# FFT-профили по третям входного частотного диапазона (для line-plot).
self.last_fft_third_axes_m: tuple[Optional[np.ndarray], Optional[np.ndarray], Optional[np.ndarray]] = (
None,
None,
None,
)
self.last_fft_third_vals: tuple[Optional[np.ndarray], Optional[np.ndarray], Optional[np.ndarray]] = (
None,
None,
None,
)
@property
def is_ready(self) -> bool:
@ -80,8 +66,6 @@ class RingBuffer:
self.fft_depth_axis_m = None
self.fft_bins = 0
self.last_fft_vals = None
self.last_fft_third_axes_m = (None, None, None)
self.last_fft_third_vals = (None, None, None)
self.y_min_fft = None
self.y_max_fft = None
return True
@ -112,11 +96,6 @@ class RingBuffer:
self._push_fft(s)
def _push_fft(self, s: np.ndarray):
empty_thirds = (
np.zeros((0,), dtype=np.float32),
np.zeros((0,), dtype=np.float32),
np.zeros((0,), dtype=np.float32),
)
depth_axis_m, fft_row = compute_ifft_profile_from_sweep(
s,
complex_mode=self.fft_complex_mode,
@ -126,21 +105,13 @@ class RingBuffer:
n = min(int(fft_row.size), int(depth_axis_m.size))
if n <= 0:
self.last_fft_third_axes_m = empty_thirds
self.last_fft_third_vals = empty_thirds
self.last_fft_vals = None
return
if n != fft_row.size:
fft_row = fft_row[:n]
if n != depth_axis_m.size:
depth_axis_m = depth_axis_m[:n]
# Для отображения храним только первую половину IFFT-профиля:
# вторая половина для текущей схемы симметрична и визуально избыточна.
n_keep = max(1, (n + 1) // 2)
fft_row = fft_row[:n_keep]
depth_axis_m = depth_axis_m[:n_keep]
n = n_keep
needs_reset = (
self.ring_fft is None
or self.fft_depth_axis_m is None
@ -169,7 +140,6 @@ class RingBuffer:
prev_head = (self.head - 1) % self.ring_fft.shape[0]
self.ring_fft[prev_head, :] = fft_row
self.last_fft_vals = fft_row
self.last_fft_third_axes_m, self.last_fft_third_vals = self._compute_fft_thirds(s)
fr_min = np.nanmin(fft_row)
fr_max = float(np.nanpercentile(fft_row, 90))
@ -178,65 +148,6 @@ class RingBuffer:
if self.y_max_fft is None or (not np.isnan(fr_max) and fr_max > self.y_max_fft):
self.y_max_fft = float(fr_max)
def _compute_fft_thirds(
self, s: np.ndarray
) -> tuple[tuple[np.ndarray, np.ndarray, np.ndarray], tuple[np.ndarray, np.ndarray, np.ndarray]]:
sweep = np.asarray(s, dtype=np.float64).ravel()
total = int(sweep.size)
def _empty() -> np.ndarray:
return np.zeros((0,), dtype=np.float32)
if total <= 0:
return (_empty(), _empty(), _empty()), (_empty(), _empty(), _empty())
freq_hz = build_frequency_axis_hz(total)
edges = np.linspace(0, total, 4, dtype=np.int64)
axes: list[np.ndarray] = []
vals: list[np.ndarray] = []
for idx in range(3):
i0 = int(edges[idx])
i1 = int(edges[idx + 1])
if i1 - i0 < 2:
axes.append(_empty())
vals.append(_empty())
continue
seg = sweep[i0:i1]
seg_freq = freq_hz[i0:i1]
seg_complex = reconstruct_complex_spectrum_from_real_trace(
seg,
complex_mode=self.fft_complex_mode,
)
depth_m, seg_fft = perform_ifft_depth_response(seg_complex, seg_freq, axis="abs")
depth_m = np.asarray(depth_m, dtype=np.float32).ravel()
seg_fft = np.asarray(seg_fft, dtype=np.float32).ravel()
n = min(int(depth_m.size), int(seg_fft.size))
if n <= 0:
axes.append(_empty())
vals.append(_empty())
continue
depth_m = depth_m[:n]
seg_fft = seg_fft[:n]
n_keep = max(1, (n + 1) // 2)
axes.append(depth_m[:n_keep])
vals.append(seg_fft[:n_keep])
return (
axes[0],
axes[1],
axes[2],
), (
vals[0],
vals[1],
vals[2],
)
def get_display_ring(self) -> np.ndarray:
"""Кольцо в порядке от старого к новому, ось времени по X. Форма: (width, time)."""
if self.ring is None:

View File

@ -13,11 +13,13 @@ from rfg_adc_plotter.processing.pipeline import SweepPreprocessor
ROOT = Path(__file__).resolve().parents[1]
SAMPLE_BG = ROOT / "sample_data" / "empty"
SAMPLE_CALIB = ROOT / "sample_data" / "no_antennas_35dB_attenuators"
SAMPLE_NEW_FMT = ROOT / "sample_data" / "new_format" / "attenuators_50dB"
def test_detect_reference_file_format_for_sample_capture():
assert detect_reference_file_format(str(SAMPLE_BG)) == "bin_capture"
assert detect_reference_file_format(str(SAMPLE_CALIB)) == "bin_capture"
assert detect_reference_file_format(str(SAMPLE_NEW_FMT)) == "bin_capture"
def test_load_capture_sweeps_parses_binary_capture():
@ -33,6 +35,22 @@ def test_load_capture_sweeps_parses_binary_capture():
assert channels == {0}
def test_load_capture_sweeps_parses_new_format_logdetector_capture():
sweeps = load_capture_sweeps(str(SAMPLE_NEW_FMT), fancy=False, logscale=False)
assert len(sweeps) > 900
widths = [int(s.size) for s, _info in sweeps]
dominant_width = max(set(widths), key=widths.count)
# Должно совпадать с ожидаемой шириной свипа из штатных capture.
assert dominant_width in (758, 759)
channels = set()
for _s, info in sweeps:
chs = info.get("chs", [info.get("ch", 0)])
channels.update(int(v) for v in chs)
assert channels == {0}
def test_aggregate_capture_reference_filters_incomplete_sweeps():
sweeps = load_capture_sweeps(str(SAMPLE_BG), fancy=False, logscale=False)
vector, summary = aggregate_capture_reference(sweeps, channel=0, method="median", path=str(SAMPLE_BG))

View File

@ -1,5 +1,6 @@
import numpy as np
from rfg_adc_plotter.processing.fourier import compute_ifft_profile_from_sweep
from rfg_adc_plotter.state.ring_buffer import RingBuffer
@ -8,6 +9,7 @@ def test_ring_buffer_allocates_fft_buffers_from_first_push():
ring.ensure_init(64)
sweep = np.linspace(-1.0, 1.0, 64, dtype=np.float32)
depth_expected, vals_expected = compute_ifft_profile_from_sweep(sweep, complex_mode="arccos")
ring.push(sweep)
assert ring.ring_fft is not None
@ -16,14 +18,7 @@ def test_ring_buffer_allocates_fft_buffers_from_first_push():
assert ring.fft_bins == ring.ring_fft.shape[1]
assert ring.fft_bins == ring.fft_depth_axis_m.size
assert ring.fft_bins == ring.last_fft_vals.size
assert ring.last_fft_third_axes_m != (None, None, None)
assert ring.last_fft_third_vals != (None, None, None)
for axis, vals in zip(ring.last_fft_third_axes_m, ring.last_fft_third_vals):
assert axis is not None
assert vals is not None
assert axis.dtype == np.float32
assert vals.dtype == np.float32
assert axis.size == vals.size
assert ring.fft_bins == min(depth_expected.size, vals_expected.size)
# Legacy alias kept for compatibility with existing GUI code paths.
assert ring.fft_time_axis is ring.fft_depth_axis_m
@ -56,8 +51,6 @@ def test_ring_buffer_mode_switch_resets_fft_buffers_only():
assert ring.ring is not None
assert ring.ring_fft is not None
raw_before = ring.ring.copy()
assert ring.last_fft_third_axes_m != (None, None, None)
assert ring.last_fft_third_vals != (None, None, None)
changed = ring.set_fft_complex_mode("diff")
assert changed is True
@ -67,35 +60,22 @@ def test_ring_buffer_mode_switch_resets_fft_buffers_only():
assert ring.ring_fft is None
assert ring.fft_depth_axis_m is None
assert ring.last_fft_vals is None
assert ring.last_fft_third_axes_m == (None, None, None)
assert ring.last_fft_third_vals == (None, None, None)
assert ring.fft_bins == 0
ring.push(np.linspace(-1.0, 1.0, 128, dtype=np.float32))
assert ring.ring_fft is not None
assert ring.fft_depth_axis_m is not None
assert ring.last_fft_vals is not None
assert ring.last_fft_third_axes_m != (None, None, None)
assert ring.last_fft_third_vals != (None, None, None)
for axis, vals in zip(ring.last_fft_third_axes_m, ring.last_fft_third_vals):
assert axis is not None
assert vals is not None
assert axis.dtype == np.float32
assert vals.dtype == np.float32
assert axis.size == vals.size
def test_ring_buffer_short_sweeps_keep_third_profiles_well_formed():
def test_ring_buffer_short_sweeps_keep_fft_profile_well_formed():
for n in (1, 2, 3):
ring = RingBuffer(max_sweeps=4)
ring.ensure_init(n)
ring.push(np.linspace(-1.0, 1.0, n, dtype=np.float32))
assert ring.last_fft_third_axes_m != (None, None, None)
assert ring.last_fft_third_vals != (None, None, None)
for axis, vals in zip(ring.last_fft_third_axes_m, ring.last_fft_third_vals):
assert axis is not None
assert vals is not None
assert axis.dtype == np.float32
assert vals.dtype == np.float32
assert axis.size == vals.size
assert ring.fft_depth_axis_m is not None
assert ring.last_fft_vals is not None
assert ring.fft_depth_axis_m.dtype == np.float32
assert ring.last_fft_vals.dtype == np.float32
assert ring.fft_depth_axis_m.size == ring.last_fft_vals.size

View File

@ -0,0 +1,110 @@
import math
from rfg_adc_plotter.io.sweep_parser_core import BinaryRecordStreamParser
def _u16le(word: int) -> bytes:
w = int(word) & 0xFFFF
return bytes((w & 0xFF, (w >> 8) & 0xFF))
def _pack_signed_words_be(value: int, words: int) -> list[int]:
bits = 16 * int(words)
v = int(value)
if v < 0:
v = (1 << bits) + v
out: list[int] = []
for i in range(words):
shift = (words - 1 - i) * 16
out.append((v >> shift) & 0xFFFF)
return out
def _pack_legacy_start(ch: int) -> bytes:
return b"\xff\xff" * 3 + bytes((0x0A, int(ch) & 0xFF))
def _pack_legacy_point(ch: int, step: int, value_i32: int) -> bytes:
v = int(value_i32) & 0xFFFF_FFFF
return b"".join(
[
_u16le(step),
_u16le((v >> 16) & 0xFFFF),
_u16le(v & 0xFFFF),
bytes((0x0A, int(ch) & 0xFF)),
]
)
def _pack_log_start(ch: int) -> bytes:
return b"\xff\xff" * 5 + bytes((0x0A, int(ch) & 0xFF))
def _pack_log_point(step: int, avg1: int, avg2: int, pair_words: int, ch: int = 0) -> bytes:
words = [int(step) & 0xFFFF]
words.extend(_pack_signed_words_be(avg1, pair_words))
words.extend(_pack_signed_words_be(avg2, pair_words))
words.append(((int(ch) & 0xFF) << 8) | 0x000A)
return b"".join(_u16le(w) for w in words)
def _log_pair_to_linear(avg1: int, avg2: int) -> float:
exp1 = max(-300.0, min(300.0, float(avg1) * 0.001))
exp2 = max(-300.0, min(300.0, float(avg2) * 0.001))
return (math.pow(10.0, exp1) - math.pow(10.0, exp2)) * 1000.0
def test_binary_parser_parses_legacy_8_byte_records():
parser = BinaryRecordStreamParser()
stream = b"".join(
[
_pack_legacy_start(3),
_pack_legacy_point(3, 1, -2),
_pack_legacy_point(3, 2, 123456),
]
)
events = []
events.extend(parser.feed(stream[:5]))
events.extend(parser.feed(stream[5:17]))
events.extend(parser.feed(stream[17:]))
assert events[0] == ("start", 3)
assert events[1] == ("point", 3, 1, -2.0)
assert events[2] == ("point", 3, 2, 123456.0)
def test_binary_parser_parses_logdetector_32bit_pair_records():
parser = BinaryRecordStreamParser()
stream = b"".join(
[
_pack_log_start(0),
_pack_log_point(1, 1500, 700, pair_words=2, ch=0),
_pack_log_point(2, 1510, 710, pair_words=2, ch=0),
]
)
events = parser.feed(stream)
assert events[0] == ("start", 0)
assert events[1][0:3] == ("point", 0, 1)
assert events[2][0:3] == ("point", 0, 2)
assert abs(float(events[1][3]) - _log_pair_to_linear(1500, 700)) < 1e-6
assert abs(float(events[2][3]) - _log_pair_to_linear(1510, 710)) < 1e-6
def test_binary_parser_parses_logdetector_128bit_pair_records():
parser = BinaryRecordStreamParser()
stream = b"".join(
[
_pack_log_start(5),
_pack_log_point(7, 1600, 800, pair_words=8, ch=5),
_pack_log_point(8, 1610, 810, pair_words=8, ch=5),
]
)
events = parser.feed(stream)
assert events[0] == ("start", 5)
assert events[1][0:3] == ("point", 5, 7)
assert events[2][0:3] == ("point", 5, 8)
assert abs(float(events[1][3]) - _log_pair_to_linear(1600, 800)) < 1e-6
assert abs(float(events[2][3]) - _log_pair_to_linear(1610, 810)) < 1e-6