from enum import IntEnum from serial import Serial from serial.tools import list_ports import device_conversion as cnv from datetime import datetime #### ---- Constants GET_DATA_TOTAL_LENGTH = 30 # Total number of bytes when getting DATA SEND_PARAMS_TOTAL_LENGTH = 30 # Total number of bytes when sending parameters TASK_ENABLE_COMMAND_LENGTH = 32 # Total number of bytes when sending TASK_ENABLE command class TaskType(IntEnum): Manual = 0x00 ChangeCurrentLD1 = 0x01 ChangeCurrentLD2 = 0x02 ChangeTemperatureLD1 = 0x03 ChangeTemperatureLD2 = 0x04 #### ---- Auxiliary functions def int_to_hex(inp): if inp<0 or inp>65535: print("Error. Input should be within [0, 65535]. Returning N=0.") return "0000" return f"{inp:#0{6}x}"[2:] def crc(lst): crc=int("0x"+lst[0],16) for i in range(1,len(lst)): crc=crc^int("0x"+lst[i],16) return int_to_hex(crc) def show_hex_string(string): return "".join("\\x{}".format(char.encode()) for char in (string[i:i+2] for i in range(0, len(string), 2))) def flipfour(s): ''' Changes "abcd" to "cdba" ''' if len(s) != 4: print("Error. Trying to flip string with length not equal to 4.") return None return s[2:4]+s[0:2] #### ---- Port Operations def setup_port_connection(baudrate: int, port: str, timeout_sec: float): prt = Serial() prt.baudrate = baudrate prt.port = port prt.timeout = timeout_sec return prt def open_port(prt): prt.open() if prt.is_open: print("Connection succesful. Port is opened.") print("Port parameters:", prt) print("") else: print("Can't open port. Exiting program.") exit() def close_port(prt): prt.close() print("") if prt.is_open: print("Can't close port. Exiting program.") exit() else: print("Port is closed. Exiting program.") exit() #### ---- Interacting with device: low-level # ---- Sending commands def send_TASK_ENABLE(prt, bytestring): ''' Set task parameters (x7777 + ...). Expected device answer: STATE. ''' if len(bytestring) != TASK_ENABLE_COMMAND_LENGTH: print("Error. Wrong parameter string for TASK_ENABLE.") return None prt.write(bytestring) print("Sent: Set control parameters (TASK_ENABLE).") def send_DECODE_ENABLE(prt, bytestring): ''' Set control parameters (x1111 + ...). Expected device answer: STATE. ''' if len(bytestring) != SEND_PARAMS_TOTAL_LENGTH: print("Error. Wrong parameter string for DECODE_ENABLE.") return None prt.write(bytestring) print("Sent: Set control parameters (DECODE_ENABLE).") def send_DEFAULT_ENABLE(prt): ''' Reset the device (x2222). Expected device answer: STATE. ''' input = bytearray.fromhex(flipfour("2222")) prt.write(input) print("Sent: Reset device (DEFAULT_ENABLE).") def send_TRANSS_ENABLE(prt): ''' Request all saved data (x3333). Expected device answer: SAVED_DATA. ''' # TODO later. pass def send_TRANS_ENABLE(prt): ''' Request last piece of data (x4444). Expected device answer: DATA. ''' input = bytearray.fromhex(flipfour("4444")) prt.write(input) print("Sent: Request last data (TRANS_ENABLE).") def send_REMOVE_FILE(prt): ''' Delete saved data (x5555). Expected device answer: STATE. ''' input = bytearray.fromhex(flipfour("5555")) prt.write(input) print("Sent: Delete saved data (REMOVE_FILE).") pass def send_STATE(prt): ''' Request state (x6666). Expected device answer: STATE. ''' input = bytearray.fromhex(flipfour("6666")) prt.write(input) print("Sent: Request state (STATE).") pass # ---- Getting data def get_STATE(prt): ''' Get decoded state of the device in byte format (2 bytes). ''' print("Received "+str(prt.inWaiting())+" bytes.") if prt.inWaiting()!=2: print("Error. Couldn't get STATE data. prt.inWaiting():", prt.inWaiting()) print("Flushing input data:", prt.read(prt.inWaiting())) # print("Flushing input data:", prt.read(2), prt.read(2)) return None out_bytes = prt.read(2) return out_bytes def get_DATA(prt): ''' Get decoded state of the device in byte format (426 bytes). ''' print("Received "+str(prt.inWaiting())+" bytes.\n") if prt.inWaiting()!=GET_DATA_TOTAL_LENGTH: print("Error. Couldn't get DATA data.") print("receiven data len:", prt.inWaiting()) return None out_bytes = prt.read(GET_DATA_TOTAL_LENGTH) return out_bytes #### ---- Interacting with device: decode/encode messages # ---- Encoding functions def CalculateCRC(data): CRC_input = [] for i in range(1,int(len(data)/4)): CRC_input.append(data[4*i:4*i+4]) return crc(CRC_input) def encode_Setup(): bits=['0']*16 bits[15] = "1" # enable work bits[14] = "1" # enable 5v1 bits[13] = "1" # enable 5v2 bits[12] = "1" # enable LD1 bits[11] = "1" # enable LD2 bits[10] = "1" # enable REF1 bits[9] = "1" # enable REF2 bits[8] = "1" # enable TEC1 bits[7] = "1" # enable TEC2 bits[6] = "1" # enable temp stab 1 bits[5] = "1" # enable temp stab 2 bits[4] = "0" # enable sd save bits[3] = "1" # enable PI1 coef read bits[2] = "1" # enable PI2 coef read bits[1] = "0" # reserved bits[0] = "0" # reserved s="".join([str(i) for i in bits]) return hex(int(s,2))[2:] def create_TaskEnableCommand(sending_param): data = flipfour("7777") # Word 0 data += flipfour(encode_Setup()) # Word 1 data += flipfour(int_to_hex(sending_param['TaskType'])) # Word 2 match sending_param['TaskType']: case TaskType.ChangeCurrentLD1.value: data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['MinC1']))) # Word 3 data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['MaxC1']))) # Word 4 data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['DeltaC1']))) # Word 5 data += flipfour(int_to_hex(int(sending_param['Dt']*100))) # Word 6 data += flipfour(int_to_hex(cnv.conv_T_C_to_N(sending_param['T1']))) # Word 7 data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['I2']))) # Word 8 data += flipfour(int_to_hex(cnv.conv_T_C_to_N(sending_param['T2']))) # Word 9 case TaskType.ChangeCurrentLD2.value: data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['MinC2']))) # Word 3 data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['MaxC2']))) # Word 4 data += flipfour(int_to_hex(int(sending_param['DeltaC2']*100))) # Word 5 data += flipfour(int_to_hex(int(sending_param['Dt']*100))) # Word 6 data += flipfour(int_to_hex(cnv.conv_T_C_to_N(sending_param['T2']))) # Word 7 data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['I1']))) # Word 8 data += flipfour(int_to_hex(cnv.conv_T_C_to_N(sending_param['T1']))) # Word 9 case TaskType.ChangeTemperatureLD1: raise Exception("Temperature changing is not implemented yet") data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['MinT1']))) # Word 3 data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['MaxT1']))) # Word 4 data += flipfour(int_to_hex(sending_param['DeltaT1']*100)) # Word 5 data += flipfour(int_to_hex(sending_param['Dt']*100)) # Word 6 data += flipfour(int_to_hex(cnv.conv_T_C_to_N(sending_param['I1']))) # Word 7 data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['I2']))) # Word 8 data += flipfour(int_to_hex(cnv.conv_T_C_to_N(sending_param['T2']))) # Word 9 case TaskType.ChangeTemperatureLD2: raise Exception("Temperature changing is not implemented yet") data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['MinT2']))) # Word 3 data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['MaxT2']))) # Word 4 data += flipfour(int_to_hex(sending_param['DeltaT2']*100)) # Word 5 data += flipfour(int_to_hex(sending_param['Dt']*100)) # Word 6 data += flipfour(int_to_hex(cnv.conv_T_C_to_N(sending_param['I2']))) # Word 7 data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['I1']))) # Word 8 data += flipfour(int_to_hex(cnv.conv_T_C_to_N(sending_param['T1']))) # Word 9 case _: raise Exception(f"Undefined TaskType:{sending_param['TaskType']}") data += flipfour(int_to_hex(int(sending_param['Tau']))) # Word 10 data += flipfour(int_to_hex(sending_param['ProportionalCoeff_1'])) # Word 11 data += flipfour(int_to_hex(sending_param['IntegralCoeff_1'])) # Word 12 data += flipfour(int_to_hex(sending_param['ProportionalCoeff_2'])) # Word 13 data += flipfour(int_to_hex(sending_param['IntegralCoeff_2'])) # Word 14 data += CalculateCRC(data) # Word 15 return bytearray.fromhex(data) def encode_Input(params): if params is None: return bytearray.fromhex("1111"+"00"*14) data = flipfour("1111") # Word 0 data += flipfour(encode_Setup()) # Word 1 data += flipfour(int_to_hex(cnv.conv_T_C_to_N(params['Temp_1']))) # Word 2 data += flipfour(int_to_hex(cnv.conv_T_C_to_N(params['Temp_2']))) # Word 3 data += flipfour("0000")*3 # Words 4-6 data += flipfour(int_to_hex(params['ProportionalCoeff_1'])) # Word 7 data += flipfour(int_to_hex(params['IntegralCoeff_1'])) # Word 8 data += flipfour(int_to_hex(params['ProportionalCoeff_2'])) # Word 9 data += flipfour(int_to_hex(params['IntegralCoeff_2'])) # Word 10 data += flipfour(params['Message_ID']) # Word 11 data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(params['Iset_1']))) # Word 12 data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(params['Iset_2']))) # Word 13 CRC_input = [] for i in range(1,int(len(data)/4)): CRC_input.append(data[4*i:4*i+4]) CRC = crc(CRC_input) data += CRC # Word 14 return bytearray.fromhex(data) # ---- Decoding functions def decode_STATE(state): st = flipfour(state) if st == '0000': status = "All ok." elif st == '0001': status = "SD Card reading/writing error (SD_ERR)." elif st == '0002': status = "Command error (UART_ERR)." elif st == '0004': status = "Wrong parameter value error (UART_DECODE_ERR)." elif st == '0008': status = "Laser 1: TEC driver overheat (TEC1_ERR)." elif st == '0010': status = "Laser 2: TEC driver overheat (TEC2_ERR)." elif st == '0020': status = "Resetting system error (DEFAULT_ERR)." elif st == '0040': status = "File deletion error (REMOVE_ERR)." else: status = "Unknown or reserved error." return status def decode_DATA(dh): def get_word(s,num): return flipfour(s[num*2*2:num*2*2+4]) def get_int_word(s,num): return int(get_word(s,num),16) data = {} data['datetime'] = datetime.now() data['Header'] = get_word(dh, 0) data['I1'] = cnv.conv_I_N_to_mA(get_int_word(dh, 1)) #LD1_param.POWER data['I2'] = cnv.conv_I_N_to_mA(get_int_word(dh, 2)) #LD2_param.POWER data['TO_LSB'] = get_int_word(dh, 3) #TO6_counter_LSB data['TO_MSB'] = get_int_word(dh, 4) #TO6_counter_MSB data['Temp_1'] = cnv.conv_T_N_to_C(get_int_word(dh, 5)) #LD1_param.LD_CURR_TEMP data['Temp_2'] = cnv.conv_T_N_to_C(get_int_word(dh, 6)) #LD2_param.LD_CURR_TEMP data['Temp_Ext_1'] = cnv.conv_TExt_N_to_C(get_int_word(dh, 7)) #U_Rt1_ext_Gain data['Temp_Ext_2'] = cnv.conv_TExt_N_to_C(get_int_word(dh, 8)) #U_Rt2_ext_Gain data['MON_3V3'] = cnv.conv_U3V3_N_to_V(get_int_word(dh, 9)) #3V_monitor data['MON_5V1'] = cnv.conv_U5V_N_to_V(get_int_word(dh, 10)) #5V1_monitor data['MON_5V2'] = cnv.conv_U5V_N_to_V(get_int_word(dh, 11)) #5V2_monitor data['MON_7V0'] = cnv.conv_U7V_N_to_V(get_int_word(dh, 12)) #7V_monitor data['Message_ID'] = get_word(dh, 13) # Last received command data['CRC'] = get_word(dh, 14) return data