usb connection improved
This commit is contained in:
@ -66,7 +66,7 @@ source .venv/bin/activate
|
|||||||
python3 -m laser_control.gui.main
|
python3 -m laser_control.gui.main
|
||||||
```
|
```
|
||||||
|
|
||||||
Автоподключение использует первый доступный USB UART-порт.
|
Автоподключение перебирает доступные UART-порты и выбирает тот, где отвечает прошивка платы.
|
||||||
При подключении приложение только читает текущее состояние и телеметрию, без автоприменения ручных параметров.
|
При подключении приложение только читает текущее состояние и телеметрию, без автоприменения ручных параметров.
|
||||||
Совместимый launcher `_device_main.py` сохранён, но он только проксирует запуск в новый PyQt entrypoint.
|
Совместимый launcher `_device_main.py` сохранён, но он только проксирует запуск в новый PyQt entrypoint.
|
||||||
|
|
||||||
|
|||||||
@ -427,8 +427,8 @@ class Protocol:
|
|||||||
)
|
)
|
||||||
|
|
||||||
words = _unpack_words(data)
|
words = _unpack_words(data)
|
||||||
expected_crc = _payload_checksum(list(words[1:14]))
|
if not Protocol.has_valid_response_crc(data):
|
||||||
if words[14] != expected_crc:
|
expected_crc = _payload_checksum(list(words[1:14]))
|
||||||
raise CRCError(expected=expected_crc, received=words[14])
|
raise CRCError(expected=expected_crc, received=words[14])
|
||||||
|
|
||||||
return Measurements(
|
return Measurements(
|
||||||
@ -448,6 +448,15 @@ class Protocol:
|
|||||||
timestamp=datetime.now(),
|
timestamp=datetime.now(),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def has_valid_response_crc(data: bytes) -> bool:
|
||||||
|
"""Return True when a telemetry frame has the expected length and CRC."""
|
||||||
|
if len(data) != GET_DATA_TOTAL_LENGTH:
|
||||||
|
return False
|
||||||
|
words = _unpack_words(data)
|
||||||
|
expected_crc = _payload_checksum(list(words[1:14]))
|
||||||
|
return words[14] == expected_crc
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def decode_status(data: bytes) -> tuple[DeviceState, int]:
|
def decode_status(data: bytes) -> tuple[DeviceState, int]:
|
||||||
"""Decode the two-byte firmware status response into flags and detail."""
|
"""Decode the two-byte firmware status response into flags and detail."""
|
||||||
|
|||||||
@ -2,11 +2,37 @@
|
|||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import re
|
||||||
|
import time
|
||||||
|
|
||||||
import serial
|
import serial
|
||||||
import serial.tools.list_ports
|
import serial.tools.list_ports
|
||||||
|
|
||||||
from .constants import BAUDRATE, SERIAL_TIMEOUT_SEC
|
from .constants import (
|
||||||
|
BAUDRATE,
|
||||||
|
GET_DATA_TOTAL_LENGTH,
|
||||||
|
SERIAL_TIMEOUT_SEC,
|
||||||
|
STATUS_RESPONSE_LENGTH,
|
||||||
|
WAIT_AFTER_SEND_SEC,
|
||||||
|
)
|
||||||
from .exceptions import CommunicationError, PortNotFoundError
|
from .exceptions import CommunicationError, PortNotFoundError
|
||||||
|
from .models import DeviceState
|
||||||
|
from .protocol import Protocol
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
_PROBE_ATTEMPTS = 2
|
||||||
|
_KNOWN_STATUS_MASK = int(
|
||||||
|
DeviceState.SD_ERROR
|
||||||
|
| DeviceState.UART_ERROR
|
||||||
|
| DeviceState.UART_DECODE_ERROR
|
||||||
|
| DeviceState.TEC1_ERROR
|
||||||
|
| DeviceState.TEC2_ERROR
|
||||||
|
| DeviceState.DEFAULT_ERROR
|
||||||
|
| DeviceState.AD9102_ERROR
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class SerialTransport:
|
class SerialTransport:
|
||||||
@ -35,17 +61,18 @@ class SerialTransport:
|
|||||||
return self._serial is not None and self._serial.is_open
|
return self._serial is not None and self._serial.is_open
|
||||||
|
|
||||||
def connect(self) -> None:
|
def connect(self) -> None:
|
||||||
"""Open the serial port, auto-detecting the first USB port when needed."""
|
"""Open the serial port, auto-detecting the responding board when needed."""
|
||||||
port = self._requested_port or self._detect_port()
|
if self._requested_port is None:
|
||||||
|
self._active_port, self._serial = self._detect_and_open_port()
|
||||||
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self._serial = serial.Serial(
|
self._serial = self._open_serial(self._requested_port)
|
||||||
port=port,
|
|
||||||
baudrate=self._baudrate,
|
|
||||||
timeout=self._timeout,
|
|
||||||
)
|
|
||||||
except Exception as exc: # noqa: BLE001
|
except Exception as exc: # noqa: BLE001
|
||||||
raise CommunicationError(f"Cannot connect to port '{port}': {exc}") from exc
|
raise CommunicationError(
|
||||||
self._active_port = port
|
f"Cannot connect to port '{self._requested_port}': {exc}"
|
||||||
|
) from exc
|
||||||
|
self._active_port = self._requested_port
|
||||||
|
|
||||||
def disconnect(self) -> None:
|
def disconnect(self) -> None:
|
||||||
"""Close the serial port if it is open."""
|
"""Close the serial port if it is open."""
|
||||||
@ -67,12 +94,105 @@ class SerialTransport:
|
|||||||
assert self._serial is not None
|
assert self._serial is not None
|
||||||
return self._serial.read(length)
|
return self._serial.read(length)
|
||||||
|
|
||||||
def _detect_port(self) -> str:
|
def _open_serial(self, port: str) -> serial.Serial:
|
||||||
ports = sorted(serial.tools.list_ports.comports(), key=lambda port: port.device)
|
return serial.Serial(
|
||||||
|
port=port,
|
||||||
|
baudrate=self._baudrate,
|
||||||
|
timeout=self._timeout,
|
||||||
|
write_timeout=self._timeout,
|
||||||
|
)
|
||||||
|
|
||||||
|
def _detect_and_open_port(self) -> tuple[str, serial.Serial]:
|
||||||
|
ports = sorted(serial.tools.list_ports.comports(), key=self._port_sort_key)
|
||||||
if not ports:
|
if not ports:
|
||||||
raise PortNotFoundError()
|
raise PortNotFoundError()
|
||||||
|
|
||||||
usb_ports = [port.device for port in ports if "USB" in port.device.upper()]
|
checked_ports: list[str] = []
|
||||||
if usb_ports:
|
for port_info in ports:
|
||||||
return usb_ports[0]
|
port = port_info.device
|
||||||
return ports[0].device
|
checked_ports.append(port)
|
||||||
|
connection: serial.Serial | None = None
|
||||||
|
matched = False
|
||||||
|
try:
|
||||||
|
connection = self._open_serial(port)
|
||||||
|
matched = self._probe_device(connection)
|
||||||
|
if matched:
|
||||||
|
return port, connection
|
||||||
|
except Exception as exc: # noqa: BLE001
|
||||||
|
logger.debug("Serial auto-detect skipped %s: %s", port, exc)
|
||||||
|
finally:
|
||||||
|
if connection is not None and not matched:
|
||||||
|
connection.close()
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
"Serial auto-detect found no responding controller; checked ports: %s",
|
||||||
|
", ".join(checked_ports),
|
||||||
|
)
|
||||||
|
raise PortNotFoundError()
|
||||||
|
|
||||||
|
def _probe_device(self, connection: serial.Serial) -> bool:
|
||||||
|
for _attempt in range(_PROBE_ATTEMPTS):
|
||||||
|
self._reset_buffers(connection)
|
||||||
|
if not self._read_valid_status(connection):
|
||||||
|
continue
|
||||||
|
if self._read_valid_telemetry(connection):
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _read_valid_status(self, connection: serial.Serial) -> bool:
|
||||||
|
raw = self._write_and_read(
|
||||||
|
connection,
|
||||||
|
Protocol.encode_state(),
|
||||||
|
STATUS_RESPONSE_LENGTH,
|
||||||
|
)
|
||||||
|
if len(raw) != STATUS_RESPONSE_LENGTH:
|
||||||
|
return False
|
||||||
|
|
||||||
|
state, _detail = Protocol.decode_status(raw)
|
||||||
|
return (int(state) & ~_KNOWN_STATUS_MASK) == 0
|
||||||
|
|
||||||
|
def _read_valid_telemetry(self, connection: serial.Serial) -> bool:
|
||||||
|
raw = self._write_and_read(
|
||||||
|
connection,
|
||||||
|
Protocol.encode_trans_enable(),
|
||||||
|
GET_DATA_TOTAL_LENGTH,
|
||||||
|
)
|
||||||
|
return Protocol.has_valid_response_crc(raw)
|
||||||
|
|
||||||
|
def _write_and_read(
|
||||||
|
self,
|
||||||
|
connection: serial.Serial,
|
||||||
|
command: bytes,
|
||||||
|
response_length: int,
|
||||||
|
) -> bytes:
|
||||||
|
connection.write(command)
|
||||||
|
connection.flush()
|
||||||
|
time.sleep(WAIT_AFTER_SEND_SEC)
|
||||||
|
return connection.read(response_length)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _reset_buffers(connection: serial.Serial) -> None:
|
||||||
|
connection.reset_input_buffer()
|
||||||
|
connection.reset_output_buffer()
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _port_sort_key(port_info) -> tuple[int, list[int | str]]:
|
||||||
|
text = " ".join(
|
||||||
|
str(value or "")
|
||||||
|
for value in (
|
||||||
|
port_info.device,
|
||||||
|
getattr(port_info, "description", ""),
|
||||||
|
getattr(port_info, "hwid", ""),
|
||||||
|
getattr(port_info, "manufacturer", ""),
|
||||||
|
getattr(port_info, "product", ""),
|
||||||
|
)
|
||||||
|
).upper()
|
||||||
|
priority = 0 if "USB" in text or getattr(port_info, "vid", None) is not None else 1
|
||||||
|
return priority, SerialTransport._natural_key(port_info.device)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _natural_key(value: str) -> list[int | str]:
|
||||||
|
return [
|
||||||
|
int(part) if part.isdigit() else part.lower()
|
||||||
|
for part in re.split(r"(\d+)", value)
|
||||||
|
]
|
||||||
|
|||||||
Reference in New Issue
Block a user