вроде воркает

This commit is contained in:
awe
2025-11-24 22:21:30 +03:00
parent 5aea37ac63
commit 88ef718f31
7 changed files with 491 additions and 103 deletions

View File

@ -6,24 +6,21 @@ from ...api.models.laser import (
LaserParameters, LaserParameters,
LaserStatus, LaserStatus,
LaserStartResponse, LaserStartResponse,
LaserStopResponse LaserStopResponse,
ManualLaserParameters,
ManualLaserStartResponse
) )
from ...core.laser import LaserController from ...core.laser.laser_controller import LaserController
from ...core import singletons
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1/laser", tags=["laser"]) router = APIRouter(prefix="/api/v1/laser", tags=["laser"])
# Singleton laser controller instance
_laser_controller: LaserController = None
def get_laser_controller():
def get_laser_controller() -> LaserController: """Dependency to get laser controller instance from singletons"""
"""Dependency to get laser controller instance""" return singletons.laser_controller_instance
global _laser_controller
if _laser_controller is None:
_laser_controller = LaserController()
return _laser_controller
@router.post("/start", response_model=LaserStartResponse) @router.post("/start", response_model=LaserStartResponse)
@ -40,19 +37,8 @@ async def start_laser_cycle(
try: try:
logger.info("Received request to start laser cycle") logger.info("Received request to start laser cycle")
# Validate that at least one control mode is enabled # Validate that only one scan mode is enabled at a time
if not any([ # (Manual mode = all disabled, Scan mode = one enabled)
parameters.enable_t1,
parameters.enable_t2,
parameters.enable_c1,
parameters.enable_c2
]):
raise HTTPException(
status_code=400,
detail="Необходимо включить хотя бы один режим управления (температура или ток)"
)
# Validate that only one mode is enabled at a time
enabled_modes = sum([ enabled_modes = sum([
parameters.enable_t1, parameters.enable_t1,
parameters.enable_t2, parameters.enable_t2,
@ -62,7 +48,7 @@ async def start_laser_cycle(
if enabled_modes > 1: if enabled_modes > 1:
raise HTTPException( raise HTTPException(
status_code=400, status_code=400,
detail="Можно включить только один режим управления одновременно" detail="Можно включить только один режим сканирования одновременно"
) )
# Start the cycle # Start the cycle
@ -84,6 +70,51 @@ async def start_laser_cycle(
raise HTTPException(status_code=500, detail=f"Внутренняя ошибка сервера: {str(e)}") raise HTTPException(status_code=500, detail=f"Внутренняя ошибка сервера: {str(e)}")
@router.post("/start-manual", response_model=ManualLaserStartResponse)
async def start_manual_laser_control(
parameters: ManualLaserParameters,
controller: LaserController = Depends(get_laser_controller)
) -> ManualLaserStartResponse:
"""
Start manual laser control with simplified parameters (t1, t2, i1, i2).
This endpoint is designed for direct manual control without scan modes.
It accepts only 4 parameters and immediately applies them to the device.
Args:
parameters: Manual control parameters with t1, t2, i1, i2
Returns:
ManualLaserStartResponse with success status and message
"""
try:
logger.info("Received request to start manual laser control")
logger.info(f"Parameters: T1={parameters.t1}°C, T2={parameters.t2}°C, I1={parameters.i1}mA, I2={parameters.i2}mA")
# Call the simplified manual control method
result = controller.start_manual_direct(
t1=parameters.t1,
t2=parameters.t2,
i1=parameters.i1,
i2=parameters.i2
)
if not result["success"]:
raise HTTPException(status_code=500, detail=result["message"])
return ManualLaserStartResponse(
success=True,
message=result["message"],
parameters=parameters
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Unexpected error starting manual laser control: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Внутренняя ошибка сервера: {str(e)}")
@router.post("/stop", response_model=LaserStopResponse) @router.post("/stop", response_model=LaserStopResponse)
async def stop_laser_cycle( async def stop_laser_cycle(
controller: LaserController = Depends(get_laser_controller) controller: LaserController = Depends(get_laser_controller)
@ -139,7 +170,12 @@ async def connect_laser(
port: Serial port (e.g., '/dev/ttyUSB0'). If not specified, auto-detect. port: Serial port (e.g., '/dev/ttyUSB0'). If not specified, auto-detect.
""" """
try: try:
logger.info(f"Received request to connect to laser hardware on port: {port}") logger.info(f"Received request to connect to laser hardware on port: {port or 'auto-detect'}")
# If already connected, disconnect first
if controller.is_connected:
logger.info("Already connected, disconnecting first...")
controller.disconnect()
result = controller.connect(port) result = controller.connect(port)

View File

@ -101,3 +101,20 @@ class LaserStopResponse(BaseModel):
success: bool = Field(..., description="Успешность операции") success: bool = Field(..., description="Успешность операции")
message: str = Field(..., description="Сообщение о результате") message: str = Field(..., description="Сообщение о результате")
class ManualLaserParameters(BaseModel):
"""Model for manual laser control with simple parameters"""
t1: float = Field(..., ge=-1, le=45, description="Температура лазера 1 (°C)")
t2: float = Field(..., ge=-1, le=45, description="Температура лазера 2 (°C)")
i1: float = Field(..., ge=15, le=70, description="Ток лазера 1 (мА)")
i2: float = Field(..., ge=15, le=60, description="Ток лазера 2 (мА)")
class ManualLaserStartResponse(BaseModel):
"""Response model for manual laser start endpoint"""
success: bool = Field(..., description="Успешность операции")
message: str = Field(..., description="Сообщение о результате")
parameters: Optional[ManualLaserParameters] = Field(None, description="Примененные параметры")

View File

@ -1,8 +1,11 @@
import logging import logging
import time
from typing import Optional, Dict, Any from typing import Optional, Dict, Any
from datetime import datetime from datetime import datetime
from ...api.models.laser import LaserParameters, LaserStatus from ...api.models.laser import LaserParameters, LaserStatus
from .RadioPhotonic_PCB_PC_software import device_interaction as dev
from .RadioPhotonic_PCB_PC_software import device_commands as cmd
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -11,22 +14,35 @@ class LaserController:
""" """
Controller for laser control system. Controller for laser control system.
This is a stub implementation that logs all actions. Communicates with RadioPhotonic board via serial port (115200 baud).
Future integration with actual hardware would use serial communication Supports both manual control and automated scanning modes.
similar to the RadioPhotonic_PCB_PC_software project.
""" """
def __init__(self): def __init__(self):
self.prt = None # Serial port object
self.is_connected = False self.is_connected = False
self.is_running = False self.is_running = False
self.current_parameters: Optional[LaserParameters] = None self.current_parameters: Optional[LaserParameters] = None
self.current_status = LaserStatus() self.current_status = LaserStatus()
self.last_data: Optional[Dict[str, Any]] = None
# Default PI coefficients (multiplied by 256 as per device protocol)
self.proportional_coeff_1 = int(10 * 256)
self.proportional_coeff_2 = int(10 * 256)
self.integral_coeff_1 = int(0.5 * 256)
self.integral_coeff_2 = int(0.5 * 256)
self.message_id = "00FF"
logger.info("LaserController initialized") logger.info("LaserController initialized")
def start_cycle(self, parameters: LaserParameters) -> Dict[str, Any]: def start_cycle(self, parameters: LaserParameters) -> Dict[str, Any]:
""" """
Start laser control cycle with given parameters. Start laser control cycle with given parameters.
Determines mode (manual vs scan) based on enable flags:
- Manual mode: all enable_* flags are False
- Scan mode: one enable_* flag is True
Args: Args:
parameters: LaserParameters object with control settings parameters: LaserParameters object with control settings
@ -34,54 +50,40 @@ class LaserController:
Dictionary with success status and message Dictionary with success status and message
""" """
try: try:
if not self.is_connected or self.prt is None:
return {
"success": False,
"message": "Устройство не подключено. Сначала выполните подключение.",
"parameters": None
}
logger.info("=" * 60) logger.info("=" * 60)
logger.info("LASER CONTROL: START CYCLE") logger.info("LASER CONTROL: START CYCLE")
logger.info(f"Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')}") logger.info(f"Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')}")
logger.info("-" * 60) logger.info("-" * 60)
# Log all parameters # Determine operation mode
logger.info("Laser 1 Temperature Parameters:") is_manual = not any([
logger.info(f" Min Temperature: {parameters.min_temp_1}°C") parameters.enable_t1,
logger.info(f" Max Temperature: {parameters.max_temp_1}°C") parameters.enable_t2,
logger.info(f" Delta Temperature: {parameters.delta_temp_1}°C") parameters.enable_c1,
parameters.enable_c2
])
logger.info("Laser 1 Current Parameters:") if is_manual:
logger.info(f" Min Current: {parameters.min_current_1} mA") logger.info("Mode: MANUAL CONTROL")
logger.info(f" Max Current: {parameters.max_current_1} mA") result = self._start_manual_mode(parameters)
logger.info(f" Delta Current: {parameters.delta_current_1} mA") else:
logger.info("Mode: AUTOMATED SCAN")
result = self._start_scan_mode(parameters)
logger.info("Laser 2 Temperature Parameters:") if result["success"]:
logger.info(f" Min Temperature: {parameters.min_temp_2}°C") self.current_parameters = parameters
logger.info(f" Max Temperature: {parameters.max_temp_2}°C") self.is_running = True
logger.info(f" Delta Temperature: {parameters.delta_temp_2}°C") self.current_status.is_running = True
logger.info("Laser 2 Current Parameters:")
logger.info(f" Min Current: {parameters.min_current_2} mA")
logger.info(f" Max Current: {parameters.max_current_2} mA")
logger.info(f" Delta Current: {parameters.delta_current_2} mA")
logger.info("Time Parameters:")
logger.info(f" Delta Time: {parameters.delta_time} μs")
logger.info(f" Tau (Delay): {parameters.tau} ms")
logger.info("Control Mode Flags:")
logger.info(f" Enable Temperature Control Laser 1: {parameters.enable_t1}")
logger.info(f" Enable Temperature Control Laser 2: {parameters.enable_t2}")
logger.info(f" Enable Current Control Laser 1: {parameters.enable_c1}")
logger.info(f" Enable Current Control Laser 2: {parameters.enable_c2}")
logger.info("=" * 60) logger.info("=" * 60)
return result
# Store current parameters
self.current_parameters = parameters
self.is_running = True
self.current_status.is_running = True
return {
"success": True,
"message": "Цикл управления лазером успешно запущен (stub mode - без реального устройства)",
"parameters": parameters.model_dump()
}
except Exception as e: except Exception as e:
logger.error(f"Error starting laser cycle: {e}", exc_info=True) logger.error(f"Error starting laser cycle: {e}", exc_info=True)
@ -91,9 +93,124 @@ class LaserController:
"parameters": None "parameters": None
} }
def _start_manual_mode(self, parameters: LaserParameters) -> Dict[str, Any]:
"""
Start manual control mode with fixed T1, T2, I1, I2 values.
Uses DECODE_ENABLE (0x1111) command.
"""
try:
# Prepare control parameters
params = {
'Temp_1': parameters.min_temp_1,
'Temp_2': parameters.min_temp_2,
'Iset_1': parameters.min_current_1,
'Iset_2': parameters.min_current_2,
'ProportionalCoeff_1': self.proportional_coeff_1,
'ProportionalCoeff_2': self.proportional_coeff_2,
'IntegralCoeff_1': self.integral_coeff_1,
'IntegralCoeff_2': self.integral_coeff_2,
'Message_ID': self.message_id
}
logger.info(f"Sending manual control parameters:")
logger.info(f" T1: {params['Temp_1']}°C, T2: {params['Temp_2']}°C")
logger.info(f" I1: {params['Iset_1']} mA, I2: {params['Iset_2']} mA")
# Send control parameters to device
dev.send_control_parameters(self.prt, params)
return {
"success": True,
"message": "Ручное управление запущено",
"parameters": parameters.model_dump()
}
except Exception as e:
logger.error(f"Error in manual mode: {e}", exc_info=True)
raise
def _start_scan_mode(self, parameters: LaserParameters) -> Dict[str, Any]:
"""
Start automated scan mode.
Uses TASK_ENABLE (0x7777) command with TaskType.
"""
try:
# Determine which parameter to scan
if parameters.enable_c1:
task_type = cmd.TaskType.ChangeCurrentLD1.value
scan_param = "Current Laser 1"
logger.info(f"Scanning Current Laser 1: {parameters.min_current_1} to {parameters.max_current_1} mA")
elif parameters.enable_c2:
task_type = cmd.TaskType.ChangeCurrentLD2.value
scan_param = "Current Laser 2"
logger.info(f"Scanning Current Laser 2: {parameters.min_current_2} to {parameters.max_current_2} mA")
elif parameters.enable_t1:
return {
"success": False,
"message": "Сканирование температуры Laser 1 не поддерживается устройством",
"parameters": None
}
elif parameters.enable_t2:
return {
"success": False,
"message": "Сканирование температуры Laser 2 не поддерживается устройством",
"parameters": None
}
else:
return {
"success": False,
"message": "Не выбран параметр для сканирования",
"parameters": None
}
# Build task parameters based on task type
sending_param = {
'TaskType': task_type,
'Dt': parameters.delta_time / 1000.0, # Convert μs to ms
'Tau': parameters.tau,
'ProportionalCoeff_1': self.proportional_coeff_1,
'ProportionalCoeff_2': self.proportional_coeff_2,
'IntegralCoeff_1': self.integral_coeff_1,
'IntegralCoeff_2': self.integral_coeff_2
}
# Add scan-specific parameters
if task_type == cmd.TaskType.ChangeCurrentLD1.value:
sending_param.update({
'MinC1': parameters.min_current_1,
'MaxC1': parameters.max_current_1,
'DeltaC1': parameters.delta_current_1,
'T1': parameters.min_temp_1, # Fixed
'I2': parameters.min_current_2, # Fixed
'T2': parameters.min_temp_2 # Fixed
})
elif task_type == cmd.TaskType.ChangeCurrentLD2.value:
sending_param.update({
'MinC2': parameters.min_current_2,
'MaxC2': parameters.max_current_2,
'DeltaC2': parameters.delta_current_2,
'T2': parameters.min_temp_2, # Fixed
'I1': parameters.min_current_1, # Fixed
'T1': parameters.min_temp_1 # Fixed
})
# Send task command to device
dev.send_task_command(self.prt, sending_param)
return {
"success": True,
"message": f"Сканирование запущено: {scan_param}",
"parameters": parameters.model_dump()
}
except Exception as e:
logger.error(f"Error in scan mode: {e}", exc_info=True)
raise
def stop_cycle(self) -> Dict[str, Any]: def stop_cycle(self) -> Dict[str, Any]:
""" """
Stop current laser control cycle. Stop current laser control cycle.
Sends reset command to device.
Returns: Returns:
Dictionary with success status and message Dictionary with success status and message
@ -104,6 +221,14 @@ class LaserController:
logger.info(f"Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')}") logger.info(f"Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')}")
logger.info("=" * 60) logger.info("=" * 60)
if self.is_connected and self.prt is not None:
# Send reset command to device
try:
dev.reset_port_settings(self.prt)
logger.info("Device reset command sent")
except Exception as e:
logger.warning(f"Failed to send reset command: {e}")
self.is_running = False self.is_running = False
self.current_status.is_running = False self.current_status.is_running = False
self.current_parameters = None self.current_parameters = None
@ -120,39 +245,131 @@ class LaserController:
"message": f"Ошибка при остановке цикла: {str(e)}" "message": f"Ошибка при остановке цикла: {str(e)}"
} }
def start_manual_direct(self, t1: float, t2: float, i1: float, i2: float) -> Dict[str, Any]:
"""
Start manual control mode with direct T1, T2, I1, I2 parameters.
Simplified interface for manual control.
Args:
t1: Temperature for Laser 1 in Celsius
t2: Temperature for Laser 2 in Celsius
i1: Current for Laser 1 in mA
i2: Current for Laser 2 in mA
Returns:
Dictionary with success status and message
"""
try:
# Check connection status
if not self.is_connected:
logger.error("Device not connected")
return {
"success": False,
"message": "Устройство не подключено. Нажмите кнопку 'Подключить' в настройках.",
}
if self.prt is None:
logger.error("Serial port is None")
return {
"success": False,
"message": "Порт не инициализирован. Переподключите устройство.",
}
# Check if port is actually open
if not self.prt.is_open:
logger.error("Serial port is not open")
return {
"success": False,
"message": "Порт не открыт. Переподключите устройство.",
}
logger.info("=" * 60)
logger.info("LASER CONTROL: START MANUAL MODE (Direct)")
logger.info(f"Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')}")
logger.info(f"Port: {self.prt.port}, Open: {self.prt.is_open}")
logger.info(f" T1: {t1}°C, T2: {t2}°C")
logger.info(f" I1: {i1} mA, I2: {i2} mA")
logger.info("=" * 60)
# Prepare control parameters
params = {
'Temp_1': t1,
'Temp_2': t2,
'Iset_1': i1,
'Iset_2': i2,
'ProportionalCoeff_1': self.proportional_coeff_1,
'ProportionalCoeff_2': self.proportional_coeff_2,
'IntegralCoeff_1': self.integral_coeff_1,
'IntegralCoeff_2': self.integral_coeff_2,
'Message_ID': self.message_id
}
# Send control parameters to device
logger.info("Sending control parameters to device...")
dev.send_control_parameters(self.prt, params)
logger.info("Control parameters sent successfully")
self.is_running = True
self.current_status.is_running = True
return {
"success": True,
"message": "Ручное управление запущено",
}
except Exception as e:
logger.error(f"Error starting manual control: {e}", exc_info=True)
# Reset connection status if error
self.is_connected = False
return {
"success": False,
"message": f"Ошибка при запуске: {str(e)}. Попробуйте переподключить устройство.",
}
def get_status(self) -> LaserStatus: def get_status(self) -> LaserStatus:
""" """
Get current laser status. Get current laser status by requesting data from device.
Uses TRANS_ENABLE (0x4444) command.
Returns: Returns:
LaserStatus object with current state LaserStatus object with current state
""" """
# In real implementation, this would query the hardware
# For now, return stub data
self.current_status.connected = self.is_connected self.current_status.connected = self.is_connected
self.current_status.is_running = self.is_running self.current_status.is_running = self.is_running
# Stub values (would come from actual hardware) if self.is_connected and self.prt is not None:
if self.is_connected: try:
self.current_status.temp_1 = 28.0 # Request data from device
self.current_status.temp_2 = 28.9 data = dev.request_data(self.prt)
self.current_status.current_1 = -0.02
self.current_status.current_2 = -0.02 if data and isinstance(data, dict):
self.current_status.temp_ext_1 = 30.95 # Update status from device data
self.current_status.temp_ext_2 = 29.58 self.last_data = data
self.current_status.voltage_3v3 = 3.30 self.current_status.temp_1 = data.get('Temp_1', 0.0)
self.current_status.voltage_5v1 = 4.92 self.current_status.temp_2 = data.get('Temp_2', 0.0)
self.current_status.voltage_5v2 = 4.96 self.current_status.current_1 = data.get('I1', 0.0)
self.current_status.voltage_7v0 = 7.57 self.current_status.current_2 = data.get('I2', 0.0)
self.current_status.temp_ext_1 = data.get('Temp_Ext_1', 0.0)
self.current_status.temp_ext_2 = data.get('Temp_Ext_2', 0.0)
self.current_status.voltage_3v3 = data.get('MON_3V3', 0.0)
self.current_status.voltage_5v1 = data.get('MON_5V1', 0.0)
self.current_status.voltage_5v2 = data.get('MON_5V2', 0.0)
self.current_status.voltage_7v0 = data.get('MON_7V0', 0.0)
except Exception as e:
logger.warning(f"Error requesting status from device: {e}")
# Keep previous status values on error
return self.current_status return self.current_status
def connect(self, port: Optional[str] = None) -> Dict[str, Any]: def connect(self, port: Optional[str] = None) -> Dict[str, Any]:
""" """
Connect to laser control hardware. Connect to laser control hardware via serial port.
Auto-detects USB serial ports if port not specified.
Args: Args:
port: Serial port to connect to (e.g., '/dev/ttyUSB0') port: Serial port to connect to (e.g., '/dev/ttyUSB0')
If None, will auto-detect USB ports
Returns: Returns:
Dictionary with success status and message Dictionary with success status and message
@ -160,21 +377,63 @@ class LaserController:
try: try:
logger.info(f"Attempting to connect to laser hardware on port: {port or 'auto-detect'}") logger.info(f"Attempting to connect to laser hardware on port: {port or 'auto-detect'}")
# In real implementation, would use serial communication here if self.is_connected:
# For now, just simulate connection logger.warning("Already connected to device")
return {
"success": True,
"message": "Уже подключено к устройству",
"port": str(self.prt.port) if self.prt else "unknown"
}
# Create port connection (auto-detect if port not specified)
if port:
# Manual port specification
try:
self.prt = cmd.setup_port_connection(port=port, baudrate=115200, timeout_sec=1)
cmd.open_port(self.prt)
dev.reset_port_settings(self.prt)
except Exception as e:
logger.error(f"Failed to connect to specified port {port}: {e}")
return {
"success": False,
"message": f"Не удалось подключиться к порту {port}: {str(e)}",
"port": port
}
else:
# Auto-detect USB ports
self.prt = dev.create_port_connection()
if self.prt is None:
logger.error("Failed to create port connection")
return {
"success": False,
"message": "Не удалось найти устройство. Проверьте подключение USB.",
"port": None
}
self.is_connected = True self.is_connected = True
self.current_status.connected = True self.current_status.connected = True
logger.info("Successfully connected to laser hardware (stub mode)") port_name = self.prt.port if hasattr(self.prt, 'port') else "unknown"
logger.info(f"Successfully connected to laser hardware on {port_name}")
# Request initial status
try:
time.sleep(0.2) # Give device time to initialize
self.get_status()
except Exception as e:
logger.warning(f"Failed to get initial status: {e}")
return { return {
"success": True, "success": True,
"message": f"Подключено к устройству (stub mode)", "message": f"Подключено к устройству на порту {port_name}",
"port": port or "auto-detected" "port": port_name
} }
except Exception as e: except Exception as e:
logger.error(f"Error connecting to laser hardware: {e}", exc_info=True) logger.error(f"Error connecting to laser hardware: {e}", exc_info=True)
self.is_connected = False
self.prt = None
return { return {
"success": False, "success": False,
"message": f"Ошибка подключения: {str(e)}", "message": f"Ошибка подключения: {str(e)}",
@ -184,6 +443,7 @@ class LaserController:
def disconnect(self) -> Dict[str, Any]: def disconnect(self) -> Dict[str, Any]:
""" """
Disconnect from laser control hardware. Disconnect from laser control hardware.
Stops any running cycle and closes serial port.
Returns: Returns:
Dictionary with success status and message Dictionary with success status and message
@ -195,8 +455,19 @@ class LaserController:
if self.is_running: if self.is_running:
self.stop_cycle() self.stop_cycle()
# Close serial port
if self.prt is not None:
try:
cmd.close_port(self.prt)
logger.info("Serial port closed")
except Exception as e:
logger.warning(f"Error closing serial port: {e}")
self.prt = None
self.is_connected = False self.is_connected = False
self.current_status.connected = False self.current_status.connected = False
self.last_data = None
logger.info("Successfully disconnected from laser hardware") logger.info("Successfully disconnected from laser hardware")

View File

@ -11,6 +11,7 @@ from vna_system.core.processors.storage.data_storage import DataStorage
from vna_system.core.settings.settings_manager import VNASettingsManager from vna_system.core.settings.settings_manager import VNASettingsManager
from vna_system.core.processors.manager import ProcessorManager from vna_system.core.processors.manager import ProcessorManager
from vna_system.core.processors.websocket_handler import ProcessorWebSocketHandler from vna_system.core.processors.websocket_handler import ProcessorWebSocketHandler
from vna_system.core.laser.laser_controller import LaserController
from vna_system.core.config import PROCESSORS_CONFIG_DIR_PATH from vna_system.core.config import PROCESSORS_CONFIG_DIR_PATH
# Global singleton instances # Global singleton instances
@ -23,3 +24,6 @@ data_storage = DataStorage()
processor_websocket_handler: ProcessorWebSocketHandler = ProcessorWebSocketHandler( processor_websocket_handler: ProcessorWebSocketHandler = ProcessorWebSocketHandler(
processor_manager, data_storage processor_manager, data_storage
) )
# Laser control system
laser_controller_instance: LaserController = LaserController()

View File

@ -40,6 +40,17 @@ async def lifespan(app: FastAPI):
processors=singletons.processor_manager.list_processors(), processors=singletons.processor_manager.list_processors(),
) )
# Try to connect to laser controller (optional, non-blocking)
logger.info("Attempting to connect to laser control hardware")
try:
result = singletons.laser_controller_instance.connect()
if result["success"]:
logger.info("Laser controller connected", port=result.get("port"))
else:
logger.warning("Laser controller connection failed (will retry on demand)", message=result.get("message"))
except Exception as e:
logger.warning("Failed to connect to laser controller on startup (will retry on demand)", error=str(e))
logger.info("VNA API Server started successfully") logger.info("VNA API Server started successfully")
yield yield
except Exception as exc: except Exception as exc:
@ -47,6 +58,14 @@ async def lifespan(app: FastAPI):
raise raise
logger.info("Shutting down VNA API Server") logger.info("Shutting down VNA API Server")
# Disconnect laser controller
if singletons.laser_controller_instance and singletons.laser_controller_instance.is_connected:
try:
singletons.laser_controller_instance.disconnect()
logger.info("Laser controller disconnected")
except Exception as e:
logger.warning("Error disconnecting laser controller", error=str(e))
if singletons.processor_manager: if singletons.processor_manager:
singletons.processor_manager.stop_processing() singletons.processor_manager.stop_processing()
logger.info("Processor system stopped") logger.info("Processor system stopped")

View File

@ -93,6 +93,7 @@ export const API = {
LASER: { LASER: {
START: `${API_BASE}/laser/start`, START: `${API_BASE}/laser/start`,
START_MANUAL: `${API_BASE}/laser/start-manual`,
STOP: `${API_BASE}/laser/stop`, STOP: `${API_BASE}/laser/stop`,
STATUS: `${API_BASE}/laser/status`, STATUS: `${API_BASE}/laser/status`,
CONNECT: `${API_BASE}/laser/connect`, CONNECT: `${API_BASE}/laser/connect`,

View File

@ -3,7 +3,6 @@
* Handles laser control interface with two modes: manual and scan * Handles laser control interface with two modes: manual and scan
*/ */
import { ButtonState } from '../utils.js';
import { apiPost } from '../api-client.js'; import { apiPost } from '../api-client.js';
import { API, NOTIFICATION_TYPES } from '../constants.js'; import { API, NOTIFICATION_TYPES } from '../constants.js';
@ -111,26 +110,30 @@ export class LaserManager {
} }
try { try {
ButtonState.disable(this.elements.startBtn); // Disable start button during request
this.elements.startBtn.disabled = true;
let parameters; let parameters;
let endpoint;
if (this.isManualMode) { if (this.isManualMode) {
// Manual mode - set fixed values // Manual mode - use simplified endpoint with only t1, t2, i1, i2
parameters = this.collectManualParameters(); parameters = this.collectManualParametersSimple();
endpoint = API.LASER.START_MANUAL;
} else { } else {
// Scan mode - set scan parameters // Scan mode - use full endpoint with scan parameters
parameters = this.collectScanParameters(); parameters = this.collectScanParameters();
endpoint = API.LASER.START;
} }
// Validate parameters // Validate parameters
if (!this.validateParameters(parameters)) { if (!this.validateParameters(parameters)) {
ButtonState.enable(this.elements.startBtn); this.elements.startBtn.disabled = false;
return; return;
} }
// Send start request // Send start request to appropriate endpoint
const response = await apiPost(API.LASER.START, parameters); const response = await apiPost(endpoint, parameters);
if (response.success) { if (response.success) {
this.isRunning = true; this.isRunning = true;
@ -146,13 +149,13 @@ export class LaserManager {
console.log('Laser cycle started:', parameters); console.log('Laser cycle started:', parameters);
} else { } else {
this.notify(ERROR, 'Ошибка', response.message); this.notify(ERROR, 'Ошибка', response.message);
ButtonState.enable(this.elements.startBtn); this.elements.startBtn.disabled = false;
} }
} catch (error) { } catch (error) {
console.error('Failed to start laser cycle:', error); console.error('Failed to start laser cycle:', error);
this.notify(ERROR, 'Ошибка', `Не удалось запустить цикл: ${error.message}`); this.notify(ERROR, 'Ошибка', `Не удалось запустить цикл: ${error.message}`);
ButtonState.enable(this.elements.startBtn); this.elements.startBtn.disabled = false;
} }
} }
@ -163,7 +166,8 @@ export class LaserManager {
} }
try { try {
ButtonState.disable(this.elements.stopBtn); // Disable stop button during request
this.elements.stopBtn.disabled = true;
const response = await apiPost(API.LASER.STOP, {}); const response = await apiPost(API.LASER.STOP, {});
@ -178,13 +182,13 @@ export class LaserManager {
console.log('Laser cycle stopped'); console.log('Laser cycle stopped');
} else { } else {
this.notify(ERROR, 'Ошибка', response.message); this.notify(ERROR, 'Ошибка', response.message);
ButtonState.enable(this.elements.stopBtn); this.elements.stopBtn.disabled = false;
} }
} catch (error) { } catch (error) {
console.error('Failed to stop laser cycle:', error); console.error('Failed to stop laser cycle:', error);
this.notify(ERROR, 'Ошибка', `Не удалось остановить цикл: ${error.message}`); this.notify(ERROR, 'Ошибка', `Не удалось остановить цикл: ${error.message}`);
ButtonState.enable(this.elements.stopBtn); this.elements.stopBtn.disabled = false;
} }
} }
@ -225,6 +229,16 @@ export class LaserManager {
}; };
} }
collectManualParametersSimple() {
// Simplified manual mode: only 4 parameters (t1, t2, i1, i2)
return {
t1: parseFloat(this.elements.temp1.value),
t2: parseFloat(this.elements.temp2.value),
i1: parseFloat(this.elements.current1.value),
i2: parseFloat(this.elements.current2.value)
};
}
collectScanParameters() { collectScanParameters() {
// Scan mode: scan current 1 while keeping other parameters fixed // Scan mode: scan current 1 while keeping other parameters fixed
return { return {
@ -260,6 +274,32 @@ export class LaserManager {
} }
validateParameters(params) { validateParameters(params) {
// Check if simplified format (t1, t2, i1, i2)
if ('t1' in params && 't2' in params && 'i1' in params && 'i2' in params) {
// Simplified format validation
const values = [params.t1, params.t2, params.i1, params.i2];
if (values.some(v => isNaN(v))) {
this.notify(ERROR, 'Ошибка валидации', 'Все поля должны быть заполнены корректными числами');
return false;
}
// Check temperature ranges
if (params.t1 < -1 || params.t1 > 45 || params.t2 < -1 || params.t2 > 45) {
this.notify(ERROR, 'Ошибка валидации', 'Температура должна быть от -1 до 45°C');
return false;
}
// Check current ranges
if (params.i1 < 15 || params.i1 > 70 || params.i2 < 15 || params.i2 > 60) {
this.notify(ERROR, 'Ошибка валидации', 'Ток лазера 1: 15-70мА, лазера 2: 15-60мА');
return false;
}
return true;
}
// Full format validation (original)
// Check for NaN values // Check for NaN values
const values = [ const values = [
params.min_temp_1, params.max_temp_1, params.min_temp_1, params.max_temp_1,