Files
RadioPhotonic_PCB_PC_software/device_commands.py
2026-02-03 19:29:40 +03:00

483 lines
16 KiB
Python

from enum import IntEnum
from serial import Serial
from serial.tools import list_ports
import device_conversion as cnv
from datetime import datetime
#### ---- Constants
GET_DATA_TOTAL_LENGTH = 30 # Total number of bytes when getting DATA
SEND_PARAMS_TOTAL_LENGTH = 30 # Total number of bytes when sending parameters
TASK_ENABLE_COMMAND_LENGTH = 32 # Total number of bytes when sending TASK_ENABLE command
AD9833_CMD_TOTAL_LENGTH = 10 # Total bytes when sending AD9102 saw command
AD9833_CMD_HEADER = "8888"
AD9102_SAW_STEP_DEFAULT = 1
AD9102_PAT_PERIOD_DEFAULT = 0xFFFF
AD9102_PAT_PERIOD_BASE_DEFAULT = 0x02
AD9102_DAC_CLK_HZ = None # set to actual DAC clock if you want freq->SAW_STEP conversion
AD9102_FLAG_SRAM = 0x0004
AD9102_SRAM_SAMPLES_DEFAULT = 16
AD9102_SRAM_HOLD_DEFAULT = 1
class TaskType(IntEnum):
Manual = 0x00
ChangeCurrentLD1 = 0x01
ChangeCurrentLD2 = 0x02
ChangeTemperatureLD1 = 0x03
ChangeTemperatureLD2 = 0x04
#### ---- Auxiliary functions
def int_to_hex(inp):
if inp<0 or inp>65535:
print("Error. Input should be within [0, 65535]. Returning N=0.")
return "0000"
return f"{inp:#0{6}x}"[2:]
def crc(lst):
crc=int("0x"+lst[0],16)
for i in range(1,len(lst)):
crc=crc^int("0x"+lst[i],16)
return int_to_hex(crc)
def show_hex_string(string):
return "".join("\\x{}".format(char.encode()) for char in (string[i:i+2] for i in range(0, len(string), 2)))
def flipfour(s):
''' Changes "abcd" to "cdba"
'''
if len(s) != 4:
print("Error. Trying to flip string with length not equal to 4.")
return None
return s[2:4]+s[0:2]
#### ---- Port Operations
def setup_port_connection(baudrate: int, port: str, timeout_sec: float):
prt = Serial()
prt.baudrate = baudrate
prt.port = port
prt.timeout = timeout_sec
return prt
def open_port(prt):
prt.open()
if prt.is_open:
print("Connection succesful. Port is opened.")
print("Port parameters:", prt)
print("")
else:
print("Can't open port. Exiting program.")
exit()
def close_port(prt):
prt.close()
print("")
if prt.is_open:
print("Can't close port. Exiting program.")
exit()
else:
print("Port is closed. Exiting program.")
exit()
#### ---- Interacting with device: low-level
# ---- Sending commands
def send_TASK_ENABLE(prt, bytestring):
''' Set task parameters (x7777 + ...).
Expected device answer: STATE.
'''
if len(bytestring) != TASK_ENABLE_COMMAND_LENGTH:
print("Error. Wrong parameter string for TASK_ENABLE.")
return None
prt.write(bytestring)
print("Sent: Set control parameters (TASK_ENABLE).")
def send_DECODE_ENABLE(prt, bytestring):
''' Set control parameters (x1111 + ...).
Expected device answer: STATE.
'''
if len(bytestring) != SEND_PARAMS_TOTAL_LENGTH:
print("Error. Wrong parameter string for DECODE_ENABLE.")
return None
prt.write(bytestring)
print("Sent: Set control parameters (DECODE_ENABLE).")
def send_DEFAULT_ENABLE(prt):
''' Reset the device (x2222).
Expected device answer: STATE.
'''
input = bytearray.fromhex(flipfour("2222"))
prt.write(input)
print("Sent: Reset device (DEFAULT_ENABLE).")
def send_TRANSS_ENABLE(prt):
''' Request all saved data (x3333).
Expected device answer: SAVED_DATA.
'''
# TODO later.
pass
def send_TRANS_ENABLE(prt):
''' Request last piece of data (x4444).
Expected device answer: DATA.
'''
input = bytearray.fromhex(flipfour("4444"))
prt.write(input)
print("Sent: Request last data (TRANS_ENABLE).")
def send_REMOVE_FILE(prt):
''' Delete saved data (x5555).
Expected device answer: STATE.
'''
input = bytearray.fromhex(flipfour("5555"))
prt.write(input)
print("Sent: Delete saved data (REMOVE_FILE).")
pass
def send_STATE(prt):
''' Request state (x6666).
Expected device answer: STATE.
'''
input = bytearray.fromhex(flipfour("6666"))
prt.write(input)
print("Sent: Request state (STATE).")
pass
def send_AD9833(prt, bytestring):
''' Start/stop AD9833 output with triangle (ramp) mode (0x8888 + ...).
Expected device answer: STATE.
'''
if len(bytestring) != AD9833_CMD_TOTAL_LENGTH:
print("Error. Wrong parameter string for AD9833 command.")
return None
prt.write(bytestring)
print("Sent: AD9833 ramp command.")
# ---- Getting data
def get_STATE(prt):
''' Get decoded state of the device in byte format (2 bytes).
'''
print("Received "+str(prt.inWaiting())+" bytes.")
if prt.inWaiting()!=2:
print("Error. Couldn't get STATE data. prt.inWaiting():", prt.inWaiting())
print("Flushing input data:", prt.read(prt.inWaiting()))
# print("Flushing input data:", prt.read(2), prt.read(2))
return None
out_bytes = prt.read(2)
return out_bytes
def get_DATA(prt):
''' Get decoded state of the device in byte format (426 bytes).
'''
print("Received "+str(prt.inWaiting())+" bytes.\n")
if prt.inWaiting()!=GET_DATA_TOTAL_LENGTH:
print("Error. Couldn't get DATA data.")
print("receiven data len:", prt.inWaiting())
return None
out_bytes = prt.read(GET_DATA_TOTAL_LENGTH)
return out_bytes
#### ---- Interacting with device: decode/encode messages
# ---- Encoding functions
def CalculateCRC(data):
CRC_input = []
for i in range(1,int(len(data)/4)):
CRC_input.append(data[4*i:4*i+4])
return crc(CRC_input)
def encode_Setup():
bits=['0']*16
bits[15] = "1" # enable work
bits[14] = "1" # enable 5v1
bits[13] = "1" # enable 5v2
bits[12] = "1" # enable LD1
bits[11] = "1" # enable LD2
bits[10] = "1" # enable REF1
bits[9] = "1" # enable REF2
bits[8] = "1" # enable TEC1
bits[7] = "1" # enable TEC2
bits[6] = "1" # enable temp stab 1
bits[5] = "1" # enable temp stab 2
bits[4] = "0" # enable sd save
bits[3] = "1" # enable PI1 coef read
bits[2] = "1" # enable PI2 coef read
bits[1] = "0" # reserved
bits[0] = "0" # reserved
s="".join([str(i) for i in bits])
return hex(int(s,2))[2:]
def create_TaskEnableCommand(sending_param):
data = flipfour("7777") # Word 0
data += flipfour(encode_Setup()) # Word 1
data += flipfour(int_to_hex(sending_param['TaskType'])) # Word 2
match sending_param['TaskType']:
case TaskType.ChangeCurrentLD1.value:
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['MinC1']))) # Word 3
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['MaxC1']))) # Word 4
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['DeltaC1']))) # Word 5
data += flipfour(int_to_hex(int(sending_param['Dt']*100))) # Word 6
data += flipfour(int_to_hex(cnv.conv_T_C_to_N(sending_param['T1']))) # Word 7
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['I2']))) # Word 8
data += flipfour(int_to_hex(cnv.conv_T_C_to_N(sending_param['T2']))) # Word 9
case TaskType.ChangeCurrentLD2.value:
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['MinC2']))) # Word 3
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['MaxC2']))) # Word 4
data += flipfour(int_to_hex(int(sending_param['DeltaC2']*100))) # Word 5
data += flipfour(int_to_hex(int(sending_param['Dt']*100))) # Word 6
data += flipfour(int_to_hex(cnv.conv_T_C_to_N(sending_param['T2']))) # Word 7
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['I1']))) # Word 8
data += flipfour(int_to_hex(cnv.conv_T_C_to_N(sending_param['T1']))) # Word 9
case TaskType.ChangeTemperatureLD1:
raise Exception("Temperature changing is not implemented yet")
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['MinT1']))) # Word 3
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['MaxT1']))) # Word 4
data += flipfour(int_to_hex(sending_param['DeltaT1']*100)) # Word 5
data += flipfour(int_to_hex(sending_param['Dt']*100)) # Word 6
data += flipfour(int_to_hex(cnv.conv_T_C_to_N(sending_param['I1']))) # Word 7
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['I2']))) # Word 8
data += flipfour(int_to_hex(cnv.conv_T_C_to_N(sending_param['T2']))) # Word 9
case TaskType.ChangeTemperatureLD2:
raise Exception("Temperature changing is not implemented yet")
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['MinT2']))) # Word 3
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['MaxT2']))) # Word 4
data += flipfour(int_to_hex(sending_param['DeltaT2']*100)) # Word 5
data += flipfour(int_to_hex(sending_param['Dt']*100)) # Word 6
data += flipfour(int_to_hex(cnv.conv_T_C_to_N(sending_param['I2']))) # Word 7
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['I1']))) # Word 8
data += flipfour(int_to_hex(cnv.conv_T_C_to_N(sending_param['T1']))) # Word 9
case _:
raise Exception(f"Undefined TaskType:{sending_param['TaskType']}")
data += flipfour(int_to_hex(int(sending_param['Tau']))) # Word 10
data += flipfour(int_to_hex(sending_param['ProportionalCoeff_1'])) # Word 11
data += flipfour(int_to_hex(sending_param['IntegralCoeff_1'])) # Word 12
data += flipfour(int_to_hex(sending_param['ProportionalCoeff_2'])) # Word 13
data += flipfour(int_to_hex(sending_param['IntegralCoeff_2'])) # Word 14
data += CalculateCRC(data) # Word 15
return bytearray.fromhex(data)
def calc_saw_step_for_freq(freq_hz: float, dac_clk_hz: float, triangle: bool):
if freq_hz <= 0 or dac_clk_hz is None or dac_clk_hz <= 0:
return AD9102_SAW_STEP_DEFAULT
n = 2 if triangle else 1
step = int(round(dac_clk_hz / (freq_hz * n * 16384.0)))
if step < 1:
step = 1
if step > 63:
step = 63
return step
def calc_pat_period_for_duty(saw_step: int, duty: float, pat_period_base: int, triangle: bool):
if duty is None or duty <= 0 or duty > 1.0:
return AD9102_PAT_PERIOD_DEFAULT
n = 2 if triangle else 1
base_cycles = 16 if pat_period_base == 0 else pat_period_base
ramp_cycles = n * 16384 * max(1, min(63, saw_step))
pat_period = int(round(ramp_cycles / (duty * base_cycles)))
if pat_period < 1:
pat_period = 1
if pat_period > 0xFFFF:
pat_period = 0xFFFF
return pat_period
def calc_sram_samples_for_freq(freq_hz: float, dac_clk_hz: float, hold: int = None):
if hold is None or hold <= 0:
hold = AD9102_SRAM_HOLD_DEFAULT
if freq_hz is None or freq_hz <= 0 or dac_clk_hz is None or dac_clk_hz <= 0:
return AD9102_SRAM_SAMPLES_DEFAULT
samples = int(round(dac_clk_hz / (freq_hz * hold)))
if samples < 2:
samples = 2
if samples > 4096:
samples = 4096
return samples
def create_AD9833_ramp_command(saw_step: int = None,
pat_period: int = None,
pat_period_base: int = None,
enable: bool = True,
triangle: bool = True,
sram_mode: bool = False,
sram_samples: int = None,
sram_hold: int = None):
flags = 0
if enable:
flags |= 0x0001
if triangle:
flags |= 0x0002
if sram_mode:
flags |= AD9102_FLAG_SRAM
if sram_mode:
if sram_samples is None:
sram_samples = AD9102_SRAM_SAMPLES_DEFAULT
if sram_samples < 2:
sram_samples = 2
if sram_samples > 4096:
sram_samples = 4096
if sram_hold is None or sram_hold <= 0:
sram_hold = AD9102_SRAM_HOLD_DEFAULT
if sram_hold > 0x0F:
sram_hold = 0x0F
param0 = sram_samples & 0xFFFF
param1 = sram_hold & 0x000F
else:
if saw_step is None:
saw_step = AD9102_SAW_STEP_DEFAULT
if pat_period is None:
pat_period = AD9102_PAT_PERIOD_DEFAULT
if pat_period_base is None:
pat_period_base = AD9102_PAT_PERIOD_BASE_DEFAULT
if saw_step < 1:
saw_step = 1
if saw_step > 63:
saw_step = 63
if pat_period < 0:
pat_period = 0
if pat_period > 0xFFFF:
pat_period = 0xFFFF
if pat_period_base < 0:
pat_period_base = 0
if pat_period_base > 0x0F:
pat_period_base = 0x0F
param0 = ((pat_period_base & 0x0F) << 8) | (saw_step & 0xFF)
param1 = pat_period
crc_word = flags ^ param0 ^ param1
data = flipfour(AD9833_CMD_HEADER) # Word 0 (header)
data += flipfour(int_to_hex(flags))
data += flipfour(int_to_hex(param0))
data += flipfour(int_to_hex(param1))
data += flipfour(int_to_hex(crc_word))
return bytearray.fromhex(data)
def encode_Input(params):
if params is None:
return bytearray.fromhex("1111"+"00"*14)
data = flipfour("1111") # Word 0
data += flipfour(encode_Setup()) # Word 1
data += flipfour(int_to_hex(cnv.conv_T_C_to_N(params['Temp_1']))) # Word 2
data += flipfour(int_to_hex(cnv.conv_T_C_to_N(params['Temp_2']))) # Word 3
data += flipfour("0000")*3 # Words 4-6
data += flipfour(int_to_hex(params['ProportionalCoeff_1'])) # Word 7
data += flipfour(int_to_hex(params['IntegralCoeff_1'])) # Word 8
data += flipfour(int_to_hex(params['ProportionalCoeff_2'])) # Word 9
data += flipfour(int_to_hex(params['IntegralCoeff_2'])) # Word 10
data += flipfour(params['Message_ID']) # Word 11
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(params['Iset_1']))) # Word 12
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(params['Iset_2']))) # Word 13
CRC_input = []
for i in range(1,int(len(data)/4)):
CRC_input.append(data[4*i:4*i+4])
CRC = crc(CRC_input)
data += CRC # Word 14
return bytearray.fromhex(data)
# ---- Decoding functions
def decode_STATE(state):
st = flipfour(state)
if st is None or len(st) != 4:
return "Error: invalid STATE length."
hi = int(st[0:2], 16)
lo = int(st[2:4], 16)
errors = []
if lo & 0x01:
errors.append("SD Card reading/writing error (SD_ERR)")
if lo & 0x02:
errors.append("Command error (UART_ERR)")
if lo & 0x04:
errors.append("Wrong parameter value error (UART_DECODE_ERR)")
if lo & 0x08:
errors.append("Laser 1: TEC driver overheat (TEC1_ERR)")
if lo & 0x10:
errors.append("Laser 2: TEC driver overheat (TEC2_ERR)")
if lo & 0x20:
errors.append("Resetting system error (DEFAULT_ERR)")
if lo & 0x40:
errors.append("File deletion error (REMOVE_ERR)")
if lo & 0x80:
errors.append("AD9102 status check failed (AD9102_ERR)")
if not errors:
status = "All ok."
else:
status = "; ".join(errors)
if hi != 0:
status += f" | AD9102_PAT_STATUS=0x{hi:02X}"
return status
def decode_DATA(dh):
def get_word(s,num):
return flipfour(s[num*2*2:num*2*2+4])
def get_int_word(s,num):
return int(get_word(s,num),16)
data = {}
data['datetime'] = datetime.now()
data['Header'] = get_word(dh, 0)
data['I1'] = cnv.conv_I_N_to_mA(get_int_word(dh, 1)) #LD1_param.POWER
data['I2'] = cnv.conv_I_N_to_mA(get_int_word(dh, 2)) #LD2_param.POWER
data['TO_LSB'] = get_int_word(dh, 3) #TO6_counter_LSB
data['TO_MSB'] = get_int_word(dh, 4) #TO6_counter_MSB
data['Temp_1'] = cnv.conv_T_N_to_C(get_int_word(dh, 5)) #LD1_param.LD_CURR_TEMP
data['Temp_2'] = cnv.conv_T_N_to_C(get_int_word(dh, 6)) #LD2_param.LD_CURR_TEMP
data['Temp_Ext_1'] = cnv.conv_TExt_N_to_C(get_int_word(dh, 7)) #U_Rt1_ext_Gain
data['Temp_Ext_2'] = cnv.conv_TExt_N_to_C(get_int_word(dh, 8)) #U_Rt2_ext_Gain
data['MON_3V3'] = cnv.conv_U3V3_N_to_V(get_int_word(dh, 9)) #3V_monitor
data['MON_5V1'] = cnv.conv_U5V_N_to_V(get_int_word(dh, 10)) #5V1_monitor
data['MON_5V2'] = cnv.conv_U5V_N_to_V(get_int_word(dh, 11)) #5V2_monitor
data['MON_7V0'] = cnv.conv_U7V_N_to_V(get_int_word(dh, 12)) #7V_monitor
data['Message_ID'] = get_word(dh, 13) # Last received command
data['CRC'] = get_word(dh, 14)
return data