392 lines
13 KiB
Python
392 lines
13 KiB
Python
import time
|
|
import json
|
|
from datetime import datetime
|
|
|
|
import device_commands as cmd
|
|
|
|
|
|
#### ---- Constants
|
|
|
|
WAIT_AFTER_SEND = 0.15 # Wait after sending command before requesting input (s).
|
|
FAST_STATE_TIMEOUT_S = 0.4
|
|
FAST_STATE_POLL_S = 0.005
|
|
|
|
|
|
#### ---- High-level port commands
|
|
|
|
def _port_sort_key(port_info):
|
|
device = str(getattr(port_info, "device", "") or "").lower()
|
|
description = str(getattr(port_info, "description", "") or "").lower()
|
|
hwid = str(getattr(port_info, "hwid", "") or "").lower()
|
|
|
|
is_usb = ("usb" in device) or ("acm" in device) or ("usb" in description) or ("vid:pid" in hwid)
|
|
is_builtin_uart = device.startswith("/dev/ttys")
|
|
|
|
return (
|
|
0 if is_usb else 1,
|
|
1 if is_builtin_uart else 0,
|
|
device,
|
|
)
|
|
|
|
|
|
def _is_preferred_serial_port(port_info):
|
|
device = str(getattr(port_info, "device", "") or "").lower()
|
|
description = str(getattr(port_info, "description", "") or "").lower()
|
|
hwid = str(getattr(port_info, "hwid", "") or "").lower()
|
|
return ("usb" in device) or ("acm" in device) or ("usb" in description) or ("vid:pid" in hwid)
|
|
|
|
|
|
def create_port_connection():
|
|
prt = None
|
|
port_infos = list(cmd.list_ports.comports())
|
|
preferred_ports = [port_info for port_info in port_infos if _is_preferred_serial_port(port_info)]
|
|
if preferred_ports:
|
|
port_infos = preferred_ports
|
|
|
|
for port_info in sorted(port_infos, key=_port_sort_key):
|
|
port = getattr(port_info, "device", None)
|
|
if not port:
|
|
continue
|
|
try:
|
|
prt = cmd.setup_port_connection(port=port, baudrate=115200, timeout_sec=1)
|
|
cmd.open_port(prt)
|
|
if not reset_port_settings(prt):
|
|
raise RuntimeError(f"No valid STATE reply on {port}")
|
|
except Exception:
|
|
if prt is not None and prt.is_open:
|
|
prt.close()
|
|
prt = None
|
|
continue
|
|
break
|
|
return prt
|
|
|
|
|
|
def _print_state_reply(state_bytes):
|
|
if state_bytes is None:
|
|
return False
|
|
|
|
status = state_bytes.hex()
|
|
print("Received: STATE. State status:", cmd.decode_STATE(status), "(" + cmd.flipfour(status) + ")")
|
|
print("")
|
|
return True
|
|
|
|
|
|
def _decode_state_word(state_bytes):
|
|
if state_bytes is None:
|
|
return None
|
|
try:
|
|
return int(cmd.flipfour(state_bytes.hex()), 16)
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _state_has_errors(state_bytes):
|
|
state_word = _decode_state_word(state_bytes)
|
|
if state_word is None:
|
|
return True
|
|
return (state_word & 0x00FF) != 0
|
|
|
|
|
|
def _state_message(state_bytes):
|
|
if state_bytes is None:
|
|
return "STATE reply not received."
|
|
return cmd.decode_STATE(state_bytes.hex())
|
|
|
|
|
|
def _wait_for_state_reply(prt, timeout_s=WAIT_AFTER_SEND, poll_s=0.01, quiet=False):
|
|
if not _wait_for_min_bytes(prt, expected_len=2, timeout_s=timeout_s, poll_s=poll_s):
|
|
if not quiet:
|
|
print("Error. Timed out waiting for STATE.")
|
|
return None
|
|
|
|
state_bytes = cmd.get_STATE(prt, quiet=quiet)
|
|
if quiet:
|
|
return state_bytes
|
|
_print_state_reply(state_bytes)
|
|
return state_bytes
|
|
|
|
|
|
def _rollback_ad9102_waveform_upload(prt):
|
|
try:
|
|
cancel_cmd = cmd.create_AD9102_waveform_control_command(cmd.AD9102_WAVE_OPCODE_CANCEL)
|
|
cmd.send_AD9102_waveform_control(prt, cancel_cmd, quiet=True)
|
|
state_bytes = _wait_for_state_reply(prt, timeout_s=FAST_STATE_TIMEOUT_S,
|
|
poll_s=FAST_STATE_POLL_S, quiet=True)
|
|
if state_bytes is not None and not _state_has_errors(state_bytes):
|
|
return
|
|
except Exception:
|
|
pass
|
|
|
|
reset_port_settings(prt)
|
|
|
|
|
|
def reset_port_settings(prt):
|
|
# Reset port settings and check status
|
|
try:
|
|
prt.reset_input_buffer()
|
|
prt.reset_output_buffer()
|
|
except Exception:
|
|
pass
|
|
|
|
cmd.send_DEFAULT_ENABLE(prt)
|
|
state_bytes = _wait_for_state_reply(prt, timeout_s=max(WAIT_AFTER_SEND, 0.4), poll_s=0.01, quiet=True)
|
|
if state_bytes is not None:
|
|
status = state_bytes.hex()
|
|
print("Received: STATE. State status:", cmd.decode_STATE(status), "(" + cmd.flipfour(status) + ")")
|
|
print("")
|
|
return True
|
|
return False
|
|
|
|
|
|
def request_state(prt):
|
|
cmd.send_STATE(prt)
|
|
time.sleep(WAIT_AFTER_SEND)
|
|
return _print_state_reply(cmd.get_STATE(prt))
|
|
|
|
|
|
def send_control_parameters(prt, params):
|
|
hexstring = cmd.encode_Input(params)
|
|
cmd.send_DECODE_ENABLE(prt, hexstring)
|
|
time.sleep(WAIT_AFTER_SEND)
|
|
return _print_state_reply(cmd.get_STATE(prt))
|
|
|
|
|
|
def send_task_command(prt, sending_param):
|
|
# Send task command (TASK_ENABLE state in firmware)
|
|
hexstring = cmd.create_TaskEnableCommand(sending_param)
|
|
cmd.send_TASK_ENABLE(prt, hexstring)
|
|
time.sleep(WAIT_AFTER_SEND)
|
|
return _print_state_reply(cmd.get_STATE(prt))
|
|
|
|
|
|
def start_ramp_max(
|
|
prt,
|
|
freq_hz=None,
|
|
duty=None,
|
|
saw_step=None,
|
|
pat_period=None,
|
|
pat_period_base=None,
|
|
dac_clk_hz=None,
|
|
triangle=True,
|
|
sram_mode=False,
|
|
sram_samples=None,
|
|
sram_hold=None,
|
|
sram_amplitude=None,
|
|
):
|
|
# Start AD9102 sawtooth with configurable frequency/duty or SRAM ramp mode
|
|
if sram_mode:
|
|
if sram_hold is None:
|
|
sram_hold = cmd.AD9102_SRAM_HOLD_DEFAULT
|
|
if sram_samples is None and freq_hz is not None:
|
|
if dac_clk_hz is None:
|
|
dac_clk_hz = cmd.AD9102_DAC_CLK_HZ
|
|
sram_samples = cmd.calc_sram_samples_for_freq(freq_hz, dac_clk_hz, sram_hold)
|
|
hexstring = cmd.create_AD9102_ramp_command(
|
|
enable=True,
|
|
triangle=triangle,
|
|
sram_mode=True,
|
|
sram_samples=sram_samples,
|
|
sram_hold=sram_hold,
|
|
sram_amplitude=sram_amplitude,
|
|
)
|
|
else:
|
|
if pat_period_base is None:
|
|
pat_period_base = cmd.AD9102_PAT_PERIOD_BASE_DEFAULT
|
|
if saw_step is None and freq_hz is not None:
|
|
if dac_clk_hz is None:
|
|
dac_clk_hz = cmd.AD9102_DAC_CLK_HZ
|
|
saw_step = cmd.calc_saw_step_for_freq(freq_hz, dac_clk_hz, triangle)
|
|
if saw_step is None:
|
|
saw_step = cmd.AD9102_SAW_STEP_DEFAULT
|
|
if pat_period is None and duty is not None:
|
|
pat_period = cmd.calc_pat_period_for_duty(saw_step, duty, pat_period_base, triangle)
|
|
if pat_period is None:
|
|
pat_period = cmd.AD9102_PAT_PERIOD_DEFAULT
|
|
hexstring = cmd.create_AD9102_ramp_command(
|
|
saw_step,
|
|
pat_period,
|
|
pat_period_base,
|
|
enable=True,
|
|
triangle=triangle,
|
|
)
|
|
|
|
cmd.send_AD9102(prt, hexstring)
|
|
time.sleep(WAIT_AFTER_SEND)
|
|
return _print_state_reply(cmd.get_STATE(prt))
|
|
|
|
|
|
def start_ad9833_ramp(prt, freq_hz=None, mclk_hz=None, triangle=True, enable=True):
|
|
if freq_hz is None:
|
|
freq_hz = 0.0
|
|
hexstring = cmd.create_AD9833_ramp_command(
|
|
freq_hz=freq_hz,
|
|
mclk_hz=mclk_hz,
|
|
enable=enable,
|
|
triangle=triangle,
|
|
)
|
|
cmd.send_AD9833(prt, hexstring)
|
|
time.sleep(WAIT_AFTER_SEND)
|
|
return _print_state_reply(cmd.get_STATE(prt))
|
|
|
|
|
|
def set_stm32_dac(prt, dac_code, enable=True):
|
|
hexstring = cmd.create_STM32_DAC_command(dac_code=dac_code, enable=enable)
|
|
cmd.send_STM32_DAC(prt, hexstring)
|
|
time.sleep(WAIT_AFTER_SEND)
|
|
return _print_state_reply(cmd.get_STATE(prt))
|
|
|
|
|
|
def load_ad9102_waveform_file(filepath):
|
|
try:
|
|
with open(filepath, "r", encoding="utf-8") as fh:
|
|
payload = json.load(fh)
|
|
except FileNotFoundError as exc:
|
|
raise ValueError(f"Waveform file not found: {filepath}") from exc
|
|
except json.JSONDecodeError as exc:
|
|
raise ValueError(f"Invalid JSON in waveform file: {exc.msg}") from exc
|
|
|
|
if not isinstance(payload, list):
|
|
raise ValueError("Waveform file must contain a JSON array.")
|
|
|
|
if len(payload) < 2 or len(payload) > cmd.AD9102_WAVE_MAX_SAMPLES:
|
|
raise ValueError(f"Waveform length must be within [2, {cmd.AD9102_WAVE_MAX_SAMPLES}].")
|
|
|
|
samples = []
|
|
for index, sample in enumerate(payload):
|
|
if isinstance(sample, bool) or not isinstance(sample, int):
|
|
raise ValueError(f"Waveform sample #{index} is not an integer.")
|
|
if sample < cmd.AD9102_SAMPLE_MIN or sample > cmd.AD9102_SAMPLE_MAX:
|
|
raise ValueError(
|
|
f"Waveform sample #{index} is out of range "
|
|
f"[{cmd.AD9102_SAMPLE_MIN}, {cmd.AD9102_SAMPLE_MAX}]."
|
|
)
|
|
samples.append(int(sample))
|
|
|
|
return samples
|
|
|
|
|
|
def upload_ad9102_waveform(prt, samples):
|
|
waveform = list(samples) if samples is not None else []
|
|
if len(waveform) < 2 or len(waveform) > cmd.AD9102_WAVE_MAX_SAMPLES:
|
|
raise ValueError(f"Waveform length must be within [2, {cmd.AD9102_WAVE_MAX_SAMPLES}].")
|
|
|
|
try:
|
|
begin_cmd = cmd.create_AD9102_waveform_control_command(
|
|
cmd.AD9102_WAVE_OPCODE_BEGIN,
|
|
param0=len(waveform),
|
|
param1=0,
|
|
)
|
|
cmd.send_AD9102_waveform_control(prt, begin_cmd, quiet=True)
|
|
state_bytes = _wait_for_state_reply(prt, timeout_s=FAST_STATE_TIMEOUT_S,
|
|
poll_s=FAST_STATE_POLL_S, quiet=True)
|
|
if state_bytes is None or _state_has_errors(state_bytes):
|
|
raise RuntimeError(f"Waveform BEGIN failed: {_state_message(state_bytes)}")
|
|
|
|
for offset in range(0, len(waveform), cmd.AD9102_WAVE_CHUNK_SAMPLES):
|
|
chunk = waveform[offset:offset + cmd.AD9102_WAVE_CHUNK_SAMPLES]
|
|
chunk_cmd = cmd.create_AD9102_waveform_data_command(chunk)
|
|
cmd.send_AD9102_waveform_data(prt, chunk_cmd, quiet=True)
|
|
state_bytes = _wait_for_state_reply(prt, timeout_s=FAST_STATE_TIMEOUT_S,
|
|
poll_s=FAST_STATE_POLL_S, quiet=True)
|
|
if state_bytes is None or _state_has_errors(state_bytes):
|
|
chunk_no = (offset // cmd.AD9102_WAVE_CHUNK_SAMPLES) + 1
|
|
raise RuntimeError(f"Waveform DATA chunk {chunk_no} failed: {_state_message(state_bytes)}")
|
|
|
|
commit_cmd = cmd.create_AD9102_waveform_control_command(cmd.AD9102_WAVE_OPCODE_COMMIT)
|
|
cmd.send_AD9102_waveform_control(prt, commit_cmd, quiet=True)
|
|
state_bytes = _wait_for_state_reply(prt, timeout_s=FAST_STATE_TIMEOUT_S,
|
|
poll_s=FAST_STATE_POLL_S, quiet=True)
|
|
if state_bytes is None or _state_has_errors(state_bytes):
|
|
raise RuntimeError(f"Waveform COMMIT failed: {_state_message(state_bytes)}")
|
|
|
|
print(f"Uploaded AD9102 waveform ({len(waveform)} samples).")
|
|
return True
|
|
except Exception:
|
|
_rollback_ad9102_waveform_upload(prt)
|
|
raise
|
|
|
|
|
|
def _wait_for_min_bytes(prt, expected_len, timeout_s, poll_s=0.01):
|
|
deadline = time.time() + timeout_s
|
|
while time.time() < deadline:
|
|
waiting = prt.inWaiting()
|
|
if waiting >= expected_len:
|
|
return True
|
|
time.sleep(poll_s)
|
|
return prt.inWaiting() >= expected_len
|
|
|
|
|
|
def send_ds1809_pulse(prt, uc=False, dc=False, count=1, pulse_ms=None):
|
|
if count is None or count <= 0:
|
|
count = 1
|
|
if pulse_ms is None or pulse_ms <= 0:
|
|
pulse_ms = cmd.DS1809_PULSE_MS_DEFAULT
|
|
|
|
hexstring = cmd.create_DS1809_pulse_command(uc=uc, dc=dc, count=count, pulse_ms=pulse_ms)
|
|
cmd.send_DS1809(prt, hexstring)
|
|
|
|
# Firmware blocks while pulsing DS1809 lines: wait pulse train + safe margin.
|
|
pulse_train_time = (2.0 * float(count) * float(pulse_ms)) / 1000.0
|
|
time.sleep(max(WAIT_AFTER_SEND, pulse_train_time + 0.35))
|
|
|
|
# Then poll shortly for STATE bytes; this avoids early read (0 bytes) on startup.
|
|
_wait_for_min_bytes(prt, expected_len=2, timeout_s=0.8)
|
|
|
|
return _print_state_reply(cmd.get_STATE(prt))
|
|
|
|
|
|
def request_data(prt):
|
|
cmd.send_TRANS_ENABLE(prt)
|
|
time.sleep(WAIT_AFTER_SEND)
|
|
|
|
data_bytes = cmd.get_DATA(prt)
|
|
if data_bytes is None:
|
|
return None
|
|
|
|
return cmd.decode_DATA(data_bytes.hex())
|
|
|
|
|
|
def print_data(data):
|
|
def shorten(i):
|
|
return str(round(i, 2))
|
|
|
|
print("Data from device (time: " + datetime.now().strftime("%H:%M:%S:%f") + "):")
|
|
print("Message Header:", data["Header"], " Message ID:", data["Message_ID"])
|
|
print(
|
|
"Photodiode Current 1 (" + str(len(data["I1"])) + " values):",
|
|
shorten(data["I1"]),
|
|
shorten(data["I1"][1]),
|
|
"...",
|
|
shorten(data["I1"]),
|
|
shorten(data["I1"][-1]),
|
|
"mA",
|
|
)
|
|
print(
|
|
"Photodiode Current 2 (" + str(len(data["I2"])) + " values):",
|
|
shorten(data["I2"]),
|
|
shorten(data["I2"][1]),
|
|
"...",
|
|
shorten(data["I2"]),
|
|
shorten(data["I2"][-1]),
|
|
"mA",
|
|
)
|
|
print("Laser Temperature 1:", shorten(data["Temp_1"]), "C")
|
|
print("Laser Temperature 2:", shorten(data["Temp_2"]), "C")
|
|
print("Temperature of external thermistor 1:", shorten(data["Temp_Ext_1"]), "C")
|
|
print("Temperature of external thermistor 2:", shorten(data["Temp_Ext_2"]), "C")
|
|
print(
|
|
"Voltages 3V3: "
|
|
+ shorten(data["MON_3V3"])
|
|
+ "V 5V1: "
|
|
+ shorten(data["MON_5V1"])
|
|
+ "V 5V2: "
|
|
+ shorten(data["MON_5V2"])
|
|
+ "V 7V0: "
|
|
+ shorten(data["MON_7V0"])
|
|
+ "V."
|
|
)
|
|
|
|
|
|
def close_connection(prt):
|
|
cmd.close_port(prt)
|