working_laser control

This commit is contained in:
awe
2025-11-27 18:11:33 +03:00
parent c64f2a4d6b
commit 2742cfe856
4 changed files with 85 additions and 54 deletions

View File

@ -581,8 +581,8 @@ class VNADataAcquisition:
# Still process the data if it has valid bits
# Convert to dB if non-zero
if raw_value != 0:
points.append((float(int(raw_value)), 0.))
points.append((float(int(raw_value)), 0.))
if i == 0 or i == 100:
logger.debug(f"raw_value: {raw_value}, marker: {marker}, word= {word}" )
return points

View File

@ -54,7 +54,7 @@ SWEEP_CMD_LEN = 515
SWEEP_CMD_PREFIX = bytes([0xAA, 0x00, 0xDA])
MEAS_HEADER_LEN = 21
MEAS_CMDS_PER_SWEEP = 17
EXPECTED_POINTS_PER_SWEEP = 1000
EXPECTED_POINTS_PER_SWEEP = 1024
# -----------------------------------------------------------------------------
# Buffer settings

View File

@ -4,8 +4,8 @@ from typing import Optional, Dict, Any
from datetime import datetime
from ...api.models.laser import LaserParameters, LaserStatus
from .RadioPhotonic_PCB_PC_software import device_interaction as dev
from .RadioPhotonic_PCB_PC_software import device_commands as cmd
from .RFG_PCB_PC_controller_supersimple import device_interaction as dev
from .RFG_PCB_PC_controller_supersimple import device_commands as cmd
logger = logging.getLogger(__name__)
@ -14,8 +14,16 @@ class LaserController:
"""
Controller for laser control system.
Uses RFG_PCB_PC_controller_supersimple module for device communication.
Communicates with RadioPhotonic board via serial port (115200 baud).
Supports both manual control and automated scanning modes.
Control Logic (RFG implementation):
- Manual mode: send_control_parameters() to set steady current/temperature
- Scan mode: send_control_parameters() THEN send_task_command() for scanning
- Stop: send_control_parameters() to return to steady state (not reset)
TaskType format: String-based ("TT_CHANGE_CURR_1", "TT_CHANGE_CURR_2")
"""
def __init__(self):
@ -96,11 +104,11 @@ class LaserController:
def _start_manual_mode(self, parameters: LaserParameters) -> Dict[str, Any]:
"""
Start manual control mode with fixed T1, T2, I1, I2 values.
Uses DECODE_ENABLE (0x1111) command.
Uses DECODE_ENABLE (0x1111) command - simplified version from RFG example.
"""
try:
# Prepare control parameters
params = {
# Prepare control parameters (simplified - direct send)
ctrl_params = {
'Temp_1': parameters.min_temp_1,
'Temp_2': parameters.min_temp_2,
'Iset_1': parameters.min_current_1,
@ -113,12 +121,12 @@ class LaserController:
}
logger.info(f"Sending manual control parameters:")
logger.info(f" T1: {params['Temp_1']}°C, T2: {params['Temp_2']}°C")
logger.info(f" I1: {params['Iset_1']} mA, I2: {params['Iset_2']} mA")
# Send control parameters to device
dev.send_control_parameters(self.prt, params)
logger.info(f" T1: {ctrl_params['Temp_1']}°C, T2: {ctrl_params['Temp_2']}°C")
logger.info(f" I1: {ctrl_params['Iset_1']} mA, I2: {ctrl_params['Iset_2']} mA")
# Send control parameters to device (steady current mode)
dev.send_control_parameters(self.prt, ctrl_params)
logger.info(dev.request_data(self.prt))
return {
"success": True,
"message": "Ручное управление запущено",
@ -131,13 +139,14 @@ class LaserController:
def _start_scan_mode(self, parameters: LaserParameters) -> Dict[str, Any]:
"""
Start automated scan mode.
Uses TASK_ENABLE (0x7777) command with TaskType.
Start automated scan mode - simplified version from RFG example.
RFG flow: send_control_parameters() THEN send_task_command()
This initializes the device before starting the scan.
"""
try:
# First, send initial control parameters to set the device to starting values
logger.info("Setting initial control parameters before scan...")
initial_params = {
# Step 1: Start lasers with initial control parameters (from RFG example lines 72-75)
logger.info("Step 1: Starting lasers with control parameters...")
ctrl_params = {
'Temp_1': parameters.min_temp_1,
'Temp_2': parameters.min_temp_2,
'Iset_1': parameters.min_current_1,
@ -149,24 +158,20 @@ class LaserController:
'Message_ID': self.message_id
}
logger.info(f"Initial parameters: T1={initial_params['Temp_1']}°C, T2={initial_params['Temp_2']}°C, "
f"I1={initial_params['Iset_1']} mA, I2={initial_params['Iset_2']} mA")
dev.send_control_parameters(self.prt, ctrl_params)
logger.info(dev.request_data(self.prt))
logger.info("Control parameters sent - lasers started")
# Send initial control parameters
dev.send_control_parameters(self.prt, initial_params)
logger.info("Initial control parameters sent successfully")
# Small delay as in RFG example (line 75: sleep(2))
time.sleep(2)
# Small delay to allow device to process
import time
time.sleep(0.2)
# Determine which parameter to scan
# Step 2: Determine which parameter to scan
if parameters.enable_c1:
task_type = cmd.TaskType.ChangeCurrentLD1.value
task_type = "TT_CHANGE_CURR_1" # RFG uses string format
scan_param = "Current Laser 1"
logger.info(f"Scanning Current Laser 1: {parameters.min_current_1} to {parameters.max_current_1} mA")
elif parameters.enable_c2:
task_type = cmd.TaskType.ChangeCurrentLD2.value
task_type = "TT_CHANGE_CURR_2" # RFG uses string format
scan_param = "Current Laser 2"
logger.info(f"Scanning Current Laser 2: {parameters.min_current_2} to {parameters.max_current_2} mA")
elif parameters.enable_t1:
@ -188,10 +193,11 @@ class LaserController:
"parameters": None
}
# Build task parameters based on task type
sending_param = {
# Step 3: Build task parameters for scan (from RFG example lines 78-82)
logger.info("Step 2: Switching to current sweep mode...")
task_params = {
'TaskType': task_type,
'Dt': parameters.delta_time / 1000.0, # Convert μs to ms
'Dt': parameters.delta_time ,
'Tau': parameters.tau,
'ProportionalCoeff_1': self.proportional_coeff_1,
'ProportionalCoeff_2': self.proportional_coeff_2,
@ -200,8 +206,8 @@ class LaserController:
}
# Add scan-specific parameters
if task_type == cmd.TaskType.ChangeCurrentLD1.value:
sending_param.update({
if task_type == "TT_CHANGE_CURR_1":
task_params.update({
'MinC1': parameters.min_current_1,
'MaxC1': parameters.max_current_1,
'DeltaC1': parameters.delta_current_1,
@ -209,8 +215,8 @@ class LaserController:
'I2': parameters.min_current_2, # Fixed
'T2': parameters.min_temp_2 # Fixed
})
elif task_type == cmd.TaskType.ChangeCurrentLD2.value:
sending_param.update({
elif task_type == "TT_CHANGE_CURR_2":
task_params.update({
'MinC2': parameters.min_current_2,
'MaxC2': parameters.max_current_2,
'DeltaC2': parameters.delta_current_2,
@ -219,8 +225,10 @@ class LaserController:
'T1': parameters.min_temp_1 # Fixed
})
# Send task command to device
dev.send_task_command(self.prt, sending_param)
# Send task command to device (start current variation)
dev.send_task_command(self.prt, task_params)
# print(dev.request_data(self.prt))
logger.info("Task command sent successfully - scan started")
return {
"success": True,
@ -235,7 +243,8 @@ class LaserController:
def stop_cycle(self) -> Dict[str, Any]:
"""
Stop current laser control cycle.
Sends reset command to device.
Uses send_control_parameters to return to steady current mode (RFG example logic).
This stops any ongoing scan and maintains the last known parameters.
Returns:
Dictionary with success status and message
@ -247,20 +256,40 @@ class LaserController:
logger.info("=" * 60)
if self.is_connected and self.prt is not None:
# Send reset command to device
try:
dev.reset_port_settings(self.prt)
logger.info("Device reset command sent")
except Exception as e:
logger.warning(f"Failed to send reset command: {e}")
# Stop current variation - go to steady current mode
# Use last known parameters or defaults
if self.current_parameters:
ctrl_params = {
'Temp_1': self.current_parameters.min_temp_1,
'Temp_2': self.current_parameters.min_temp_2,
'Iset_1': self.current_parameters.min_current_1,
'Iset_2': self.current_parameters.min_current_2,
'ProportionalCoeff_1': self.proportional_coeff_1,
'ProportionalCoeff_2': self.proportional_coeff_2,
'IntegralCoeff_1': self.integral_coeff_1,
'IntegralCoeff_2': self.integral_coeff_2,
'Message_ID': self.message_id
}
logger.info(f"Stopping scan - returning to steady current mode:")
logger.info(f" T1: {ctrl_params['Temp_1']}°C, T2: {ctrl_params['Temp_2']}°C")
logger.info(f" I1: {ctrl_params['Iset_1']} mA, I2: {ctrl_params['Iset_2']} mA")
try:
dev.send_control_parameters(self.prt, ctrl_params)
logger.info(dev.request_data(self.prt))
logger.info("Control parameters sent - laser in steady state")
except Exception as e:
logger.warning(f"Failed to send control parameters: {e}")
else:
logger.warning("No current parameters stored, cannot send steady state command")
self.is_running = False
self.current_status.is_running = False
self.current_parameters = None
return {
"success": True,
"message": "Цикл управления лазером остановлен"
"message": "Цикл управления лазером остановлен (steady current mode)"
}
except Exception as e:
@ -480,15 +509,17 @@ class LaserController:
if self.is_running:
self.stop_cycle()
# Close serial port
# Close serial port using RFG close_connection
if self.prt is not None:
try:
# cmd.close_port(self.prt)
dev.close_connection(self.prt)
logger.info("Serial port closed")
except Exception as e:
logger.warning(f"Error closing serial port: {e}")
self.prt = None
# Fallback: just set to None
self.prt = None
else:
self.prt = None
self.is_connected = False
self.current_status.connected = False

View File

@ -1,6 +1,6 @@
{
"y_min": -65,
"y_max": 40,
"y_min": -80,
"y_max": -15,
"autoscale": true,
"show_magnitude": true,
"show_phase": false,