from enum import IntEnum from serial import Serial from serial.tools import list_ports import device_conversion as cnv from datetime import datetime #### ---- Constants GET_DATA_TOTAL_LENGTH = 30 # Total number of bytes when getting DATA SEND_PARAMS_TOTAL_LENGTH = 30 # Total number of bytes when sending parameters TASK_ENABLE_COMMAND_LENGTH = 32 # Total number of bytes when sending TASK_ENABLE command AD9102_CMD_TOTAL_LENGTH = 10 # Total bytes when sending AD9102 saw command AD9102_CMD_HEADER = "8888" AD9833_CMD_TOTAL_LENGTH = 10 # Total bytes when sending AD9833 command AD9833_CMD_HEADER = "9999" DS1809_CMD_TOTAL_LENGTH = 10 # Total bytes when sending DS1809 UC/DC pulse command DS1809_CMD_HEADER = "AAAA" AD9102_SAW_STEP_DEFAULT = 1 AD9102_PAT_PERIOD_DEFAULT = 0xFFFF AD9102_PAT_PERIOD_BASE_DEFAULT = 0x02 AD9102_DAC_CLK_HZ = None # set to actual DAC clock if you want freq->SAW_STEP conversion AD9102_FLAG_SRAM = 0x0004 AD9102_FLAG_SRAM_FMT = 0x0008 AD9102_SRAM_SAMPLES_DEFAULT = 16 AD9102_SRAM_HOLD_DEFAULT = 1 AD9102_SRAM_AMP_DEFAULT = 8191 AD9833_FLAG_ENABLE = 0x0001 AD9833_FLAG_TRIANGLE = 0x0002 AD9833_MCLK_HZ_DEFAULT = 20_000_000 DS1809_FLAG_UC = 0x0001 DS1809_FLAG_DC = 0x0002 DS1809_PULSE_MS_DEFAULT = 2 class TaskType(IntEnum): Manual = 0x00 ChangeCurrentLD1 = 0x01 ChangeCurrentLD2 = 0x02 ChangeTemperatureLD1 = 0x03 ChangeTemperatureLD2 = 0x04 #### ---- Auxiliary functions def int_to_hex(inp): if inp<0 or inp>65535: print("Error. Input should be within [0, 65535]. Returning N=0.") return "0000" return f"{inp:#0{6}x}"[2:] def crc(lst): crc=int("0x"+lst[0],16) for i in range(1,len(lst)): crc=crc^int("0x"+lst[i],16) return int_to_hex(crc) def show_hex_string(string): return "".join("\\x{}".format(char.encode()) for char in (string[i:i+2] for i in range(0, len(string), 2))) def flipfour(s): ''' Changes "abcd" to "cdba" ''' if len(s) != 4: print("Error. Trying to flip string with length not equal to 4.") return None return s[2:4]+s[0:2] #### ---- Port Operations def setup_port_connection(baudrate: int, port: str, timeout_sec: float): prt = Serial() prt.baudrate = baudrate prt.port = port prt.timeout = timeout_sec return prt def open_port(prt): prt.open() if prt.is_open: print("Connection succesful. Port is opened.") print("Port parameters:", prt) print("") else: print("Can't open port. Exiting program.") exit() def close_port(prt): prt.close() print("") if prt.is_open: print("Can't close port. Exiting program.") exit() else: print("Port is closed. Exiting program.") exit() #### ---- Interacting with device: low-level # ---- Sending commands def send_TASK_ENABLE(prt, bytestring): ''' Set task parameters (x7777 + ...). Expected device answer: STATE. ''' if len(bytestring) != TASK_ENABLE_COMMAND_LENGTH: print("Error. Wrong parameter string for TASK_ENABLE.") return None prt.write(bytestring) print("Sent: Set control parameters (TASK_ENABLE).") def send_DECODE_ENABLE(prt, bytestring): ''' Set control parameters (x1111 + ...). Expected device answer: STATE. ''' if len(bytestring) != SEND_PARAMS_TOTAL_LENGTH: print("Error. Wrong parameter string for DECODE_ENABLE.") return None prt.write(bytestring) print("Sent: Set control parameters (DECODE_ENABLE).") def send_DEFAULT_ENABLE(prt): ''' Reset the device (x2222). Expected device answer: STATE. ''' input = bytearray.fromhex(flipfour("2222")) prt.write(input) print("Sent: Reset device (DEFAULT_ENABLE).") def send_TRANSS_ENABLE(prt): ''' Request all saved data (x3333). Expected device answer: SAVED_DATA. ''' # TODO later. pass def send_TRANS_ENABLE(prt): ''' Request last piece of data (x4444). Expected device answer: DATA. ''' input = bytearray.fromhex(flipfour("4444")) prt.write(input) print("Sent: Request last data (TRANS_ENABLE).") def send_REMOVE_FILE(prt): ''' Delete saved data (x5555). Expected device answer: STATE. ''' input = bytearray.fromhex(flipfour("5555")) prt.write(input) print("Sent: Delete saved data (REMOVE_FILE).") pass def send_STATE(prt): ''' Request state (x6666). Expected device answer: STATE. ''' input = bytearray.fromhex(flipfour("6666")) prt.write(input) print("Sent: Request state (STATE).") pass def send_AD9102(prt, bytestring): ''' Start/stop AD9102 output with saw/triangle (0x8888 + ...). Expected device answer: STATE. ''' if len(bytestring) != AD9102_CMD_TOTAL_LENGTH: print("Error. Wrong parameter string for AD9102 command.") return None prt.write(bytestring) print("Sent: AD9102 ramp command.") def send_AD9833(prt, bytestring): ''' Start/stop AD9833 output with triangle (0x9999 + ...). Expected device answer: STATE. ''' if len(bytestring) != AD9833_CMD_TOTAL_LENGTH: print("Error. Wrong parameter string for AD9833 command.") return None prt.write(bytestring) print("Sent: AD9833 ramp command.") def send_DS1809(prt, bytestring): ''' Pulse DS1809 UC/DC control lines (0xAAAA + ...). Expected device answer: STATE. ''' if len(bytestring) != DS1809_CMD_TOTAL_LENGTH: print("Error. Wrong parameter string for DS1809 command.") return None prt.write(bytestring) print("Sent: DS1809 pulse command.") # ---- Getting data def get_STATE(prt): ''' Get decoded state of the device in byte format (2 bytes). ''' print("Received "+str(prt.inWaiting())+" bytes.") if prt.inWaiting()!=2: print("Error. Couldn't get STATE data. prt.inWaiting():", prt.inWaiting()) print("Flushing input data:", prt.read(prt.inWaiting())) # print("Flushing input data:", prt.read(2), prt.read(2)) return None out_bytes = prt.read(2) return out_bytes def get_DATA(prt): ''' Get decoded state of the device in byte format (30 bytes). ''' print("Received "+str(prt.inWaiting())+" bytes.\n") if prt.inWaiting()!=GET_DATA_TOTAL_LENGTH: print("Error. Couldn't get DATA data.") print("receiven data len:", prt.inWaiting()) if prt.inWaiting() > 0: print("Flushing input data:", prt.read(prt.inWaiting())) return None out_bytes = prt.read(GET_DATA_TOTAL_LENGTH) return out_bytes #### ---- Interacting with device: decode/encode messages # ---- Encoding functions def CalculateCRC(data): CRC_input = [] for i in range(1,int(len(data)/4)): CRC_input.append(data[4*i:4*i+4]) return crc(CRC_input) def encode_Setup(): bits=['0']*16 bits[15] = "1" # enable work bits[14] = "1" # enable 5v1 bits[13] = "1" # enable 5v2 bits[12] = "1" # enable LD1 bits[11] = "1" # enable LD2 bits[10] = "1" # enable REF1 bits[9] = "1" # enable REF2 bits[8] = "1" # enable TEC1 bits[7] = "1" # enable TEC2 bits[6] = "1" # enable temp stab 1 bits[5] = "1" # enable temp stab 2 bits[4] = "0" # enable sd save bits[3] = "1" # enable PI1 coef read bits[2] = "1" # enable PI2 coef read bits[1] = "0" # reserved bits[0] = "0" # reserved s="".join([str(i) for i in bits]) return hex(int(s,2))[2:] def create_TaskEnableCommand(sending_param): data = flipfour("7777") # Word 0 data += flipfour(encode_Setup()) # Word 1 data += flipfour(int_to_hex(sending_param['TaskType'])) # Word 2 match sending_param['TaskType']: case TaskType.ChangeCurrentLD1.value: data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['MinC1']))) # Word 3 data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['MaxC1']))) # Word 4 data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['DeltaC1']))) # Word 5 data += flipfour(int_to_hex(int(sending_param['Dt']*100))) # Word 6 data += flipfour(int_to_hex(cnv.conv_T_C_to_N(sending_param['T1']))) # Word 7 data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['I2']))) # Word 8 data += flipfour(int_to_hex(cnv.conv_T_C_to_N(sending_param['T2']))) # Word 9 case TaskType.ChangeCurrentLD2.value: data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['MinC2']))) # Word 3 data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['MaxC2']))) # Word 4 data += flipfour(int_to_hex(int(sending_param['DeltaC2']*100))) # Word 5 data += flipfour(int_to_hex(int(sending_param['Dt']*100))) # Word 6 data += flipfour(int_to_hex(cnv.conv_T_C_to_N(sending_param['T2']))) # Word 7 data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['I1']))) # Word 8 data += flipfour(int_to_hex(cnv.conv_T_C_to_N(sending_param['T1']))) # Word 9 case TaskType.ChangeTemperatureLD1: raise Exception("Temperature changing is not implemented yet") data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['MinT1']))) # Word 3 data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['MaxT1']))) # Word 4 data += flipfour(int_to_hex(sending_param['DeltaT1']*100)) # Word 5 data += flipfour(int_to_hex(sending_param['Dt']*100)) # Word 6 data += flipfour(int_to_hex(cnv.conv_T_C_to_N(sending_param['I1']))) # Word 7 data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['I2']))) # Word 8 data += flipfour(int_to_hex(cnv.conv_T_C_to_N(sending_param['T2']))) # Word 9 case TaskType.ChangeTemperatureLD2: raise Exception("Temperature changing is not implemented yet") data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['MinT2']))) # Word 3 data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['MaxT2']))) # Word 4 data += flipfour(int_to_hex(sending_param['DeltaT2']*100)) # Word 5 data += flipfour(int_to_hex(sending_param['Dt']*100)) # Word 6 data += flipfour(int_to_hex(cnv.conv_T_C_to_N(sending_param['I2']))) # Word 7 data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['I1']))) # Word 8 data += flipfour(int_to_hex(cnv.conv_T_C_to_N(sending_param['T1']))) # Word 9 case _: raise Exception(f"Undefined TaskType:{sending_param['TaskType']}") data += flipfour(int_to_hex(int(sending_param['Tau']))) # Word 10 data += flipfour(int_to_hex(sending_param['ProportionalCoeff_1'])) # Word 11 data += flipfour(int_to_hex(sending_param['IntegralCoeff_1'])) # Word 12 data += flipfour(int_to_hex(sending_param['ProportionalCoeff_2'])) # Word 13 data += flipfour(int_to_hex(sending_param['IntegralCoeff_2'])) # Word 14 data += CalculateCRC(data) # Word 15 return bytearray.fromhex(data) def calc_saw_step_for_freq(freq_hz: float, dac_clk_hz: float, triangle: bool): if freq_hz <= 0 or dac_clk_hz is None or dac_clk_hz <= 0: return AD9102_SAW_STEP_DEFAULT n = 2 if triangle else 1 step = int(round(dac_clk_hz / (freq_hz * n * 16384.0))) if step < 1: step = 1 if step > 63: step = 63 return step def calc_pat_period_for_duty(saw_step: int, duty: float, pat_period_base: int, triangle: bool): if duty is None or duty <= 0 or duty > 1.0: return AD9102_PAT_PERIOD_DEFAULT n = 2 if triangle else 1 base_cycles = 16 if pat_period_base == 0 else pat_period_base ramp_cycles = n * 16384 * max(1, min(63, saw_step)) pat_period = int(round(ramp_cycles / (duty * base_cycles))) if pat_period < 1: pat_period = 1 if pat_period > 0xFFFF: pat_period = 0xFFFF return pat_period def calc_sram_samples_for_freq(freq_hz: float, dac_clk_hz: float, hold: int = None): if hold is None or hold <= 0: hold = AD9102_SRAM_HOLD_DEFAULT if freq_hz is None or freq_hz <= 0 or dac_clk_hz is None or dac_clk_hz <= 0: return AD9102_SRAM_SAMPLES_DEFAULT samples = int(round(dac_clk_hz / (freq_hz * hold))) if samples < 2: samples = 2 if samples > 4096: samples = 4096 return samples def create_AD9102_ramp_command(saw_step: int = None, pat_period: int = None, pat_period_base: int = None, enable: bool = True, triangle: bool = True, sram_mode: bool = False, sram_samples: int = None, sram_hold: int = None, sram_amplitude: int = None): flags = 0 if enable: flags |= 0x0001 if triangle: flags |= 0x0002 if sram_mode: flags |= AD9102_FLAG_SRAM if sram_mode: flags |= AD9102_FLAG_SRAM_FMT if sram_samples is None: sram_samples = AD9102_SRAM_SAMPLES_DEFAULT if sram_samples < 2: sram_samples = 2 if sram_samples > 4096: sram_samples = 4096 if sram_amplitude is None: sram_amplitude = AD9102_SRAM_AMP_DEFAULT if sram_amplitude < 0: sram_amplitude = 0 if sram_amplitude > AD9102_SRAM_AMP_DEFAULT: sram_amplitude = AD9102_SRAM_AMP_DEFAULT param0 = int(sram_amplitude) & 0xFFFF param1 = int(sram_samples) & 0xFFFF else: if saw_step is None: saw_step = AD9102_SAW_STEP_DEFAULT if pat_period is None: pat_period = AD9102_PAT_PERIOD_DEFAULT if pat_period_base is None: pat_period_base = AD9102_PAT_PERIOD_BASE_DEFAULT if saw_step < 1: saw_step = 1 if saw_step > 63: saw_step = 63 if pat_period < 0: pat_period = 0 if pat_period > 0xFFFF: pat_period = 0xFFFF if pat_period_base < 0: pat_period_base = 0 if pat_period_base > 0x0F: pat_period_base = 0x0F param0 = ((pat_period_base & 0x0F) << 8) | (saw_step & 0xFF) param1 = pat_period crc_word = flags ^ param0 ^ param1 data = flipfour(AD9102_CMD_HEADER) # Word 0 (header) data += flipfour(int_to_hex(flags)) data += flipfour(int_to_hex(param0)) data += flipfour(int_to_hex(param1)) data += flipfour(int_to_hex(crc_word)) return bytearray.fromhex(data) def create_AD9833_ramp_command(freq_hz: float, mclk_hz: float = None, enable: bool = True, triangle: bool = True): if mclk_hz is None or mclk_hz <= 0: mclk_hz = AD9833_MCLK_HZ_DEFAULT if mclk_hz is None or mclk_hz <= 0 or freq_hz is None or freq_hz < 0: freq_word = 0 else: freq_word = int(round((freq_hz * (1 << 28)) / float(mclk_hz))) if freq_word < 0: freq_word = 0 if freq_word > 0x0FFFFFFF: freq_word = 0x0FFFFFFF lsw = freq_word & 0x3FFF msw = (freq_word >> 14) & 0x3FFF flags = 0 if enable: flags |= AD9833_FLAG_ENABLE if triangle: flags |= AD9833_FLAG_TRIANGLE crc_word = flags ^ lsw ^ msw data = flipfour(AD9833_CMD_HEADER) # Word 0 (header) data += flipfour(int_to_hex(flags)) data += flipfour(int_to_hex(lsw)) data += flipfour(int_to_hex(msw)) data += flipfour(int_to_hex(crc_word)) return bytearray.fromhex(data) def create_DS1809_pulse_command(uc: bool = False, dc: bool = False, count: int = 1, pulse_ms: int = None): flags = 0 if uc: flags |= DS1809_FLAG_UC if dc: flags |= DS1809_FLAG_DC if count is None or count <= 0: count = 1 if count > 64: count = 64 if pulse_ms is None: pulse_ms = DS1809_PULSE_MS_DEFAULT if pulse_ms < 1: pulse_ms = 1 if pulse_ms > 500: pulse_ms = 500 param0 = int(count) & 0xFFFF param1 = int(pulse_ms) & 0xFFFF crc_word = flags ^ param0 ^ param1 data = flipfour(DS1809_CMD_HEADER) # Word 0 (header) data += flipfour(int_to_hex(flags)) data += flipfour(int_to_hex(param0)) data += flipfour(int_to_hex(param1)) data += flipfour(int_to_hex(crc_word)) return bytearray.fromhex(data) def encode_Input(params): if params is None: return bytearray.fromhex("1111"+"00"*14) data = flipfour("1111") # Word 0 data += flipfour(encode_Setup()) # Word 1 data += flipfour(int_to_hex(cnv.conv_T_C_to_N(params['Temp_1']))) # Word 2 data += flipfour(int_to_hex(cnv.conv_T_C_to_N(params['Temp_2']))) # Word 3 data += flipfour("0000")*3 # Words 4-6 data += flipfour(int_to_hex(params['ProportionalCoeff_1'])) # Word 7 data += flipfour(int_to_hex(params['IntegralCoeff_1'])) # Word 8 data += flipfour(int_to_hex(params['ProportionalCoeff_2'])) # Word 9 data += flipfour(int_to_hex(params['IntegralCoeff_2'])) # Word 10 data += flipfour(params['Message_ID']) # Word 11 data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(params['Iset_1']))) # Word 12 data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(params['Iset_2']))) # Word 13 CRC_input = [] for i in range(1,int(len(data)/4)): CRC_input.append(data[4*i:4*i+4]) CRC = crc(CRC_input) data += CRC # Word 14 return bytearray.fromhex(data) # ---- Decoding functions def decode_STATE(state): st = flipfour(state) if st is None or len(st) != 4: return "Error: invalid STATE length." hi = int(st[0:2], 16) lo = int(st[2:4], 16) errors = [] if lo & 0x01: errors.append("SD Card reading/writing error (SD_ERR)") if lo & 0x02: errors.append("Command error (UART_ERR)") if lo & 0x04: errors.append("Wrong parameter value error (UART_DECODE_ERR)") if lo & 0x08: errors.append("Laser 1: TEC driver overheat (TEC1_ERR)") if lo & 0x10: errors.append("Laser 2: TEC driver overheat (TEC2_ERR)") if lo & 0x20: errors.append("Resetting system error (DEFAULT_ERR)") if lo & 0x40: errors.append("File deletion error (REMOVE_ERR)") if lo & 0x80: errors.append("AD9102 status check failed (AD9102_ERR)") if not errors: status = "All ok." else: status = "; ".join(errors) if hi != 0: status += f" | AD9102_PAT_STATUS=0x{hi:02X}" return status def decode_DATA(dh): def get_word(s,num): return flipfour(s[num*2*2:num*2*2+4]) def get_int_word(s,num): return int(get_word(s,num),16) data = {} data['datetime'] = datetime.now() data['Header'] = get_word(dh, 0) i1_raw = get_int_word(dh, 1) i2_raw = get_int_word(dh, 2) data['I1_raw'] = i1_raw data['I2_raw'] = i2_raw data['I1'] = cnv.conv_I_N_to_mA(i1_raw) #LD1_param.POWER data['I2'] = cnv.conv_I_N_to_mA(i2_raw) #LD2_param.POWER data['TO_LSB'] = get_int_word(dh, 3) #TO6_counter_LSB data['TO_MSB'] = get_int_word(dh, 4) #TO6_counter_MSB data['Temp_1'] = cnv.conv_T_N_to_C(get_int_word(dh, 5)) #LD1_param.LD_CURR_TEMP data['Temp_2'] = cnv.conv_T_N_to_C(get_int_word(dh, 6)) #LD2_param.LD_CURR_TEMP data['Temp_Ext_1'] = cnv.conv_TExt_N_to_C(get_int_word(dh, 7)) #U_Rt1_ext_Gain data['Temp_Ext_2'] = cnv.conv_TExt_N_to_C(get_int_word(dh, 8)) #U_Rt2_ext_Gain data['MON_3V3'] = cnv.conv_U3V3_N_to_V(get_int_word(dh, 9)) #3V_monitor data['MON_5V1'] = cnv.conv_U5V_N_to_V(get_int_word(dh, 10)) #5V1_monitor data['MON_5V2'] = cnv.conv_U5V_N_to_V(get_int_word(dh, 11)) #5V2_monitor data['MON_7V0'] = cnv.conv_U7V_N_to_V(get_int_word(dh, 12)) #7V_monitor data['Message_ID'] = get_word(dh, 13) # Last received command data['CRC'] = get_word(dh, 14) return data