diff --git a/vna_system/api/endpoints/health.py b/vna_system/api/endpoints/health.py index f7fc5b3..12c11f6 100644 --- a/vna_system/api/endpoints/health.py +++ b/vna_system/api/endpoints/health.py @@ -32,12 +32,7 @@ async def health_check(): async def device_status(): """Get device connection status and details.""" acquisition = singletons.vna_data_acquisition_instance - device_info = acquisition.get_device_info() return { - "device_info": device_info, "acquisition_running": acquisition.is_running, - "last_sweep": acquisition.sweep_buffer.get_latest_sweep_number(), - "total_sweeps": len(acquisition.sweep_buffer.get_all_sweeps()), - "buffer_stats": acquisition.sweep_buffer.get_stats(), } \ No newline at end of file diff --git a/vna_system/api/main.py b/vna_system/api/main.py index 914f0e7..fda7b9c 100644 --- a/vna_system/api/main.py +++ b/vna_system/api/main.py @@ -13,7 +13,7 @@ from pathlib import Path import vna_system.core.singletons as singletons from vna_system.core.processing.sweep_processor import SweepProcessingManager -from vna_system.api.websockets.websocket_handler import WebSocketManager +from vna_system.core.processing.websocket_handler import WebSocketManager from vna_system.api.endpoints import health, processing, web_ui from vna_system.api.websockets import processing as ws_processing @@ -66,10 +66,10 @@ async def lifespan(app: FastAPI): # Start acquisition logger.info("Starting data acquisition...") - singletons.vna_data_acquisition_instance.start() - # Connect processing to acquisition singletons.processing_manager.set_sweep_buffer(singletons.vna_data_acquisition_instance.sweep_buffer) + singletons.vna_data_acquisition_instance.start() + singletons.processing_manager.start() logger.info("Sweep processing started") diff --git a/vna_system/config/config.py b/vna_system/config/config.py index 5800627..b8bfb31 100644 --- a/vna_system/config/config.py +++ b/vna_system/config/config.py @@ -5,8 +5,12 @@ Configuration file for VNA data acquisition system import glob import logging +from pathlib import Path import serial.tools.list_ports +# Base directory for VNA system +BASE_DIR = Path(__file__).parent.parent + # Serial communication settings DEFAULT_BAUD_RATE = 115200 DEFAULT_PORT = "/dev/ttyACM0" @@ -46,7 +50,7 @@ SERIAL_PEEK_SIZE = 32 SERIAL_IDLE_TIMEOUT = 0.5 SERIAL_DRAIN_DELAY = 0.05 SERIAL_DRAIN_CHECK_DELAY = 0.01 -SERIAL_CONNECT_DELAY = 0.5 +SERIAL_CONNECT_DELAY = 0.01 def find_vna_port(): @@ -61,21 +65,21 @@ def find_vna_port(): # Method 1: Use pyserial port detection by VID/PID try: ports = list(serial.tools.list_ports.comports()) + logger.debug(f"Found {len(ports)} serial ports") for port in ports: - logger.debug(f"Checking port {port.device}: VID={port.vid:04X} PID={port.pid:04X} " - f"Manufacturer='{port.manufacturer}' Product='{port.description}'") - + logger.debug(f"Checking port {port.device}") + # Check by VID/PID if port.vid == VNA_VID and port.pid == VNA_PID: - logger.info(f"Found VNA device by VID/PID at {port.device}") + logger.debug(f"Found VNA device by VID/PID at {port.device}") return port.device # Fallback: Check by manufacturer/product strings if (port.manufacturer and VNA_MANUFACTURER.lower() in port.manufacturer.lower() and port.description and VNA_PRODUCT.lower() in port.description.lower()): - logger.info(f"Found VNA device by description at {port.device}") + logger.debug(f"Found VNA device by description at {port.device}") return port.device except Exception as e: diff --git a/vna_system/core/acquisition/data_acquisition.py b/vna_system/core/acquisition/data_acquisition.py index cc49bd8..6e4bd11 100644 --- a/vna_system/core/acquisition/data_acquisition.py +++ b/vna_system/core/acquisition/data_acquisition.py @@ -6,11 +6,10 @@ import os import struct import threading import time -from typing import BinaryIO, List, Optional, Tuple +from typing import BinaryIO, List, Tuple import serial -# Avoid wildcard imports: make config access explicit and discoverable. from vna_system.config import config as cfg from vna_system.core.acquisition.sweep_buffer import SweepBuffer @@ -21,25 +20,16 @@ logger = logging.getLogger(__name__) class VNADataAcquisition: """Main data acquisition class with asynchronous sweep collection.""" - def __init__(self, port: str = None, baud: int = cfg.DEFAULT_BAUD_RATE) -> None: + def __init__(self) -> None: self.bin_log_path: str = cfg.BIN_LOG_FILE_PATH - # Auto-detect port if not specified - if port is None: - self.port: str = cfg.get_vna_port() - logger.info(f"Using auto-detected port: {self.port}") - else: - self.port: str = port - logger.info(f"Using specified port: {self.port}") - - self.baud: int = baud + self.baud: int = cfg.DEFAULT_BAUD_RATE self._sweep_buffer = SweepBuffer() - self.ser: Optional[serial.SerialBase] = None # Control flags self._running: bool = False - self._thread: Optional[threading.Thread] = None + self._thread: threading.Thread | None = None self._stop_event: threading.Event = threading.Event() # Sweep collection state @@ -76,7 +66,6 @@ class VNADataAcquisition: self._thread.join(timeout=5.0) logger.info("Acquisition thread joined.") - self._close_serial() @property def is_running(self) -> bool: @@ -88,99 +77,25 @@ class VNADataAcquisition: """Return a reference to the sweep buffer.""" return self._sweep_buffer - def get_device_info(self) -> dict: - """Get information about the connected VNA device.""" - info = { - 'port': self.port, - 'baud_rate': self.baud, - 'is_connected': self.ser is not None and self.ser.is_open if self.ser else False, - 'is_running': self.is_running, - 'device_detected': False, - 'device_details': None - } - - # Try to get device details from port detection - try: - from serial.tools.list_ports import comports - for port in comports(): - if port.device == self.port: - info['device_detected'] = True - info['device_details'] = { - 'vid': f"0x{port.vid:04X}" if port.vid else None, - 'pid': f"0x{port.pid:04X}" if port.pid else None, - 'manufacturer': port.manufacturer, - 'product': port.description, - 'serial_number': port.serial_number - } - break - except Exception as e: - logger.debug(f"Could not get device details: {e}") - - return info - # --------------------------------------------------------------------- # # Serial management # --------------------------------------------------------------------- # - def _connect_serial(self) -> bool: - """Establish the serial connection.""" - try: - logger.info(f"Attempting to connect to VNA device at {self.port} @ {self.baud} baud") - # Check if port exists - import os - if not os.path.exists(self.port): - logger.error(f"Port {self.port} does not exist. Device not connected?") - return False - - # Check permissions - if not os.access(self.port, os.R_OK | os.W_OK): - logger.error(f"No read/write permissions for {self.port}. Try: sudo chmod 666 {self.port}") - logger.error("Or add your user to the dialout group: sudo usermod -a -G dialout $USER") - return False - - self.ser = serial.Serial(self.port, baudrate=self.baud, timeout=0) - time.sleep(cfg.SERIAL_CONNECT_DELAY) - self._drain_serial_input() - logger.info(f"Serial connection established to {self.port}") - return True - except PermissionError as exc: - logger.error(f"Permission denied for {self.port}: {exc}") - logger.error("Try: sudo chmod 666 {self.port} or add user to dialout group") - return False - except serial.SerialException as exc: - logger.error(f"Serial connection failed for {self.port}: {exc}") - return False - except Exception as exc: # noqa: BLE001 (keep broad to preserve behavior) - logger.error(f"Unexpected error connecting to {self.port}: {exc}") - return False - - def _close_serial(self) -> None: - """Close the serial connection if open.""" - if self.ser: - try: - self.ser.close() - logger.debug("Serial connection closed.") - except Exception: # noqa: BLE001 - # Preserve original behavior: swallow any serial close errors. - logger.debug("Ignoring error while closing serial.", exc_info=True) - finally: - self.ser = None - - def _drain_serial_input(self) -> None: + def _drain_serial_input(self, ser: serial.Serial) -> None: """Drain any pending bytes from the serial input buffer.""" - if not self.ser: + if not ser: return drained = 0 while True: - bytes_waiting = getattr(self.ser, "in_waiting", 0) + bytes_waiting = getattr(ser, "in_waiting", 0) if bytes_waiting <= 0: break - drained += len(self.ser.read(bytes_waiting)) + drained += len(ser.read(bytes_waiting)) time.sleep(cfg.SERIAL_DRAIN_CHECK_DELAY) if drained: - logger.debug("Drained %d pending byte(s) from serial input.", drained) + logger.warning("Drained %d pending byte(s) from serial input.", drained) # --------------------------------------------------------------------- # # Acquisition loop @@ -190,80 +105,78 @@ class VNADataAcquisition: """Main acquisition loop executed by the background thread.""" while self._running and not self._stop_event.is_set(): try: - if not self._connect_serial(): - time.sleep(1.0) - continue + # Auto-detect port + self.port: str = cfg.get_vna_port() + logger.info(f"Using auto-detected port: {self.port}") - with open(self.bin_log_path, "rb") as raw: - buffered = io.BufferedReader(raw, buffer_size=cfg.SERIAL_BUFFER_SIZE) - self._drain_serial_input() - self._process_sweep_data(buffered) + with serial.Serial(self.port, self.baud) as ser: + self._drain_serial_input(ser) + with open(self.bin_log_path, "rb") as raw: + buffered = io.BufferedReader(raw, buffer_size=cfg.SERIAL_BUFFER_SIZE) + self._process_sweep_data(buffered, ser) except Exception as exc: # noqa: BLE001 logger.error("Acquisition error: %s", exc) time.sleep(1.0) - finally: - self._close_serial() # --------------------------------------------------------------------- # # Log processing # --------------------------------------------------------------------- # - def _process_sweep_data(self, f: BinaryIO) -> None: + def _process_sweep_data(self, f: BinaryIO, ser: serial.Serial) -> None: """Process the binary log file and collect sweep data one sweep at a time.""" - while self._running and not self._stop_event.is_set(): - try: - # Start from beginning of file for each sweep - f.seek(0) + try: + # Start from beginning of file for each sweep + f.seek(0) - # Validate header - header = self._read_exact(f, len(cfg.MAGIC)) - if header != cfg.MAGIC: - raise ValueError("Invalid log format: MAGIC header mismatch.") + # Validate header + header = self._read_exact(f, len(cfg.MAGIC)) + if header != cfg.MAGIC: + raise ValueError("Invalid log format: MAGIC header mismatch.") - self._reset_sweep_state() + self._reset_sweep_state() - # Process one complete sweep - sweep_completed = False - while not sweep_completed and self._running and not self._stop_event.is_set(): - # Read record header - dir_byte = f.read(1) - if not dir_byte: - # EOF reached without completing sweep - wait and retry - logger.debug("EOF reached, waiting for more data...") - time.sleep(0.1) - break + # Process one complete sweep + sweep_completed = False + while not sweep_completed and self._running and not self._stop_event.is_set(): + # Read record header + dir_byte = f.read(1) + if not dir_byte: + # EOF reached without completing sweep - wait and retry + logger.debug("EOF reached, waiting for more data...") + time.sleep(0.1) + break - direction = dir_byte[0] - (length,) = struct.unpack(">I", self._read_exact(f, 4)) + direction = dir_byte[0] + (length,) = struct.unpack(">I", self._read_exact(f, 4)) - if direction == cfg.DIR_TO_DEV: - # TX path: stream to device and inspect for sweep start - first = self._serial_write_from_file(f, length) + if direction == cfg.DIR_TO_DEV: + # TX path: stream to device and inspect for sweep start + first = self._serial_write_from_file(f, length, ser) - if not self._collecting and self._is_sweep_start_command(length, first): - self._collecting = True - self._collected_rx_payloads = [] - self._meas_cmds_in_sweep = 0 - logger.info("Starting sweep data collection from device") + if not self._collecting and self._is_sweep_start_command(length, first): + self._collecting = True + self._collected_rx_payloads = [] + self._meas_cmds_in_sweep = 0 + logger.info("Starting sweep data collection from device") - elif direction == cfg.DIR_FROM_DEV: - # RX path: read exact number of bytes from device - rx_bytes = self._serial_read_exact(length, capture=self._collecting) - self._skip_bytes(f, length) # Keep log file pointer in sync + elif direction == cfg.DIR_FROM_DEV: + # RX path: read exact number of bytes from device + rx_bytes = self._serial_read_exact(length, ser, capture=self._collecting) + self._skip_bytes(f, length) # Keep log file pointer in sync - if self._collecting: - self._collected_rx_payloads.append(rx_bytes) - self._meas_cmds_in_sweep += 1 + if self._collecting: + self._collected_rx_payloads.append(rx_bytes) + self._meas_cmds_in_sweep += 1 - # Check for sweep completion - if self._meas_cmds_in_sweep >= cfg.MEAS_CMDS_PER_SWEEP: - self._finalize_sweep() - sweep_completed = True + # Check for sweep completion + if self._meas_cmds_in_sweep >= cfg.MEAS_CMDS_PER_SWEEP: + self._finalize_sweep() + sweep_completed = True - except Exception as exc: # noqa: BLE001 - logger.error("Processing error: %s", exc) - time.sleep(1.0) + except Exception as exc: # noqa: BLE001 + logger.error("Processing error: %s", exc) + time.sleep(1.0) def _finalize_sweep(self) -> None: """Parse collected payloads into points and push to the buffer.""" @@ -320,7 +233,7 @@ class VNADataAcquisition: raise EOFError(f"Unexpected EOF while skipping {n} bytes.") remaining -= len(chunk) - def _serial_write_from_file(self, f: BinaryIO, nbytes: int) -> bytes: + def _serial_write_from_file(self, f: BinaryIO, nbytes: int, ser: serial.Serial) -> bytes: """ Stream *nbytes* from a file-like object to the serial port. @@ -343,9 +256,7 @@ class VNADataAcquisition: # Write to serial written = 0 while written < len(chunk): - if not self.ser: - break - n = self.ser.write(chunk[written:]) + n = ser.write(chunk[written:]) if n is None: n = 0 written += n @@ -354,29 +265,27 @@ class VNADataAcquisition: return bytes(first) - def _serial_read_exact(self, nbytes: int, capture: bool = False) -> bytes: + def _serial_read_exact(self, nbytes: int, ser: serial.Serial, capture: bool = False) -> bytes: """Read exactly *nbytes* from the serial port; optionally capture and return them.""" - if not self.ser: - return b"" deadline = time.monotonic() + cfg.RX_TIMEOUT total = 0 out = bytearray() if capture else None - old_timeout = self.ser.timeout - self.ser.timeout = min(cfg.SERIAL_IDLE_TIMEOUT, cfg.RX_TIMEOUT) + old_timeout = ser.timeout + ser.timeout = min(cfg.SERIAL_IDLE_TIMEOUT, cfg.RX_TIMEOUT) try: while total < nbytes: if time.monotonic() >= deadline: raise TimeoutError(f"Timeout while waiting for {nbytes} bytes.") - chunk = self.ser.read(nbytes - total) + chunk = ser.read(nbytes - total) if chunk: total += len(chunk) if capture and out is not None: out.extend(chunk) return bytes(out) if (capture and out is not None) else b"" finally: - self.ser.timeout = old_timeout + ser.timeout = old_timeout # --------------------------------------------------------------------- # # Parsing & detection diff --git a/vna_system/core/acquisition/sweep_buffer.py b/vna_system/core/acquisition/sweep_buffer.py index 9f45ae8..c37519f 100644 --- a/vna_system/core/acquisition/sweep_buffer.py +++ b/vna_system/core/acquisition/sweep_buffer.py @@ -1,16 +1,9 @@ -#!/usr/bin/env python3 -""" -Sweep data structures and buffer management for VNA data acquisition. - -This module contains classes for storing and managing sweep data in a thread-safe manner. -""" - import math import threading import time from collections import deque from dataclasses import dataclass -from typing import List, Tuple, Optional, Dict, Any +from typing import List, Tuple from vna_system.config.config import SWEEP_BUFFER_MAX_SIZE @@ -55,35 +48,11 @@ class SweepBuffer: self._buffer.append(sweep) return self._sweep_counter - def get_latest_sweep(self) -> Optional[SweepData]: + def get_latest_sweep(self) -> SweepData | None: """Get the most recent sweep""" with self._lock: return self._buffer[-1] if self._buffer else None - def get_sweep_by_number(self, sweep_number: int) -> Optional[SweepData]: - """Get a specific sweep by its number""" - with self._lock: - for sweep in reversed(self._buffer): - if sweep.sweep_number == sweep_number: - return sweep - return None - - def get_all_sweeps(self) -> List[SweepData]: - """Get all sweeps currently in buffer""" - with self._lock: - return list(self._buffer) - - def get_buffer_info(self) -> Dict[str, Any]: - """Get buffer status information""" - with self._lock: - return { - 'current_size': len(self._buffer), - 'max_size': self._buffer.maxlen, - 'total_sweeps_processed': self._sweep_counter, - 'latest_sweep_number': self._buffer[-1].sweep_number if self._buffer else None, - 'oldest_sweep_number': self._buffer[0].sweep_number if self._buffer else None, - 'next_sweep_number': self._sweep_counter + 1 - } def set_sweep_counter(self, sweep_number: int) -> None: """Set the sweep counter to continue from a specific number.""" diff --git a/vna_system/core/processing/base_processor.py b/vna_system/core/processing/base_processor.py index 14900c0..d5f27ec 100644 --- a/vna_system/core/processing/base_processor.py +++ b/vna_system/core/processing/base_processor.py @@ -6,7 +6,7 @@ Base sweep processor interface and utilities. from __future__ import annotations import abc -from typing import Any, Dict, List, Optional +from typing import Any, Dict from vna_system.core.acquisition.sweep_buffer import SweepData @@ -26,7 +26,7 @@ class BaseSweepProcessor(abc.ABC): pass @abc.abstractmethod - def process(self, sweep: SweepData) -> Optional[ProcessingResult]: + def process(self, sweep: SweepData) -> ProcessingResult | None: """Process a sweep and return results.""" pass @@ -43,8 +43,8 @@ class ProcessingResult: processor_name: str, sweep_number: int, data: Dict[str, Any], - plotly_figure: Optional[Dict[str, Any]] = None, - file_path: Optional[str] = None, + plotly_figure: Dict[str, Any] | None = None, + file_path: str | None = None, ) -> None: self.processor_name = processor_name self.sweep_number = sweep_number diff --git a/vna_system/core/processing/processors/magnitude_plot.py b/vna_system/core/processing/processors/magnitude_plot.py index 86f4f45..c7868ab 100644 --- a/vna_system/core/processing/processors/magnitude_plot.py +++ b/vna_system/core/processing/processors/magnitude_plot.py @@ -7,7 +7,7 @@ from __future__ import annotations import os from pathlib import Path -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Dict, List, Tuple import numpy as np import matplotlib.pyplot as plt @@ -38,7 +38,7 @@ class MagnitudePlotProcessor(BaseSweepProcessor): def name(self) -> str: return "magnitude_plot" - def process(self, sweep: SweepData) -> Optional[ProcessingResult]: + def process(self, sweep: SweepData) -> ProcessingResult | None: """Process sweep data and create magnitude plot.""" if not self.should_process(sweep): return None @@ -107,7 +107,7 @@ class MagnitudePlotProcessor(BaseSweepProcessor): return fig - def _save_matplotlib_image(self, sweep: SweepData, magnitude_data: List[Tuple[float, float]]) -> Optional[Path]: + def _save_matplotlib_image(self, sweep: SweepData, magnitude_data: List[Tuple[float, float]]) -> Path | None: """Save plot as image file using matplotlib.""" try: filename = f"magnitude_sweep_{sweep.sweep_number:06d}.{self.image_format}" diff --git a/vna_system/core/processing/results_storage.py b/vna_system/core/processing/results_storage.py index aa125ef..a5636ff 100644 --- a/vna_system/core/processing/results_storage.py +++ b/vna_system/core/processing/results_storage.py @@ -4,7 +4,7 @@ import json import logging import threading from pathlib import Path -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List from vna_system.core.processing.base_processor import ProcessingResult @@ -169,7 +169,7 @@ class ResultsStorage: except Exception as e: logger.error(f"Failed to remove sweep {sweep_number}: {e}") - def get_latest_results(self, processor_name: Optional[str] = None, limit: int = 10) -> List[ProcessingResult]: + def get_latest_results(self, processor_name: str | None = None, limit: int = 10) -> List[ProcessingResult]: """Get latest processing results.""" with self._lock: results = [] @@ -186,7 +186,7 @@ class ResultsStorage: return results[:limit] - def get_result_by_sweep(self, sweep_number: int, processor_name: Optional[str] = None) -> List[ProcessingResult]: + def get_result_by_sweep(self, sweep_number: int, processor_name: str | None = None) -> List[ProcessingResult]: """Get processing results for specific sweep number.""" with self._lock: results = [] @@ -227,7 +227,7 @@ class ResultsStorage: return results - def get_sweep_metadata(self, sweep_number: int) -> Optional[Dict[str, Any]]: + def get_sweep_metadata(self, sweep_number: int) -> Dict[str, Any] | None: """Get metadata for specific sweep.""" with self._lock: if sweep_number not in self._cache: @@ -243,7 +243,7 @@ class ResultsStorage: return None - def get_available_sweeps(self, limit: Optional[int] = None) -> List[int]: + def get_available_sweeps(self, limit: int | None = None) -> List[int]: """Get list of available sweep numbers.""" with self._lock: sorted_sweeps = sorted(self._cache.keys(), reverse=True) diff --git a/vna_system/core/processing/sweep_processor.py b/vna_system/core/processing/sweep_processor.py index 026db98..03be78a 100644 --- a/vna_system/core/processing/sweep_processor.py +++ b/vna_system/core/processing/sweep_processor.py @@ -5,7 +5,7 @@ import logging import queue import threading import time -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List from vna_system.core.acquisition.sweep_buffer import SweepBuffer, SweepData from vna_system.core.processing.base_processor import BaseSweepProcessor, ProcessingResult @@ -22,11 +22,11 @@ class SweepProcessingManager: def __init__(self, config_path: str = "vna_system/core/processing/config.json") -> None: self.processors: List[BaseSweepProcessor] = [] self.config: Dict[str, Any] = {} - self.sweep_buffer: Optional[SweepBuffer] = None + self.sweep_buffer: SweepBuffer | None = None # Processing control self._running = False - self._thread: Optional[threading.Thread] = None + self._thread: threading.Thread | None = None self._stop_event = threading.Event() self._last_processed_sweep = 0 @@ -174,7 +174,7 @@ class SweepProcessingManager: except queue.Empty: pass - def get_next_result(self, timeout: Optional[float] = None) -> Optional[ProcessingResult]: + def get_next_result(self, timeout: float | None = None) -> ProcessingResult | None: """Get next processing result from queue. Used by websocket handler.""" try: return self.results_queue.get(timeout=timeout) @@ -196,11 +196,11 @@ class SweepProcessingManager: break return results - def get_latest_results(self, processor_name: Optional[str] = None, limit: int = 10) -> List[ProcessingResult]: + def get_latest_results(self, processor_name: str | None = None, limit: int = 10) -> List[ProcessingResult]: """Get latest processing results from storage, optionally filtered by processor name.""" return self.results_storage.get_latest_results(processor_name, limit) - def get_result_by_sweep(self, sweep_number: int, processor_name: Optional[str] = None) -> List[ProcessingResult]: + def get_result_by_sweep(self, sweep_number: int, processor_name: str | None = None) -> List[ProcessingResult]: """Get processing results for a specific sweep number from storage.""" return self.results_storage.get_result_by_sweep(sweep_number, processor_name) diff --git a/vna_system/api/websockets/websocket_handler.py b/vna_system/core/processing/websocket_handler.py similarity index 100% rename from vna_system/api/websockets/websocket_handler.py rename to vna_system/core/processing/websocket_handler.py diff --git a/vna_system/core/settings/settings_manager.py b/vna_system/core/settings/settings_manager.py new file mode 100644 index 0000000..261c290 --- /dev/null +++ b/vna_system/core/settings/settings_manager.py @@ -0,0 +1,352 @@ +from __future__ import annotations + +import json +import logging +import re +from dataclasses import dataclass +from datetime import datetime +from enum import Enum +from pathlib import Path +from typing import Dict, List, Tuple + +from vna_system.config import config as cfg +from vna_system.core.processing.results_storage import ResultsStorage + +logger = logging.getLogger(__name__) + + +# ----------------------- Minimal enums & dataclass ----------------------- + +class VNAMode(Enum): + S11 = "S11" + S21 = "S21" + + +class CalibrationStandard(Enum): + OPEN = "open" + SHORT = "short" + LOAD = "load" + THROUGH = "through" + + +@dataclass(frozen=True) +class LogConfig: + """Parsed configuration from a selected config file.""" + mode: VNAMode + file_path: Path + stem: str + start_hz: float | None + stop_hz: float | None + points: int | None + bw_hz: float | None + + +# ----------------------- Filename parsing helpers -------------------------- + +_UNIT_MULT = { + "hz": 1.0, + "khz": 1e3, + "mhz": 1e6, + "ghz": 1e9, +} +_PARAM_RE = re.compile(r"^(str|stp|pnts|bw)(?P[0-9]+(?:\.[0-9]+)?)(?P[a-zA-Z]+)?$") + + +def _to_hz(val: float, unit: str | None, default_hz: float) -> float: + if unit: + m = _UNIT_MULT.get(unit.lower()) + if m: + return float(val) * m + return float(val) * default_hz + + +def parse_config_filename(name: str, assume_mhz_for_freq: bool = True) -> Tuple[float | None, float | None, int | None, float] | None: + """ + Parse tokens like: str100_stp8800_pnts1000_bw1khz.[bin] + - str/stp default to MHz if no unit (configurable) + - bw defaults to Hz if no unit + """ + base = Path(name).stem + tokens = base.split("_") + + start_hz = stop_hz = bw_hz = None + points: int | None = None + + for t in tokens: + m = _PARAM_RE.match(t) + if not m: + continue + key = t[:3] + val = float(m.group("val")) + unit = m.group("unit") + + if key == "str": + start_hz = _to_hz(val, unit, 1e6 if assume_mhz_for_freq else 1.0) + elif key == "stp": + stop_hz = _to_hz(val, unit, 1e6 if assume_mhz_for_freq else 1.0) + elif key == "pnt": # token 'pnts' + points = int(val) + elif key == "bw": + bw_hz = _to_hz(val, unit, 1.0) + + return start_hz, stop_hz, points, bw_hz + + +# ----------------------- VNA Settings Manager ------------------------------ + +class VNASettingsManager: + """ + - Scans config_logs/{S11,S21}/ for available configs + - Controls current_log.bin symlink (must be a real symlink) + - Parses config params from filename + - Stores per-config calibration in: + calibration////_sweepNNNNNN/ + - copies ALL processor result JSON files for that sweep (and metadata.json if present) + - UI helpers: select S11/S21, calibrate (through/open/short/load) by sweep number + """ + + def __init__( + self, + base_dir: Path | None = None, + config_logs_subdir: str = "binary_logs/config_logs", + current_log_name: str = "current_log.bin", + calibration_subdir: str = "calibration", + assume_mhz_for_freq: bool = True, + results_storage: ResultsStorage | None = None, + ): + self.base_dir = Path(base_dir or cfg.BASE_DIR) + self.cfg_logs_dir = self.base_dir / config_logs_subdir + self.current_log = self.cfg_logs_dir / current_log_name + self.calib_root = self.base_dir / calibration_subdir + self.assume_mhz_for_freq = assume_mhz_for_freq + + # Ensure directory structure exists + (self.cfg_logs_dir / "S11").mkdir(parents=True, exist_ok=True) + (self.cfg_logs_dir / "S21").mkdir(parents=True, exist_ok=True) + self.calib_root.mkdir(parents=True, exist_ok=True) + + # Results storage + self.results = results_storage or ResultsStorage( + storage_dir=str(self.base_dir / "processing_results") + ) + + # ---------- configuration selection & discovery ---------- + + def list_configs(self, mode: VNAMode | None = None) -> List[LogConfig]: + modes = [mode] if mode else [VNAMode.S11, VNAMode.S21] + out: List[LogConfig] = [] + for m in modes: + d = self.cfg_logs_dir / m.value + if not d.exists(): + continue + for fp in sorted(d.glob("*.bin")): + s, e, n, bw = parse_config_filename(fp.name, self.assume_mhz_for_freq) + out.append(LogConfig( + mode=m, + file_path=fp.resolve(), + stem=fp.stem, + start_hz=s, + stop_hz=e, + points=n, + bw_hz=bw, + )) + return out + + def set_current_config(self, mode: VNAMode, filename: str) -> LogConfig: + """ + Update current_log.bin symlink to point to config_logs//. + Real symlink only; will raise if not supported. + """ + target = (self.cfg_logs_dir / mode.value / filename).resolve() + if not target.exists(): + raise FileNotFoundError(f"Config not found: {target}") + + if self.current_log.exists() or self.current_log.is_symlink(): + self.current_log.unlink() + + # relative link if possible, else absolute (still a symlink) + try: + rel = target.relative_to(self.current_log.parent) + except ValueError: + rel = target + + self.current_log.symlink_to(rel) + return self.get_current_config() + + def get_current_config(self) -> LogConfig: + if not self.current_log.exists(): + raise FileNotFoundError(f"{self.current_log} does not exist") + + tgt = self.current_log.resolve() + mode = VNAMode(tgt.parent.name) # expects .../config_logs// + s, e, n, bw = parse_config_filename(tgt.name, self.assume_mhz_for_freq) + return LogConfig( + mode=mode, + file_path=tgt, + stem=tgt.stem, + start_hz=s, stop_hz=e, points=n, bw_hz=bw + ) + + # ---------- calibration capture (ALL processors) ---------- + + @staticmethod + def required_standards(mode: VNAMode) -> List[CalibrationStandard]: + return ( + [CalibrationStandard.OPEN, CalibrationStandard.SHORT, CalibrationStandard.LOAD] + if mode == VNAMode.S11 + else [CalibrationStandard.THROUGH] + ) + + def _calib_dir(self, cfg: LogConfig, standard: CalibrationStandard | None = None) -> Path: + base = self.calib_root / cfg.mode.value / cfg.stem + return base / standard.value if standard else base + + def _calib_sweep_dir(self, cfg: LogConfig, standard: CalibrationStandard, sweep_number: int, ts: str | None = None) -> Path: + """ + calibration////_sweepNNNNNN/ + """ + ts = ts or datetime.now().strftime("%Y%m%d_%H%M%S") + d = self._calib_dir(cfg, standard) / f"{ts}_sweep{sweep_number:06d}" + d.mkdir(parents=True, exist_ok=True) + return d + + def record_calibration_from_sweep( + self, + standard: CalibrationStandard, + sweep_number: int, + *, + cfg: LogConfig | None = None + ) -> Path: + """ + Capture ALL processor JSON results for the given sweep and save under: + calibration////_sweepNNNNNN/ + Also copy metadata.json if available. + Returns the created sweep calibration directory. + """ + cfg = cfg or self.get_current_config() + + # Get ALL results for the sweep + results = self.results.get_result_by_sweep(sweep_number, processor_name=None) + if not results: + raise FileNotFoundError(f"No processor results found for sweep {sweep_number}") + + # Determine destination dir + dst_dir = self._calib_sweep_dir(cfg, standard, sweep_number) + + # Save processor files (re-serialize what ResultsStorage returns) + count = 0 + for r in results: + try: + dst_file = dst_dir / f"{r.processor_name}.json" + payload = { + "processor_name": r.processor_name, + "sweep_number": r.sweep_number, + "data": r.data, + } + # keep optional fields if present + if getattr(r, "plotly_figure", None) is not None: + payload["plotly_figure"] = r.plotly_figure + if getattr(r, "file_path", None) is not None: + payload["file_path"] = r.file_path + + with open(dst_file, "w") as f: + json.dump(payload, f, indent=2) + count += 1 + except Exception as e: + logger.error(f"Failed to store processor '{r.processor_name}' for sweep {sweep_number}: {e}") + + # Save metadata if available + try: + meta = self.results.get_sweep_metadata(sweep_number) + if meta: + with open(dst_dir / "metadata.json", "w") as f: + json.dump(meta, f, indent=2) + except Exception as e: + logger.warning(f"Failed to write metadata for sweep {sweep_number}: {e}") + + if count == 0: + raise RuntimeError(f"Nothing was written for sweep {sweep_number}") + + logger.info(f"Stored calibration (standard={standard.value}) from sweep {sweep_number} into {dst_dir}") + return dst_dir + + def latest_calibration(self, cfg: LogConfig | None = None) -> Dict[CalibrationStandard, Path] | None: + """ + Returns the latest sweep directory per required standard for the current (or provided) config. + """ + cfg = cfg or self.get_current_config() + out: Dict[CalibrationStandard, Path] | None = {} + for std in self.required_standards(cfg.mode): + d = self._calib_dir(cfg, std) + if not d.exists(): + out[std] = None + continue + subdirs = sorted([p for p in d.iterdir() if p.is_dir()]) + out[std] = subdirs[-1] if subdirs else None + return out + + def calibration_status(self, cfg: LogConfig | None = None) -> Dict[str, bool]: + cfg = cfg or self.get_current_config() + latest = self.latest_calibration(cfg) + return {std.value: (p is not None and p.exists()) for std, p in latest.items()} + + def is_fully_calibrated(self, cfg: LogConfig | None = None) -> bool: + return all(self.calibration_status(cfg).values()) + + # ---------- UI helpers ---------- + + def summary(self) -> Dict[str, object]: + cfg = self.get_current_config() + latest = self.latest_calibration(cfg) + return { + "mode": cfg.mode.value, + "current_log": str(self.current_log), + "selected_file": str(cfg.file_path), + "stem": cfg.stem, + "params": { + "start_hz": cfg.start_hz, + "stop_hz": cfg.stop_hz, + "points": cfg.points, + "bw_hz": cfg.bw_hz, + }, + "required_standards": [s.value for s in self.required_standards(cfg.mode)], + "calibration_latest": {k.value: (str(v) if v else None) for k, v in latest.items()}, + "is_fully_calibrated": self.is_fully_calibrated(cfg), + } + + def ui_select_S11(self, filename: str) -> Dict[str, object]: + self.set_current_config(VNAMode.S11, filename) + return self.summary() + + def ui_select_S21(self, filename: str) -> Dict[str, object]: + self.set_current_config(VNAMode.S21, filename) + return self.summary() + + # Calibration triggers (buttons) + def ui_calibrate_through(self, sweep_number: int) -> Dict[str, object]: + cfg = self.get_current_config() + if cfg.mode != VNAMode.S21: + raise RuntimeError("THROUGH is only valid in S21 mode") + self.record_calibration_from_sweep(CalibrationStandard.THROUGH, sweep_number) + return self.summary() + + def ui_calibrate_open(self, sweep_number: int) -> Dict[str, object]: + cfg = self.get_current_config() + if cfg.mode != VNAMode.S11: + raise RuntimeError("OPEN is only valid in S11 mode") + self.record_calibration_from_sweep(CalibrationStandard.OPEN, sweep_number) + return self.summary() + + def ui_calibrate_short(self, sweep_number: int) -> Dict[str, object]: + cfg = self.get_current_config() + if cfg.mode != VNAMode.S11: + raise RuntimeError("SHORT is only valid in S11 mode") + self.record_calibration_from_sweep(CalibrationStandard.SHORT, sweep_number) + return self.summary() + + def ui_calibrate_load(self, sweep_number: int) -> Dict[str, object]: + cfg = self.get_current_config() + if cfg.mode != VNAMode.S11: + raise RuntimeError("LOAD is only valid in S11 mode") + self.record_calibration_from_sweep(CalibrationStandard.LOAD, sweep_number) + return self.summary() diff --git a/vna_system/core/singletons.py b/vna_system/core/singletons.py index a5393e6..b0957f6 100644 --- a/vna_system/core/singletons.py +++ b/vna_system/core/singletons.py @@ -7,9 +7,11 @@ scattered throughout the codebase. """ from vna_system.core.acquisition.data_acquisition import VNADataAcquisition from vna_system.core.processing.sweep_processor import SweepProcessingManager -from vna_system.api.websockets.websocket_handler import WebSocketManager +from vna_system.core.processing.websocket_handler import WebSocketManager +from vna_system.core.settings.settings_manager import VNASettingsManager # Global singleton instances vna_data_acquisition_instance: VNADataAcquisition = VNADataAcquisition() processing_manager: SweepProcessingManager = SweepProcessingManager() -websocket_manager: WebSocketManager = WebSocketManager(processing_manager) \ No newline at end of file +websocket_manager: WebSocketManager = WebSocketManager(processing_manager) +settings_manager: VNASettingsManager = VNASettingsManager() \ No newline at end of file diff --git a/vna_system/scripts/__init__.py b/vna_system/scripts/__init__.py deleted file mode 100644 index 55dbf87..0000000 --- a/vna_system/scripts/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""VNA System scripts module.""" \ No newline at end of file