79 lines
2.7 KiB
Python
79 lines
2.7 KiB
Python
"""Serial transport for the laser controller board."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import serial
|
|
import serial.tools.list_ports
|
|
|
|
from .constants import BAUDRATE, SERIAL_TIMEOUT_SEC
|
|
from .exceptions import CommunicationError, PortNotFoundError
|
|
|
|
|
|
class SerialTransport:
|
|
"""Small serial wrapper with auto-detection and explicit lifecycle."""
|
|
|
|
def __init__(
|
|
self,
|
|
port: str | None = None,
|
|
baudrate: int = BAUDRATE,
|
|
timeout: float = SERIAL_TIMEOUT_SEC,
|
|
) -> None:
|
|
self._requested_port = port
|
|
self._active_port: str | None = None
|
|
self._baudrate = baudrate
|
|
self._timeout = timeout
|
|
self._serial: serial.Serial | None = None
|
|
|
|
@property
|
|
def port_name(self) -> str | None:
|
|
"""Return the connected port or the requested port when disconnected."""
|
|
return self._active_port or self._requested_port
|
|
|
|
@property
|
|
def is_connected(self) -> bool:
|
|
"""Return True when the serial port is currently open."""
|
|
return self._serial is not None and self._serial.is_open
|
|
|
|
def connect(self) -> None:
|
|
"""Open the serial port, auto-detecting the first USB port when needed."""
|
|
port = self._requested_port or self._detect_port()
|
|
try:
|
|
self._serial = serial.Serial(
|
|
port=port,
|
|
baudrate=self._baudrate,
|
|
timeout=self._timeout,
|
|
)
|
|
except Exception as exc: # noqa: BLE001
|
|
raise CommunicationError(f"Cannot connect to port '{port}': {exc}") from exc
|
|
self._active_port = port
|
|
|
|
def disconnect(self) -> None:
|
|
"""Close the serial port if it is open."""
|
|
if self._serial is not None and self._serial.is_open:
|
|
self._serial.close()
|
|
self._serial = None
|
|
|
|
def send(self, data: bytes) -> None:
|
|
"""Write raw bytes to the serial port."""
|
|
if not self.is_connected:
|
|
raise CommunicationError("Serial port is not connected")
|
|
assert self._serial is not None
|
|
self._serial.write(data)
|
|
|
|
def read(self, length: int) -> bytes:
|
|
"""Read a fixed number of bytes from the serial port."""
|
|
if not self.is_connected:
|
|
raise CommunicationError("Serial port is not connected")
|
|
assert self._serial is not None
|
|
return self._serial.read(length)
|
|
|
|
def _detect_port(self) -> str:
|
|
ports = sorted(serial.tools.list_ports.comports(), key=lambda port: port.device)
|
|
if not ports:
|
|
raise PortNotFoundError()
|
|
|
|
usb_ports = [port.device for port in ports if "USB" in port.device.upper()]
|
|
if usb_ports:
|
|
return usb_ports[0]
|
|
return ports[0].device
|