Files
RadioPhotonic_PCB_PC_software/device_interaction.py
2026-02-18 16:04:20 +03:00

240 lines
7.0 KiB
Python

import time
from datetime import datetime
import device_commands as cmd
#### ---- Constants
WAIT_AFTER_SEND = 0.15 # Wait after sending command before requesting input (s).
#### ---- High-level port commands
def create_port_connection():
prt = None
print()
ports = [port for port, _, _ in sorted(cmd.list_ports.comports())]
# Linux-only preference: use USB UART ports first.
usb_ports = [port for port in ports if "USB" in port]
if usb_ports:
ports = usb_ports
for port in ports:
try:
print("PORT:", port)
prt = cmd.setup_port_connection(port=port, baudrate=115200, timeout_sec=1)
cmd.open_port(prt)
reset_port_settings(prt)
return prt
except Exception:
if prt is not None:
try:
prt.close()
except Exception:
pass
continue
return None
def _print_state_reply(state_bytes):
if state_bytes is None:
return False
status = state_bytes.hex()
print("Received: STATE. State status:", cmd.decode_STATE(status), "(" + cmd.flipfour(status) + ")")
print("")
return True
def reset_port_settings(prt):
cmd.send_DEFAULT_ENABLE(prt)
time.sleep(WAIT_AFTER_SEND)
return _print_state_reply(cmd.get_STATE(prt))
def request_state(prt):
cmd.send_STATE(prt)
time.sleep(WAIT_AFTER_SEND)
return _print_state_reply(cmd.get_STATE(prt))
def send_control_parameters(prt, params):
hexstring = cmd.encode_Input(params)
cmd.send_DECODE_ENABLE(prt, hexstring)
time.sleep(WAIT_AFTER_SEND)
return _print_state_reply(cmd.get_STATE(prt))
def send_task_command(prt, sending_param):
# Send task command (TASK_ENABLE state in firmware)
hexstring = cmd.create_TaskEnableCommand(sending_param)
cmd.send_TASK_ENABLE(prt, hexstring)
time.sleep(WAIT_AFTER_SEND)
return _print_state_reply(cmd.get_STATE(prt))
def start_ramp_max(
prt,
freq_hz=None,
duty=None,
saw_step=None,
pat_period=None,
pat_period_base=None,
dac_clk_hz=None,
triangle=True,
sram_mode=False,
sram_samples=None,
sram_hold=None,
sram_amplitude=None,
):
# Start AD9102 sawtooth with configurable frequency/duty or SRAM ramp mode
if sram_mode:
if sram_hold is None:
sram_hold = cmd.AD9102_SRAM_HOLD_DEFAULT
if sram_samples is None and freq_hz is not None:
if dac_clk_hz is None:
dac_clk_hz = cmd.AD9102_DAC_CLK_HZ
sram_samples = cmd.calc_sram_samples_for_freq(freq_hz, dac_clk_hz, sram_hold)
hexstring = cmd.create_AD9102_ramp_command(
enable=True,
triangle=triangle,
sram_mode=True,
sram_samples=sram_samples,
sram_hold=sram_hold,
sram_amplitude=sram_amplitude,
)
else:
if pat_period_base is None:
pat_period_base = cmd.AD9102_PAT_PERIOD_BASE_DEFAULT
if saw_step is None and freq_hz is not None:
if dac_clk_hz is None:
dac_clk_hz = cmd.AD9102_DAC_CLK_HZ
saw_step = cmd.calc_saw_step_for_freq(freq_hz, dac_clk_hz, triangle)
if saw_step is None:
saw_step = cmd.AD9102_SAW_STEP_DEFAULT
if pat_period is None and duty is not None:
pat_period = cmd.calc_pat_period_for_duty(saw_step, duty, pat_period_base, triangle)
if pat_period is None:
pat_period = cmd.AD9102_PAT_PERIOD_DEFAULT
hexstring = cmd.create_AD9102_ramp_command(
saw_step,
pat_period,
pat_period_base,
enable=True,
triangle=triangle,
)
cmd.send_AD9102(prt, hexstring)
time.sleep(WAIT_AFTER_SEND)
return _print_state_reply(cmd.get_STATE(prt))
def start_ad9833_ramp(prt, freq_hz=None, mclk_hz=None, triangle=True, enable=True):
if freq_hz is None:
freq_hz = 0.0
hexstring = cmd.create_AD9833_ramp_command(
freq_hz=freq_hz,
mclk_hz=mclk_hz,
enable=enable,
triangle=triangle,
)
cmd.send_AD9833(prt, hexstring)
time.sleep(WAIT_AFTER_SEND)
return _print_state_reply(cmd.get_STATE(prt))
def set_stm32_dac(prt, dac_code, enable=True):
hexstring = cmd.create_STM32_DAC_command(dac_code=dac_code, enable=enable)
cmd.send_STM32_DAC(prt, hexstring)
time.sleep(WAIT_AFTER_SEND)
return _print_state_reply(cmd.get_STATE(prt))
def _wait_for_min_bytes(prt, expected_len, timeout_s, poll_s=0.01):
deadline = time.time() + timeout_s
while time.time() < deadline:
waiting = prt.inWaiting()
if waiting >= expected_len:
return True
time.sleep(poll_s)
return prt.inWaiting() >= expected_len
def send_ds1809_pulse(prt, uc=False, dc=False, count=1, pulse_ms=None):
if count is None or count <= 0:
count = 1
if pulse_ms is None or pulse_ms <= 0:
pulse_ms = cmd.DS1809_PULSE_MS_DEFAULT
hexstring = cmd.create_DS1809_pulse_command(uc=uc, dc=dc, count=count, pulse_ms=pulse_ms)
cmd.send_DS1809(prt, hexstring)
# Firmware blocks while pulsing DS1809 lines: wait pulse train + safe margin.
pulse_train_time = (2.0 * float(count) * float(pulse_ms)) / 1000.0
time.sleep(max(WAIT_AFTER_SEND, pulse_train_time + 0.35))
# Then poll shortly for STATE bytes; this avoids early read (0 bytes) on startup.
_wait_for_min_bytes(prt, expected_len=2, timeout_s=0.8)
return _print_state_reply(cmd.get_STATE(prt))
def request_data(prt):
cmd.send_TRANS_ENABLE(prt)
time.sleep(WAIT_AFTER_SEND)
data_bytes = cmd.get_DATA(prt)
if data_bytes is None:
return None
return cmd.decode_DATA(data_bytes.hex())
def print_data(data):
def shorten(i):
return str(round(i, 2))
print("Data from device (time: " + datetime.now().strftime("%H:%M:%S:%f") + "):")
print("Message Header:", data["Header"], " Message ID:", data["Message_ID"])
print(
"Photodiode Current 1 (" + str(len(data["I1"])) + " values):",
shorten(data["I1"]),
shorten(data["I1"][1]),
"...",
shorten(data["I1"]),
shorten(data["I1"][-1]),
"mA",
)
print(
"Photodiode Current 2 (" + str(len(data["I2"])) + " values):",
shorten(data["I2"]),
shorten(data["I2"][1]),
"...",
shorten(data["I2"]),
shorten(data["I2"][-1]),
"mA",
)
print("Laser Temperature 1:", shorten(data["Temp_1"]), "C")
print("Laser Temperature 2:", shorten(data["Temp_2"]), "C")
print("Temperature of external thermistor 1:", shorten(data["Temp_Ext_1"]), "C")
print("Temperature of external thermistor 2:", shorten(data["Temp_Ext_2"]), "C")
print(
"Voltages 3V3: "
+ shorten(data["MON_3V3"])
+ "V 5V1: "
+ shorten(data["MON_5V1"])
+ "V 5V2: "
+ shorten(data["MON_5V2"])
+ "V 7V0: "
+ shorten(data["MON_7V0"])
+ "V."
)
def close_connection(prt):
cmd.close_port(prt)