add 32-bit binary sweep parsing and percentile scaling for raw waterfall

This commit is contained in:
2026-03-03 18:49:12 +03:00
parent 7d714530bc
commit f4a3e6546a

View File

@ -384,6 +384,7 @@ class SweepReader(threading.Thread):
out_queue: Queue[SweepPacket], out_queue: Queue[SweepPacket],
stop_event: threading.Event, stop_event: threading.Event,
fancy: bool = False, fancy: bool = False,
bin_mode: bool = False,
): ):
super().__init__(daemon=True) super().__init__(daemon=True)
self._port_path = port_path self._port_path = port_path
@ -392,11 +393,17 @@ class SweepReader(threading.Thread):
self._stop = stop_event self._stop = stop_event
self._src: Optional[SerialLineSource] = None self._src: Optional[SerialLineSource] = None
self._fancy = bool(fancy) self._fancy = bool(fancy)
self._bin_mode = bool(bin_mode)
self._max_width: int = 0 self._max_width: int = 0
self._sweep_idx: int = 0 self._sweep_idx: int = 0
self._last_sweep_ts: Optional[float] = None self._last_sweep_ts: Optional[float] = None
self._n_valid_hist = deque() self._n_valid_hist = deque()
@staticmethod
def _u32_to_i32(v: int) -> int:
"""Преобразование 32-bit слова в знаковое значение."""
return v - 0x1_0000_0000 if (v & 0x8000_0000) else v
def _finalize_current(self, xs, ys, channels: Optional[set[int]]): def _finalize_current(self, xs, ys, channels: Optional[set[int]]):
if not xs: if not xs:
return return
@ -446,8 +453,10 @@ class SweepReader(threading.Thread):
m = float(np.nanmean(sweep)) m = float(np.nanmean(sweep))
if np.isfinite(m) and m < DATA_INVERSION_THRASHOLD: if np.isfinite(m) and m < DATA_INVERSION_THRASHOLD:
sweep *= -1.0 sweep *= -1.0
except Exception: except Exception:
pass pass
#sweep = np.abs(sweep)
#sweep -= float(np.nanmean(sweep)) #sweep -= float(np.nanmean(sweep))
# Метрики для статусной строки (вид словаря: переменная -> значение) # Метрики для статусной строки (вид словаря: переменная -> значение)
@ -502,13 +511,149 @@ class SweepReader(threading.Thread):
except Exception: except Exception:
pass pass
def run(self): def _run_ascii_stream(self, chunk_reader: SerialChunkReader):
# Состояние текущего свипа
xs: list[int] = [] xs: list[int] = []
ys: list[int] = [] ys: list[int] = []
cur_channel: Optional[int] = None cur_channel: Optional[int] = None
cur_channels: set[int] = set() cur_channels: set[int] = set()
buf = bytearray()
while not self._stop.is_set():
data = chunk_reader.read_available()
if data:
buf += data
else:
time.sleep(0.0005)
continue
while True:
nl = buf.find(b"\n")
if nl == -1:
break
line = bytes(buf[:nl])
del buf[: nl + 1]
if line.endswith(b"\r"):
line = line[:-1]
if not line:
continue
if line.startswith(b"Sweep_start"):
self._finalize_current(xs, ys, cur_channels)
xs.clear()
ys.clear()
cur_channel = None
cur_channels.clear()
continue
# sCH X Y или s CH X Y (все целые со знаком). Разделяем по любым пробелам/табам.
if len(line) >= 3:
parts = line.split()
if len(parts) >= 3 and (parts[0].lower() == b"s" or parts[0].lower().startswith(b"s")):
try:
if parts[0].lower() == b"s":
if len(parts) >= 4:
ch = int(parts[1], 10)
x = int(parts[2], 10)
y = int(parts[3], 10) # поддержка знака: "+…" и "-…"
else:
ch = 0
x = int(parts[1], 10)
y = int(parts[2], 10) # поддержка знака: "+…" и "-…"
else:
# формат вида "s0"
ch = int(parts[0][1:], 10)
x = int(parts[1], 10)
y = int(parts[2], 10) # поддержка знака: "+…" и "-…"
except Exception:
continue
if cur_channel is None:
cur_channel = ch
cur_channels.add(ch)
xs.append(x)
ys.append(y)
if len(buf) > 1_000_000:
del buf[:-262144]
self._finalize_current(xs, ys, cur_channels)
def _run_binary_stream(self, chunk_reader: SerialChunkReader):
xs: list[int] = []
ys: list[int] = []
cur_channel: Optional[int] = None
cur_channels: set[int] = set()
words = deque()
buf = bytearray()
while not self._stop.is_set():
data = chunk_reader.read_available()
if data:
buf += data
else:
time.sleep(0.0005)
continue
usable = len(buf) & ~1
if usable == 0:
continue
i = 0
while i < usable:
w = int(buf[i]) | (int(buf[i + 1]) << 8)
words.append(w)
i += 2
# Бинарный протокол:
# старт свипа (актуальный): 0xFFFF, 0xFFFF, 0xFFFF, (ch<<8)|0x0A
# старт свипа (legacy): 0xFFFF, 0xFFFF, channel, 0x0A0A
# точка: step, value_hi, value_lo, 0x000A
while len(words) >= 4:
w0 = int(words[0])
w1 = int(words[1])
w2 = int(words[2])
w3 = int(words[3])
if w0 == 0xFFFF and w1 == 0xFFFF and w2 == 0xFFFF and (w3 & 0x00FF) == 0x000A:
self._finalize_current(xs, ys, cur_channels)
xs.clear()
ys.clear()
cur_channels.clear()
cur_channel = (w3 >> 8) & 0x00FF
cur_channels.add(cur_channel)
for _ in range(4):
words.popleft()
continue
if w0 == 0xFFFF and w1 == 0xFFFF and w3 == 0x0A0A:
self._finalize_current(xs, ys, cur_channels)
xs.clear()
ys.clear()
cur_channels.clear()
cur_channel = w2
cur_channels.add(cur_channel)
for _ in range(4):
words.popleft()
continue
if w3 == 0x000A:
if cur_channel is not None:
cur_channels.add(cur_channel)
xs.append(w0)
value_u32 = (w1 << 16) | w2
ys.append(self._u32_to_i32(value_u32))
for _ in range(4):
words.popleft()
continue
# Поток может начаться с середины пакета; сдвигаемся по слову до ресинхронизации.
words.popleft()
del buf[:usable]
if len(buf) > 1_000_000:
del buf[:-262144]
self._finalize_current(xs, ys, cur_channels)
def run(self):
try: try:
self._src = SerialLineSource(self._port_path, self._baud, timeout=1.0) self._src = SerialLineSource(self._port_path, self._baud, timeout=1.0)
sys.stderr.write(f"[info] Открыл порт {self._port_path} ({self._src._using})\n") sys.stderr.write(f"[info] Открыл порт {self._port_path} ({self._src._using})\n")
@ -517,74 +662,12 @@ class SweepReader(threading.Thread):
return return
try: try:
# Быстрый неблокирующий дренаж порта с разбором по байтам
chunk_reader = SerialChunkReader(self._src) chunk_reader = SerialChunkReader(self._src)
buf = bytearray() if self._bin_mode:
while not self._stop.is_set(): self._run_binary_stream(chunk_reader)
data = chunk_reader.read_available() else:
if data: self._run_ascii_stream(chunk_reader)
buf += data
else:
# Короткая уступка CPU, если нет новых данных
time.sleep(0.0005)
continue
# Обрабатываем все полные строки
while True:
nl = buf.find(b"\n")
if nl == -1:
break
line = bytes(buf[:nl])
del buf[: nl + 1]
if line.endswith(b"\r"):
line = line[:-1]
if not line:
continue
if line.startswith(b"Sweep_start"):
self._finalize_current(xs, ys, cur_channels)
xs.clear()
ys.clear()
cur_channel = None
cur_channels.clear()
continue
# sCH X Y или s CH X Y (все целые со знаком). Разделяем по любым пробелам/табам.
if len(line) >= 3:
parts = line.split()
if len(parts) >= 3 and (parts[0].lower() == b"s" or parts[0].lower().startswith(b"s")):
try:
if parts[0].lower() == b"s":
if len(parts) >= 4:
ch = int(parts[1], 10)
x = int(parts[2], 10)
y = int(parts[3], 10) # поддержка знака: "+…" и "-…"
else:
ch = 0
x = int(parts[1], 10)
y = int(parts[2], 10) # поддержка знака: "+…" и "-…"
else:
# формат вида "s0"
ch = int(parts[0][1:], 10)
x = int(parts[1], 10)
y = int(parts[2], 10) # поддержка знака: "+…" и "-…"
except Exception:
continue
if cur_channel is None:
cur_channel = ch
cur_channels.add(ch)
xs.append(x)
ys.append(y)
# Защита от переполнения буфера при отсутствии переводов строки
if len(buf) > 1_000_000:
del buf[:-262144]
finally: finally:
try:
# Завершаем оставшийся свип
self._finalize_current(xs, ys, cur_channels)
except Exception:
pass
try: try:
if self._src is not None: if self._src is not None:
self._src.close() self._src.close()
@ -648,6 +731,15 @@ def main():
default="projector", default="projector",
help="Тип нормировки: projector (по огибающим в [-1,+1]) или simple (raw/calib)", help="Тип нормировки: projector (по огибающим в [-1,+1]) или simple (raw/calib)",
) )
parser.add_argument(
"--bin",
dest="bin_mode",
action="store_true",
help=(
"Бинарный протокол: старт свипа 0xFFFF,0xFFFF,0xFFFF,(CH<<8)|0x0A; "
"точки step,uint32(hi16,lo16),0x000A"
),
)
args = parser.parse_args() args = parser.parse_args()
@ -673,7 +765,14 @@ def main():
# Очередь завершённых свипов и поток чтения # Очередь завершённых свипов и поток чтения
q: Queue[SweepPacket] = Queue(maxsize=1000) q: Queue[SweepPacket] = Queue(maxsize=1000)
stop_event = threading.Event() stop_event = threading.Event()
reader = SweepReader(args.port, args.baud, q, stop_event, fancy=bool(args.fancy)) reader = SweepReader(
args.port,
args.baud,
q,
stop_event,
fancy=bool(args.fancy),
bin_mode=bool(args.bin_mode),
)
reader.start() reader.start()
# Графика # Графика
@ -863,7 +962,7 @@ def main():
freq_shared = np.arange(fft_bins, dtype=np.int32) freq_shared = np.arange(fft_bins, dtype=np.int32)
def _visible_levels_matplotlib(data: np.ndarray, axis) -> Optional[Tuple[float, float]]: def _visible_levels_matplotlib(data: np.ndarray, axis) -> Optional[Tuple[float, float]]:
"""(vmin, vmax) по текущей видимой области imshow (без накопления по времени).""" """(vmin, vmax) по центральным 90% значений в видимой области imshow."""
if data.size == 0: if data.size == 0:
return None return None
ny, nx = data.shape[0], data.shape[1] ny, nx = data.shape[0], data.shape[1]
@ -888,8 +987,8 @@ def main():
if not finite.any(): if not finite.any():
return None return None
vals = sub[finite] vals = sub[finite]
vmin = float(np.min(vals)) vmin = float(np.nanpercentile(vals, 5))
vmax = float(np.max(vals)) vmax = float(np.nanpercentile(vals, 95))
if not (np.isfinite(vmin) and np.isfinite(vmax)) or vmin == vmax: if not (np.isfinite(vmin) and np.isfinite(vmax)) or vmin == vmax:
return None return None
return (vmin, vmax) return (vmin, vmax)
@ -1168,7 +1267,14 @@ def run_pyqtgraph(args):
# Очередь завершённых свипов и поток чтения # Очередь завершённых свипов и поток чтения
q: Queue[SweepPacket] = Queue(maxsize=1000) q: Queue[SweepPacket] = Queue(maxsize=1000)
stop_event = threading.Event() stop_event = threading.Event()
reader = SweepReader(args.port, args.baud, q, stop_event, fancy=bool(args.fancy)) reader = SweepReader(
args.port,
args.baud,
q,
stop_event,
fancy=bool(args.fancy),
bin_mode=bool(args.bin_mode),
)
reader.start() reader.start()
# Настройки скорости # Настройки скорости
@ -1310,7 +1416,7 @@ def run_pyqtgraph(args):
freq_shared = np.arange(fft_bins, dtype=np.int32) freq_shared = np.arange(fft_bins, dtype=np.int32)
def _visible_levels_pyqtgraph(data: np.ndarray) -> Optional[Tuple[float, float]]: def _visible_levels_pyqtgraph(data: np.ndarray) -> Optional[Tuple[float, float]]:
"""(vmin, vmax) по текущей видимой области ImageItem (без накопления по времени).""" """(vmin, vmax) по центральным 90% значений в видимой области ImageItem."""
if data.size == 0: if data.size == 0:
return None return None
ny, nx = data.shape[0], data.shape[1] ny, nx = data.shape[0], data.shape[1]
@ -1334,8 +1440,8 @@ def run_pyqtgraph(args):
if not finite.any(): if not finite.any():
return None return None
vals = sub[finite] vals = sub[finite]
vmin = float(np.min(vals)) vmin = float(np.nanpercentile(vals, 5))
vmax = float(np.max(vals)) vmax = float(np.nanpercentile(vals, 95))
if not (np.isfinite(vmin) and np.isfinite(vmax)) or vmin == vmax: if not (np.isfinite(vmin) and np.isfinite(vmax)) or vmin == vmax:
return None return None
return (vmin, vmax) return (vmin, vmax)