updated acquisition

This commit is contained in:
Ayzen
2025-09-23 20:41:52 +03:00
parent 32323e7743
commit 664314097f
13 changed files with 456 additions and 226 deletions

View File

@ -32,12 +32,7 @@ async def health_check():
async def device_status():
"""Get device connection status and details."""
acquisition = singletons.vna_data_acquisition_instance
device_info = acquisition.get_device_info()
return {
"device_info": device_info,
"acquisition_running": acquisition.is_running,
"last_sweep": acquisition.sweep_buffer.get_latest_sweep_number(),
"total_sweeps": len(acquisition.sweep_buffer.get_all_sweeps()),
"buffer_stats": acquisition.sweep_buffer.get_stats(),
}

View File

@ -13,7 +13,7 @@ from pathlib import Path
import vna_system.core.singletons as singletons
from vna_system.core.processing.sweep_processor import SweepProcessingManager
from vna_system.api.websockets.websocket_handler import WebSocketManager
from vna_system.core.processing.websocket_handler import WebSocketManager
from vna_system.api.endpoints import health, processing, web_ui
from vna_system.api.websockets import processing as ws_processing
@ -66,10 +66,10 @@ async def lifespan(app: FastAPI):
# Start acquisition
logger.info("Starting data acquisition...")
singletons.vna_data_acquisition_instance.start()
# Connect processing to acquisition
singletons.processing_manager.set_sweep_buffer(singletons.vna_data_acquisition_instance.sweep_buffer)
singletons.vna_data_acquisition_instance.start()
singletons.processing_manager.start()
logger.info("Sweep processing started")

View File

@ -5,8 +5,12 @@ Configuration file for VNA data acquisition system
import glob
import logging
from pathlib import Path
import serial.tools.list_ports
# Base directory for VNA system
BASE_DIR = Path(__file__).parent.parent
# Serial communication settings
DEFAULT_BAUD_RATE = 115200
DEFAULT_PORT = "/dev/ttyACM0"
@ -46,7 +50,7 @@ SERIAL_PEEK_SIZE = 32
SERIAL_IDLE_TIMEOUT = 0.5
SERIAL_DRAIN_DELAY = 0.05
SERIAL_DRAIN_CHECK_DELAY = 0.01
SERIAL_CONNECT_DELAY = 0.5
SERIAL_CONNECT_DELAY = 0.01
def find_vna_port():
@ -61,21 +65,21 @@ def find_vna_port():
# Method 1: Use pyserial port detection by VID/PID
try:
ports = list(serial.tools.list_ports.comports())
logger.debug(f"Found {len(ports)} serial ports")
for port in ports:
logger.debug(f"Checking port {port.device}: VID={port.vid:04X} PID={port.pid:04X} "
f"Manufacturer='{port.manufacturer}' Product='{port.description}'")
logger.debug(f"Checking port {port.device}")
# Check by VID/PID
if port.vid == VNA_VID and port.pid == VNA_PID:
logger.info(f"Found VNA device by VID/PID at {port.device}")
logger.debug(f"Found VNA device by VID/PID at {port.device}")
return port.device
# Fallback: Check by manufacturer/product strings
if (port.manufacturer and VNA_MANUFACTURER.lower() in port.manufacturer.lower() and
port.description and VNA_PRODUCT.lower() in port.description.lower()):
logger.info(f"Found VNA device by description at {port.device}")
logger.debug(f"Found VNA device by description at {port.device}")
return port.device
except Exception as e:

View File

@ -6,11 +6,10 @@ import os
import struct
import threading
import time
from typing import BinaryIO, List, Optional, Tuple
from typing import BinaryIO, List, Tuple
import serial
# Avoid wildcard imports: make config access explicit and discoverable.
from vna_system.config import config as cfg
from vna_system.core.acquisition.sweep_buffer import SweepBuffer
@ -21,25 +20,16 @@ logger = logging.getLogger(__name__)
class VNADataAcquisition:
"""Main data acquisition class with asynchronous sweep collection."""
def __init__(self, port: str = None, baud: int = cfg.DEFAULT_BAUD_RATE) -> None:
def __init__(self) -> None:
self.bin_log_path: str = cfg.BIN_LOG_FILE_PATH
# Auto-detect port if not specified
if port is None:
self.port: str = cfg.get_vna_port()
logger.info(f"Using auto-detected port: {self.port}")
else:
self.port: str = port
logger.info(f"Using specified port: {self.port}")
self.baud: int = baud
self.baud: int = cfg.DEFAULT_BAUD_RATE
self._sweep_buffer = SweepBuffer()
self.ser: Optional[serial.SerialBase] = None
# Control flags
self._running: bool = False
self._thread: Optional[threading.Thread] = None
self._thread: threading.Thread | None = None
self._stop_event: threading.Event = threading.Event()
# Sweep collection state
@ -76,7 +66,6 @@ class VNADataAcquisition:
self._thread.join(timeout=5.0)
logger.info("Acquisition thread joined.")
self._close_serial()
@property
def is_running(self) -> bool:
@ -88,99 +77,25 @@ class VNADataAcquisition:
"""Return a reference to the sweep buffer."""
return self._sweep_buffer
def get_device_info(self) -> dict:
"""Get information about the connected VNA device."""
info = {
'port': self.port,
'baud_rate': self.baud,
'is_connected': self.ser is not None and self.ser.is_open if self.ser else False,
'is_running': self.is_running,
'device_detected': False,
'device_details': None
}
# Try to get device details from port detection
try:
from serial.tools.list_ports import comports
for port in comports():
if port.device == self.port:
info['device_detected'] = True
info['device_details'] = {
'vid': f"0x{port.vid:04X}" if port.vid else None,
'pid': f"0x{port.pid:04X}" if port.pid else None,
'manufacturer': port.manufacturer,
'product': port.description,
'serial_number': port.serial_number
}
break
except Exception as e:
logger.debug(f"Could not get device details: {e}")
return info
# --------------------------------------------------------------------- #
# Serial management
# --------------------------------------------------------------------- #
def _connect_serial(self) -> bool:
"""Establish the serial connection."""
try:
logger.info(f"Attempting to connect to VNA device at {self.port} @ {self.baud} baud")
# Check if port exists
import os
if not os.path.exists(self.port):
logger.error(f"Port {self.port} does not exist. Device not connected?")
return False
# Check permissions
if not os.access(self.port, os.R_OK | os.W_OK):
logger.error(f"No read/write permissions for {self.port}. Try: sudo chmod 666 {self.port}")
logger.error("Or add your user to the dialout group: sudo usermod -a -G dialout $USER")
return False
self.ser = serial.Serial(self.port, baudrate=self.baud, timeout=0)
time.sleep(cfg.SERIAL_CONNECT_DELAY)
self._drain_serial_input()
logger.info(f"Serial connection established to {self.port}")
return True
except PermissionError as exc:
logger.error(f"Permission denied for {self.port}: {exc}")
logger.error("Try: sudo chmod 666 {self.port} or add user to dialout group")
return False
except serial.SerialException as exc:
logger.error(f"Serial connection failed for {self.port}: {exc}")
return False
except Exception as exc: # noqa: BLE001 (keep broad to preserve behavior)
logger.error(f"Unexpected error connecting to {self.port}: {exc}")
return False
def _close_serial(self) -> None:
"""Close the serial connection if open."""
if self.ser:
try:
self.ser.close()
logger.debug("Serial connection closed.")
except Exception: # noqa: BLE001
# Preserve original behavior: swallow any serial close errors.
logger.debug("Ignoring error while closing serial.", exc_info=True)
finally:
self.ser = None
def _drain_serial_input(self) -> None:
def _drain_serial_input(self, ser: serial.Serial) -> None:
"""Drain any pending bytes from the serial input buffer."""
if not self.ser:
if not ser:
return
drained = 0
while True:
bytes_waiting = getattr(self.ser, "in_waiting", 0)
bytes_waiting = getattr(ser, "in_waiting", 0)
if bytes_waiting <= 0:
break
drained += len(self.ser.read(bytes_waiting))
drained += len(ser.read(bytes_waiting))
time.sleep(cfg.SERIAL_DRAIN_CHECK_DELAY)
if drained:
logger.debug("Drained %d pending byte(s) from serial input.", drained)
logger.warning("Drained %d pending byte(s) from serial input.", drained)
# --------------------------------------------------------------------- #
# Acquisition loop
@ -190,80 +105,78 @@ class VNADataAcquisition:
"""Main acquisition loop executed by the background thread."""
while self._running and not self._stop_event.is_set():
try:
if not self._connect_serial():
time.sleep(1.0)
continue
# Auto-detect port
self.port: str = cfg.get_vna_port()
logger.info(f"Using auto-detected port: {self.port}")
with open(self.bin_log_path, "rb") as raw:
buffered = io.BufferedReader(raw, buffer_size=cfg.SERIAL_BUFFER_SIZE)
self._drain_serial_input()
self._process_sweep_data(buffered)
with serial.Serial(self.port, self.baud) as ser:
self._drain_serial_input(ser)
with open(self.bin_log_path, "rb") as raw:
buffered = io.BufferedReader(raw, buffer_size=cfg.SERIAL_BUFFER_SIZE)
self._process_sweep_data(buffered, ser)
except Exception as exc: # noqa: BLE001
logger.error("Acquisition error: %s", exc)
time.sleep(1.0)
finally:
self._close_serial()
# --------------------------------------------------------------------- #
# Log processing
# --------------------------------------------------------------------- #
def _process_sweep_data(self, f: BinaryIO) -> None:
def _process_sweep_data(self, f: BinaryIO, ser: serial.Serial) -> None:
"""Process the binary log file and collect sweep data one sweep at a time."""
while self._running and not self._stop_event.is_set():
try:
# Start from beginning of file for each sweep
f.seek(0)
try:
# Start from beginning of file for each sweep
f.seek(0)
# Validate header
header = self._read_exact(f, len(cfg.MAGIC))
if header != cfg.MAGIC:
raise ValueError("Invalid log format: MAGIC header mismatch.")
# Validate header
header = self._read_exact(f, len(cfg.MAGIC))
if header != cfg.MAGIC:
raise ValueError("Invalid log format: MAGIC header mismatch.")
self._reset_sweep_state()
self._reset_sweep_state()
# Process one complete sweep
sweep_completed = False
while not sweep_completed and self._running and not self._stop_event.is_set():
# Read record header
dir_byte = f.read(1)
if not dir_byte:
# EOF reached without completing sweep - wait and retry
logger.debug("EOF reached, waiting for more data...")
time.sleep(0.1)
break
# Process one complete sweep
sweep_completed = False
while not sweep_completed and self._running and not self._stop_event.is_set():
# Read record header
dir_byte = f.read(1)
if not dir_byte:
# EOF reached without completing sweep - wait and retry
logger.debug("EOF reached, waiting for more data...")
time.sleep(0.1)
break
direction = dir_byte[0]
(length,) = struct.unpack(">I", self._read_exact(f, 4))
direction = dir_byte[0]
(length,) = struct.unpack(">I", self._read_exact(f, 4))
if direction == cfg.DIR_TO_DEV:
# TX path: stream to device and inspect for sweep start
first = self._serial_write_from_file(f, length)
if direction == cfg.DIR_TO_DEV:
# TX path: stream to device and inspect for sweep start
first = self._serial_write_from_file(f, length, ser)
if not self._collecting and self._is_sweep_start_command(length, first):
self._collecting = True
self._collected_rx_payloads = []
self._meas_cmds_in_sweep = 0
logger.info("Starting sweep data collection from device")
if not self._collecting and self._is_sweep_start_command(length, first):
self._collecting = True
self._collected_rx_payloads = []
self._meas_cmds_in_sweep = 0
logger.info("Starting sweep data collection from device")
elif direction == cfg.DIR_FROM_DEV:
# RX path: read exact number of bytes from device
rx_bytes = self._serial_read_exact(length, capture=self._collecting)
self._skip_bytes(f, length) # Keep log file pointer in sync
elif direction == cfg.DIR_FROM_DEV:
# RX path: read exact number of bytes from device
rx_bytes = self._serial_read_exact(length, ser, capture=self._collecting)
self._skip_bytes(f, length) # Keep log file pointer in sync
if self._collecting:
self._collected_rx_payloads.append(rx_bytes)
self._meas_cmds_in_sweep += 1
if self._collecting:
self._collected_rx_payloads.append(rx_bytes)
self._meas_cmds_in_sweep += 1
# Check for sweep completion
if self._meas_cmds_in_sweep >= cfg.MEAS_CMDS_PER_SWEEP:
self._finalize_sweep()
sweep_completed = True
# Check for sweep completion
if self._meas_cmds_in_sweep >= cfg.MEAS_CMDS_PER_SWEEP:
self._finalize_sweep()
sweep_completed = True
except Exception as exc: # noqa: BLE001
logger.error("Processing error: %s", exc)
time.sleep(1.0)
except Exception as exc: # noqa: BLE001
logger.error("Processing error: %s", exc)
time.sleep(1.0)
def _finalize_sweep(self) -> None:
"""Parse collected payloads into points and push to the buffer."""
@ -320,7 +233,7 @@ class VNADataAcquisition:
raise EOFError(f"Unexpected EOF while skipping {n} bytes.")
remaining -= len(chunk)
def _serial_write_from_file(self, f: BinaryIO, nbytes: int) -> bytes:
def _serial_write_from_file(self, f: BinaryIO, nbytes: int, ser: serial.Serial) -> bytes:
"""
Stream *nbytes* from a file-like object to the serial port.
@ -343,9 +256,7 @@ class VNADataAcquisition:
# Write to serial
written = 0
while written < len(chunk):
if not self.ser:
break
n = self.ser.write(chunk[written:])
n = ser.write(chunk[written:])
if n is None:
n = 0
written += n
@ -354,29 +265,27 @@ class VNADataAcquisition:
return bytes(first)
def _serial_read_exact(self, nbytes: int, capture: bool = False) -> bytes:
def _serial_read_exact(self, nbytes: int, ser: serial.Serial, capture: bool = False) -> bytes:
"""Read exactly *nbytes* from the serial port; optionally capture and return them."""
if not self.ser:
return b""
deadline = time.monotonic() + cfg.RX_TIMEOUT
total = 0
out = bytearray() if capture else None
old_timeout = self.ser.timeout
self.ser.timeout = min(cfg.SERIAL_IDLE_TIMEOUT, cfg.RX_TIMEOUT)
old_timeout = ser.timeout
ser.timeout = min(cfg.SERIAL_IDLE_TIMEOUT, cfg.RX_TIMEOUT)
try:
while total < nbytes:
if time.monotonic() >= deadline:
raise TimeoutError(f"Timeout while waiting for {nbytes} bytes.")
chunk = self.ser.read(nbytes - total)
chunk = ser.read(nbytes - total)
if chunk:
total += len(chunk)
if capture and out is not None:
out.extend(chunk)
return bytes(out) if (capture and out is not None) else b""
finally:
self.ser.timeout = old_timeout
ser.timeout = old_timeout
# --------------------------------------------------------------------- #
# Parsing & detection

View File

@ -1,16 +1,9 @@
#!/usr/bin/env python3
"""
Sweep data structures and buffer management for VNA data acquisition.
This module contains classes for storing and managing sweep data in a thread-safe manner.
"""
import math
import threading
import time
from collections import deque
from dataclasses import dataclass
from typing import List, Tuple, Optional, Dict, Any
from typing import List, Tuple
from vna_system.config.config import SWEEP_BUFFER_MAX_SIZE
@ -55,35 +48,11 @@ class SweepBuffer:
self._buffer.append(sweep)
return self._sweep_counter
def get_latest_sweep(self) -> Optional[SweepData]:
def get_latest_sweep(self) -> SweepData | None:
"""Get the most recent sweep"""
with self._lock:
return self._buffer[-1] if self._buffer else None
def get_sweep_by_number(self, sweep_number: int) -> Optional[SweepData]:
"""Get a specific sweep by its number"""
with self._lock:
for sweep in reversed(self._buffer):
if sweep.sweep_number == sweep_number:
return sweep
return None
def get_all_sweeps(self) -> List[SweepData]:
"""Get all sweeps currently in buffer"""
with self._lock:
return list(self._buffer)
def get_buffer_info(self) -> Dict[str, Any]:
"""Get buffer status information"""
with self._lock:
return {
'current_size': len(self._buffer),
'max_size': self._buffer.maxlen,
'total_sweeps_processed': self._sweep_counter,
'latest_sweep_number': self._buffer[-1].sweep_number if self._buffer else None,
'oldest_sweep_number': self._buffer[0].sweep_number if self._buffer else None,
'next_sweep_number': self._sweep_counter + 1
}
def set_sweep_counter(self, sweep_number: int) -> None:
"""Set the sweep counter to continue from a specific number."""

View File

@ -6,7 +6,7 @@ Base sweep processor interface and utilities.
from __future__ import annotations
import abc
from typing import Any, Dict, List, Optional
from typing import Any, Dict
from vna_system.core.acquisition.sweep_buffer import SweepData
@ -26,7 +26,7 @@ class BaseSweepProcessor(abc.ABC):
pass
@abc.abstractmethod
def process(self, sweep: SweepData) -> Optional[ProcessingResult]:
def process(self, sweep: SweepData) -> ProcessingResult | None:
"""Process a sweep and return results."""
pass
@ -43,8 +43,8 @@ class ProcessingResult:
processor_name: str,
sweep_number: int,
data: Dict[str, Any],
plotly_figure: Optional[Dict[str, Any]] = None,
file_path: Optional[str] = None,
plotly_figure: Dict[str, Any] | None = None,
file_path: str | None = None,
) -> None:
self.processor_name = processor_name
self.sweep_number = sweep_number

View File

@ -7,7 +7,7 @@ from __future__ import annotations
import os
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from typing import Any, Dict, List, Tuple
import numpy as np
import matplotlib.pyplot as plt
@ -38,7 +38,7 @@ class MagnitudePlotProcessor(BaseSweepProcessor):
def name(self) -> str:
return "magnitude_plot"
def process(self, sweep: SweepData) -> Optional[ProcessingResult]:
def process(self, sweep: SweepData) -> ProcessingResult | None:
"""Process sweep data and create magnitude plot."""
if not self.should_process(sweep):
return None
@ -107,7 +107,7 @@ class MagnitudePlotProcessor(BaseSweepProcessor):
return fig
def _save_matplotlib_image(self, sweep: SweepData, magnitude_data: List[Tuple[float, float]]) -> Optional[Path]:
def _save_matplotlib_image(self, sweep: SweepData, magnitude_data: List[Tuple[float, float]]) -> Path | None:
"""Save plot as image file using matplotlib."""
try:
filename = f"magnitude_sweep_{sweep.sweep_number:06d}.{self.image_format}"

View File

@ -4,7 +4,7 @@ import json
import logging
import threading
from pathlib import Path
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List
from vna_system.core.processing.base_processor import ProcessingResult
@ -169,7 +169,7 @@ class ResultsStorage:
except Exception as e:
logger.error(f"Failed to remove sweep {sweep_number}: {e}")
def get_latest_results(self, processor_name: Optional[str] = None, limit: int = 10) -> List[ProcessingResult]:
def get_latest_results(self, processor_name: str | None = None, limit: int = 10) -> List[ProcessingResult]:
"""Get latest processing results."""
with self._lock:
results = []
@ -186,7 +186,7 @@ class ResultsStorage:
return results[:limit]
def get_result_by_sweep(self, sweep_number: int, processor_name: Optional[str] = None) -> List[ProcessingResult]:
def get_result_by_sweep(self, sweep_number: int, processor_name: str | None = None) -> List[ProcessingResult]:
"""Get processing results for specific sweep number."""
with self._lock:
results = []
@ -227,7 +227,7 @@ class ResultsStorage:
return results
def get_sweep_metadata(self, sweep_number: int) -> Optional[Dict[str, Any]]:
def get_sweep_metadata(self, sweep_number: int) -> Dict[str, Any] | None:
"""Get metadata for specific sweep."""
with self._lock:
if sweep_number not in self._cache:
@ -243,7 +243,7 @@ class ResultsStorage:
return None
def get_available_sweeps(self, limit: Optional[int] = None) -> List[int]:
def get_available_sweeps(self, limit: int | None = None) -> List[int]:
"""Get list of available sweep numbers."""
with self._lock:
sorted_sweeps = sorted(self._cache.keys(), reverse=True)

View File

@ -5,7 +5,7 @@ import logging
import queue
import threading
import time
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List
from vna_system.core.acquisition.sweep_buffer import SweepBuffer, SweepData
from vna_system.core.processing.base_processor import BaseSweepProcessor, ProcessingResult
@ -22,11 +22,11 @@ class SweepProcessingManager:
def __init__(self, config_path: str = "vna_system/core/processing/config.json") -> None:
self.processors: List[BaseSweepProcessor] = []
self.config: Dict[str, Any] = {}
self.sweep_buffer: Optional[SweepBuffer] = None
self.sweep_buffer: SweepBuffer | None = None
# Processing control
self._running = False
self._thread: Optional[threading.Thread] = None
self._thread: threading.Thread | None = None
self._stop_event = threading.Event()
self._last_processed_sweep = 0
@ -174,7 +174,7 @@ class SweepProcessingManager:
except queue.Empty:
pass
def get_next_result(self, timeout: Optional[float] = None) -> Optional[ProcessingResult]:
def get_next_result(self, timeout: float | None = None) -> ProcessingResult | None:
"""Get next processing result from queue. Used by websocket handler."""
try:
return self.results_queue.get(timeout=timeout)
@ -196,11 +196,11 @@ class SweepProcessingManager:
break
return results
def get_latest_results(self, processor_name: Optional[str] = None, limit: int = 10) -> List[ProcessingResult]:
def get_latest_results(self, processor_name: str | None = None, limit: int = 10) -> List[ProcessingResult]:
"""Get latest processing results from storage, optionally filtered by processor name."""
return self.results_storage.get_latest_results(processor_name, limit)
def get_result_by_sweep(self, sweep_number: int, processor_name: Optional[str] = None) -> List[ProcessingResult]:
def get_result_by_sweep(self, sweep_number: int, processor_name: str | None = None) -> List[ProcessingResult]:
"""Get processing results for a specific sweep number from storage."""
return self.results_storage.get_result_by_sweep(sweep_number, processor_name)

View File

@ -0,0 +1,352 @@
from __future__ import annotations
import json
import logging
import re
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Dict, List, Tuple
from vna_system.config import config as cfg
from vna_system.core.processing.results_storage import ResultsStorage
logger = logging.getLogger(__name__)
# ----------------------- Minimal enums & dataclass -----------------------
class VNAMode(Enum):
S11 = "S11"
S21 = "S21"
class CalibrationStandard(Enum):
OPEN = "open"
SHORT = "short"
LOAD = "load"
THROUGH = "through"
@dataclass(frozen=True)
class LogConfig:
"""Parsed configuration from a selected config file."""
mode: VNAMode
file_path: Path
stem: str
start_hz: float | None
stop_hz: float | None
points: int | None
bw_hz: float | None
# ----------------------- Filename parsing helpers --------------------------
_UNIT_MULT = {
"hz": 1.0,
"khz": 1e3,
"mhz": 1e6,
"ghz": 1e9,
}
_PARAM_RE = re.compile(r"^(str|stp|pnts|bw)(?P<val>[0-9]+(?:\.[0-9]+)?)(?P<unit>[a-zA-Z]+)?$")
def _to_hz(val: float, unit: str | None, default_hz: float) -> float:
if unit:
m = _UNIT_MULT.get(unit.lower())
if m:
return float(val) * m
return float(val) * default_hz
def parse_config_filename(name: str, assume_mhz_for_freq: bool = True) -> Tuple[float | None, float | None, int | None, float] | None:
"""
Parse tokens like: str100_stp8800_pnts1000_bw1khz.[bin]
- str/stp default to MHz if no unit (configurable)
- bw defaults to Hz if no unit
"""
base = Path(name).stem
tokens = base.split("_")
start_hz = stop_hz = bw_hz = None
points: int | None = None
for t in tokens:
m = _PARAM_RE.match(t)
if not m:
continue
key = t[:3]
val = float(m.group("val"))
unit = m.group("unit")
if key == "str":
start_hz = _to_hz(val, unit, 1e6 if assume_mhz_for_freq else 1.0)
elif key == "stp":
stop_hz = _to_hz(val, unit, 1e6 if assume_mhz_for_freq else 1.0)
elif key == "pnt": # token 'pnts'
points = int(val)
elif key == "bw":
bw_hz = _to_hz(val, unit, 1.0)
return start_hz, stop_hz, points, bw_hz
# ----------------------- VNA Settings Manager ------------------------------
class VNASettingsManager:
"""
- Scans config_logs/{S11,S21}/ for available configs
- Controls current_log.bin symlink (must be a real symlink)
- Parses config params from filename
- Stores per-config calibration in:
calibration/<MODE>/<STEM>/<STANDARD>/<timestamp>_sweepNNNNNN/
- copies ALL processor result JSON files for that sweep (and metadata.json if present)
- UI helpers: select S11/S21, calibrate (through/open/short/load) by sweep number
"""
def __init__(
self,
base_dir: Path | None = None,
config_logs_subdir: str = "binary_logs/config_logs",
current_log_name: str = "current_log.bin",
calibration_subdir: str = "calibration",
assume_mhz_for_freq: bool = True,
results_storage: ResultsStorage | None = None,
):
self.base_dir = Path(base_dir or cfg.BASE_DIR)
self.cfg_logs_dir = self.base_dir / config_logs_subdir
self.current_log = self.cfg_logs_dir / current_log_name
self.calib_root = self.base_dir / calibration_subdir
self.assume_mhz_for_freq = assume_mhz_for_freq
# Ensure directory structure exists
(self.cfg_logs_dir / "S11").mkdir(parents=True, exist_ok=True)
(self.cfg_logs_dir / "S21").mkdir(parents=True, exist_ok=True)
self.calib_root.mkdir(parents=True, exist_ok=True)
# Results storage
self.results = results_storage or ResultsStorage(
storage_dir=str(self.base_dir / "processing_results")
)
# ---------- configuration selection & discovery ----------
def list_configs(self, mode: VNAMode | None = None) -> List[LogConfig]:
modes = [mode] if mode else [VNAMode.S11, VNAMode.S21]
out: List[LogConfig] = []
for m in modes:
d = self.cfg_logs_dir / m.value
if not d.exists():
continue
for fp in sorted(d.glob("*.bin")):
s, e, n, bw = parse_config_filename(fp.name, self.assume_mhz_for_freq)
out.append(LogConfig(
mode=m,
file_path=fp.resolve(),
stem=fp.stem,
start_hz=s,
stop_hz=e,
points=n,
bw_hz=bw,
))
return out
def set_current_config(self, mode: VNAMode, filename: str) -> LogConfig:
"""
Update current_log.bin symlink to point to config_logs/<MODE>/<filename>.
Real symlink only; will raise if not supported.
"""
target = (self.cfg_logs_dir / mode.value / filename).resolve()
if not target.exists():
raise FileNotFoundError(f"Config not found: {target}")
if self.current_log.exists() or self.current_log.is_symlink():
self.current_log.unlink()
# relative link if possible, else absolute (still a symlink)
try:
rel = target.relative_to(self.current_log.parent)
except ValueError:
rel = target
self.current_log.symlink_to(rel)
return self.get_current_config()
def get_current_config(self) -> LogConfig:
if not self.current_log.exists():
raise FileNotFoundError(f"{self.current_log} does not exist")
tgt = self.current_log.resolve()
mode = VNAMode(tgt.parent.name) # expects .../config_logs/<MODE>/<file>
s, e, n, bw = parse_config_filename(tgt.name, self.assume_mhz_for_freq)
return LogConfig(
mode=mode,
file_path=tgt,
stem=tgt.stem,
start_hz=s, stop_hz=e, points=n, bw_hz=bw
)
# ---------- calibration capture (ALL processors) ----------
@staticmethod
def required_standards(mode: VNAMode) -> List[CalibrationStandard]:
return (
[CalibrationStandard.OPEN, CalibrationStandard.SHORT, CalibrationStandard.LOAD]
if mode == VNAMode.S11
else [CalibrationStandard.THROUGH]
)
def _calib_dir(self, cfg: LogConfig, standard: CalibrationStandard | None = None) -> Path:
base = self.calib_root / cfg.mode.value / cfg.stem
return base / standard.value if standard else base
def _calib_sweep_dir(self, cfg: LogConfig, standard: CalibrationStandard, sweep_number: int, ts: str | None = None) -> Path:
"""
calibration/<MODE>/<STEM>/<STANDARD>/<timestamp>_sweepNNNNNN/
"""
ts = ts or datetime.now().strftime("%Y%m%d_%H%M%S")
d = self._calib_dir(cfg, standard) / f"{ts}_sweep{sweep_number:06d}"
d.mkdir(parents=True, exist_ok=True)
return d
def record_calibration_from_sweep(
self,
standard: CalibrationStandard,
sweep_number: int,
*,
cfg: LogConfig | None = None
) -> Path:
"""
Capture ALL processor JSON results for the given sweep and save under:
calibration/<MODE>/<STEM>/<STANDARD>/<timestamp>_sweepNNNNNN/
Also copy metadata.json if available.
Returns the created sweep calibration directory.
"""
cfg = cfg or self.get_current_config()
# Get ALL results for the sweep
results = self.results.get_result_by_sweep(sweep_number, processor_name=None)
if not results:
raise FileNotFoundError(f"No processor results found for sweep {sweep_number}")
# Determine destination dir
dst_dir = self._calib_sweep_dir(cfg, standard, sweep_number)
# Save processor files (re-serialize what ResultsStorage returns)
count = 0
for r in results:
try:
dst_file = dst_dir / f"{r.processor_name}.json"
payload = {
"processor_name": r.processor_name,
"sweep_number": r.sweep_number,
"data": r.data,
}
# keep optional fields if present
if getattr(r, "plotly_figure", None) is not None:
payload["plotly_figure"] = r.plotly_figure
if getattr(r, "file_path", None) is not None:
payload["file_path"] = r.file_path
with open(dst_file, "w") as f:
json.dump(payload, f, indent=2)
count += 1
except Exception as e:
logger.error(f"Failed to store processor '{r.processor_name}' for sweep {sweep_number}: {e}")
# Save metadata if available
try:
meta = self.results.get_sweep_metadata(sweep_number)
if meta:
with open(dst_dir / "metadata.json", "w") as f:
json.dump(meta, f, indent=2)
except Exception as e:
logger.warning(f"Failed to write metadata for sweep {sweep_number}: {e}")
if count == 0:
raise RuntimeError(f"Nothing was written for sweep {sweep_number}")
logger.info(f"Stored calibration (standard={standard.value}) from sweep {sweep_number} into {dst_dir}")
return dst_dir
def latest_calibration(self, cfg: LogConfig | None = None) -> Dict[CalibrationStandard, Path] | None:
"""
Returns the latest sweep directory per required standard for the current (or provided) config.
"""
cfg = cfg or self.get_current_config()
out: Dict[CalibrationStandard, Path] | None = {}
for std in self.required_standards(cfg.mode):
d = self._calib_dir(cfg, std)
if not d.exists():
out[std] = None
continue
subdirs = sorted([p for p in d.iterdir() if p.is_dir()])
out[std] = subdirs[-1] if subdirs else None
return out
def calibration_status(self, cfg: LogConfig | None = None) -> Dict[str, bool]:
cfg = cfg or self.get_current_config()
latest = self.latest_calibration(cfg)
return {std.value: (p is not None and p.exists()) for std, p in latest.items()}
def is_fully_calibrated(self, cfg: LogConfig | None = None) -> bool:
return all(self.calibration_status(cfg).values())
# ---------- UI helpers ----------
def summary(self) -> Dict[str, object]:
cfg = self.get_current_config()
latest = self.latest_calibration(cfg)
return {
"mode": cfg.mode.value,
"current_log": str(self.current_log),
"selected_file": str(cfg.file_path),
"stem": cfg.stem,
"params": {
"start_hz": cfg.start_hz,
"stop_hz": cfg.stop_hz,
"points": cfg.points,
"bw_hz": cfg.bw_hz,
},
"required_standards": [s.value for s in self.required_standards(cfg.mode)],
"calibration_latest": {k.value: (str(v) if v else None) for k, v in latest.items()},
"is_fully_calibrated": self.is_fully_calibrated(cfg),
}
def ui_select_S11(self, filename: str) -> Dict[str, object]:
self.set_current_config(VNAMode.S11, filename)
return self.summary()
def ui_select_S21(self, filename: str) -> Dict[str, object]:
self.set_current_config(VNAMode.S21, filename)
return self.summary()
# Calibration triggers (buttons)
def ui_calibrate_through(self, sweep_number: int) -> Dict[str, object]:
cfg = self.get_current_config()
if cfg.mode != VNAMode.S21:
raise RuntimeError("THROUGH is only valid in S21 mode")
self.record_calibration_from_sweep(CalibrationStandard.THROUGH, sweep_number)
return self.summary()
def ui_calibrate_open(self, sweep_number: int) -> Dict[str, object]:
cfg = self.get_current_config()
if cfg.mode != VNAMode.S11:
raise RuntimeError("OPEN is only valid in S11 mode")
self.record_calibration_from_sweep(CalibrationStandard.OPEN, sweep_number)
return self.summary()
def ui_calibrate_short(self, sweep_number: int) -> Dict[str, object]:
cfg = self.get_current_config()
if cfg.mode != VNAMode.S11:
raise RuntimeError("SHORT is only valid in S11 mode")
self.record_calibration_from_sweep(CalibrationStandard.SHORT, sweep_number)
return self.summary()
def ui_calibrate_load(self, sweep_number: int) -> Dict[str, object]:
cfg = self.get_current_config()
if cfg.mode != VNAMode.S11:
raise RuntimeError("LOAD is only valid in S11 mode")
self.record_calibration_from_sweep(CalibrationStandard.LOAD, sweep_number)
return self.summary()

View File

@ -7,9 +7,11 @@ scattered throughout the codebase.
"""
from vna_system.core.acquisition.data_acquisition import VNADataAcquisition
from vna_system.core.processing.sweep_processor import SweepProcessingManager
from vna_system.api.websockets.websocket_handler import WebSocketManager
from vna_system.core.processing.websocket_handler import WebSocketManager
from vna_system.core.settings.settings_manager import VNASettingsManager
# Global singleton instances
vna_data_acquisition_instance: VNADataAcquisition = VNADataAcquisition()
processing_manager: SweepProcessingManager = SweepProcessingManager()
websocket_manager: WebSocketManager = WebSocketManager(processing_manager)
websocket_manager: WebSocketManager = WebSocketManager(processing_manager)
settings_manager: VNASettingsManager = VNASettingsManager()

View File

@ -1 +0,0 @@
"""VNA System scripts module."""