""" Main laser controller for the laser control module. Provides a high-level API for controlling dual laser systems. All input parameters are validated before being sent to the device. Can be embedded in any Python application without GUI dependencies. """ import time import logging from typing import Optional, Callable from .protocol import Protocol, TaskType as ProtoTaskType from .validators import ParameterValidator from .models import ( ManualModeParams, VariationParams, VariationType, Measurements, DeviceStatus, DeviceState, ) from .exceptions import ( ValidationError, CommunicationError, DeviceNotRespondingError, DeviceStateError, ) from .constants import WAIT_AFTER_SEND_SEC logger = logging.getLogger(__name__) # Default PI regulator coefficients (match firmware defaults) DEFAULT_PI_P = 2560 # 10 * 256 DEFAULT_PI_I = 128 # 0.5 * 256 class LaserController: """ High-level controller for the dual laser board. Usage example:: ctrl = LaserController(port='/dev/ttyUSB0') ctrl.connect() ctrl.set_manual_mode(temp1=25.0, temp2=30.0, current1=40.0, current2=35.0) data = ctrl.get_measurements() print(data.voltage_3v3) ctrl.disconnect() All public methods raise :class:`ValidationError` for bad parameters and :class:`CommunicationError` for transport-level problems. """ def __init__( self, port: Optional[str] = None, pi_coeff1_p: int = DEFAULT_PI_P, pi_coeff1_i: int = DEFAULT_PI_I, pi_coeff2_p: int = DEFAULT_PI_P, pi_coeff2_i: int = DEFAULT_PI_I, on_data: Optional[Callable[[Measurements], None]] = None, ): """ Args: port: Serial port (e.g. '/dev/ttyUSB0'). None = auto-detect. pi_coeff1_p: Proportional coefficient for laser 1 PI regulator. pi_coeff1_i: Integral coefficient for laser 1 PI regulator. pi_coeff2_p: Proportional coefficient for laser 2 PI regulator. pi_coeff2_i: Integral coefficient for laser 2 PI regulator. on_data: Optional callback called whenever new measurements are received. Signature: ``callback(Measurements)``. """ self._protocol = Protocol(port) self._pi1_p = pi_coeff1_p self._pi1_i = pi_coeff1_i self._pi2_p = pi_coeff2_p self._pi2_i = pi_coeff2_i self._on_data = on_data self._message_id = 0 self._last_measurements: Optional[Measurements] = None # ---- Connection ------------------------------------------------------- def connect(self) -> bool: """ Open connection to the device. Returns: True if connection succeeded. Raises: CommunicationError: If the port cannot be opened. """ self._protocol.connect() logger.info("Connected to laser controller on port %s", self._protocol._port_name or "auto") return True def disconnect(self) -> None: """Close the serial port gracefully.""" self._protocol.disconnect() logger.info("Disconnected from laser controller") @property def is_connected(self) -> bool: """True if the serial port is open.""" return self._protocol.is_connected # ---- Public API ------------------------------------------------------- def set_manual_mode( self, temp1: float, temp2: float, current1: float, current2: float, ) -> None: """ Set manual control parameters for both lasers. Args: temp1: Setpoint temperature for laser 1, °C. Valid range: [15.0 … 40.0] °C. temp2: Setpoint temperature for laser 2, °C. Valid range: [15.0 … 40.0] °C. current1: Drive current for laser 1, mA. Valid range: [15.0 … 60.0] mA. current2: Drive current for laser 2, mA. Valid range: [15.0 … 60.0] mA. Raises: ValidationError: If any parameter is out of range. CommunicationError: If the command cannot be sent. """ validated = ParameterValidator.validate_manual_mode_params( temp1, temp2, current1, current2 ) self._message_id = (self._message_id + 1) & 0xFFFF cmd = Protocol.encode_decode_enable( temp1=validated['temp1'], temp2=validated['temp2'], current1=validated['current1'], current2=validated['current2'], pi_coeff1_p=self._pi1_p, pi_coeff1_i=self._pi1_i, pi_coeff2_p=self._pi2_p, pi_coeff2_i=self._pi2_i, message_id=self._message_id, ) self._send_and_read_state(cmd) logger.debug("Manual mode set: T1=%.2f T2=%.2f I1=%.2f I2=%.2f", validated['temp1'], validated['temp2'], validated['current1'], validated['current2']) def start_variation( self, variation_type: VariationType, params: dict, ) -> None: """ Start a parameter variation task. Args: variation_type: Which parameter to vary (:class:`VariationType.CHANGE_CURRENT_LD1` or :class:`VariationType.CHANGE_CURRENT_LD2`). params: Dictionary with the following keys: - ``min_value`` – minimum value of the varied parameter. - ``max_value`` – maximum value of the varied parameter. - ``step`` – step size. - ``time_step`` – discretisation time step, µs [20 … 100]. - ``delay_time``– delay between pulses, ms [3 … 10]. - ``static_temp1`` – fixed temperature for laser 1, °C. - ``static_temp2`` – fixed temperature for laser 2, °C. - ``static_current1`` – fixed current for laser 1, mA. - ``static_current2`` – fixed current for laser 2, mA. Raises: ValidationError: If any parameter fails validation. CommunicationError: If the command cannot be sent. """ # Validate variation-specific params validated = ParameterValidator.validate_variation_params( params, variation_type ) # Validate static parameters static_temp1 = ParameterValidator.validate_temperature( params.get('static_temp1', 25.0), 'static_temp1' ) static_temp2 = ParameterValidator.validate_temperature( params.get('static_temp2', 25.0), 'static_temp2' ) static_current1 = ParameterValidator.validate_current( params.get('static_current1', 30.0), 'static_current1' ) static_current2 = ParameterValidator.validate_current( params.get('static_current2', 30.0), 'static_current2' ) # Map VariationType → protocol TaskType task_type_map = { VariationType.CHANGE_CURRENT_LD1: ProtoTaskType.CHANGE_CURRENT_LD1, VariationType.CHANGE_CURRENT_LD2: ProtoTaskType.CHANGE_CURRENT_LD2, VariationType.CHANGE_TEMPERATURE_LD1: ProtoTaskType.CHANGE_TEMPERATURE_LD1, VariationType.CHANGE_TEMPERATURE_LD2: ProtoTaskType.CHANGE_TEMPERATURE_LD2, } proto_task = task_type_map[validated['variation_type']] cmd = Protocol.encode_task_enable( task_type=proto_task, static_temp1=static_temp1, static_temp2=static_temp2, static_current1=static_current1, static_current2=static_current2, min_value=validated['min_value'], max_value=validated['max_value'], step=validated['step'], time_step=validated['time_step'], delay_time=validated['delay_time'], message_id=self._message_id, pi_coeff1_p=self._pi1_p, pi_coeff1_i=self._pi1_i, pi_coeff2_p=self._pi2_p, pi_coeff2_i=self._pi2_i, ) self._send_and_read_state(cmd) logger.info("Variation task started: type=%s min=%.3f max=%.3f step=%.3f", validated['variation_type'].name, validated['min_value'], validated['max_value'], validated['step']) def stop_task(self) -> None: """Stop the current task by sending DEFAULT_ENABLE (reset).""" cmd = Protocol.encode_default_enable() self._send_and_read_state(cmd) logger.info("Task stopped (DEFAULT_ENABLE sent)") def get_measurements(self) -> Optional[Measurements]: """ Request and return the latest measurements from the device. Returns: :class:`Measurements` dataclass, or None if no data available. Raises: CommunicationError: On transport errors. """ cmd = Protocol.encode_trans_enable() self._send(cmd) raw = self._protocol.receive_raw(30) if not raw or len(raw) != 30: logger.warning("No data received from device") return None response = Protocol.decode_response(raw) measurements = response.to_measurements() self._last_measurements = measurements if self._on_data: self._on_data(measurements) return measurements def get_status(self) -> DeviceStatus: """ Request and return the current device status. Returns: :class:`DeviceStatus` with state and latest measurements. Raises: CommunicationError: On transport errors. """ cmd = Protocol.encode_state() self._send(cmd) raw = self._protocol.receive_raw(2) if not raw or len(raw) < 2: raise DeviceNotRespondingError() state_code = Protocol.decode_state(raw) # Try to get measurements as well measurements = self._last_measurements return DeviceStatus( state=DeviceState(state_code) if state_code in DeviceState._value2member_map_ else DeviceState.ERROR, measurements=measurements, is_connected=self.is_connected, last_command_id=self._message_id, error_message=Protocol.state_to_description(f"{state_code:04x}") if state_code != 0 else None, ) def reset(self) -> None: """Send a hardware reset command to the device.""" cmd = Protocol.encode_default_enable() self._send_and_read_state(cmd) logger.info("Device reset command sent") # ---- Internal helpers ------------------------------------------------- def _send(self, cmd: bytes) -> None: """Send command bytes and wait for the device to process.""" if not self.is_connected: raise CommunicationError("Not connected to device. Call connect() first.") self._protocol.send_raw(cmd) time.sleep(WAIT_AFTER_SEND_SEC) def _send_and_read_state(self, cmd: bytes) -> int: """Send command and read the 2-byte STATE response the device always returns. Commands DECODE_ENABLE, TASK_ENABLE and DEFAULT_ENABLE each trigger a STATE reply from the firmware. If we don't consume those bytes here, they accumulate in the serial buffer and corrupt the next DATA read. Returns the decoded state code (0x0000 = OK). """ self._send(cmd) raw = self._protocol.receive_raw(2) if raw and len(raw) == 2: state = Protocol.decode_state(raw) logger.debug("STATE response after command: 0x%04x", state) return state return 0 # ---- Context manager support ----------------------------------------- def __enter__(self): self.connect() return self def __exit__(self, exc_type, exc_val, exc_tb): self.disconnect() return False