new adc
This commit is contained in:
@ -127,6 +127,12 @@ Logscale binary `16-bit x2`:
|
|||||||
.venv/bin/python -m rfg_adc_plotter.main /dev/ttyACM0 --parser_test
|
.venv/bin/python -m rfg_adc_plotter.main /dev/ttyACM0 --parser_test
|
||||||
```
|
```
|
||||||
|
|
||||||
|
Комплексный ASCII-поток `step real imag`:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
.venv/bin/python -m rfg_adc_plotter.main /dev/ttyACM0 --parser_complex_ascii
|
||||||
|
```
|
||||||
|
|
||||||
## Локальная проверка через replay_pty
|
## Локальная проверка через replay_pty
|
||||||
|
|
||||||
Если есть лог-файл захвата, его можно воспроизвести как виртуальный последовательный порт.
|
Если есть лог-файл захвата, его можно воспроизвести как виртуальный последовательный порт.
|
||||||
|
|||||||
@ -95,6 +95,14 @@ def build_parser() -> argparse.ArgumentParser:
|
|||||||
"одиночный 0xFFFF завершает точку, серия 0xFFFF начинает новый свип"
|
"одиночный 0xFFFF завершает точку, серия 0xFFFF начинает новый свип"
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--parser_complex_ascii",
|
||||||
|
action="store_true",
|
||||||
|
help=(
|
||||||
|
"ASCII-поток из трех чисел на строку: step real imag. "
|
||||||
|
"Новый свип определяется по сбросу/повтору step, FFT строится по комплексным данным"
|
||||||
|
),
|
||||||
|
)
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--calibrate",
|
"--calibrate",
|
||||||
action="store_true",
|
action="store_true",
|
||||||
|
|||||||
@ -155,6 +155,41 @@ def apply_working_range_to_aux_curves(
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def apply_working_range_to_signal(
|
||||||
|
freqs: Optional[np.ndarray],
|
||||||
|
sweep: Optional[np.ndarray],
|
||||||
|
signal_values: Optional[np.ndarray],
|
||||||
|
range_min_ghz: float,
|
||||||
|
range_max_ghz: float,
|
||||||
|
) -> Optional[np.ndarray]:
|
||||||
|
"""Crop an arbitrary signal with the same mask used for the raw sweep."""
|
||||||
|
if freqs is None or sweep is None or signal_values is None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
freq_arr = np.asarray(freqs, dtype=np.float64).reshape(-1)
|
||||||
|
sweep_arr = np.asarray(sweep, dtype=np.float32).reshape(-1)
|
||||||
|
signal_arr = np.asarray(signal_values).reshape(-1)
|
||||||
|
width = min(freq_arr.size, sweep_arr.size, signal_arr.size)
|
||||||
|
if width <= 0:
|
||||||
|
return None
|
||||||
|
|
||||||
|
freq_arr = freq_arr[:width]
|
||||||
|
sweep_arr = sweep_arr[:width]
|
||||||
|
signal_arr = signal_arr[:width]
|
||||||
|
valid = (
|
||||||
|
np.isfinite(freq_arr)
|
||||||
|
& np.isfinite(sweep_arr)
|
||||||
|
& (freq_arr >= float(range_min_ghz))
|
||||||
|
& (freq_arr <= float(range_max_ghz))
|
||||||
|
)
|
||||||
|
if not np.any(valid):
|
||||||
|
return None
|
||||||
|
|
||||||
|
if np.iscomplexobj(signal_arr):
|
||||||
|
return np.asarray(signal_arr[valid], dtype=np.complex64)
|
||||||
|
return np.asarray(signal_arr[valid], dtype=np.float32)
|
||||||
|
|
||||||
|
|
||||||
def resolve_visible_aux_curves(aux_curves: SweepAuxCurves, enabled: bool) -> SweepAuxCurves:
|
def resolve_visible_aux_curves(aux_curves: SweepAuxCurves, enabled: bool) -> SweepAuxCurves:
|
||||||
"""Return auxiliary curves only when their display is enabled."""
|
"""Return auxiliary curves only when their display is enabled."""
|
||||||
if (not enabled) or aux_curves is None:
|
if (not enabled) or aux_curves is None:
|
||||||
@ -198,6 +233,7 @@ def run_pyqtgraph(args) -> None:
|
|||||||
"""Start the PyQtGraph GUI."""
|
"""Start the PyQtGraph GUI."""
|
||||||
peak_calibrate_mode = bool(getattr(args, "calibrate", False))
|
peak_calibrate_mode = bool(getattr(args, "calibrate", False))
|
||||||
peak_search_enabled = bool(getattr(args, "peak_search", False))
|
peak_search_enabled = bool(getattr(args, "peak_search", False))
|
||||||
|
complex_sweep_mode = bool(getattr(args, "parser_complex_ascii", False))
|
||||||
try:
|
try:
|
||||||
import pyqtgraph as pg
|
import pyqtgraph as pg
|
||||||
from pyqtgraph.Qt import QtCore, QtWidgets # type: ignore
|
from pyqtgraph.Qt import QtCore, QtWidgets # type: ignore
|
||||||
@ -218,6 +254,7 @@ def run_pyqtgraph(args) -> None:
|
|||||||
logscale=bool(args.logscale),
|
logscale=bool(args.logscale),
|
||||||
parser_16_bit_x2=bool(args.parser_16_bit_x2),
|
parser_16_bit_x2=bool(args.parser_16_bit_x2),
|
||||||
parser_test=bool(args.parser_test),
|
parser_test=bool(args.parser_test),
|
||||||
|
parser_complex_ascii=complex_sweep_mode,
|
||||||
)
|
)
|
||||||
reader.start()
|
reader.start()
|
||||||
|
|
||||||
@ -413,6 +450,12 @@ def run_pyqtgraph(args) -> None:
|
|||||||
background_buttons_row.addWidget(background_load_btn)
|
background_buttons_row.addWidget(background_load_btn)
|
||||||
background_group_layout.addLayout(background_buttons_row)
|
background_group_layout.addLayout(background_buttons_row)
|
||||||
parsed_data_cb = QtWidgets.QCheckBox("данные после парсинга")
|
parsed_data_cb = QtWidgets.QCheckBox("данные после парсинга")
|
||||||
|
if complex_sweep_mode:
|
||||||
|
try:
|
||||||
|
parsed_data_cb.setText("Re/Im после парсинга")
|
||||||
|
p_line.setTitle("Модуль комплексного сигнала")
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
try:
|
try:
|
||||||
settings_layout.addWidget(QtWidgets.QLabel("Настройки"))
|
settings_layout.addWidget(QtWidgets.QLabel("Настройки"))
|
||||||
except Exception:
|
except Exception:
|
||||||
@ -576,6 +619,8 @@ def run_pyqtgraph(args) -> None:
|
|||||||
if runtime.full_current_freqs is None or runtime.full_current_sweep_raw is None:
|
if runtime.full_current_freqs is None or runtime.full_current_sweep_raw is None:
|
||||||
runtime.current_freqs = None
|
runtime.current_freqs = None
|
||||||
runtime.current_sweep_raw = None
|
runtime.current_sweep_raw = None
|
||||||
|
runtime.current_fft_source = None
|
||||||
|
runtime.current_fft_input = None
|
||||||
runtime.current_aux_curves = None
|
runtime.current_aux_curves = None
|
||||||
runtime.current_sweep_norm = None
|
runtime.current_sweep_norm = None
|
||||||
runtime.current_fft_mag = None
|
runtime.current_fft_mag = None
|
||||||
@ -591,6 +636,13 @@ def run_pyqtgraph(args) -> None:
|
|||||||
)
|
)
|
||||||
runtime.current_freqs = current_freqs
|
runtime.current_freqs = current_freqs
|
||||||
runtime.current_sweep_raw = current_sweep
|
runtime.current_sweep_raw = current_sweep
|
||||||
|
runtime.current_fft_source = apply_working_range_to_signal(
|
||||||
|
runtime.full_current_freqs,
|
||||||
|
runtime.full_current_sweep_raw,
|
||||||
|
runtime.full_current_fft_source,
|
||||||
|
runtime.range_min_ghz,
|
||||||
|
runtime.range_max_ghz,
|
||||||
|
)
|
||||||
runtime.current_aux_curves = apply_working_range_to_aux_curves(
|
runtime.current_aux_curves = apply_working_range_to_aux_curves(
|
||||||
runtime.full_current_freqs,
|
runtime.full_current_freqs,
|
||||||
runtime.full_current_sweep_raw,
|
runtime.full_current_sweep_raw,
|
||||||
@ -604,6 +656,8 @@ def run_pyqtgraph(args) -> None:
|
|||||||
reset_ring_buffers()
|
reset_ring_buffers()
|
||||||
runtime.current_freqs = None
|
runtime.current_freqs = None
|
||||||
runtime.current_sweep_raw = None
|
runtime.current_sweep_raw = None
|
||||||
|
runtime.current_fft_source = None
|
||||||
|
runtime.current_fft_input = None
|
||||||
runtime.current_aux_curves = None
|
runtime.current_aux_curves = None
|
||||||
runtime.current_sweep_norm = None
|
runtime.current_sweep_norm = None
|
||||||
runtime.current_fft_mag = None
|
runtime.current_fft_mag = None
|
||||||
@ -615,6 +669,10 @@ def run_pyqtgraph(args) -> None:
|
|||||||
recompute_current_processed_sweep(push_to_ring=push_to_ring)
|
recompute_current_processed_sweep(push_to_ring=push_to_ring)
|
||||||
|
|
||||||
def recompute_current_processed_sweep(push_to_ring: bool = False) -> None:
|
def recompute_current_processed_sweep(push_to_ring: bool = False) -> None:
|
||||||
|
fft_source = runtime.current_fft_source
|
||||||
|
if fft_source is None and runtime.current_sweep_raw is not None:
|
||||||
|
fft_source = np.asarray(runtime.current_sweep_raw, dtype=np.float32)
|
||||||
|
|
||||||
if (
|
if (
|
||||||
runtime.current_sweep_raw is not None
|
runtime.current_sweep_raw is not None
|
||||||
and runtime.current_sweep_raw.size > 0
|
and runtime.current_sweep_raw.size > 0
|
||||||
@ -625,6 +683,16 @@ def run_pyqtgraph(args) -> None:
|
|||||||
else:
|
else:
|
||||||
runtime.current_sweep_norm = None
|
runtime.current_sweep_norm = None
|
||||||
|
|
||||||
|
if fft_source is None or np.asarray(fft_source).size == 0:
|
||||||
|
runtime.current_fft_input = None
|
||||||
|
elif calib_enabled and runtime.calib_envelope is not None:
|
||||||
|
runtime.current_fft_input = normalize_by_envelope(fft_source, runtime.calib_envelope)
|
||||||
|
else:
|
||||||
|
runtime.current_fft_input = np.asarray(
|
||||||
|
fft_source,
|
||||||
|
dtype=np.complex64 if np.iscomplexobj(fft_source) else np.float32,
|
||||||
|
).copy()
|
||||||
|
|
||||||
runtime.current_fft_mag = None
|
runtime.current_fft_mag = None
|
||||||
runtime.current_fft_db = None
|
runtime.current_fft_db = None
|
||||||
if (
|
if (
|
||||||
@ -635,8 +703,9 @@ def run_pyqtgraph(args) -> None:
|
|||||||
return
|
return
|
||||||
|
|
||||||
sweep_for_processing = runtime.current_sweep_norm if runtime.current_sweep_norm is not None else runtime.current_sweep_raw
|
sweep_for_processing = runtime.current_sweep_norm if runtime.current_sweep_norm is not None else runtime.current_sweep_raw
|
||||||
|
fft_input_for_processing = runtime.current_fft_input if runtime.current_fft_input is not None else sweep_for_processing
|
||||||
ensure_buffer(runtime.current_sweep_raw.size)
|
ensure_buffer(runtime.current_sweep_raw.size)
|
||||||
runtime.ring.push(sweep_for_processing, runtime.current_freqs)
|
runtime.ring.push(sweep_for_processing, runtime.current_freqs, fft_input=fft_input_for_processing)
|
||||||
runtime.current_distances = runtime.ring.distance_axis
|
runtime.current_distances = runtime.ring.distance_axis
|
||||||
runtime.current_fft_mag = runtime.ring.get_last_fft_linear()
|
runtime.current_fft_mag = runtime.ring.get_last_fft_linear()
|
||||||
runtime.current_fft_db = runtime.ring.last_fft_db
|
runtime.current_fft_db = runtime.ring.last_fft_db
|
||||||
@ -860,7 +929,7 @@ def run_pyqtgraph(args) -> None:
|
|||||||
runtime.mark_dirty()
|
runtime.mark_dirty()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
fft_mode_combo.setCurrentIndex(1)
|
fft_mode_combo.setCurrentIndex(2 if complex_sweep_mode else 1)
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
restore_range_controls()
|
restore_range_controls()
|
||||||
@ -1060,6 +1129,26 @@ def run_pyqtgraph(args) -> None:
|
|||||||
break
|
break
|
||||||
drained += 1
|
drained += 1
|
||||||
base_freqs = np.linspace(SWEEP_FREQ_MIN_GHZ, SWEEP_FREQ_MAX_GHZ, sweep.size, dtype=np.float64)
|
base_freqs = np.linspace(SWEEP_FREQ_MIN_GHZ, SWEEP_FREQ_MAX_GHZ, sweep.size, dtype=np.float64)
|
||||||
|
runtime.full_current_aux_curves = None
|
||||||
|
runtime.full_current_fft_source = None
|
||||||
|
if complex_sweep_mode and aux_curves is not None:
|
||||||
|
try:
|
||||||
|
aux_1, aux_2 = aux_curves
|
||||||
|
calibrated_aux_1_payload = calibrate_freqs({"F": base_freqs, "I": aux_1})
|
||||||
|
calibrated_aux_2 = calibrate_freqs({"F": base_freqs, "I": aux_2})["I"]
|
||||||
|
runtime.full_current_freqs = np.asarray(calibrated_aux_1_payload["F"], dtype=np.float64)
|
||||||
|
calibrated_aux_1 = np.asarray(calibrated_aux_1_payload["I"], dtype=np.float32)
|
||||||
|
calibrated_aux_2 = np.asarray(calibrated_aux_2, dtype=np.float32)
|
||||||
|
runtime.full_current_aux_curves = (calibrated_aux_1, calibrated_aux_2)
|
||||||
|
runtime.full_current_fft_source = (
|
||||||
|
calibrated_aux_1.astype(np.complex64) + (1j * calibrated_aux_2.astype(np.complex64))
|
||||||
|
)
|
||||||
|
runtime.full_current_sweep_raw = np.abs(runtime.full_current_fft_source).astype(np.float32)
|
||||||
|
except Exception:
|
||||||
|
runtime.full_current_aux_curves = None
|
||||||
|
runtime.full_current_fft_source = None
|
||||||
|
|
||||||
|
if runtime.full_current_fft_source is None:
|
||||||
calibrated = calibrate_freqs(
|
calibrated = calibrate_freqs(
|
||||||
{
|
{
|
||||||
"F": base_freqs,
|
"F": base_freqs,
|
||||||
@ -1068,9 +1157,8 @@ def run_pyqtgraph(args) -> None:
|
|||||||
)
|
)
|
||||||
runtime.full_current_freqs = np.asarray(calibrated["F"], dtype=np.float64)
|
runtime.full_current_freqs = np.asarray(calibrated["F"], dtype=np.float64)
|
||||||
runtime.full_current_sweep_raw = np.asarray(calibrated["I"], dtype=np.float32)
|
runtime.full_current_sweep_raw = np.asarray(calibrated["I"], dtype=np.float32)
|
||||||
if aux_curves is None:
|
runtime.full_current_fft_source = np.asarray(runtime.full_current_sweep_raw, dtype=np.float32).copy()
|
||||||
runtime.full_current_aux_curves = None
|
if aux_curves is not None:
|
||||||
else:
|
|
||||||
try:
|
try:
|
||||||
aux_1, aux_2 = aux_curves
|
aux_1, aux_2 = aux_curves
|
||||||
calibrated_aux_1 = calibrate_freqs({"F": base_freqs, "I": aux_1})["I"]
|
calibrated_aux_1 = calibrate_freqs({"F": base_freqs, "I": aux_1})["I"]
|
||||||
@ -1165,6 +1253,8 @@ def run_pyqtgraph(args) -> None:
|
|||||||
if finite_x.size > 0:
|
if finite_x.size > 0:
|
||||||
p_line.setXRange(float(np.min(finite_x)), float(np.max(finite_x)), padding=0)
|
p_line.setXRange(float(np.min(finite_x)), float(np.max(finite_x)), padding=0)
|
||||||
|
|
||||||
|
sweep_for_fft = runtime.current_fft_input
|
||||||
|
if sweep_for_fft is None:
|
||||||
sweep_for_fft = runtime.current_sweep_norm if runtime.current_sweep_norm is not None else runtime.current_sweep_raw
|
sweep_for_fft = runtime.current_sweep_norm if runtime.current_sweep_norm is not None else runtime.current_sweep_raw
|
||||||
distance_axis = runtime.current_distances if runtime.current_distances is not None else runtime.ring.distance_axis
|
distance_axis = runtime.current_distances if runtime.current_distances is not None else runtime.ring.distance_axis
|
||||||
if sweep_for_fft is not None and sweep_for_fft.size > 0 and distance_axis is not None:
|
if sweep_for_fft is not None and sweep_for_fft.size > 0 and distance_axis is not None:
|
||||||
|
|||||||
@ -82,6 +82,62 @@ class AsciiSweepParser:
|
|||||||
return events
|
return events
|
||||||
|
|
||||||
|
|
||||||
|
class ComplexAsciiSweepParser:
|
||||||
|
"""Incremental parser for ASCII ``step real imag`` streams."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self._buf = bytearray()
|
||||||
|
self._last_step: Optional[int] = None
|
||||||
|
self._seen_points = False
|
||||||
|
|
||||||
|
def feed(self, data: bytes) -> List[ParserEvent]:
|
||||||
|
if data:
|
||||||
|
self._buf += data
|
||||||
|
events: List[ParserEvent] = []
|
||||||
|
while True:
|
||||||
|
nl = self._buf.find(b"\n")
|
||||||
|
if nl == -1:
|
||||||
|
break
|
||||||
|
line = bytes(self._buf[:nl])
|
||||||
|
del self._buf[: nl + 1]
|
||||||
|
if line.endswith(b"\r"):
|
||||||
|
line = line[:-1]
|
||||||
|
if not line:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if line.lower().startswith(b"sweep_start"):
|
||||||
|
self._last_step = None
|
||||||
|
self._seen_points = False
|
||||||
|
events.append(StartEvent())
|
||||||
|
continue
|
||||||
|
|
||||||
|
parts = line.split()
|
||||||
|
if len(parts) < 3:
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
step = int(parts[0], 10)
|
||||||
|
real = float(parts[1])
|
||||||
|
imag = float(parts[2])
|
||||||
|
except Exception:
|
||||||
|
continue
|
||||||
|
if step < 0 or (not math.isfinite(real)) or (not math.isfinite(imag)):
|
||||||
|
continue
|
||||||
|
|
||||||
|
if self._seen_points and self._last_step is not None and step <= self._last_step:
|
||||||
|
events.append(StartEvent())
|
||||||
|
self._seen_points = True
|
||||||
|
self._last_step = step
|
||||||
|
events.append(
|
||||||
|
PointEvent(
|
||||||
|
ch=0,
|
||||||
|
x=step,
|
||||||
|
y=float(abs(complex(real, imag))),
|
||||||
|
aux=(float(real), float(imag)),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
return events
|
||||||
|
|
||||||
|
|
||||||
class LegacyBinaryParser:
|
class LegacyBinaryParser:
|
||||||
"""Byte-resynchronizing parser for legacy 8-byte binary records."""
|
"""Byte-resynchronizing parser for legacy 8-byte binary records."""
|
||||||
|
|
||||||
|
|||||||
@ -10,6 +10,7 @@ from queue import Full, Queue
|
|||||||
from rfg_adc_plotter.io.serial_source import SerialChunkReader, SerialLineSource
|
from rfg_adc_plotter.io.serial_source import SerialChunkReader, SerialLineSource
|
||||||
from rfg_adc_plotter.io.sweep_parser_core import (
|
from rfg_adc_plotter.io.sweep_parser_core import (
|
||||||
AsciiSweepParser,
|
AsciiSweepParser,
|
||||||
|
ComplexAsciiSweepParser,
|
||||||
LegacyBinaryParser,
|
LegacyBinaryParser,
|
||||||
LogScale16BitX2BinaryParser,
|
LogScale16BitX2BinaryParser,
|
||||||
LogScaleBinaryParser32,
|
LogScaleBinaryParser32,
|
||||||
@ -33,6 +34,7 @@ class SweepReader(threading.Thread):
|
|||||||
logscale: bool = False,
|
logscale: bool = False,
|
||||||
parser_16_bit_x2: bool = False,
|
parser_16_bit_x2: bool = False,
|
||||||
parser_test: bool = False,
|
parser_test: bool = False,
|
||||||
|
parser_complex_ascii: bool = False,
|
||||||
):
|
):
|
||||||
super().__init__(daemon=True)
|
super().__init__(daemon=True)
|
||||||
self._port_path = port_path
|
self._port_path = port_path
|
||||||
@ -44,9 +46,12 @@ class SweepReader(threading.Thread):
|
|||||||
self._logscale = bool(logscale)
|
self._logscale = bool(logscale)
|
||||||
self._parser_16_bit_x2 = bool(parser_16_bit_x2)
|
self._parser_16_bit_x2 = bool(parser_16_bit_x2)
|
||||||
self._parser_test = bool(parser_test)
|
self._parser_test = bool(parser_test)
|
||||||
|
self._parser_complex_ascii = bool(parser_complex_ascii)
|
||||||
self._src: SerialLineSource | None = None
|
self._src: SerialLineSource | None = None
|
||||||
|
|
||||||
def _build_parser(self):
|
def _build_parser(self):
|
||||||
|
if self._parser_complex_ascii:
|
||||||
|
return ComplexAsciiSweepParser(), SweepAssembler(fancy=self._fancy, apply_inversion=False)
|
||||||
if self._parser_test:
|
if self._parser_test:
|
||||||
return ParserTestStreamParser(), SweepAssembler(fancy=self._fancy, apply_inversion=False)
|
return ParserTestStreamParser(), SweepAssembler(fancy=self._fancy, apply_inversion=False)
|
||||||
if self._parser_16_bit_x2:
|
if self._parser_16_bit_x2:
|
||||||
|
|||||||
@ -65,13 +65,22 @@ def set_calibration_base_value(index: int, value: float) -> np.ndarray:
|
|||||||
def calibrate_freqs(sweep: Mapping[str, Any]) -> SweepData:
|
def calibrate_freqs(sweep: Mapping[str, Any]) -> SweepData:
|
||||||
"""Return a sweep copy with calibrated and resampled frequency axis."""
|
"""Return a sweep copy with calibrated and resampled frequency axis."""
|
||||||
freqs = np.asarray(sweep["F"], dtype=np.float64).copy()
|
freqs = np.asarray(sweep["F"], dtype=np.float64).copy()
|
||||||
values = np.asarray(sweep["I"], dtype=np.float64).copy()
|
values_in = np.asarray(sweep["I"]).reshape(-1)
|
||||||
|
values = np.asarray(
|
||||||
|
values_in,
|
||||||
|
dtype=np.complex128 if np.iscomplexobj(values_in) else np.float64,
|
||||||
|
).copy()
|
||||||
coeffs = np.asarray(CALIBRATION_C, dtype=np.float64)
|
coeffs = np.asarray(CALIBRATION_C, dtype=np.float64)
|
||||||
if freqs.size > 0:
|
if freqs.size > 0:
|
||||||
freqs = coeffs[0] + coeffs[1] * freqs + coeffs[2] * (freqs * freqs)
|
freqs = coeffs[0] + coeffs[1] * freqs + coeffs[2] * (freqs * freqs)
|
||||||
|
|
||||||
if freqs.size >= 2:
|
if freqs.size >= 2:
|
||||||
freqs_cal = np.linspace(float(freqs[0]), float(freqs[-1]), freqs.size, dtype=np.float64)
|
freqs_cal = np.linspace(float(freqs[0]), float(freqs[-1]), freqs.size, dtype=np.float64)
|
||||||
|
if np.iscomplexobj(values):
|
||||||
|
values_real = np.interp(freqs_cal, freqs, values.real.astype(np.float64, copy=False))
|
||||||
|
values_imag = np.interp(freqs_cal, freqs, values.imag.astype(np.float64, copy=False))
|
||||||
|
values_cal = (values_real + (1j * values_imag)).astype(np.complex64)
|
||||||
|
else:
|
||||||
values_cal = np.interp(freqs_cal, freqs, values).astype(np.float64)
|
values_cal = np.interp(freqs_cal, freqs, values).astype(np.float64)
|
||||||
else:
|
else:
|
||||||
freqs_cal = freqs.copy()
|
freqs_cal = freqs.copy()
|
||||||
|
|||||||
@ -24,6 +24,21 @@ def _finite_freq_bounds(freqs: Optional[np.ndarray]) -> Optional[Tuple[float, fl
|
|||||||
return f_min, f_max
|
return f_min, f_max
|
||||||
|
|
||||||
|
|
||||||
|
def _coerce_sweep_array(sweep: np.ndarray) -> np.ndarray:
|
||||||
|
values = np.asarray(sweep).reshape(-1)
|
||||||
|
if np.iscomplexobj(values):
|
||||||
|
return np.asarray(values, dtype=np.complex64)
|
||||||
|
return np.asarray(values, dtype=np.float32)
|
||||||
|
|
||||||
|
|
||||||
|
def _interp_signal(x_uniform: np.ndarray, x_known: np.ndarray, y_known: np.ndarray) -> np.ndarray:
|
||||||
|
if np.iscomplexobj(y_known):
|
||||||
|
real = np.interp(x_uniform, x_known, np.asarray(y_known.real, dtype=np.float64))
|
||||||
|
imag = np.interp(x_uniform, x_known, np.asarray(y_known.imag, dtype=np.float64))
|
||||||
|
return (real + (1j * imag)).astype(np.complex64)
|
||||||
|
return np.interp(x_uniform, x_known, np.asarray(y_known, dtype=np.float64)).astype(np.float32)
|
||||||
|
|
||||||
|
|
||||||
def prepare_fft_segment(
|
def prepare_fft_segment(
|
||||||
sweep: np.ndarray,
|
sweep: np.ndarray,
|
||||||
freqs: Optional[np.ndarray],
|
freqs: Optional[np.ndarray],
|
||||||
@ -34,8 +49,10 @@ def prepare_fft_segment(
|
|||||||
if take_fft <= 0:
|
if take_fft <= 0:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
sweep_seg = np.asarray(sweep[:take_fft], dtype=np.float32)
|
sweep_arr = _coerce_sweep_array(sweep)
|
||||||
fallback = np.nan_to_num(sweep_seg, nan=0.0).astype(np.float32, copy=False)
|
sweep_seg = sweep_arr[:take_fft]
|
||||||
|
fallback_dtype = np.complex64 if np.iscomplexobj(sweep_seg) else np.float32
|
||||||
|
fallback = np.nan_to_num(sweep_seg, nan=0.0).astype(fallback_dtype, copy=False)
|
||||||
if freqs is None:
|
if freqs is None:
|
||||||
return fallback, take_fft
|
return fallback, take_fft
|
||||||
|
|
||||||
@ -59,7 +76,7 @@ def prepare_fft_segment(
|
|||||||
return fallback, take_fft
|
return fallback, take_fft
|
||||||
|
|
||||||
x_uniform = np.linspace(float(x_unique[0]), float(x_unique[-1]), take_fft, dtype=np.float64)
|
x_uniform = np.linspace(float(x_unique[0]), float(x_unique[-1]), take_fft, dtype=np.float64)
|
||||||
resampled = np.interp(x_uniform, x_unique, y_unique).astype(np.float32)
|
resampled = _interp_signal(x_uniform, x_unique, y_unique)
|
||||||
return resampled, take_fft
|
return resampled, take_fft
|
||||||
|
|
||||||
|
|
||||||
@ -94,18 +111,20 @@ def build_symmetric_ifft_spectrum(
|
|||||||
|
|
||||||
fft_seg, take_fft = prepared
|
fft_seg, take_fft = prepared
|
||||||
if take_fft != band_len:
|
if take_fft != band_len:
|
||||||
fft_seg = np.asarray(fft_seg[:band_len], dtype=np.float32)
|
fft_dtype = np.complex64 if np.iscomplexobj(fft_seg) else np.float32
|
||||||
|
fft_seg = np.asarray(fft_seg[:band_len], dtype=fft_dtype)
|
||||||
if fft_seg.size < band_len:
|
if fft_seg.size < band_len:
|
||||||
padded = np.zeros((band_len,), dtype=np.float32)
|
padded = np.zeros((band_len,), dtype=fft_dtype)
|
||||||
padded[: fft_seg.size] = fft_seg
|
padded[: fft_seg.size] = fft_seg
|
||||||
fft_seg = padded
|
fft_seg = padded
|
||||||
|
|
||||||
window = np.hanning(band_len).astype(np.float32)
|
window = np.hanning(band_len).astype(np.float32)
|
||||||
band = np.nan_to_num(fft_seg, nan=0.0).astype(np.float32, copy=False) * window
|
band_dtype = np.complex64 if np.iscomplexobj(fft_seg) else np.float32
|
||||||
|
band = np.nan_to_num(fft_seg, nan=0.0).astype(band_dtype, copy=False) * window
|
||||||
|
|
||||||
spectrum = np.zeros((int(fft_len),), dtype=np.float32)
|
spectrum = np.zeros((int(fft_len),), dtype=band_dtype)
|
||||||
spectrum[pos_idx] = band
|
spectrum[pos_idx] = band
|
||||||
spectrum[neg_idx] = band[::-1]
|
spectrum[neg_idx] = np.conj(band[::-1]) if np.iscomplexobj(band) else band[::-1]
|
||||||
return spectrum
|
return spectrum
|
||||||
|
|
||||||
|
|
||||||
@ -137,16 +156,18 @@ def build_positive_only_centered_ifft_spectrum(
|
|||||||
|
|
||||||
fft_seg, take_fft = prepared
|
fft_seg, take_fft = prepared
|
||||||
if take_fft != band_len:
|
if take_fft != band_len:
|
||||||
fft_seg = np.asarray(fft_seg[:band_len], dtype=np.float32)
|
fft_dtype = np.complex64 if np.iscomplexobj(fft_seg) else np.float32
|
||||||
|
fft_seg = np.asarray(fft_seg[:band_len], dtype=fft_dtype)
|
||||||
if fft_seg.size < band_len:
|
if fft_seg.size < band_len:
|
||||||
padded = np.zeros((band_len,), dtype=np.float32)
|
padded = np.zeros((band_len,), dtype=fft_dtype)
|
||||||
padded[: fft_seg.size] = fft_seg
|
padded[: fft_seg.size] = fft_seg
|
||||||
fft_seg = padded
|
fft_seg = padded
|
||||||
|
|
||||||
window = np.hanning(band_len).astype(np.float32)
|
window = np.hanning(band_len).astype(np.float32)
|
||||||
band = np.nan_to_num(fft_seg, nan=0.0).astype(np.float32, copy=False) * window
|
band_dtype = np.complex64 if np.iscomplexobj(fft_seg) else np.float32
|
||||||
|
band = np.nan_to_num(fft_seg, nan=0.0).astype(band_dtype, copy=False) * window
|
||||||
|
|
||||||
spectrum = np.zeros((int(fft_len),), dtype=np.float32)
|
spectrum = np.zeros((int(fft_len),), dtype=band_dtype)
|
||||||
spectrum[pos_idx] = band
|
spectrum[pos_idx] = band
|
||||||
return spectrum
|
return spectrum
|
||||||
|
|
||||||
@ -168,7 +189,8 @@ def _compute_fft_mag_row_direct(
|
|||||||
return np.full((bins,), np.nan, dtype=np.float32)
|
return np.full((bins,), np.nan, dtype=np.float32)
|
||||||
|
|
||||||
fft_seg, take_fft = prepared
|
fft_seg, take_fft = prepared
|
||||||
fft_in = np.zeros((FFT_LEN,), dtype=np.float32)
|
fft_dtype = np.complex64 if np.iscomplexobj(fft_seg) else np.float32
|
||||||
|
fft_in = np.zeros((FFT_LEN,), dtype=fft_dtype)
|
||||||
window = np.hanning(take_fft).astype(np.float32)
|
window = np.hanning(take_fft).astype(np.float32)
|
||||||
fft_in[:take_fft] = fft_seg * window
|
fft_in[:take_fft] = fft_seg * window
|
||||||
spec = np.fft.ifft(fft_in)
|
spec = np.fft.ifft(fft_in)
|
||||||
|
|||||||
@ -150,18 +150,24 @@ def resample_envelope(envelope: np.ndarray, width: int) -> np.ndarray:
|
|||||||
|
|
||||||
def normalize_by_envelope(raw: np.ndarray, envelope: np.ndarray) -> np.ndarray:
|
def normalize_by_envelope(raw: np.ndarray, envelope: np.ndarray) -> np.ndarray:
|
||||||
"""Normalize a sweep by an envelope with safe resampling and zero protection."""
|
"""Normalize a sweep by an envelope with safe resampling and zero protection."""
|
||||||
raw_arr = np.asarray(raw, dtype=np.float32).reshape(-1)
|
raw_in = np.asarray(raw).reshape(-1)
|
||||||
|
raw_dtype = np.complex64 if np.iscomplexobj(raw_in) else np.float32
|
||||||
|
raw_arr = np.asarray(raw_in, dtype=raw_dtype).reshape(-1)
|
||||||
if raw_arr.size == 0:
|
if raw_arr.size == 0:
|
||||||
return raw_arr.copy()
|
return raw_arr.copy()
|
||||||
|
|
||||||
env = resample_envelope(envelope, raw_arr.size)
|
env = resample_envelope(envelope, raw_arr.size)
|
||||||
out = np.full_like(raw_arr, np.nan, dtype=np.float32)
|
out = np.full(raw_arr.shape, np.nan + 0j if np.iscomplexobj(raw_arr) else np.nan, dtype=raw_dtype)
|
||||||
den_eps = np.float32(1e-9)
|
den_eps = np.float32(1e-9)
|
||||||
valid = np.isfinite(raw_arr) & np.isfinite(env)
|
valid = np.isfinite(raw_arr) & np.isfinite(env)
|
||||||
if np.any(valid):
|
if np.any(valid):
|
||||||
with np.errstate(divide="ignore", invalid="ignore"):
|
with np.errstate(divide="ignore", invalid="ignore"):
|
||||||
denom = env[valid] + np.where(env[valid] >= 0.0, den_eps, -den_eps)
|
denom = env[valid] + np.where(env[valid] >= 0.0, den_eps, -den_eps)
|
||||||
out[valid] = raw_arr[valid] / denom
|
out[valid] = raw_arr[valid] / denom
|
||||||
|
if np.iscomplexobj(out):
|
||||||
|
out_real = np.nan_to_num(out.real, nan=np.nan, posinf=np.nan, neginf=np.nan)
|
||||||
|
out_imag = np.nan_to_num(out.imag, nan=np.nan, posinf=np.nan, neginf=np.nan)
|
||||||
|
return (out_real + (1j * out_imag)).astype(np.complex64, copy=False)
|
||||||
return np.nan_to_num(out, nan=np.nan, posinf=np.nan, neginf=np.nan)
|
return np.nan_to_num(out, nan=np.nan, posinf=np.nan, neginf=np.nan)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -23,6 +23,7 @@ class RingBuffer:
|
|||||||
self.ring: Optional[np.ndarray] = None
|
self.ring: Optional[np.ndarray] = None
|
||||||
self.ring_time: Optional[np.ndarray] = None
|
self.ring_time: Optional[np.ndarray] = None
|
||||||
self.ring_fft: Optional[np.ndarray] = None
|
self.ring_fft: Optional[np.ndarray] = None
|
||||||
|
self.ring_fft_input: Optional[np.ndarray] = None
|
||||||
self.x_shared: Optional[np.ndarray] = None
|
self.x_shared: Optional[np.ndarray] = None
|
||||||
self.distance_axis: Optional[np.ndarray] = None
|
self.distance_axis: Optional[np.ndarray] = None
|
||||||
self.last_fft_mag: Optional[np.ndarray] = None
|
self.last_fft_mag: Optional[np.ndarray] = None
|
||||||
@ -46,6 +47,7 @@ class RingBuffer:
|
|||||||
self.ring = None
|
self.ring = None
|
||||||
self.ring_time = None
|
self.ring_time = None
|
||||||
self.ring_fft = None
|
self.ring_fft = None
|
||||||
|
self.ring_fft_input = None
|
||||||
self.x_shared = None
|
self.x_shared = None
|
||||||
self.distance_axis = None
|
self.distance_axis = None
|
||||||
self.last_fft_mag = None
|
self.last_fft_mag = None
|
||||||
@ -63,13 +65,18 @@ class RingBuffer:
|
|||||||
self.ring = np.full((self.max_sweeps, self.width), np.nan, dtype=np.float32)
|
self.ring = np.full((self.max_sweeps, self.width), np.nan, dtype=np.float32)
|
||||||
self.ring_time = np.full((self.max_sweeps,), np.nan, dtype=np.float64)
|
self.ring_time = np.full((self.max_sweeps,), np.nan, dtype=np.float64)
|
||||||
self.ring_fft = np.full((self.max_sweeps, self.fft_bins), np.nan, dtype=np.float32)
|
self.ring_fft = np.full((self.max_sweeps, self.fft_bins), np.nan, dtype=np.float32)
|
||||||
|
self.ring_fft_input = np.full((self.max_sweeps, self.width), np.nan + 0j, dtype=np.complex64)
|
||||||
self.head = 0
|
self.head = 0
|
||||||
changed = True
|
changed = True
|
||||||
elif target_width != self.width:
|
elif target_width != self.width:
|
||||||
new_ring = np.full((self.max_sweeps, target_width), np.nan, dtype=np.float32)
|
new_ring = np.full((self.max_sweeps, target_width), np.nan, dtype=np.float32)
|
||||||
|
new_fft_input = np.full((self.max_sweeps, target_width), np.nan + 0j, dtype=np.complex64)
|
||||||
take = min(self.width, target_width)
|
take = min(self.width, target_width)
|
||||||
new_ring[:, :take] = self.ring[:, :take]
|
new_ring[:, :take] = self.ring[:, :take]
|
||||||
|
if self.ring_fft_input is not None:
|
||||||
|
new_fft_input[:, :take] = self.ring_fft_input[:, :take]
|
||||||
self.ring = new_ring
|
self.ring = new_ring
|
||||||
|
self.ring_fft_input = new_fft_input
|
||||||
self.width = target_width
|
self.width = target_width
|
||||||
changed = True
|
changed = True
|
||||||
|
|
||||||
@ -106,12 +113,18 @@ class RingBuffer:
|
|||||||
|
|
||||||
self.ring_fft.fill(np.nan)
|
self.ring_fft.fill(np.nan)
|
||||||
for row_idx in range(self.ring.shape[0]):
|
for row_idx in range(self.ring.shape[0]):
|
||||||
sweep_row = self.ring[row_idx]
|
fft_source_row = self.ring_fft_input[row_idx] if self.ring_fft_input is not None else self.ring[row_idx]
|
||||||
if not np.any(np.isfinite(sweep_row)):
|
if not np.any(np.isfinite(fft_source_row)):
|
||||||
continue
|
continue
|
||||||
|
finite_idx = np.flatnonzero(np.isfinite(fft_source_row))
|
||||||
|
if finite_idx.size <= 0:
|
||||||
|
continue
|
||||||
|
row_width = int(finite_idx[-1]) + 1
|
||||||
|
fft_source = fft_source_row[:row_width]
|
||||||
|
freqs = self.last_freqs[:row_width] if self.last_freqs is not None and self.last_freqs.size >= row_width else self.last_freqs
|
||||||
fft_mag = compute_fft_mag_row(
|
fft_mag = compute_fft_mag_row(
|
||||||
sweep_row,
|
fft_source,
|
||||||
self.last_freqs,
|
freqs,
|
||||||
self.fft_bins,
|
self.fft_bins,
|
||||||
mode=self.fft_mode,
|
mode=self.fft_mode,
|
||||||
)
|
)
|
||||||
@ -140,12 +153,18 @@ class RingBuffer:
|
|||||||
"""Backward-compatible wrapper for the old two-state FFT switch."""
|
"""Backward-compatible wrapper for the old two-state FFT switch."""
|
||||||
return self.set_fft_mode("symmetric" if enabled else "direct")
|
return self.set_fft_mode("symmetric" if enabled else "direct")
|
||||||
|
|
||||||
def push(self, sweep: np.ndarray, freqs: Optional[np.ndarray] = None) -> None:
|
def push(
|
||||||
|
self,
|
||||||
|
sweep: np.ndarray,
|
||||||
|
freqs: Optional[np.ndarray] = None,
|
||||||
|
*,
|
||||||
|
fft_input: Optional[np.ndarray] = None,
|
||||||
|
) -> None:
|
||||||
"""Push a processed sweep and refresh raw/FFT buffers."""
|
"""Push a processed sweep and refresh raw/FFT buffers."""
|
||||||
if sweep is None or sweep.size == 0:
|
if sweep is None or sweep.size == 0:
|
||||||
return
|
return
|
||||||
self.ensure_init(int(sweep.size))
|
self.ensure_init(int(sweep.size))
|
||||||
if self.ring is None or self.ring_time is None or self.ring_fft is None:
|
if self.ring is None or self.ring_time is None or self.ring_fft is None or self.ring_fft_input is None:
|
||||||
return
|
return
|
||||||
|
|
||||||
row = np.full((self.width,), np.nan, dtype=np.float32)
|
row = np.full((self.width,), np.nan, dtype=np.float32)
|
||||||
@ -156,7 +175,13 @@ class RingBuffer:
|
|||||||
if freqs is not None:
|
if freqs is not None:
|
||||||
self.last_freqs = np.asarray(freqs, dtype=np.float64).copy()
|
self.last_freqs = np.asarray(freqs, dtype=np.float64).copy()
|
||||||
|
|
||||||
fft_mag = compute_fft_mag_row(sweep, freqs, self.fft_bins, mode=self.fft_mode)
|
fft_source = np.asarray(fft_input if fft_input is not None else sweep).reshape(-1)
|
||||||
|
fft_row = np.full((self.width,), np.nan + 0j, dtype=np.complex64)
|
||||||
|
fft_take = min(self.width, int(fft_source.size))
|
||||||
|
fft_row[:fft_take] = np.asarray(fft_source[:fft_take], dtype=np.complex64)
|
||||||
|
self.ring_fft_input[self.head, :] = fft_row
|
||||||
|
|
||||||
|
fft_mag = compute_fft_mag_row(fft_source, freqs, self.fft_bins, mode=self.fft_mode)
|
||||||
self.ring_fft[self.head, :] = fft_mag
|
self.ring_fft[self.head, :] = fft_mag
|
||||||
self.last_fft_mag = np.asarray(fft_mag, dtype=np.float32).copy()
|
self.last_fft_mag = np.asarray(fft_mag, dtype=np.float32).copy()
|
||||||
self.last_fft_db = fft_mag_to_db(fft_mag)
|
self.last_fft_db = fft_mag_to_db(fft_mag)
|
||||||
|
|||||||
@ -20,10 +20,13 @@ class RuntimeState:
|
|||||||
range_max_ghz: float = 0.0
|
range_max_ghz: float = 0.0
|
||||||
full_current_freqs: Optional[np.ndarray] = None
|
full_current_freqs: Optional[np.ndarray] = None
|
||||||
full_current_sweep_raw: Optional[np.ndarray] = None
|
full_current_sweep_raw: Optional[np.ndarray] = None
|
||||||
|
full_current_fft_source: Optional[np.ndarray] = None
|
||||||
full_current_aux_curves: SweepAuxCurves = None
|
full_current_aux_curves: SweepAuxCurves = None
|
||||||
current_freqs: Optional[np.ndarray] = None
|
current_freqs: Optional[np.ndarray] = None
|
||||||
current_distances: Optional[np.ndarray] = None
|
current_distances: Optional[np.ndarray] = None
|
||||||
current_sweep_raw: Optional[np.ndarray] = None
|
current_sweep_raw: Optional[np.ndarray] = None
|
||||||
|
current_fft_source: Optional[np.ndarray] = None
|
||||||
|
current_fft_input: Optional[np.ndarray] = None
|
||||||
current_aux_curves: SweepAuxCurves = None
|
current_aux_curves: SweepAuxCurves = None
|
||||||
current_sweep_norm: Optional[np.ndarray] = None
|
current_sweep_norm: Optional[np.ndarray] = None
|
||||||
current_fft_mag: Optional[np.ndarray] = None
|
current_fft_mag: Optional[np.ndarray] = None
|
||||||
|
|||||||
@ -31,6 +31,7 @@ class CliTests(unittest.TestCase):
|
|||||||
self.assertEqual(proc.returncode, 0)
|
self.assertEqual(proc.returncode, 0)
|
||||||
self.assertIn("usage:", proc.stdout)
|
self.assertIn("usage:", proc.stdout)
|
||||||
self.assertIn("--parser_16_bit_x2", proc.stdout)
|
self.assertIn("--parser_16_bit_x2", proc.stdout)
|
||||||
|
self.assertIn("--parser_complex_ascii", proc.stdout)
|
||||||
|
|
||||||
def test_backend_mpl_reports_removal(self):
|
def test_backend_mpl_reports_removal(self):
|
||||||
proc = _run("-m", "rfg_adc_plotter.main", "/dev/null", "--backend", "mpl")
|
proc = _run("-m", "rfg_adc_plotter.main", "/dev/null", "--backend", "mpl")
|
||||||
|
|||||||
@ -56,6 +56,18 @@ class ProcessingTests(unittest.TestCase):
|
|||||||
self.assertEqual(calibrated["I"].shape, (32,))
|
self.assertEqual(calibrated["I"].shape, (32,))
|
||||||
self.assertTrue(np.all(np.diff(calibrated["F"]) >= 0.0))
|
self.assertTrue(np.all(np.diff(calibrated["F"]) >= 0.0))
|
||||||
|
|
||||||
|
def test_calibrate_freqs_keeps_complex_payload(self):
|
||||||
|
sweep = {
|
||||||
|
"F": np.linspace(3.3, 14.3, 32),
|
||||||
|
"I": np.exp(1j * np.linspace(0.0, np.pi, 32)).astype(np.complex64),
|
||||||
|
}
|
||||||
|
calibrated = calibrate_freqs(sweep)
|
||||||
|
|
||||||
|
self.assertEqual(calibrated["F"].shape, (32,))
|
||||||
|
self.assertEqual(calibrated["I"].shape, (32,))
|
||||||
|
self.assertTrue(np.iscomplexobj(calibrated["I"]))
|
||||||
|
self.assertTrue(np.all(np.isfinite(calibrated["I"])))
|
||||||
|
|
||||||
def test_normalizers_and_envelopes_return_finite_ranges(self):
|
def test_normalizers_and_envelopes_return_finite_ranges(self):
|
||||||
calib = (np.sin(np.linspace(0.0, 4.0 * np.pi, 64)) * 5.0).astype(np.float32)
|
calib = (np.sin(np.linspace(0.0, 4.0 * np.pi, 64)) * 5.0).astype(np.float32)
|
||||||
raw = calib * 0.75
|
raw = calib * 0.75
|
||||||
@ -105,6 +117,15 @@ class ProcessingTests(unittest.TestCase):
|
|||||||
self.assertAlmostEqual(float(normalized[1]), 2.0, places=5)
|
self.assertAlmostEqual(float(normalized[1]), 2.0, places=5)
|
||||||
self.assertAlmostEqual(float(normalized[2]), -3.0, places=5)
|
self.assertAlmostEqual(float(normalized[2]), -3.0, places=5)
|
||||||
|
|
||||||
|
def test_normalize_by_envelope_supports_complex_input(self):
|
||||||
|
raw = np.asarray([1.0 + 1.0j, 2.0 - 2.0j], dtype=np.complex64)
|
||||||
|
envelope = np.asarray([1.0, 2.0], dtype=np.float32)
|
||||||
|
normalized = normalize_by_envelope(raw, envelope)
|
||||||
|
|
||||||
|
self.assertTrue(np.iscomplexobj(normalized))
|
||||||
|
self.assertTrue(np.all(np.isfinite(normalized)))
|
||||||
|
self.assertTrue(np.allclose(normalized, np.asarray([1.0 + 1.0j, 1.0 - 1.0j], dtype=np.complex64)))
|
||||||
|
|
||||||
def test_load_calib_envelope_rejects_empty_payload(self):
|
def test_load_calib_envelope_rejects_empty_payload(self):
|
||||||
with tempfile.TemporaryDirectory() as tmp_dir:
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
||||||
path = os.path.join(tmp_dir, "empty.npy")
|
path = os.path.join(tmp_dir, "empty.npy")
|
||||||
@ -247,6 +268,33 @@ class ProcessingTests(unittest.TestCase):
|
|||||||
self.assertTrue(np.allclose(spectrum[zero_mask], 0.0))
|
self.assertTrue(np.allclose(spectrum[zero_mask], 0.0))
|
||||||
self.assertTrue(np.any(np.abs(spectrum[pos_idx]) > 0.0))
|
self.assertTrue(np.any(np.abs(spectrum[pos_idx]) > 0.0))
|
||||||
|
|
||||||
|
def test_complex_symmetric_ifft_spectrum_uses_conjugate_mirror(self):
|
||||||
|
sweep = np.exp(1j * np.linspace(0.0, np.pi, 128)).astype(np.complex64)
|
||||||
|
freqs = np.linspace(4.0, 10.0, 128, dtype=np.float64)
|
||||||
|
spectrum = build_symmetric_ifft_spectrum(sweep, freqs, fft_len=FFT_LEN)
|
||||||
|
|
||||||
|
self.assertIsNotNone(spectrum)
|
||||||
|
freq_axis = np.linspace(-10.0, 10.0, FFT_LEN, dtype=np.float64)
|
||||||
|
neg_idx_all = np.flatnonzero(freq_axis <= (-4.0))
|
||||||
|
pos_idx_all = np.flatnonzero(freq_axis >= 4.0)
|
||||||
|
band_len = int(min(neg_idx_all.size, pos_idx_all.size))
|
||||||
|
neg_idx = neg_idx_all[:band_len]
|
||||||
|
pos_idx = pos_idx_all[-band_len:]
|
||||||
|
|
||||||
|
self.assertTrue(np.iscomplexobj(spectrum))
|
||||||
|
self.assertTrue(np.allclose(spectrum[neg_idx], np.conj(spectrum[pos_idx][::-1])))
|
||||||
|
|
||||||
|
def test_compute_fft_helpers_accept_complex_input(self):
|
||||||
|
sweep = np.exp(1j * np.linspace(0.0, 2.0 * np.pi, 128)).astype(np.complex64)
|
||||||
|
freqs = np.linspace(3.3, 14.3, 128, dtype=np.float64)
|
||||||
|
mag = compute_fft_mag_row(sweep, freqs, 513, mode="positive_only")
|
||||||
|
row = compute_fft_row(sweep, freqs, 513, mode="positive_only")
|
||||||
|
|
||||||
|
self.assertEqual(mag.shape, (513,))
|
||||||
|
self.assertEqual(row.shape, (513,))
|
||||||
|
self.assertTrue(np.any(np.isfinite(mag)))
|
||||||
|
self.assertTrue(np.any(np.isfinite(row)))
|
||||||
|
|
||||||
def test_symmetric_distance_axis_uses_windowed_frequency_bounds(self):
|
def test_symmetric_distance_axis_uses_windowed_frequency_bounds(self):
|
||||||
freqs = np.linspace(4.0, 10.0, 128, dtype=np.float64)
|
freqs = np.linspace(4.0, 10.0, 128, dtype=np.float64)
|
||||||
axis = compute_distance_axis(freqs, 513, mode="symmetric")
|
axis = compute_distance_axis(freqs, 513, mode="symmetric")
|
||||||
|
|||||||
@ -3,6 +3,7 @@ from __future__ import annotations
|
|||||||
import numpy as np
|
import numpy as np
|
||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
|
from rfg_adc_plotter.processing.fft import compute_fft_mag_row
|
||||||
from rfg_adc_plotter.state.ring_buffer import RingBuffer
|
from rfg_adc_plotter.state.ring_buffer import RingBuffer
|
||||||
|
|
||||||
|
|
||||||
@ -72,6 +73,19 @@ class RingBufferTests(unittest.TestCase):
|
|||||||
self.assertEqual(ring.last_fft_db.shape, (ring.fft_bins,))
|
self.assertEqual(ring.last_fft_db.shape, (ring.fft_bins,))
|
||||||
self.assertIsNotNone(ring.distance_axis)
|
self.assertIsNotNone(ring.distance_axis)
|
||||||
|
|
||||||
|
def test_ring_buffer_rebuilds_fft_from_complex_input(self):
|
||||||
|
ring = RingBuffer(max_sweeps=2)
|
||||||
|
freqs = np.linspace(3.3, 14.3, 64, dtype=np.float64)
|
||||||
|
complex_input = np.exp(1j * np.linspace(0.0, 2.0 * np.pi, 64)).astype(np.complex64)
|
||||||
|
display_sweep = np.abs(complex_input).astype(np.float32)
|
||||||
|
ring.push(display_sweep, freqs, fft_input=complex_input)
|
||||||
|
|
||||||
|
ring.set_fft_mode("direct")
|
||||||
|
|
||||||
|
expected = compute_fft_mag_row(complex_input, freqs, ring.fft_bins, mode="direct")
|
||||||
|
self.assertTrue(np.allclose(ring.get_last_fft_linear(), expected))
|
||||||
|
self.assertTrue(np.allclose(ring.get_display_raw()[: display_sweep.size, -1], display_sweep))
|
||||||
|
|
||||||
def test_ring_buffer_reset_clears_cached_history(self):
|
def test_ring_buffer_reset_clears_cached_history(self):
|
||||||
ring = RingBuffer(max_sweeps=2)
|
ring = RingBuffer(max_sweeps=2)
|
||||||
ring.push(np.linspace(0.0, 1.0, 64, dtype=np.float32), np.linspace(4.0, 10.0, 64))
|
ring.push(np.linspace(0.0, 1.0, 64, dtype=np.float32), np.linspace(4.0, 10.0, 64))
|
||||||
|
|||||||
@ -5,6 +5,7 @@ import unittest
|
|||||||
|
|
||||||
from rfg_adc_plotter.io.sweep_parser_core import (
|
from rfg_adc_plotter.io.sweep_parser_core import (
|
||||||
AsciiSweepParser,
|
AsciiSweepParser,
|
||||||
|
ComplexAsciiSweepParser,
|
||||||
LegacyBinaryParser,
|
LegacyBinaryParser,
|
||||||
LogScale16BitX2BinaryParser,
|
LogScale16BitX2BinaryParser,
|
||||||
LogScaleBinaryParser32,
|
LogScaleBinaryParser32,
|
||||||
@ -96,6 +97,20 @@ class SweepParserCoreTests(unittest.TestCase):
|
|||||||
self.assertEqual(events[1].x, 1)
|
self.assertEqual(events[1].x, 1)
|
||||||
self.assertEqual(events[1].y, -2.0)
|
self.assertEqual(events[1].y, -2.0)
|
||||||
|
|
||||||
|
def test_complex_ascii_parser_detects_new_sweep_on_step_reset(self):
|
||||||
|
parser = ComplexAsciiSweepParser()
|
||||||
|
events = parser.feed(b"0 3 4\n1 5 12\n0 8 15\n")
|
||||||
|
|
||||||
|
self.assertIsInstance(events[0], PointEvent)
|
||||||
|
self.assertEqual(events[0].x, 0)
|
||||||
|
self.assertEqual(events[0].y, 5.0)
|
||||||
|
self.assertEqual(events[0].aux, (3.0, 4.0))
|
||||||
|
self.assertIsInstance(events[1], PointEvent)
|
||||||
|
self.assertEqual(events[1].y, 13.0)
|
||||||
|
self.assertIsInstance(events[2], StartEvent)
|
||||||
|
self.assertIsInstance(events[3], PointEvent)
|
||||||
|
self.assertEqual(events[3].aux, (8.0, 15.0))
|
||||||
|
|
||||||
def test_logscale_32_parser_keeps_channel_and_aux_values(self):
|
def test_logscale_32_parser_keeps_channel_and_aux_values(self):
|
||||||
parser = LogScaleBinaryParser32()
|
parser = LogScaleBinaryParser32()
|
||||||
stream = _pack_log_start(5) + _pack_log_point(7, 1500, 700, ch=5)
|
stream = _pack_log_start(5) + _pack_log_point(7, 1500, 700, ch=5)
|
||||||
|
|||||||
Reference in New Issue
Block a user