Files
RadioPhotonic_PCB_PC_software/laser_control/transport.py
2026-04-27 16:28:48 +03:00

199 lines
6.3 KiB
Python

"""Serial transport for the laser controller board."""
from __future__ import annotations
import logging
import re
import time
import serial
import serial.tools.list_ports
from .constants import (
BAUDRATE,
GET_DATA_TOTAL_LENGTH,
SERIAL_TIMEOUT_SEC,
STATUS_RESPONSE_LENGTH,
WAIT_AFTER_SEND_SEC,
)
from .exceptions import CommunicationError, PortNotFoundError
from .models import DeviceState
from .protocol import Protocol
logger = logging.getLogger(__name__)
_PROBE_ATTEMPTS = 2
_KNOWN_STATUS_MASK = int(
DeviceState.SD_ERROR
| DeviceState.UART_ERROR
| DeviceState.UART_DECODE_ERROR
| DeviceState.TEC1_ERROR
| DeviceState.TEC2_ERROR
| DeviceState.DEFAULT_ERROR
| DeviceState.AD9102_ERROR
)
class SerialTransport:
"""Small serial wrapper with auto-detection and explicit lifecycle."""
def __init__(
self,
port: str | None = None,
baudrate: int = BAUDRATE,
timeout: float = SERIAL_TIMEOUT_SEC,
) -> None:
self._requested_port = port
self._active_port: str | None = None
self._baudrate = baudrate
self._timeout = timeout
self._serial: serial.Serial | None = None
@property
def port_name(self) -> str | None:
"""Return the connected port or the requested port when disconnected."""
return self._active_port or self._requested_port
@property
def is_connected(self) -> bool:
"""Return True when the serial port is currently open."""
return self._serial is not None and self._serial.is_open
def connect(self) -> None:
"""Open the serial port, auto-detecting the responding board when needed."""
if self._requested_port is None:
self._active_port, self._serial = self._detect_and_open_port()
return
try:
self._serial = self._open_serial(self._requested_port)
except Exception as exc: # noqa: BLE001
raise CommunicationError(
f"Cannot connect to port '{self._requested_port}': {exc}"
) from exc
self._active_port = self._requested_port
def disconnect(self) -> None:
"""Close the serial port if it is open."""
if self._serial is not None and self._serial.is_open:
self._serial.close()
self._serial = None
def send(self, data: bytes) -> None:
"""Write raw bytes to the serial port."""
if not self.is_connected:
raise CommunicationError("Serial port is not connected")
assert self._serial is not None
self._serial.write(data)
def read(self, length: int) -> bytes:
"""Read a fixed number of bytes from the serial port."""
if not self.is_connected:
raise CommunicationError("Serial port is not connected")
assert self._serial is not None
return self._serial.read(length)
def _open_serial(self, port: str) -> serial.Serial:
return serial.Serial(
port=port,
baudrate=self._baudrate,
timeout=self._timeout,
write_timeout=self._timeout,
)
def _detect_and_open_port(self) -> tuple[str, serial.Serial]:
ports = sorted(serial.tools.list_ports.comports(), key=self._port_sort_key)
if not ports:
raise PortNotFoundError()
checked_ports: list[str] = []
for port_info in ports:
port = port_info.device
checked_ports.append(port)
connection: serial.Serial | None = None
matched = False
try:
connection = self._open_serial(port)
matched = self._probe_device(connection)
if matched:
return port, connection
except Exception as exc: # noqa: BLE001
logger.debug("Serial auto-detect skipped %s: %s", port, exc)
finally:
if connection is not None and not matched:
connection.close()
logger.info(
"Serial auto-detect found no responding controller; checked ports: %s",
", ".join(checked_ports),
)
raise PortNotFoundError()
def _probe_device(self, connection: serial.Serial) -> bool:
for _attempt in range(_PROBE_ATTEMPTS):
self._reset_buffers(connection)
if not self._read_valid_status(connection):
continue
if self._read_valid_telemetry(connection):
return True
return False
def _read_valid_status(self, connection: serial.Serial) -> bool:
raw = self._write_and_read(
connection,
Protocol.encode_state(),
STATUS_RESPONSE_LENGTH,
)
if len(raw) != STATUS_RESPONSE_LENGTH:
return False
state, _detail = Protocol.decode_status(raw)
return (int(state) & ~_KNOWN_STATUS_MASK) == 0
def _read_valid_telemetry(self, connection: serial.Serial) -> bool:
raw = self._write_and_read(
connection,
Protocol.encode_trans_enable(),
GET_DATA_TOTAL_LENGTH,
)
return Protocol.has_valid_response_crc(raw)
def _write_and_read(
self,
connection: serial.Serial,
command: bytes,
response_length: int,
) -> bytes:
connection.write(command)
connection.flush()
time.sleep(WAIT_AFTER_SEND_SEC)
return connection.read(response_length)
@staticmethod
def _reset_buffers(connection: serial.Serial) -> None:
connection.reset_input_buffer()
connection.reset_output_buffer()
@staticmethod
def _port_sort_key(port_info) -> tuple[int, list[int | str]]:
text = " ".join(
str(value or "")
for value in (
port_info.device,
getattr(port_info, "description", ""),
getattr(port_info, "hwid", ""),
getattr(port_info, "manufacturer", ""),
getattr(port_info, "product", ""),
)
).upper()
priority = 0 if "USB" in text or getattr(port_info, "vid", None) is not None else 1
return priority, SerialTransport._natural_key(port_info.device)
@staticmethod
def _natural_key(value: str) -> list[int | str]:
return [
int(part) if part.isdigit() else part.lower()
for part in re.split(r"(\d+)", value)
]