init. Not working due to the malunction of PySimpleGUI lib
This commit is contained in:
346
device_commands.py
Normal file
346
device_commands.py
Normal file
@ -0,0 +1,346 @@
|
||||
from enum import IntEnum
|
||||
from serial import Serial
|
||||
from serial.tools import list_ports
|
||||
import device_conversion as cnv
|
||||
from datetime import datetime
|
||||
|
||||
#### ---- Constants
|
||||
|
||||
GET_DATA_TOTAL_LENGTH = 30 # Total number of bytes when getting DATA
|
||||
SEND_PARAMS_TOTAL_LENGTH = 30 # Total number of bytes when sending parameters
|
||||
TASK_ENABLE_COMMAND_LENGTH = 32 # Total number of bytes when sending TASK_ENABLE command
|
||||
|
||||
class TaskType(IntEnum):
|
||||
Manual = 0x00
|
||||
ChangeCurrentLD1 = 0x01
|
||||
ChangeCurrentLD2 = 0x02
|
||||
ChangeTemperatureLD1 = 0x03
|
||||
ChangeTemperatureLD2 = 0x04
|
||||
#### ---- Auxiliary functions
|
||||
|
||||
def int_to_hex(inp):
|
||||
if inp<0 or inp>65535:
|
||||
print("Error. Input should be within [0, 65535]. Returning N=0.")
|
||||
return "0000"
|
||||
return f"{inp:#0{6}x}"[2:]
|
||||
|
||||
def crc(lst):
|
||||
crc=int("0x"+lst[0],16)
|
||||
for i in range(1,len(lst)):
|
||||
crc=crc^int("0x"+lst[i],16)
|
||||
return int_to_hex(crc)
|
||||
|
||||
def show_hex_string(string):
|
||||
return "".join("\\x{}".format(char.encode()) for char in (string[i:i+2] for i in range(0, len(string), 2)))
|
||||
|
||||
|
||||
def flipfour(s):
|
||||
''' Changes "abcd" to "cdba"
|
||||
'''
|
||||
if len(s) != 4:
|
||||
print("Error. Trying to flip string with length not equal to 4.")
|
||||
return None
|
||||
return s[2:4]+s[0:2]
|
||||
|
||||
|
||||
#### ---- Port Operations
|
||||
|
||||
|
||||
def setup_port_connection(baudrate: int, port: str, timeout_sec: float):
|
||||
prt = Serial()
|
||||
prt.baudrate = baudrate
|
||||
prt.port = port
|
||||
prt.timeout = timeout_sec
|
||||
return prt
|
||||
|
||||
|
||||
def open_port(prt):
|
||||
prt.open()
|
||||
|
||||
if prt.is_open:
|
||||
print("Connection succesful. Port is opened.")
|
||||
print("Port parameters:", prt)
|
||||
print("")
|
||||
else:
|
||||
print("Can't open port. Exiting program.")
|
||||
exit()
|
||||
|
||||
|
||||
def close_port(prt):
|
||||
prt.close()
|
||||
print("")
|
||||
if prt.is_open:
|
||||
print("Can't close port. Exiting program.")
|
||||
exit()
|
||||
else:
|
||||
print("Port is closed. Exiting program.")
|
||||
exit()
|
||||
|
||||
|
||||
#### ---- Interacting with device: low-level
|
||||
|
||||
# ---- Sending commands
|
||||
|
||||
def send_TASK_ENABLE(prt, bytestring):
|
||||
''' Set task parameters (x7777 + ...).
|
||||
Expected device answer: STATE.
|
||||
'''
|
||||
if len(bytestring) != TASK_ENABLE_COMMAND_LENGTH:
|
||||
print("Error. Wrong parameter string for TASK_ENABLE.")
|
||||
return None
|
||||
prt.write(bytestring)
|
||||
print("Sent: Set control parameters (TASK_ENABLE).")
|
||||
|
||||
def send_DECODE_ENABLE(prt, bytestring):
|
||||
''' Set control parameters (x1111 + ...).
|
||||
Expected device answer: STATE.
|
||||
'''
|
||||
if len(bytestring) != SEND_PARAMS_TOTAL_LENGTH:
|
||||
print("Error. Wrong parameter string for DECODE_ENABLE.")
|
||||
return None
|
||||
prt.write(bytestring)
|
||||
print("Sent: Set control parameters (DECODE_ENABLE).")
|
||||
|
||||
|
||||
def send_DEFAULT_ENABLE(prt):
|
||||
''' Reset the device (x2222).
|
||||
Expected device answer: STATE.
|
||||
'''
|
||||
input = bytearray.fromhex(flipfour("2222"))
|
||||
prt.write(input)
|
||||
print("Sent: Reset device (DEFAULT_ENABLE).")
|
||||
|
||||
|
||||
def send_TRANSS_ENABLE(prt):
|
||||
''' Request all saved data (x3333).
|
||||
Expected device answer: SAVED_DATA.
|
||||
'''
|
||||
# TODO later.
|
||||
pass
|
||||
|
||||
|
||||
def send_TRANS_ENABLE(prt):
|
||||
''' Request last piece of data (x4444).
|
||||
Expected device answer: DATA.
|
||||
'''
|
||||
input = bytearray.fromhex(flipfour("4444"))
|
||||
prt.write(input)
|
||||
print("Sent: Request last data (TRANS_ENABLE).")
|
||||
|
||||
|
||||
def send_REMOVE_FILE(prt):
|
||||
''' Delete saved data (x5555).
|
||||
Expected device answer: STATE.
|
||||
'''
|
||||
input = bytearray.fromhex(flipfour("5555"))
|
||||
prt.write(input)
|
||||
print("Sent: Delete saved data (REMOVE_FILE).")
|
||||
pass
|
||||
|
||||
|
||||
def send_STATE(prt):
|
||||
''' Request state (x6666).
|
||||
Expected device answer: STATE.
|
||||
'''
|
||||
input = bytearray.fromhex(flipfour("6666"))
|
||||
prt.write(input)
|
||||
print("Sent: Request state (STATE).")
|
||||
pass
|
||||
|
||||
|
||||
# ---- Getting data
|
||||
|
||||
|
||||
def get_STATE(prt):
|
||||
''' Get decoded state of the device in byte format (2 bytes).
|
||||
'''
|
||||
|
||||
print("Received "+str(prt.inWaiting())+" bytes.")
|
||||
if prt.inWaiting()!=2:
|
||||
print("Error. Couldn't get STATE data. prt.inWaiting():", prt.inWaiting())
|
||||
print("Flushing input data:", prt.read(prt.inWaiting()))
|
||||
# print("Flushing input data:", prt.read(2), prt.read(2))
|
||||
return None
|
||||
|
||||
out_bytes = prt.read(2)
|
||||
return out_bytes
|
||||
|
||||
|
||||
def get_DATA(prt):
|
||||
''' Get decoded state of the device in byte format (426 bytes).
|
||||
'''
|
||||
|
||||
print("Received "+str(prt.inWaiting())+" bytes.\n")
|
||||
if prt.inWaiting()!=GET_DATA_TOTAL_LENGTH:
|
||||
print("Error. Couldn't get DATA data.")
|
||||
return None
|
||||
|
||||
out_bytes = prt.read(GET_DATA_TOTAL_LENGTH)
|
||||
return out_bytes
|
||||
|
||||
|
||||
#### ---- Interacting with device: decode/encode messages
|
||||
|
||||
# ---- Encoding functions
|
||||
def CalculateCRC(data):
|
||||
CRC_input = []
|
||||
for i in range(1,int(len(data)/4)):
|
||||
CRC_input.append(data[4*i:4*i+4])
|
||||
return crc(CRC_input)
|
||||
|
||||
def encode_Setup():
|
||||
bits=['0']*16
|
||||
|
||||
bits[15] = "1" # enable work
|
||||
bits[14] = "1" # enable 5v1
|
||||
bits[13] = "1" # enable 5v2
|
||||
bits[12] = "1" # enable LD1
|
||||
bits[11] = "1" # enable LD2
|
||||
bits[10] = "1" # enable REF1
|
||||
bits[9] = "1" # enable REF2
|
||||
bits[8] = "1" # enable TEC1
|
||||
bits[7] = "1" # enable TEC2
|
||||
bits[6] = "1" # enable temp stab 1
|
||||
bits[5] = "1" # enable temp stab 2
|
||||
bits[4] = "0" # enable sd save
|
||||
bits[3] = "1" # enable PI1 coef read
|
||||
bits[2] = "1" # enable PI2 coef read
|
||||
bits[1] = "0" # reserved
|
||||
bits[0] = "0" # reserved
|
||||
|
||||
s="".join([str(i) for i in bits])
|
||||
return hex(int(s,2))[2:]
|
||||
|
||||
def create_TaskEnableCommand(sending_param):
|
||||
data = flipfour("7777") # Word 0
|
||||
data += flipfour(encode_Setup()) # Word 1
|
||||
data += flipfour(int_to_hex(sending_param['TaskType'])) # Word 2
|
||||
match sending_param['TaskType']:
|
||||
case TaskType.ChangeCurrentLD1.value:
|
||||
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['MinC1']))) # Word 3
|
||||
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['MaxC1']))) # Word 4
|
||||
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['DeltaC1']))) # Word 5
|
||||
data += flipfour(int_to_hex(int(sending_param['Dt']*100))) # Word 6
|
||||
data += flipfour(int_to_hex(cnv.conv_T_C_to_N(sending_param['T1']))) # Word 7
|
||||
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['I2']))) # Word 8
|
||||
data += flipfour(int_to_hex(cnv.conv_T_C_to_N(sending_param['T2']))) # Word 9
|
||||
case TaskType.ChangeCurrentLD2.value:
|
||||
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['MinC2']))) # Word 3
|
||||
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['MaxC2']))) # Word 4
|
||||
data += flipfour(int_to_hex(int(sending_param['DeltaC2']*100))) # Word 5
|
||||
data += flipfour(int_to_hex(int(sending_param['Dt']*100))) # Word 6
|
||||
data += flipfour(int_to_hex(cnv.conv_T_C_to_N(sending_param['T2']))) # Word 7
|
||||
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['I1']))) # Word 8
|
||||
data += flipfour(int_to_hex(cnv.conv_T_C_to_N(sending_param['T1']))) # Word 9
|
||||
case TaskType.ChangeTemperatureLD1:
|
||||
raise Exception("Temperature changing is not implemented yet")
|
||||
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['MinT1']))) # Word 3
|
||||
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['MaxT1']))) # Word 4
|
||||
data += flipfour(int_to_hex(sending_param['DeltaT1']*100)) # Word 5
|
||||
data += flipfour(int_to_hex(sending_param['Dt']*100)) # Word 6
|
||||
data += flipfour(int_to_hex(cnv.conv_T_C_to_N(sending_param['I1']))) # Word 7
|
||||
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['I2']))) # Word 8
|
||||
data += flipfour(int_to_hex(cnv.conv_T_C_to_N(sending_param['T2']))) # Word 9
|
||||
case TaskType.ChangeTemperatureLD2:
|
||||
raise Exception("Temperature changing is not implemented yet")
|
||||
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['MinT2']))) # Word 3
|
||||
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['MaxT2']))) # Word 4
|
||||
data += flipfour(int_to_hex(sending_param['DeltaT2']*100)) # Word 5
|
||||
data += flipfour(int_to_hex(sending_param['Dt']*100)) # Word 6
|
||||
data += flipfour(int_to_hex(cnv.conv_T_C_to_N(sending_param['I2']))) # Word 7
|
||||
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['I1']))) # Word 8
|
||||
data += flipfour(int_to_hex(cnv.conv_T_C_to_N(sending_param['T1']))) # Word 9
|
||||
case _:
|
||||
raise Exception(f"Undefined TaskType:{sending_param['TaskType']}")
|
||||
data += flipfour(int_to_hex(int(sending_param['Tau']))) # Word 10
|
||||
data += flipfour(int_to_hex(sending_param['ProportionalCoeff_1'])) # Word 11
|
||||
data += flipfour(int_to_hex(sending_param['IntegralCoeff_1'])) # Word 12
|
||||
data += flipfour(int_to_hex(sending_param['ProportionalCoeff_2'])) # Word 13
|
||||
data += flipfour(int_to_hex(sending_param['IntegralCoeff_2'])) # Word 14
|
||||
data += CalculateCRC(data) # Word 15
|
||||
|
||||
return bytearray.fromhex(data)
|
||||
|
||||
def encode_Input(params):
|
||||
|
||||
if params is None:
|
||||
return bytearray.fromhex("1111"+"00"*14)
|
||||
|
||||
data = flipfour("1111") # Word 0
|
||||
data += flipfour(encode_Setup()) # Word 1
|
||||
data += flipfour(int_to_hex(cnv.conv_T_C_to_N(params['Temp_1']))) # Word 2
|
||||
data += flipfour(int_to_hex(cnv.conv_T_C_to_N(params['Temp_2']))) # Word 3
|
||||
data += flipfour("0000")*3 # Words 4-6
|
||||
data += flipfour(int_to_hex(params['ProportionalCoeff_1'])) # Word 7
|
||||
data += flipfour(int_to_hex(params['IntegralCoeff_1'])) # Word 8
|
||||
data += flipfour(int_to_hex(params['ProportionalCoeff_2'])) # Word 9
|
||||
data += flipfour(int_to_hex(params['IntegralCoeff_2'])) # Word 10
|
||||
data += flipfour(params['Message_ID']) # Word 11
|
||||
|
||||
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(params['Iset_1']))) # Word 12
|
||||
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(params['Iset_2']))) # Word 13
|
||||
|
||||
CRC_input = []
|
||||
for i in range(1,int(len(data)/4)):
|
||||
CRC_input.append(data[4*i:4*i+4])
|
||||
|
||||
CRC = crc(CRC_input)
|
||||
data += CRC # Word 14
|
||||
|
||||
return bytearray.fromhex(data)
|
||||
|
||||
|
||||
# ---- Decoding functions
|
||||
|
||||
def decode_STATE(state):
|
||||
st = flipfour(state)
|
||||
if st == '0000':
|
||||
status = "All ok."
|
||||
elif st == '0001':
|
||||
status = "SD Card reading/writing error (SD_ERR)."
|
||||
elif st == '0002':
|
||||
status = "Command error (UART_ERR)."
|
||||
elif st == '0004':
|
||||
status = "Wrong parameter value error (UART_DECODE_ERR)."
|
||||
elif st == '0008':
|
||||
status = "Laser 1: TEC driver overheat (TEC1_ERR)."
|
||||
elif st == '0010':
|
||||
status = "Laser 2: TEC driver overheat (TEC2_ERR)."
|
||||
elif st == '0020':
|
||||
status = "Resetting system error (DEFAULT_ERR)."
|
||||
elif st == '0040':
|
||||
status = "File deletion error (REMOVE_ERR)."
|
||||
else:
|
||||
status = "Unknown or reserved error."
|
||||
return status
|
||||
|
||||
|
||||
def decode_DATA(dh):
|
||||
def get_word(s,num):
|
||||
return flipfour(s[num*2*2:num*2*2+4])
|
||||
def get_int_word(s,num):
|
||||
return int(get_word(s,num),16)
|
||||
|
||||
data = {}
|
||||
data['datetime'] = datetime.now()
|
||||
data['Header'] = get_word(dh, 0)
|
||||
data['I1'] = cnv.conv_I_N_to_mA(get_int_word(dh, 1))
|
||||
data['I2'] = cnv.conv_I_N_to_mA(get_int_word(dh, 2))
|
||||
data['TO_LSB'] = get_int_word(dh, 3)
|
||||
data['TO_MSB'] = get_int_word(dh, 4)
|
||||
data['Temp_1'] = cnv.conv_T_N_to_C(get_int_word(dh, 5))
|
||||
data['Temp_2'] = cnv.conv_T_N_to_C(get_int_word(dh, 6))
|
||||
data['Temp_Ext_1'] = cnv.conv_TExt_N_to_C(get_int_word(dh, 7))
|
||||
data['Temp_Ext_2'] = cnv.conv_TExt_N_to_C(get_int_word(dh, 8))
|
||||
data['MON_3V3'] = cnv.conv_U3V3_N_to_V(get_int_word(dh, 9))
|
||||
data['MON_5V1'] = cnv.conv_U5V_N_to_V(get_int_word(dh, 10))
|
||||
data['MON_5V2'] = cnv.conv_U5V_N_to_V(get_int_word(dh, 11))
|
||||
data['MON_7V0'] = cnv.conv_U7V_N_to_V(get_int_word(dh, 12))
|
||||
data['Message_ID'] = get_word(dh, 13) # Last received command
|
||||
data['CRC'] = get_word(dh, 14)
|
||||
|
||||
return data
|
||||
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user