169 lines
5.2 KiB
Python
169 lines
5.2 KiB
Python
|
|
|
|
import time
|
|
|
|
from datetime import datetime
|
|
from . import device_commands as cmd
|
|
|
|
|
|
#### ---- Constants
|
|
|
|
WAIT_AFTER_SEND = 0.15 # Wait after sending command, before requesting input (in seconds).
|
|
|
|
|
|
#### ---- High-level port commands
|
|
'''
|
|
def create_port_connection():
|
|
prt = None
|
|
for port, _, _ in sorted(cmd.list_ports.comports()):
|
|
try:
|
|
prt = cmd.setup_port_connection(port=port, baudrate=115200, timeout_sec=1)
|
|
cmd.open_port(prt)
|
|
reset_port_settings(prt)
|
|
except:
|
|
prt.close()
|
|
continue
|
|
break
|
|
return prt
|
|
'''
|
|
def create_port_connection():
|
|
prt = None
|
|
print()
|
|
ports = []
|
|
for port, _,_ in sorted(cmd.list_ports.comports()):
|
|
ports.append(port)
|
|
|
|
#ONLY FOR LINUX!!!
|
|
have_ttyUSB = False
|
|
USB_ports = []
|
|
for port in ports:
|
|
if "USB" in port:
|
|
USB_ports.append(port)
|
|
if len(USB_ports):
|
|
ports = USB_ports
|
|
# print("ports:", ports)
|
|
|
|
|
|
# for port, _, _ in sorted(cmd.list_ports.comports()):
|
|
for port in ports:
|
|
try:
|
|
print("PORT:", port)
|
|
prt = cmd.setup_port_connection(port=port, baudrate=115200, timeout_sec=1)
|
|
cmd.open_port(prt)
|
|
reset_port_settings(prt)
|
|
except:
|
|
prt.close()
|
|
continue
|
|
break
|
|
return prt
|
|
|
|
|
|
|
|
|
|
# def setup_connection():
|
|
# prt = cmd.setup_port_connection()
|
|
# cmd.open_port(prt)
|
|
# return prt
|
|
|
|
|
|
def reset_port_settings(prt):
|
|
# Reset port settings and check status
|
|
# First, flush any pending data in the input buffer
|
|
if prt.inWaiting() > 0:
|
|
flushed = prt.read(prt.inWaiting())
|
|
print(f"Flushed {len(flushed)} bytes before reset: {flushed.hex()}")
|
|
|
|
cmd.send_DEFAULT_ENABLE(prt)
|
|
|
|
# Try multiple times with increasing delays to get response
|
|
max_attempts = 3
|
|
for attempt in range(max_attempts):
|
|
wait_time = WAIT_AFTER_SEND * (attempt + 1) # Increase wait time with each attempt
|
|
time.sleep(wait_time)
|
|
|
|
status_bytes = cmd.get_STATE(prt)
|
|
if status_bytes is not None:
|
|
status = status_bytes.hex()
|
|
print("Received: STATE. State status:", cmd.decode_STATE(status), "("+cmd.flipfour(status)+")")
|
|
print("")
|
|
return # Success
|
|
|
|
if attempt < max_attempts - 1:
|
|
print(f"Attempt {attempt + 1} failed, retrying with longer delay...")
|
|
|
|
# If all attempts failed, print warning but don't raise exception
|
|
print("Warning: Could not get STATE response after reset, but device may still be reset.")
|
|
print("")
|
|
|
|
|
|
def request_state(prt):
|
|
# Request data
|
|
cmd.send_STATE(prt)
|
|
time.sleep(WAIT_AFTER_SEND)
|
|
status_bytes = cmd.get_STATE(prt)
|
|
if status_bytes is not None:
|
|
status = status_bytes.hex()
|
|
print("Received: STATE. State status:", cmd.decode_STATE(status), "("+cmd.flipfour(status)+")")
|
|
print("")
|
|
|
|
|
|
def send_control_parameters(prt, params):
|
|
# Send control parameters
|
|
hexstring = cmd.encode_Input(params)
|
|
cmd.send_DECODE_ENABLE(prt,hexstring)
|
|
time.sleep(WAIT_AFTER_SEND)
|
|
status_bytes = cmd.get_STATE(prt)
|
|
if status_bytes is not None:
|
|
status = status_bytes.hex()
|
|
print("Received: STATE. State status:", cmd.decode_STATE(status), "("+cmd.flipfour(status)+")")
|
|
print("")
|
|
else:
|
|
print("")
|
|
|
|
def send_task_command(prt, sending_param):
|
|
# Send task command (TASK_ENABLE state in firmware)
|
|
hexstring = cmd.create_TaskEnableCommand(sending_param)
|
|
cmd.send_TASK_ENABLE(prt,hexstring)
|
|
time.sleep(WAIT_AFTER_SEND)
|
|
status_bytes = cmd.get_STATE(prt)
|
|
if status_bytes is not None:
|
|
status = status_bytes.hex()
|
|
print("Received: STATE. State status:", cmd.decode_STATE(status), "("+cmd.flipfour(status)+")")
|
|
print("")
|
|
else:
|
|
print("")
|
|
|
|
def request_data(prt):
|
|
# Request data
|
|
cmd.send_TRANS_ENABLE(prt)
|
|
time.sleep(WAIT_AFTER_SEND)
|
|
data_bytes = cmd.get_DATA(prt)
|
|
data_dict = []
|
|
if data_bytes is not None:
|
|
data = data_bytes.hex()
|
|
data_dict = cmd.decode_DATA(data)
|
|
return data_dict
|
|
|
|
|
|
def print_data(data):
|
|
def shorten(i):
|
|
return str(round(i, 2))
|
|
|
|
print("Data from device (time: "+datetime.now().strftime("%H:%M:%S:%f")+"):")
|
|
print("Message Header:", data['Header'], " Message ID:", data['Message_ID'])
|
|
print("Photodiode Current 1 ("+str(len(data['I1']))+" values):", \
|
|
shorten(data['I1']), shorten(data['I1'][1]), "...", \
|
|
shorten(data['I1']), shorten(data['I1'][-1]), "mA")
|
|
print("Photodiode Current 2 ("+str(len(data['I2']))+" values):", \
|
|
shorten(data['I2']), shorten(data['I2'][1]), "...", \
|
|
shorten(data['I2']), shorten(data['I2'][-1]), "mA")
|
|
print("Laser Temperature 1:", shorten(data['Temp_1']), "C")
|
|
print("Laser Temperature 2:", shorten(data['Temp_2']), "C")
|
|
print("Temperature of external thermistor 1:", shorten(data['Temp_Ext_1']), "C")
|
|
print("Temperature of external thermistor 2:", shorten(data['Temp_Ext_2']), "C")
|
|
print("Voltages 3V3: "+shorten(data['MON_3V3'])+"V 5V1: "+shorten(data['MON_5V1'])+ \
|
|
"V 5V2: "+shorten(data['MON_5V2'])+"V 7V0: "+shorten(data['MON_7V0'])+"V.")
|
|
|
|
def close_connection(prt):
|
|
cmd.close_port(prt)
|