works: setup generator_PCB, set params (temps, currents, PI coeffs) (switches PCB to constant current on lasers), set current sweep params (switches PCB to varying laser currents).
This commit is contained in:
358
device_commands.py
Normal file
358
device_commands.py
Normal file
@ -0,0 +1,358 @@
|
|||||||
|
from enum import IntEnum
|
||||||
|
from serial import Serial
|
||||||
|
from serial.tools import list_ports
|
||||||
|
import device_conversion as cnv
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
#### ---- Constants
|
||||||
|
|
||||||
|
GET_DATA_TOTAL_LENGTH = 30 # Total number of bytes when getting DATA
|
||||||
|
SEND_PARAMS_TOTAL_LENGTH = 30 # Total number of bytes when sending parameters
|
||||||
|
TASK_ENABLE_COMMAND_LENGTH = 32 # Total number of bytes when sending TASK_ENABLE command
|
||||||
|
|
||||||
|
#defines from Generator_PCB .c code
|
||||||
|
TT_CHANGE_CURR_1 = 0x1
|
||||||
|
TT_CHANGE_CURR_2 = 0x2
|
||||||
|
TT_CHANGE_TEMP_1 = 0x3
|
||||||
|
TT_CHANGE_TEMP_2 = 0x4
|
||||||
|
|
||||||
|
|
||||||
|
class TaskType(IntEnum):
|
||||||
|
Manual = 0x00
|
||||||
|
ChangeCurrentLD1 = 0x01
|
||||||
|
ChangeCurrentLD2 = 0x02
|
||||||
|
ChangeTemperatureLD1 = 0x03
|
||||||
|
ChangeTemperatureLD2 = 0x04
|
||||||
|
#### ---- Auxiliary functions
|
||||||
|
|
||||||
|
def int_to_hex(inp):
|
||||||
|
if inp<0 or inp>65535:
|
||||||
|
print("Error. Input should be within [0, 65535]. Returning N=0.")
|
||||||
|
return "0000"
|
||||||
|
return f"{inp:#0{6}x}"[2:]
|
||||||
|
|
||||||
|
def crc(lst):
|
||||||
|
crc=int("0x"+lst[0],16)
|
||||||
|
for i in range(1,len(lst)):
|
||||||
|
crc=crc^int("0x"+lst[i],16)
|
||||||
|
return int_to_hex(crc)
|
||||||
|
|
||||||
|
def show_hex_string(string):
|
||||||
|
return "".join("\\x{}".format(char.encode()) for char in (string[i:i+2] for i in range(0, len(string), 2)))
|
||||||
|
|
||||||
|
|
||||||
|
def flipfour(s):
|
||||||
|
''' Changes "abcd" to "cdba"
|
||||||
|
'''
|
||||||
|
if len(s) != 4:
|
||||||
|
print("Error. Trying to flip string with length not equal to 4.")
|
||||||
|
return None
|
||||||
|
return s[2:4]+s[0:2]
|
||||||
|
|
||||||
|
|
||||||
|
#### ---- Port Operations
|
||||||
|
|
||||||
|
|
||||||
|
def setup_port_connection(baudrate: int, port: str, timeout_sec: float):
|
||||||
|
prt = Serial()
|
||||||
|
prt.baudrate = baudrate
|
||||||
|
prt.port = port
|
||||||
|
prt.timeout = timeout_sec
|
||||||
|
return prt
|
||||||
|
|
||||||
|
|
||||||
|
def open_port(prt):
|
||||||
|
prt.open()
|
||||||
|
|
||||||
|
if prt.is_open:
|
||||||
|
print("Connection succesful. Port is opened.")
|
||||||
|
print("Port parameters:", prt)
|
||||||
|
print("")
|
||||||
|
else:
|
||||||
|
print("Can't open port. Exiting program.")
|
||||||
|
exit()
|
||||||
|
|
||||||
|
|
||||||
|
def close_port(prt):
|
||||||
|
prt.close()
|
||||||
|
print("")
|
||||||
|
if prt.is_open:
|
||||||
|
print("Can't close port. Exiting program.")
|
||||||
|
exit()
|
||||||
|
else:
|
||||||
|
print("Port is closed. Exiting program.")
|
||||||
|
exit()
|
||||||
|
|
||||||
|
|
||||||
|
#### ---- Interacting with device: low-level
|
||||||
|
|
||||||
|
# ---- Sending commands
|
||||||
|
|
||||||
|
def send_TASK_ENABLE(prt, bytestring):
|
||||||
|
''' Set task parameters (x7777 + ...).
|
||||||
|
Expected device answer: STATE.
|
||||||
|
'''
|
||||||
|
if len(bytestring) != TASK_ENABLE_COMMAND_LENGTH:
|
||||||
|
print("Error. Wrong parameter string for TASK_ENABLE.")
|
||||||
|
return None
|
||||||
|
prt.write(bytestring)
|
||||||
|
print("Sent: Set control parameters (TASK_ENABLE).")
|
||||||
|
|
||||||
|
def send_DECODE_ENABLE(prt, bytestring):
|
||||||
|
''' Set control parameters (x1111 + ...).
|
||||||
|
Expected device answer: STATE.
|
||||||
|
'''
|
||||||
|
if len(bytestring) != SEND_PARAMS_TOTAL_LENGTH:
|
||||||
|
print("Error. Wrong parameter string for DECODE_ENABLE.")
|
||||||
|
return None
|
||||||
|
prt.write(bytestring)
|
||||||
|
print("Sent: Set control parameters (DECODE_ENABLE).")
|
||||||
|
|
||||||
|
|
||||||
|
def send_DEFAULT_ENABLE(prt):
|
||||||
|
''' Reset the device (x2222).
|
||||||
|
Expected device answer: STATE.
|
||||||
|
'''
|
||||||
|
input = bytearray.fromhex(flipfour("2222"))
|
||||||
|
prt.write(input)
|
||||||
|
print("Sent: Reset device (DEFAULT_ENABLE).")
|
||||||
|
|
||||||
|
|
||||||
|
def send_TRANSS_ENABLE(prt):
|
||||||
|
''' Request all saved data (x3333).
|
||||||
|
Expected device answer: SAVED_DATA.
|
||||||
|
'''
|
||||||
|
# TODO later.
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def send_TRANS_ENABLE(prt):
|
||||||
|
''' Request last piece of data (x4444).
|
||||||
|
Expected device answer: DATA.
|
||||||
|
'''
|
||||||
|
input = bytearray.fromhex(flipfour("4444"))
|
||||||
|
prt.write(input)
|
||||||
|
print("Sent: Request last data (TRANS_ENABLE).")
|
||||||
|
|
||||||
|
|
||||||
|
def send_REMOVE_FILE(prt):
|
||||||
|
''' Delete saved data (x5555).
|
||||||
|
Expected device answer: STATE.
|
||||||
|
'''
|
||||||
|
input = bytearray.fromhex(flipfour("5555"))
|
||||||
|
prt.write(input)
|
||||||
|
print("Sent: Delete saved data (REMOVE_FILE).")
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def send_STATE(prt):
|
||||||
|
''' Request state (x6666).
|
||||||
|
Expected device answer: STATE.
|
||||||
|
'''
|
||||||
|
input = bytearray.fromhex(flipfour("6666"))
|
||||||
|
prt.write(input)
|
||||||
|
print("Sent: Request state (STATE).")
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
# ---- Getting data
|
||||||
|
|
||||||
|
|
||||||
|
def get_STATE(prt):
|
||||||
|
''' Get decoded state of the device in byte format (2 bytes).
|
||||||
|
'''
|
||||||
|
|
||||||
|
print("Received "+str(prt.inWaiting())+" bytes.")
|
||||||
|
if prt.inWaiting()!=2:
|
||||||
|
print("Error. Couldn't get STATE data. prt.inWaiting():", prt.inWaiting())
|
||||||
|
print("Flushing input data:", prt.read(prt.inWaiting()))
|
||||||
|
# print("Flushing input data:", prt.read(2), prt.read(2))
|
||||||
|
return None
|
||||||
|
|
||||||
|
out_bytes = prt.read(2)
|
||||||
|
return out_bytes
|
||||||
|
|
||||||
|
|
||||||
|
def get_DATA(prt):
|
||||||
|
''' Get decoded state of the device in byte format (426 bytes).
|
||||||
|
'''
|
||||||
|
|
||||||
|
print("Received "+str(prt.inWaiting())+" bytes.\n")
|
||||||
|
if prt.inWaiting()!=GET_DATA_TOTAL_LENGTH:
|
||||||
|
print("Error. Couldn't get DATA data.")
|
||||||
|
print("receiven data len:", prt.inWaiting())
|
||||||
|
return None
|
||||||
|
|
||||||
|
out_bytes = prt.read(GET_DATA_TOTAL_LENGTH)
|
||||||
|
return out_bytes
|
||||||
|
|
||||||
|
|
||||||
|
#### ---- Interacting with device: decode/encode messages
|
||||||
|
|
||||||
|
# ---- Encoding functions
|
||||||
|
def CalculateCRC(data):
|
||||||
|
CRC_input = []
|
||||||
|
for i in range(1,int(len(data)/4)):
|
||||||
|
CRC_input.append(data[4*i:4*i+4])
|
||||||
|
return crc(CRC_input)
|
||||||
|
|
||||||
|
def encode_Setup():
|
||||||
|
bits=['0']*16
|
||||||
|
|
||||||
|
bits[15] = "1" # enable work
|
||||||
|
bits[14] = "1" # enable 5v1
|
||||||
|
bits[13] = "1" # enable 5v2
|
||||||
|
bits[12] = "1" # enable LD1
|
||||||
|
bits[11] = "1" # enable LD2
|
||||||
|
bits[10] = "1" # enable REF1
|
||||||
|
bits[9] = "1" # enable REF2
|
||||||
|
bits[8] = "1" # enable TEC1
|
||||||
|
bits[7] = "1" # enable TEC2
|
||||||
|
bits[6] = "1" # enable temp stab 1
|
||||||
|
bits[5] = "1" # enable temp stab 2
|
||||||
|
bits[4] = "0" # enable sd save
|
||||||
|
bits[3] = "1" # enable PI1 coef read
|
||||||
|
bits[2] = "1" # enable PI2 coef read
|
||||||
|
bits[1] = "0" # reserved
|
||||||
|
bits[0] = "0" # reserved
|
||||||
|
|
||||||
|
s="".join([str(i) for i in bits])
|
||||||
|
return hex(int(s,2))[2:]
|
||||||
|
|
||||||
|
def create_TaskEnableCommand(sending_param):
|
||||||
|
data = flipfour("7777") # Word 0
|
||||||
|
data += flipfour(encode_Setup()) # Word 1
|
||||||
|
# data += flipfour(int_to_hex(sending_param['TaskType'])) # Word 2
|
||||||
|
match sending_param['TaskType']:
|
||||||
|
case "TT_CHANGE_CURR_1":
|
||||||
|
data += flipfour(int_to_hex(1))
|
||||||
|
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['MinC1']))) # Word 3
|
||||||
|
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['MaxC1']))) # Word 4
|
||||||
|
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['DeltaC1']))) # Word 5
|
||||||
|
data += flipfour(int_to_hex(int(sending_param['Dt']*100))) # Word 6
|
||||||
|
data += flipfour(int_to_hex(cnv.conv_T_C_to_N(sending_param['T1']))) # Word 7
|
||||||
|
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['I2']))) # Word 8
|
||||||
|
data += flipfour(int_to_hex(cnv.conv_T_C_to_N(sending_param['T2']))) # Word 9
|
||||||
|
case "TT_CHANGE_CURR_2":
|
||||||
|
data += flipfour(int_to_hex(2))
|
||||||
|
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['MinC2']))) # Word 3
|
||||||
|
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['MaxC2']))) # Word 4
|
||||||
|
data += flipfour(int_to_hex(int(sending_param['DeltaC2']*100))) # Word 5
|
||||||
|
data += flipfour(int_to_hex(int(sending_param['Dt']*100))) # Word 6
|
||||||
|
data += flipfour(int_to_hex(cnv.conv_T_C_to_N(sending_param['T2']))) # Word 7
|
||||||
|
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['I1']))) # Word 8
|
||||||
|
data += flipfour(int_to_hex(cnv.conv_T_C_to_N(sending_param['T1']))) # Word 9
|
||||||
|
case "TT_CHANGE_TEMP_1":
|
||||||
|
data += flipfour(int_to_hex(3))
|
||||||
|
raise Exception("Temperature changing is not implemented yet")
|
||||||
|
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['MinT1']))) # Word 3
|
||||||
|
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['MaxT1']))) # Word 4
|
||||||
|
data += flipfour(int_to_hex(sending_param['DeltaT1']*100)) # Word 5
|
||||||
|
data += flipfour(int_to_hex(sending_param['Dt']*100)) # Word 6
|
||||||
|
data += flipfour(int_to_hex(cnv.conv_T_C_to_N(sending_param['I1']))) # Word 7
|
||||||
|
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['I2']))) # Word 8
|
||||||
|
data += flipfour(int_to_hex(cnv.conv_T_C_to_N(sending_param['T2']))) # Word 9
|
||||||
|
case "TT_CHANGE_TEMP_2":
|
||||||
|
data += flipfour(int_to_hex(4))
|
||||||
|
raise Exception("Temperature changing is not implemented yet")
|
||||||
|
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['MinT2']))) # Word 3
|
||||||
|
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['MaxT2']))) # Word 4
|
||||||
|
data += flipfour(int_to_hex(sending_param['DeltaT2']*100)) # Word 5
|
||||||
|
data += flipfour(int_to_hex(sending_param['Dt']*100)) # Word 6
|
||||||
|
data += flipfour(int_to_hex(cnv.conv_T_C_to_N(sending_param['I2']))) # Word 7
|
||||||
|
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['I1']))) # Word 8
|
||||||
|
data += flipfour(int_to_hex(cnv.conv_T_C_to_N(sending_param['T1']))) # Word 9
|
||||||
|
case _:
|
||||||
|
raise Exception(f"Undefined TaskType:{sending_param['TaskType']}")
|
||||||
|
data += flipfour(int_to_hex(int(sending_param['Tau']))) # Word 10
|
||||||
|
data += flipfour(int_to_hex(sending_param['ProportionalCoeff_1'])) # Word 11
|
||||||
|
data += flipfour(int_to_hex(sending_param['IntegralCoeff_1'])) # Word 12
|
||||||
|
data += flipfour(int_to_hex(sending_param['ProportionalCoeff_2'])) # Word 13
|
||||||
|
data += flipfour(int_to_hex(sending_param['IntegralCoeff_2'])) # Word 14
|
||||||
|
data += CalculateCRC(data) # Word 15
|
||||||
|
|
||||||
|
return bytearray.fromhex(data)
|
||||||
|
|
||||||
|
def encode_Input(params):
|
||||||
|
|
||||||
|
if params is None:
|
||||||
|
return bytearray.fromhex("1111"+"00"*14)
|
||||||
|
|
||||||
|
data = flipfour("1111") # Word 0
|
||||||
|
data += flipfour(encode_Setup()) # Word 1
|
||||||
|
data += flipfour(int_to_hex(cnv.conv_T_C_to_N(params['Temp_1']))) # Word 2
|
||||||
|
data += flipfour(int_to_hex(cnv.conv_T_C_to_N(params['Temp_2']))) # Word 3
|
||||||
|
data += flipfour("0000")*3 # Words 4-6
|
||||||
|
data += flipfour(int_to_hex(params['ProportionalCoeff_1'])) # Word 7
|
||||||
|
data += flipfour(int_to_hex(params['IntegralCoeff_1'])) # Word 8
|
||||||
|
data += flipfour(int_to_hex(params['ProportionalCoeff_2'])) # Word 9
|
||||||
|
data += flipfour(int_to_hex(params['IntegralCoeff_2'])) # Word 10
|
||||||
|
data += flipfour(params['Message_ID']) # Word 11
|
||||||
|
|
||||||
|
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(params['Iset_1']))) # Word 12
|
||||||
|
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(params['Iset_2']))) # Word 13
|
||||||
|
|
||||||
|
CRC_input = []
|
||||||
|
for i in range(1,int(len(data)/4)):
|
||||||
|
CRC_input.append(data[4*i:4*i+4])
|
||||||
|
|
||||||
|
CRC = crc(CRC_input)
|
||||||
|
data += CRC # Word 14
|
||||||
|
|
||||||
|
return bytearray.fromhex(data)
|
||||||
|
|
||||||
|
|
||||||
|
# ---- Decoding functions
|
||||||
|
|
||||||
|
def decode_STATE(state):
|
||||||
|
st = flipfour(state)
|
||||||
|
if st == '0000':
|
||||||
|
status = "All ok."
|
||||||
|
elif st == '0001':
|
||||||
|
status = "SD Card reading/writing error (SD_ERR)."
|
||||||
|
elif st == '0002':
|
||||||
|
status = "Command error (UART_ERR)."
|
||||||
|
elif st == '0004':
|
||||||
|
status = "Wrong parameter value error (UART_DECODE_ERR)."
|
||||||
|
elif st == '0008':
|
||||||
|
status = "Laser 1: TEC driver overheat (TEC1_ERR)."
|
||||||
|
elif st == '0010':
|
||||||
|
status = "Laser 2: TEC driver overheat (TEC2_ERR)."
|
||||||
|
elif st == '0020':
|
||||||
|
status = "Resetting system error (DEFAULT_ERR)."
|
||||||
|
elif st == '0040':
|
||||||
|
status = "File deletion error (REMOVE_ERR)."
|
||||||
|
else:
|
||||||
|
status = "Unknown or reserved error."
|
||||||
|
return status
|
||||||
|
|
||||||
|
|
||||||
|
def decode_DATA(dh):
|
||||||
|
def get_word(s,num):
|
||||||
|
return flipfour(s[num*2*2:num*2*2+4])
|
||||||
|
def get_int_word(s,num):
|
||||||
|
return int(get_word(s,num),16)
|
||||||
|
|
||||||
|
data = {}
|
||||||
|
data['datetime'] = datetime.now()
|
||||||
|
data['Header'] = get_word(dh, 0)
|
||||||
|
data['I1'] = cnv.conv_I_N_to_mA(get_int_word(dh, 1)) #LD1_param.POWER
|
||||||
|
data['I2'] = cnv.conv_I_N_to_mA(get_int_word(dh, 2)) #LD2_param.POWER
|
||||||
|
data['TO_LSB'] = get_int_word(dh, 3) #TO6_counter_LSB
|
||||||
|
data['TO_MSB'] = get_int_word(dh, 4) #TO6_counter_MSB
|
||||||
|
data['Temp_1'] = cnv.conv_T_N_to_C(get_int_word(dh, 5)) #LD1_param.LD_CURR_TEMP
|
||||||
|
data['Temp_2'] = cnv.conv_T_N_to_C(get_int_word(dh, 6)) #LD2_param.LD_CURR_TEMP
|
||||||
|
data['Temp_Ext_1'] = cnv.conv_TExt_N_to_C(get_int_word(dh, 7)) #U_Rt1_ext_Gain
|
||||||
|
data['Temp_Ext_2'] = cnv.conv_TExt_N_to_C(get_int_word(dh, 8)) #U_Rt2_ext_Gain
|
||||||
|
data['MON_3V3'] = cnv.conv_U3V3_N_to_V(get_int_word(dh, 9)) #3V_monitor
|
||||||
|
data['MON_5V1'] = cnv.conv_U5V_N_to_V(get_int_word(dh, 10)) #5V1_monitor
|
||||||
|
data['MON_5V2'] = cnv.conv_U5V_N_to_V(get_int_word(dh, 11)) #5V2_monitor
|
||||||
|
data['MON_7V0'] = cnv.conv_U7V_N_to_V(get_int_word(dh, 12)) #7V_monitor
|
||||||
|
data['Message_ID'] = get_word(dh, 13) # Last received command
|
||||||
|
data['CRC'] = get_word(dh, 14)
|
||||||
|
|
||||||
|
return data
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
68
device_conversion.py
Normal file
68
device_conversion.py
Normal file
@ -0,0 +1,68 @@
|
|||||||
|
|
||||||
|
|
||||||
|
import math
|
||||||
|
|
||||||
|
# ---- Conversion functions
|
||||||
|
|
||||||
|
VREF = 2.5 # Volts
|
||||||
|
|
||||||
|
R1 = 10000 # Ohm
|
||||||
|
R2 = 2200 # Ohm
|
||||||
|
R3 = 27000 # Ohm
|
||||||
|
R4 = 30000 # Ohm
|
||||||
|
R5 = 27000 # Ohm
|
||||||
|
R6 = 56000 # Ohm
|
||||||
|
|
||||||
|
RREF = 10 # Ohm (current-setting resistor) @1550 nm - 28.7 Ohm; @840 nm - 10 Ohm
|
||||||
|
|
||||||
|
R7 = 22000 # Ohm
|
||||||
|
R8 = 22000 # Ohm
|
||||||
|
R9 = 5100 # Ohm
|
||||||
|
R10 = 180000 # Ohm
|
||||||
|
|
||||||
|
class Task:
|
||||||
|
def __init__(self):
|
||||||
|
self.task_type = 0
|
||||||
|
# Here should be fields, contained task parameters
|
||||||
|
|
||||||
|
def conv_T_C_to_N(T):
|
||||||
|
Rt = 10000 * math.exp( 3900/(T+273) - 3900/298 )
|
||||||
|
U = VREF/(R5*(R3+R4)) * ( R1*R4*(R5+R6) - Rt*(R3*R6-R4*R5) ) / (Rt+R1)
|
||||||
|
N = int(U * 65535 / VREF)
|
||||||
|
if N<0 or N>65535:
|
||||||
|
print("Error converting T=" + str(T) + " to N=" + str(N) + ". N should be within [0, 65535]. Returning N=0.")
|
||||||
|
return N
|
||||||
|
|
||||||
|
def conv_T_N_to_C(N):
|
||||||
|
U = N*VREF/65535 # Volts
|
||||||
|
Rt = R1 * (VREF*R4*(R5+R6) - U*R5*(R3+R4)) / (U*R5*(R3+R4) + VREF*R3*R6 - VREF*R4*R5) # Ohm
|
||||||
|
T = 1 / (1/298 + 1/3900 * math.log(Rt/10000)) - 273 # In Celsius
|
||||||
|
return T
|
||||||
|
|
||||||
|
def conv_TExt_N_to_C(N):
|
||||||
|
U = N*VREF/4095*1/(1+100000/R10) + VREF*R9/(R8+R9) # Volts
|
||||||
|
Rt = R7*U/(VREF-U) # Ohm
|
||||||
|
T = 1 / (1/298 + 1/3455 * math.log(Rt/10000)) - 273 # In Celsius
|
||||||
|
return T
|
||||||
|
|
||||||
|
def conv_I_mA_to_N(I):
|
||||||
|
N = int(65535/2000 * RREF * I) # I in mA
|
||||||
|
if N<0 or N>65535:
|
||||||
|
print("Error converting I=" + str(I) + " to N=" + str(N) + ". N should be within [0, 65535]. Returning N=0.")
|
||||||
|
N=0
|
||||||
|
return N
|
||||||
|
|
||||||
|
def conv_I_N_to_mA(N):
|
||||||
|
return N*2.5/(65535*4.4) - 1/20.4 # I in mA
|
||||||
|
|
||||||
|
|
||||||
|
def conv_U3V3_N_to_V(u_int):
|
||||||
|
return u_int * 1.221 * 0.001 # Volts
|
||||||
|
|
||||||
|
def conv_U5V_N_to_V(u_int):
|
||||||
|
return u_int * 1.8315 * 0.001 # Volts
|
||||||
|
|
||||||
|
def conv_U7V_N_to_V(u_int):
|
||||||
|
return u_int * 6.72 * 0.001 # Volts
|
||||||
|
|
||||||
|
|
||||||
144
device_interaction.py
Normal file
144
device_interaction.py
Normal file
@ -0,0 +1,144 @@
|
|||||||
|
|
||||||
|
|
||||||
|
import time
|
||||||
|
|
||||||
|
from datetime import datetime
|
||||||
|
import device_commands as cmd
|
||||||
|
|
||||||
|
|
||||||
|
#### ---- Constants
|
||||||
|
|
||||||
|
WAIT_AFTER_SEND = 0.15 # Wait after sending command, before requesting input (in seconds).
|
||||||
|
|
||||||
|
|
||||||
|
#### ---- High-level port commands
|
||||||
|
'''
|
||||||
|
def create_port_connection():
|
||||||
|
prt = None
|
||||||
|
for port, _, _ in sorted(cmd.list_ports.comports()):
|
||||||
|
try:
|
||||||
|
prt = cmd.setup_port_connection(port=port, baudrate=115200, timeout_sec=1)
|
||||||
|
cmd.open_port(prt)
|
||||||
|
reset_port_settings(prt)
|
||||||
|
except:
|
||||||
|
prt.close()
|
||||||
|
continue
|
||||||
|
break
|
||||||
|
return prt
|
||||||
|
'''
|
||||||
|
def create_port_connection():
|
||||||
|
prt = None
|
||||||
|
print()
|
||||||
|
ports = []
|
||||||
|
for port, _,_ in sorted(cmd.list_ports.comports()):
|
||||||
|
ports.append(port)
|
||||||
|
|
||||||
|
#ONLY FOR LINUX!!!
|
||||||
|
have_ttyUSB = False
|
||||||
|
USB_ports = []
|
||||||
|
for port in ports:
|
||||||
|
if "USB" in port:
|
||||||
|
USB_ports.append(port)
|
||||||
|
if len(USB_ports):
|
||||||
|
ports = USB_ports
|
||||||
|
# print("ports:", ports)
|
||||||
|
|
||||||
|
|
||||||
|
# for port, _, _ in sorted(cmd.list_ports.comports()):
|
||||||
|
for port in ports:
|
||||||
|
try:
|
||||||
|
print("PORT:", port)
|
||||||
|
prt = cmd.setup_port_connection(port=port, baudrate=115200, timeout_sec=1)
|
||||||
|
cmd.open_port(prt)
|
||||||
|
reset_port_settings(prt)
|
||||||
|
except:
|
||||||
|
prt.close()
|
||||||
|
continue
|
||||||
|
break
|
||||||
|
return prt
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
# def setup_connection():
|
||||||
|
# prt = cmd.setup_port_connection()
|
||||||
|
# cmd.open_port(prt)
|
||||||
|
# return prt
|
||||||
|
|
||||||
|
|
||||||
|
def reset_port_settings(prt):
|
||||||
|
# Reset port settings and check status
|
||||||
|
cmd.send_DEFAULT_ENABLE(prt)
|
||||||
|
time.sleep(WAIT_AFTER_SEND)
|
||||||
|
status = cmd.get_STATE(prt).hex()
|
||||||
|
if status is not None:
|
||||||
|
print("Received: STATE. State status:", cmd.decode_STATE(status), "("+cmd.flipfour(status)+")")
|
||||||
|
print("")
|
||||||
|
|
||||||
|
|
||||||
|
def request_state(prt):
|
||||||
|
# Request data
|
||||||
|
cmd.send_STATE(prt)
|
||||||
|
time.sleep(WAIT_AFTER_SEND)
|
||||||
|
status = cmd.get_STATE(prt).hex()
|
||||||
|
if status is not None:
|
||||||
|
print("Received: STATE. State status:", cmd.decode_STATE(status), "("+cmd.flipfour(status)+")")
|
||||||
|
print("")
|
||||||
|
|
||||||
|
|
||||||
|
def send_control_parameters(prt, params):
|
||||||
|
# Send control parameters
|
||||||
|
hexstring = cmd.encode_Input(params)
|
||||||
|
cmd.send_DECODE_ENABLE(prt,hexstring)
|
||||||
|
time.sleep(WAIT_AFTER_SEND)
|
||||||
|
status = cmd.get_STATE(prt).hex()
|
||||||
|
if status is not None:
|
||||||
|
print("Received: STATE. State status:", cmd.decode_STATE(status), "("+cmd.flipfour(status)+")")
|
||||||
|
print("")
|
||||||
|
else:
|
||||||
|
print("")
|
||||||
|
|
||||||
|
def send_task_command(prt, sending_param):
|
||||||
|
# Send task command (TASK_ENABLE state in firmware)
|
||||||
|
hexstring = cmd.create_TaskEnableCommand(sending_param)
|
||||||
|
cmd.send_TASK_ENABLE(prt,hexstring)
|
||||||
|
time.sleep(WAIT_AFTER_SEND)
|
||||||
|
status = cmd.get_STATE(prt).hex()
|
||||||
|
if status is not None:
|
||||||
|
print("Received: STATE. State status:", cmd.decode_STATE(status), "("+cmd.flipfour(status)+")")
|
||||||
|
print("")
|
||||||
|
else:
|
||||||
|
print("")
|
||||||
|
|
||||||
|
def request_data(prt):
|
||||||
|
# Request data
|
||||||
|
cmd.send_TRANS_ENABLE(prt)
|
||||||
|
time.sleep(WAIT_AFTER_SEND)
|
||||||
|
data = cmd.get_DATA(prt).hex()
|
||||||
|
data_dict = []
|
||||||
|
if data is not None:
|
||||||
|
data_dict = cmd.decode_DATA(data)
|
||||||
|
return data_dict
|
||||||
|
|
||||||
|
|
||||||
|
def print_data(data):
|
||||||
|
def shorten(i):
|
||||||
|
return str(round(i, 2))
|
||||||
|
|
||||||
|
print("Data from device (time: "+datetime.now().strftime("%H:%M:%S:%f")+"):")
|
||||||
|
print("Message Header:", data['Header'], " Message ID:", data['Message_ID'])
|
||||||
|
print("Photodiode Current 1 ("+str(len(data['I1']))+" values):", \
|
||||||
|
shorten(data['I1']), shorten(data['I1'][1]), "...", \
|
||||||
|
shorten(data['I1']), shorten(data['I1'][-1]), "mA")
|
||||||
|
print("Photodiode Current 2 ("+str(len(data['I2']))+" values):", \
|
||||||
|
shorten(data['I2']), shorten(data['I2'][1]), "...", \
|
||||||
|
shorten(data['I2']), shorten(data['I2'][-1]), "mA")
|
||||||
|
print("Laser Temperature 1:", shorten(data['Temp_1']), "C")
|
||||||
|
print("Laser Temperature 2:", shorten(data['Temp_2']), "C")
|
||||||
|
print("Temperature of external thermistor 1:", shorten(data['Temp_Ext_1']), "C")
|
||||||
|
print("Temperature of external thermistor 2:", shorten(data['Temp_Ext_2']), "C")
|
||||||
|
print("Voltages 3V3: "+shorten(data['MON_3V3'])+"V 5V1: "+shorten(data['MON_5V1'])+ \
|
||||||
|
"V 5V2: "+shorten(data['MON_5V2'])+"V 7V0: "+shorten(data['MON_7V0'])+"V.")
|
||||||
|
|
||||||
|
def close_connection(prt):
|
||||||
|
cmd.close_port(prt)
|
||||||
89
main.py
Executable file
89
main.py
Executable file
@ -0,0 +1,89 @@
|
|||||||
|
#!/usr/bin/python3
|
||||||
|
from device_interaction import *
|
||||||
|
from time import sleep
|
||||||
|
|
||||||
|
|
||||||
|
INITIAL_TEMPERATURE_1 = 28 # Set initial temperature for Laser 1 in Celsius: from -1 to 45 C ??
|
||||||
|
INITIAL_TEMPERATURE_2 = 28.9 # Set initial temperature for Laser 2 in Celsius: from -1 to 45 C ??
|
||||||
|
INITIAL_CURRENT_1 = 33 # 64.0879 max # Set initial current for Laser 1, in mA
|
||||||
|
INITIAL_CURRENT_2 = 35 # 64.0879 max # Set initial current for Laser 2, in mA
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def set_initial_params():
|
||||||
|
params = {}
|
||||||
|
params['Temp_1'] = INITIAL_TEMPERATURE_1 # Initial temperature for Laser 1
|
||||||
|
params['Temp_2'] = INITIAL_TEMPERATURE_2 # Initial temperature for Laser 2
|
||||||
|
params['ProportionalCoeff_1'] = int(10*256) # Proportional coefficient for temperature stabilizatoin for Laser 1 <-- ToDo (why int?)
|
||||||
|
params['ProportionalCoeff_2'] = int(10*256) # Proportional coefficient for temperature stabilizatoin for Laser 2 <-- ToDo (why int?)
|
||||||
|
params['IntegralCoeff_1'] = int(0.5*256) # Integral coefficient for temperature stabilizatoin for Laser 1 <-- ToDo (why int?)
|
||||||
|
params['IntegralCoeff_2'] = int(0.5*256) # Integral coefficient for temperature stabilizatoin for Laser 2 <-- ToDo (why int?)
|
||||||
|
params['Message_ID'] = "00FF" # Send Message ID (hex format)
|
||||||
|
params['Iset_1'] = INITIAL_CURRENT_1 # Currency value array for Laser 1, in mA
|
||||||
|
params['Iset_2'] = INITIAL_CURRENT_2 # Currency value array for Laser 2, in mA
|
||||||
|
params['Min_Temp_1'] = INITIAL_TEMPERATURE_1
|
||||||
|
params['Max_Temp_1'] = 28
|
||||||
|
params['Min_Current_1'] = INITIAL_CURRENT_1
|
||||||
|
params['Max_Current_1'] = 70.0 #50
|
||||||
|
params['Delta_Temp_1'] = 0.05
|
||||||
|
params['Delta_Current_1'] = 0.05
|
||||||
|
params['Min_Temp_2'] = INITIAL_TEMPERATURE_2
|
||||||
|
params['Max_Temp_2'] = 28
|
||||||
|
params['Min_Current_2'] = INITIAL_CURRENT_2
|
||||||
|
params['Max_Current_2'] = 60 # 50
|
||||||
|
params['Delta_Temp_2'] = 0.05
|
||||||
|
params['Delta_Current_2'] = 0.05
|
||||||
|
params['Delta_Time'] = 50
|
||||||
|
params['Tau'] = 10
|
||||||
|
return params
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def set_task_params():
|
||||||
|
task_params = {}
|
||||||
|
task_params["ProportionalCoeff_1"] = 2560 #
|
||||||
|
task_params["IntegralCoeff_1"] = 128#
|
||||||
|
task_params["ProportionalCoeff_2"] = 2560#
|
||||||
|
task_params["IntegralCoeff_2"] = 128#
|
||||||
|
task_params["TaskType"] = "TT_CHANGE_CURR_1" # TT_CHANGE_CURR_1, TT_CHANGE_CURR_2, TT_CHANGE_TEMP_1, TT_CHANGE_TEMP_2
|
||||||
|
task_params["MinC1"] = 33.0#
|
||||||
|
task_params["MaxC1"] = 70.0#
|
||||||
|
task_params["DeltaC1"] = 0.05#
|
||||||
|
task_params["T1"] = 28.0#
|
||||||
|
task_params["T2"] = 28.9#
|
||||||
|
task_params["I2"] = 35.0#
|
||||||
|
task_params["Dt"] = 50#
|
||||||
|
task_params["Tau"] = 10.0#
|
||||||
|
return task_params
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
port = create_port_connection()
|
||||||
|
reset_port_settings(port)
|
||||||
|
# rcvd_data = request_data(port)
|
||||||
|
# print_data(rcvd_data)
|
||||||
|
print(request_data(port))
|
||||||
|
ctrl_params = set_initial_params()
|
||||||
|
|
||||||
|
#start lasers
|
||||||
|
send_control_parameters(port, ctrl_params)
|
||||||
|
print(request_data(port))
|
||||||
|
sleep(2)
|
||||||
|
|
||||||
|
#start current variation
|
||||||
|
task_params = set_task_params()
|
||||||
|
|
||||||
|
#switch to laser_current sweep mode
|
||||||
|
send_task_command(port, task_params)
|
||||||
|
print(request_data(port))
|
||||||
|
sleep(2)
|
||||||
|
|
||||||
|
|
||||||
|
#stop current variation (goto steady current)
|
||||||
|
send_control_parameters(port, ctrl_params)
|
||||||
|
print(request_data(port))
|
||||||
|
sleep(2)
|
||||||
Reference in New Issue
Block a user