343 lines
12 KiB
Python
343 lines
12 KiB
Python
"""
|
||
Main laser controller for the laser control module.
|
||
|
||
Provides a high-level API for controlling dual laser systems.
|
||
All input parameters are validated before being sent to the device.
|
||
Can be embedded in any Python application without GUI dependencies.
|
||
"""
|
||
|
||
import time
|
||
import logging
|
||
from typing import Optional, Callable
|
||
|
||
from .protocol import Protocol, TaskType as ProtoTaskType
|
||
from .validators import ParameterValidator
|
||
from .models import (
|
||
ManualModeParams,
|
||
VariationParams,
|
||
VariationType,
|
||
Measurements,
|
||
DeviceStatus,
|
||
DeviceState,
|
||
)
|
||
from .exceptions import (
|
||
ValidationError,
|
||
CommunicationError,
|
||
DeviceNotRespondingError,
|
||
DeviceStateError,
|
||
)
|
||
from .constants import WAIT_AFTER_SEND_SEC
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# Default PI regulator coefficients (match firmware defaults)
|
||
DEFAULT_PI_P = 2560 # 10 * 256
|
||
DEFAULT_PI_I = 128 # 0.5 * 256
|
||
|
||
|
||
class LaserController:
|
||
"""
|
||
High-level controller for the dual laser board.
|
||
|
||
Usage example::
|
||
|
||
ctrl = LaserController(port='/dev/ttyUSB0')
|
||
ctrl.connect()
|
||
ctrl.set_manual_mode(temp1=25.0, temp2=30.0,
|
||
current1=40.0, current2=35.0)
|
||
data = ctrl.get_measurements()
|
||
print(data.voltage_3v3)
|
||
ctrl.disconnect()
|
||
|
||
All public methods raise :class:`ValidationError` for bad parameters
|
||
and :class:`CommunicationError` for transport-level problems.
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
port: Optional[str] = None,
|
||
pi_coeff1_p: int = DEFAULT_PI_P,
|
||
pi_coeff1_i: int = DEFAULT_PI_I,
|
||
pi_coeff2_p: int = DEFAULT_PI_P,
|
||
pi_coeff2_i: int = DEFAULT_PI_I,
|
||
on_data: Optional[Callable[[Measurements], None]] = None,
|
||
):
|
||
"""
|
||
Args:
|
||
port: Serial port (e.g. '/dev/ttyUSB0'). None = auto-detect.
|
||
pi_coeff1_p: Proportional coefficient for laser 1 PI regulator.
|
||
pi_coeff1_i: Integral coefficient for laser 1 PI regulator.
|
||
pi_coeff2_p: Proportional coefficient for laser 2 PI regulator.
|
||
pi_coeff2_i: Integral coefficient for laser 2 PI regulator.
|
||
on_data: Optional callback called whenever new measurements
|
||
are received. Signature: ``callback(Measurements)``.
|
||
"""
|
||
self._protocol = Protocol(port)
|
||
self._pi1_p = pi_coeff1_p
|
||
self._pi1_i = pi_coeff1_i
|
||
self._pi2_p = pi_coeff2_p
|
||
self._pi2_i = pi_coeff2_i
|
||
self._on_data = on_data
|
||
self._message_id = 0
|
||
self._last_measurements: Optional[Measurements] = None
|
||
|
||
# ---- Connection -------------------------------------------------------
|
||
|
||
def connect(self) -> bool:
|
||
"""
|
||
Open connection to the device.
|
||
|
||
Returns:
|
||
True if connection succeeded.
|
||
|
||
Raises:
|
||
CommunicationError: If the port cannot be opened.
|
||
"""
|
||
self._protocol.connect()
|
||
logger.info("Connected to laser controller on port %s",
|
||
self._protocol._port_name or "auto")
|
||
return True
|
||
|
||
def disconnect(self) -> None:
|
||
"""Close the serial port gracefully."""
|
||
self._protocol.disconnect()
|
||
logger.info("Disconnected from laser controller")
|
||
|
||
@property
|
||
def is_connected(self) -> bool:
|
||
"""True if the serial port is open."""
|
||
return self._protocol.is_connected
|
||
|
||
# ---- Public API -------------------------------------------------------
|
||
|
||
def set_manual_mode(
|
||
self,
|
||
temp1: float,
|
||
temp2: float,
|
||
current1: float,
|
||
current2: float,
|
||
) -> None:
|
||
"""
|
||
Set manual control parameters for both lasers.
|
||
|
||
Args:
|
||
temp1: Setpoint temperature for laser 1, °C.
|
||
Valid range: [15.0 … 40.0] °C.
|
||
temp2: Setpoint temperature for laser 2, °C.
|
||
Valid range: [15.0 … 40.0] °C.
|
||
current1: Drive current for laser 1, mA.
|
||
Valid range: [15.0 … 60.0] mA.
|
||
current2: Drive current for laser 2, mA.
|
||
Valid range: [15.0 … 60.0] mA.
|
||
|
||
Raises:
|
||
ValidationError: If any parameter is out of range.
|
||
CommunicationError: If the command cannot be sent.
|
||
"""
|
||
validated = ParameterValidator.validate_manual_mode_params(
|
||
temp1, temp2, current1, current2
|
||
)
|
||
self._message_id = (self._message_id + 1) & 0xFFFF
|
||
|
||
cmd = Protocol.encode_decode_enable(
|
||
temp1=validated['temp1'],
|
||
temp2=validated['temp2'],
|
||
current1=validated['current1'],
|
||
current2=validated['current2'],
|
||
pi_coeff1_p=self._pi1_p,
|
||
pi_coeff1_i=self._pi1_i,
|
||
pi_coeff2_p=self._pi2_p,
|
||
pi_coeff2_i=self._pi2_i,
|
||
message_id=self._message_id,
|
||
)
|
||
self._send_and_read_state(cmd)
|
||
logger.debug("Manual mode set: T1=%.2f T2=%.2f I1=%.2f I2=%.2f",
|
||
validated['temp1'], validated['temp2'],
|
||
validated['current1'], validated['current2'])
|
||
|
||
def start_variation(
|
||
self,
|
||
variation_type: VariationType,
|
||
params: dict,
|
||
) -> None:
|
||
"""
|
||
Start a parameter variation task.
|
||
|
||
Args:
|
||
variation_type: Which parameter to vary
|
||
(:class:`VariationType.CHANGE_CURRENT_LD1` or
|
||
:class:`VariationType.CHANGE_CURRENT_LD2`).
|
||
params: Dictionary with the following keys:
|
||
|
||
- ``min_value`` – minimum value of the varied parameter.
|
||
- ``max_value`` – maximum value of the varied parameter.
|
||
- ``step`` – step size.
|
||
- ``time_step`` – discretisation time step, µs [20 … 100].
|
||
- ``delay_time``– delay between pulses, ms [3 … 10].
|
||
- ``static_temp1`` – fixed temperature for laser 1, °C.
|
||
- ``static_temp2`` – fixed temperature for laser 2, °C.
|
||
- ``static_current1`` – fixed current for laser 1, mA.
|
||
- ``static_current2`` – fixed current for laser 2, mA.
|
||
|
||
Raises:
|
||
ValidationError: If any parameter fails validation.
|
||
CommunicationError: If the command cannot be sent.
|
||
"""
|
||
# Validate variation-specific params
|
||
validated = ParameterValidator.validate_variation_params(
|
||
params, variation_type
|
||
)
|
||
|
||
# Validate static parameters
|
||
static_temp1 = ParameterValidator.validate_temperature(
|
||
params.get('static_temp1', 25.0), 'static_temp1'
|
||
)
|
||
static_temp2 = ParameterValidator.validate_temperature(
|
||
params.get('static_temp2', 25.0), 'static_temp2'
|
||
)
|
||
static_current1 = ParameterValidator.validate_current(
|
||
params.get('static_current1', 30.0), 'static_current1'
|
||
)
|
||
static_current2 = ParameterValidator.validate_current(
|
||
params.get('static_current2', 30.0), 'static_current2'
|
||
)
|
||
|
||
# Map VariationType → protocol TaskType
|
||
task_type_map = {
|
||
VariationType.CHANGE_CURRENT_LD1: ProtoTaskType.CHANGE_CURRENT_LD1,
|
||
VariationType.CHANGE_CURRENT_LD2: ProtoTaskType.CHANGE_CURRENT_LD2,
|
||
VariationType.CHANGE_TEMPERATURE_LD1: ProtoTaskType.CHANGE_TEMPERATURE_LD1,
|
||
VariationType.CHANGE_TEMPERATURE_LD2: ProtoTaskType.CHANGE_TEMPERATURE_LD2,
|
||
}
|
||
proto_task = task_type_map[validated['variation_type']]
|
||
|
||
cmd = Protocol.encode_task_enable(
|
||
task_type=proto_task,
|
||
static_temp1=static_temp1,
|
||
static_temp2=static_temp2,
|
||
static_current1=static_current1,
|
||
static_current2=static_current2,
|
||
min_value=validated['min_value'],
|
||
max_value=validated['max_value'],
|
||
step=validated['step'],
|
||
time_step=validated['time_step'],
|
||
delay_time=validated['delay_time'],
|
||
message_id=self._message_id,
|
||
pi_coeff1_p=self._pi1_p,
|
||
pi_coeff1_i=self._pi1_i,
|
||
pi_coeff2_p=self._pi2_p,
|
||
pi_coeff2_i=self._pi2_i,
|
||
)
|
||
self._send_and_read_state(cmd)
|
||
logger.info("Variation task started: type=%s min=%.3f max=%.3f step=%.3f",
|
||
validated['variation_type'].name,
|
||
validated['min_value'],
|
||
validated['max_value'],
|
||
validated['step'])
|
||
|
||
def stop_task(self) -> None:
|
||
"""Stop the current task by sending DEFAULT_ENABLE (reset)."""
|
||
cmd = Protocol.encode_default_enable()
|
||
self._send_and_read_state(cmd)
|
||
logger.info("Task stopped (DEFAULT_ENABLE sent)")
|
||
|
||
def get_measurements(self) -> Optional[Measurements]:
|
||
"""
|
||
Request and return the latest measurements from the device.
|
||
|
||
Returns:
|
||
:class:`Measurements` dataclass, or None if no data available.
|
||
|
||
Raises:
|
||
CommunicationError: On transport errors.
|
||
"""
|
||
cmd = Protocol.encode_trans_enable()
|
||
self._send(cmd)
|
||
|
||
raw = self._protocol.receive_raw(30)
|
||
if not raw or len(raw) != 30:
|
||
logger.warning("No data received from device")
|
||
return None
|
||
|
||
response = Protocol.decode_response(raw)
|
||
measurements = response.to_measurements()
|
||
self._last_measurements = measurements
|
||
|
||
if self._on_data:
|
||
self._on_data(measurements)
|
||
|
||
return measurements
|
||
|
||
def get_status(self) -> DeviceStatus:
|
||
"""
|
||
Request and return the current device status.
|
||
|
||
Returns:
|
||
:class:`DeviceStatus` with state and latest measurements.
|
||
|
||
Raises:
|
||
CommunicationError: On transport errors.
|
||
"""
|
||
cmd = Protocol.encode_state()
|
||
self._send(cmd)
|
||
|
||
raw = self._protocol.receive_raw(2)
|
||
if not raw or len(raw) < 2:
|
||
raise DeviceNotRespondingError()
|
||
|
||
state_code = Protocol.decode_state(raw)
|
||
|
||
# Try to get measurements as well
|
||
measurements = self._last_measurements
|
||
|
||
return DeviceStatus(
|
||
state=DeviceState(state_code) if state_code in DeviceState._value2member_map_
|
||
else DeviceState.ERROR,
|
||
measurements=measurements,
|
||
is_connected=self.is_connected,
|
||
last_command_id=self._message_id,
|
||
error_message=Protocol.state_to_description(f"{state_code:04x}")
|
||
if state_code != 0 else None,
|
||
)
|
||
|
||
def reset(self) -> None:
|
||
"""Send a hardware reset command to the device."""
|
||
cmd = Protocol.encode_default_enable()
|
||
self._send_and_read_state(cmd)
|
||
logger.info("Device reset command sent")
|
||
|
||
# ---- Internal helpers -------------------------------------------------
|
||
|
||
def _send(self, cmd: bytes) -> None:
|
||
"""Send command bytes and wait for the device to process."""
|
||
if not self.is_connected:
|
||
raise CommunicationError("Not connected to device. Call connect() first.")
|
||
self._protocol.send_raw(cmd)
|
||
time.sleep(WAIT_AFTER_SEND_SEC)
|
||
|
||
def _send_and_read_state(self, cmd: bytes) -> int:
|
||
"""Send command and read the 2-byte STATE response the device always returns.
|
||
|
||
Commands DECODE_ENABLE, TASK_ENABLE and DEFAULT_ENABLE each trigger a
|
||
STATE reply from the firmware. If we don't consume those bytes here,
|
||
they accumulate in the serial buffer and corrupt the next DATA read.
|
||
|
||
Returns the decoded state code (0x0000 = OK).
|
||
"""
|
||
self._send(cmd)
|
||
raw = self._protocol.receive_raw(2)
|
||
if raw and len(raw) == 2:
|
||
state = Protocol.decode_state(raw)
|
||
logger.debug("STATE response after command: 0x%04x", state)
|
||
return state
|
||
return 0
|
||
|
||
# ---- Context manager support -----------------------------------------
|
||
|
||
def __enter__(self):
|
||
self.connect()
|
||
return self
|
||
|
||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||
self.disconnect()
|
||
return False |