From 6b6689fa5f5deb6b14383b0c63ad5df6742b8fe6 Mon Sep 17 00:00:00 2001 From: Ayzen Date: Mon, 27 Apr 2026 16:28:48 +0300 Subject: [PATCH] usb connection improved --- README.md | 2 +- laser_control/protocol.py | 13 +++- laser_control/transport.py | 152 +++++++++++++++++++++++++++++++++---- 3 files changed, 148 insertions(+), 19 deletions(-) diff --git a/README.md b/README.md index 62e5d3e..a9d0537 100644 --- a/README.md +++ b/README.md @@ -66,7 +66,7 @@ source .venv/bin/activate python3 -m laser_control.gui.main ``` -Автоподключение использует первый доступный USB UART-порт. +Автоподключение перебирает доступные UART-порты и выбирает тот, где отвечает прошивка платы. При подключении приложение только читает текущее состояние и телеметрию, без автоприменения ручных параметров. Совместимый launcher `_device_main.py` сохранён, но он только проксирует запуск в новый PyQt entrypoint. diff --git a/laser_control/protocol.py b/laser_control/protocol.py index 3e0a204..949433a 100644 --- a/laser_control/protocol.py +++ b/laser_control/protocol.py @@ -427,8 +427,8 @@ class Protocol: ) words = _unpack_words(data) - expected_crc = _payload_checksum(list(words[1:14])) - if words[14] != expected_crc: + if not Protocol.has_valid_response_crc(data): + expected_crc = _payload_checksum(list(words[1:14])) raise CRCError(expected=expected_crc, received=words[14]) return Measurements( @@ -448,6 +448,15 @@ class Protocol: timestamp=datetime.now(), ) + @staticmethod + def has_valid_response_crc(data: bytes) -> bool: + """Return True when a telemetry frame has the expected length and CRC.""" + if len(data) != GET_DATA_TOTAL_LENGTH: + return False + words = _unpack_words(data) + expected_crc = _payload_checksum(list(words[1:14])) + return words[14] == expected_crc + @staticmethod def decode_status(data: bytes) -> tuple[DeviceState, int]: """Decode the two-byte firmware status response into flags and detail.""" diff --git a/laser_control/transport.py b/laser_control/transport.py index d4cc9f0..a4d5c70 100644 --- a/laser_control/transport.py +++ b/laser_control/transport.py @@ -2,11 +2,37 @@ from __future__ import annotations +import logging +import re +import time + import serial import serial.tools.list_ports -from .constants import BAUDRATE, SERIAL_TIMEOUT_SEC +from .constants import ( + BAUDRATE, + GET_DATA_TOTAL_LENGTH, + SERIAL_TIMEOUT_SEC, + STATUS_RESPONSE_LENGTH, + WAIT_AFTER_SEND_SEC, +) from .exceptions import CommunicationError, PortNotFoundError +from .models import DeviceState +from .protocol import Protocol + + +logger = logging.getLogger(__name__) + +_PROBE_ATTEMPTS = 2 +_KNOWN_STATUS_MASK = int( + DeviceState.SD_ERROR + | DeviceState.UART_ERROR + | DeviceState.UART_DECODE_ERROR + | DeviceState.TEC1_ERROR + | DeviceState.TEC2_ERROR + | DeviceState.DEFAULT_ERROR + | DeviceState.AD9102_ERROR +) class SerialTransport: @@ -35,17 +61,18 @@ class SerialTransport: return self._serial is not None and self._serial.is_open def connect(self) -> None: - """Open the serial port, auto-detecting the first USB port when needed.""" - port = self._requested_port or self._detect_port() + """Open the serial port, auto-detecting the responding board when needed.""" + if self._requested_port is None: + self._active_port, self._serial = self._detect_and_open_port() + return + try: - self._serial = serial.Serial( - port=port, - baudrate=self._baudrate, - timeout=self._timeout, - ) + self._serial = self._open_serial(self._requested_port) except Exception as exc: # noqa: BLE001 - raise CommunicationError(f"Cannot connect to port '{port}': {exc}") from exc - self._active_port = port + raise CommunicationError( + f"Cannot connect to port '{self._requested_port}': {exc}" + ) from exc + self._active_port = self._requested_port def disconnect(self) -> None: """Close the serial port if it is open.""" @@ -67,12 +94,105 @@ class SerialTransport: assert self._serial is not None return self._serial.read(length) - def _detect_port(self) -> str: - ports = sorted(serial.tools.list_ports.comports(), key=lambda port: port.device) + def _open_serial(self, port: str) -> serial.Serial: + return serial.Serial( + port=port, + baudrate=self._baudrate, + timeout=self._timeout, + write_timeout=self._timeout, + ) + + def _detect_and_open_port(self) -> tuple[str, serial.Serial]: + ports = sorted(serial.tools.list_ports.comports(), key=self._port_sort_key) if not ports: raise PortNotFoundError() - usb_ports = [port.device for port in ports if "USB" in port.device.upper()] - if usb_ports: - return usb_ports[0] - return ports[0].device + checked_ports: list[str] = [] + for port_info in ports: + port = port_info.device + checked_ports.append(port) + connection: serial.Serial | None = None + matched = False + try: + connection = self._open_serial(port) + matched = self._probe_device(connection) + if matched: + return port, connection + except Exception as exc: # noqa: BLE001 + logger.debug("Serial auto-detect skipped %s: %s", port, exc) + finally: + if connection is not None and not matched: + connection.close() + + logger.info( + "Serial auto-detect found no responding controller; checked ports: %s", + ", ".join(checked_ports), + ) + raise PortNotFoundError() + + def _probe_device(self, connection: serial.Serial) -> bool: + for _attempt in range(_PROBE_ATTEMPTS): + self._reset_buffers(connection) + if not self._read_valid_status(connection): + continue + if self._read_valid_telemetry(connection): + return True + return False + + def _read_valid_status(self, connection: serial.Serial) -> bool: + raw = self._write_and_read( + connection, + Protocol.encode_state(), + STATUS_RESPONSE_LENGTH, + ) + if len(raw) != STATUS_RESPONSE_LENGTH: + return False + + state, _detail = Protocol.decode_status(raw) + return (int(state) & ~_KNOWN_STATUS_MASK) == 0 + + def _read_valid_telemetry(self, connection: serial.Serial) -> bool: + raw = self._write_and_read( + connection, + Protocol.encode_trans_enable(), + GET_DATA_TOTAL_LENGTH, + ) + return Protocol.has_valid_response_crc(raw) + + def _write_and_read( + self, + connection: serial.Serial, + command: bytes, + response_length: int, + ) -> bytes: + connection.write(command) + connection.flush() + time.sleep(WAIT_AFTER_SEND_SEC) + return connection.read(response_length) + + @staticmethod + def _reset_buffers(connection: serial.Serial) -> None: + connection.reset_input_buffer() + connection.reset_output_buffer() + + @staticmethod + def _port_sort_key(port_info) -> tuple[int, list[int | str]]: + text = " ".join( + str(value or "") + for value in ( + port_info.device, + getattr(port_info, "description", ""), + getattr(port_info, "hwid", ""), + getattr(port_info, "manufacturer", ""), + getattr(port_info, "product", ""), + ) + ).upper() + priority = 0 if "USB" in text or getattr(port_info, "vid", None) is not None else 1 + return priority, SerialTransport._natural_key(port_info.device) + + @staticmethod + def _natural_key(value: str) -> list[int | str]: + return [ + int(part) if part.isdigit() else part.lower() + for part in re.split(r"(\d+)", value) + ]