294 lines
11 KiB
Python
294 lines
11 KiB
Python
"""
|
|
Integration tests for the laser control module.
|
|
|
|
Tests the full call chain: LaserController → Protocol → Serial,
|
|
using mock serial ports. No real hardware required.
|
|
"""
|
|
|
|
import pytest
|
|
import time
|
|
from unittest.mock import MagicMock, patch, call
|
|
from laser_control.controller import LaserController
|
|
from laser_control.models import VariationType, DeviceState
|
|
from laser_control.exceptions import (
|
|
ValidationError,
|
|
CommunicationError,
|
|
TemperatureOutOfRangeError,
|
|
CurrentOutOfRangeError,
|
|
)
|
|
from laser_control.protocol import Protocol, CommandCode
|
|
from .conftest import make_valid_response
|
|
|
|
|
|
class TestManualModeIntegration:
|
|
"""Integration tests for manual mode operation."""
|
|
|
|
def test_full_manual_mode_flow(self, connected_controller, mock_serial):
|
|
"""Test complete manual mode command flow."""
|
|
connected_controller.set_manual_mode(
|
|
temp1=25.0, temp2=30.0,
|
|
current1=40.0, current2=35.0
|
|
)
|
|
|
|
# Verify command was sent
|
|
assert mock_serial.write.called
|
|
sent_data = mock_serial.write.call_args[0][0]
|
|
assert len(sent_data) == 30 # SEND_PARAMS_TOTAL_LENGTH
|
|
|
|
# Verify command code (bytes 0-1, little-endian 0x1111 → 0x11 0x11)
|
|
assert sent_data[0] == 0x11
|
|
assert sent_data[1] == 0x11
|
|
|
|
def test_manual_mode_validation_rejects_invalid_temp(self, connected_controller):
|
|
"""Test that manual mode rejects out-of-range temperature."""
|
|
with pytest.raises(TemperatureOutOfRangeError) as exc_info:
|
|
connected_controller.set_manual_mode(
|
|
temp1=50.0, # Too high
|
|
temp2=30.0,
|
|
current1=40.0,
|
|
current2=35.0
|
|
)
|
|
assert "temp1" in str(exc_info.value)
|
|
assert "50.0" in str(exc_info.value)
|
|
|
|
def test_manual_mode_validation_rejects_invalid_current(self, connected_controller):
|
|
"""Test that manual mode rejects out-of-range current."""
|
|
with pytest.raises(CurrentOutOfRangeError) as exc_info:
|
|
connected_controller.set_manual_mode(
|
|
temp1=25.0,
|
|
temp2=30.0,
|
|
current1=40.0,
|
|
current2=70.0 # Too high
|
|
)
|
|
assert "current2" in str(exc_info.value)
|
|
|
|
def test_manual_mode_no_serial_call_on_validation_failure(
|
|
self, connected_controller, mock_serial
|
|
):
|
|
"""Serial write must not be called when validation fails."""
|
|
mock_serial.write.reset_mock()
|
|
with pytest.raises(ValidationError):
|
|
connected_controller.set_manual_mode(
|
|
temp1=5.0, # Invalid
|
|
temp2=30.0,
|
|
current1=40.0,
|
|
current2=35.0
|
|
)
|
|
mock_serial.write.assert_not_called()
|
|
|
|
def test_message_id_increments(self, connected_controller, mock_serial):
|
|
"""Message ID should increment with each command."""
|
|
initial_id = connected_controller._message_id
|
|
connected_controller.set_manual_mode(25.0, 30.0, 40.0, 35.0)
|
|
assert connected_controller._message_id == (initial_id + 1) & 0xFFFF
|
|
|
|
connected_controller.set_manual_mode(26.0, 31.0, 41.0, 36.0)
|
|
assert connected_controller._message_id == (initial_id + 2) & 0xFFFF
|
|
|
|
|
|
class TestVariationModeIntegration:
|
|
"""Integration tests for variation mode operation."""
|
|
|
|
def test_current_ld1_variation_flow(self, connected_controller, mock_serial):
|
|
"""Test complete current variation for laser 1."""
|
|
params = {
|
|
'min_value': 20.0,
|
|
'max_value': 50.0,
|
|
'step': 0.5,
|
|
'time_step': 50,
|
|
'delay_time': 5,
|
|
'static_temp1': 25.0,
|
|
'static_temp2': 30.0,
|
|
'static_current1': 35.0,
|
|
'static_current2': 35.0,
|
|
}
|
|
connected_controller.start_variation(VariationType.CHANGE_CURRENT_LD1, params)
|
|
|
|
assert mock_serial.write.called
|
|
sent_data = mock_serial.write.call_args[0][0]
|
|
assert len(sent_data) == 32 # TASK_ENABLE_COMMAND_LENGTH
|
|
|
|
# Verify command code (0x7777)
|
|
assert sent_data[0] == 0x77
|
|
assert sent_data[1] == 0x77
|
|
|
|
def test_current_ld2_variation_flow(self, connected_controller, mock_serial):
|
|
"""Test complete current variation for laser 2."""
|
|
params = {
|
|
'min_value': 20.0,
|
|
'max_value': 50.0,
|
|
'step': 0.5,
|
|
'time_step': 50,
|
|
'delay_time': 5,
|
|
'static_temp1': 25.0,
|
|
'static_temp2': 30.0,
|
|
'static_current1': 35.0,
|
|
'static_current2': 35.0,
|
|
}
|
|
connected_controller.start_variation(VariationType.CHANGE_CURRENT_LD2, params)
|
|
assert mock_serial.write.called
|
|
|
|
def test_variation_rejects_invalid_step(self, connected_controller, mock_serial):
|
|
"""Variation must reject step below minimum."""
|
|
mock_serial.write.reset_mock()
|
|
params = {
|
|
'min_value': 20.0,
|
|
'max_value': 50.0,
|
|
'step': 0.001, # Too small
|
|
'time_step': 50,
|
|
'delay_time': 5,
|
|
'static_temp1': 25.0,
|
|
'static_temp2': 30.0,
|
|
'static_current1': 35.0,
|
|
'static_current2': 35.0,
|
|
}
|
|
with pytest.raises(ValidationError):
|
|
connected_controller.start_variation(VariationType.CHANGE_CURRENT_LD1, params)
|
|
mock_serial.write.assert_not_called()
|
|
|
|
def test_variation_rejects_inverted_range(self, connected_controller):
|
|
"""Variation must reject min > max."""
|
|
params = {
|
|
'min_value': 50.0, # min > max
|
|
'max_value': 20.0,
|
|
'step': 0.5,
|
|
'time_step': 50,
|
|
'delay_time': 5,
|
|
'static_temp1': 25.0,
|
|
'static_temp2': 30.0,
|
|
'static_current1': 35.0,
|
|
'static_current2': 35.0,
|
|
}
|
|
with pytest.raises(ValidationError) as exc_info:
|
|
connected_controller.start_variation(VariationType.CHANGE_CURRENT_LD1, params)
|
|
assert "min" in str(exc_info.value).lower()
|
|
|
|
|
|
class TestMeasurementsIntegration:
|
|
"""Integration tests for measurement retrieval."""
|
|
|
|
def test_get_measurements_returns_data(self, connected_controller, mock_serial):
|
|
"""get_measurements should decode and return device data."""
|
|
mock_serial.read.return_value = make_valid_response()
|
|
measurements = connected_controller.get_measurements()
|
|
|
|
assert measurements is not None
|
|
assert isinstance(measurements.current1, float)
|
|
assert isinstance(measurements.current2, float)
|
|
assert isinstance(measurements.temp1, float)
|
|
assert isinstance(measurements.temp2, float)
|
|
assert isinstance(measurements.voltage_3v3, float)
|
|
|
|
def test_get_measurements_calls_callback(self, mock_serial):
|
|
"""on_data callback should be triggered on new measurements."""
|
|
received = []
|
|
mock_serial.read.return_value = make_valid_response()
|
|
mock_serial.is_open = True
|
|
|
|
ctrl = LaserController(
|
|
port='/dev/ttyUSB0',
|
|
on_data=lambda m: received.append(m)
|
|
)
|
|
ctrl._protocol._serial = mock_serial
|
|
|
|
ctrl.get_measurements()
|
|
assert len(received) == 1
|
|
assert received[0].voltage_3v3 > 0
|
|
|
|
def test_get_measurements_no_data(self, connected_controller, mock_serial):
|
|
"""get_measurements returns None when no data received."""
|
|
mock_serial.read.return_value = b''
|
|
result = connected_controller.get_measurements()
|
|
assert result is None
|
|
|
|
def test_voltage_rail_check(self, connected_controller, mock_serial):
|
|
"""Test power rail health check on measurements."""
|
|
mock_serial.read.return_value = make_valid_response(
|
|
mon_3v3_n=2703, # ~3.3V
|
|
mon_5v1_n=2731, # ~5.0V
|
|
mon_5v2_n=2731,
|
|
mon_7v0_n=1042, # ~7.0V
|
|
)
|
|
measurements = connected_controller.get_measurements()
|
|
if measurements:
|
|
rails = measurements.check_power_rails()
|
|
assert isinstance(rails, dict)
|
|
assert '3v3' in rails
|
|
assert '5v1' in rails
|
|
assert '5v2' in rails
|
|
assert '7v0' in rails
|
|
|
|
|
|
class TestConnectionManagement:
|
|
"""Integration tests for connection handling."""
|
|
|
|
def test_context_manager(self, mock_serial):
|
|
"""Test using LaserController as context manager."""
|
|
mock_serial.is_open = True
|
|
with patch('serial.Serial', return_value=mock_serial):
|
|
with LaserController(port='/dev/ttyUSB0') as ctrl:
|
|
assert ctrl.is_connected
|
|
mock_serial.close.assert_called()
|
|
|
|
def test_send_without_connection_raises(self):
|
|
"""Sending command without connection raises CommunicationError."""
|
|
ctrl = LaserController(port='/dev/ttyUSB0')
|
|
# Don't call connect()
|
|
with pytest.raises(CommunicationError) as exc_info:
|
|
ctrl.set_manual_mode(25.0, 30.0, 40.0, 35.0)
|
|
assert "connect" in str(exc_info.value).lower()
|
|
|
|
def test_stop_task_sends_default_enable(self, connected_controller, mock_serial):
|
|
"""stop_task should send DEFAULT_ENABLE (0x2222)."""
|
|
mock_serial.write.reset_mock()
|
|
connected_controller.stop_task()
|
|
|
|
assert mock_serial.write.called
|
|
sent_data = mock_serial.write.call_args[0][0]
|
|
# DEFAULT_ENABLE: 0x2222 → flipped to bytes 0x22 0x22
|
|
assert sent_data[0] == 0x22
|
|
assert sent_data[1] == 0x22
|
|
|
|
def test_reset_sends_default_enable(self, connected_controller, mock_serial):
|
|
"""reset() should also send DEFAULT_ENABLE."""
|
|
mock_serial.write.reset_mock()
|
|
connected_controller.reset()
|
|
assert mock_serial.write.called
|
|
|
|
|
|
class TestConversionsRoundtrip:
|
|
"""Test that physical unit conversions are self-consistent."""
|
|
|
|
def test_temperature_roundtrip(self):
|
|
"""temp_c_to_n and temp_n_to_c should be inverse of each other."""
|
|
from laser_control.conversions import temp_c_to_n, temp_n_to_c
|
|
for temp in [15.0, 20.0, 25.0, 30.0, 35.0, 40.0]:
|
|
n = temp_c_to_n(temp)
|
|
recovered = temp_n_to_c(n)
|
|
assert abs(recovered - temp) < 0.05, \
|
|
f"Temperature roundtrip failed for {temp}°C: got {recovered}°C"
|
|
|
|
def test_current_roundtrip(self):
|
|
"""current_ma_to_n and current_n_to_ma should be approximately inverse."""
|
|
from laser_control.conversions import current_ma_to_n, current_n_to_ma
|
|
# Note: current_n_to_ma is for photodiode readback, not exact inverse
|
|
# of current_ma_to_n (different calibration paths).
|
|
# We just test that values are in plausible range.
|
|
for current in [15.0, 30.0, 45.0, 60.0]:
|
|
n = current_ma_to_n(current)
|
|
assert 0 <= n <= 65535
|
|
|
|
def test_voltage_conversions_nominal(self):
|
|
"""Test voltage conversions at nominal counts."""
|
|
from laser_control.conversions import (
|
|
voltage_3v3_n_to_v, voltage_5v_n_to_v, voltage_7v_n_to_v
|
|
)
|
|
# Approximate nominal ADC counts for each rail
|
|
v33 = voltage_3v3_n_to_v(2703)
|
|
assert 3.1 <= v33 <= 3.5, f"3.3V rail: {v33}"
|
|
|
|
v5 = voltage_5v_n_to_v(2731)
|
|
assert 4.8 <= v5 <= 5.3, f"5V rail: {v5}"
|
|
|
|
v7 = voltage_7v_n_to_v(1042)
|
|
assert 6.5 <= v7 <= 7.5, f"7V rail: {v7}" |