import time import json from datetime import datetime import device_commands as cmd #### ---- Constants WAIT_AFTER_SEND = 0.15 # Wait after sending command before requesting input (s). FAST_STATE_TIMEOUT_S = 0.4 FAST_STATE_POLL_S = 0.005 #### ---- High-level port commands def _port_sort_key(port_info): device = str(getattr(port_info, "device", "") or "").lower() description = str(getattr(port_info, "description", "") or "").lower() hwid = str(getattr(port_info, "hwid", "") or "").lower() is_usb = ("usb" in device) or ("acm" in device) or ("usb" in description) or ("vid:pid" in hwid) is_builtin_uart = device.startswith("/dev/ttys") return ( 0 if is_usb else 1, 1 if is_builtin_uart else 0, device, ) def _is_preferred_serial_port(port_info): device = str(getattr(port_info, "device", "") or "").lower() description = str(getattr(port_info, "description", "") or "").lower() hwid = str(getattr(port_info, "hwid", "") or "").lower() return ("usb" in device) or ("acm" in device) or ("usb" in description) or ("vid:pid" in hwid) def create_port_connection(): prt = None port_infos = list(cmd.list_ports.comports()) preferred_ports = [port_info for port_info in port_infos if _is_preferred_serial_port(port_info)] if preferred_ports: port_infos = preferred_ports for port_info in sorted(port_infos, key=_port_sort_key): port = getattr(port_info, "device", None) if not port: continue try: prt = cmd.setup_port_connection(port=port, baudrate=115200, timeout_sec=1) cmd.open_port(prt) if not reset_port_settings(prt): raise RuntimeError(f"No valid STATE reply on {port}") except Exception: if prt is not None and prt.is_open: prt.close() prt = None continue break return prt def _print_state_reply(state_bytes): if state_bytes is None: return False status = state_bytes.hex() print("Received: STATE. State status:", cmd.decode_STATE(status), "(" + cmd.flipfour(status) + ")") print("") return True def _decode_state_word(state_bytes): if state_bytes is None: return None try: return int(cmd.flipfour(state_bytes.hex()), 16) except Exception: return None def _state_has_errors(state_bytes): state_word = _decode_state_word(state_bytes) if state_word is None: return True return (state_word & 0x00FF) != 0 def _state_message(state_bytes): if state_bytes is None: return "STATE reply not received." return cmd.decode_STATE(state_bytes.hex()) def _wait_for_state_reply(prt, timeout_s=WAIT_AFTER_SEND, poll_s=0.01, quiet=False): if not _wait_for_min_bytes(prt, expected_len=2, timeout_s=timeout_s, poll_s=poll_s): if not quiet: print("Error. Timed out waiting for STATE.") return None state_bytes = cmd.get_STATE(prt, quiet=quiet) if quiet: return state_bytes _print_state_reply(state_bytes) return state_bytes def _rollback_ad9102_waveform_upload(prt): try: cancel_cmd = cmd.create_AD9102_waveform_control_command(cmd.AD9102_WAVE_OPCODE_CANCEL) cmd.send_AD9102_waveform_control(prt, cancel_cmd, quiet=True) state_bytes = _wait_for_state_reply(prt, timeout_s=FAST_STATE_TIMEOUT_S, poll_s=FAST_STATE_POLL_S, quiet=True) if state_bytes is not None and not _state_has_errors(state_bytes): return except Exception: pass reset_port_settings(prt) def reset_port_settings(prt): # Reset port settings and check status try: prt.reset_input_buffer() prt.reset_output_buffer() except Exception: pass cmd.send_DEFAULT_ENABLE(prt) state_bytes = _wait_for_state_reply(prt, timeout_s=max(WAIT_AFTER_SEND, 0.4), poll_s=0.01, quiet=True) if state_bytes is not None: status = state_bytes.hex() print("Received: STATE. State status:", cmd.decode_STATE(status), "(" + cmd.flipfour(status) + ")") print("") return True return False def request_state(prt): cmd.send_STATE(prt) time.sleep(WAIT_AFTER_SEND) return _print_state_reply(cmd.get_STATE(prt)) def send_control_parameters(prt, params): hexstring = cmd.encode_Input(params) cmd.send_DECODE_ENABLE(prt, hexstring) time.sleep(WAIT_AFTER_SEND) return _print_state_reply(cmd.get_STATE(prt)) def send_task_command(prt, sending_param): # Send task command (TASK_ENABLE state in firmware) hexstring = cmd.create_TaskEnableCommand(sending_param) cmd.send_TASK_ENABLE(prt, hexstring) time.sleep(WAIT_AFTER_SEND) return _print_state_reply(cmd.get_STATE(prt)) def start_ramp_max( prt, freq_hz=None, duty=None, saw_step=None, pat_period=None, pat_period_base=None, dac_clk_hz=None, triangle=True, sram_mode=False, sram_samples=None, sram_hold=None, sram_amplitude=None, ): # Start AD9102 sawtooth with configurable frequency/duty or SRAM ramp mode if sram_mode: if sram_hold is None: sram_hold = cmd.AD9102_SRAM_HOLD_DEFAULT if sram_samples is None and freq_hz is not None: if dac_clk_hz is None: dac_clk_hz = cmd.AD9102_DAC_CLK_HZ sram_samples = cmd.calc_sram_samples_for_freq(freq_hz, dac_clk_hz, sram_hold) hexstring = cmd.create_AD9102_ramp_command( enable=True, triangle=triangle, sram_mode=True, sram_samples=sram_samples, sram_hold=sram_hold, sram_amplitude=sram_amplitude, ) else: if pat_period_base is None: pat_period_base = cmd.AD9102_PAT_PERIOD_BASE_DEFAULT if saw_step is None and freq_hz is not None: if dac_clk_hz is None: dac_clk_hz = cmd.AD9102_DAC_CLK_HZ saw_step = cmd.calc_saw_step_for_freq(freq_hz, dac_clk_hz, triangle) if saw_step is None: saw_step = cmd.AD9102_SAW_STEP_DEFAULT if pat_period is None and duty is not None: pat_period = cmd.calc_pat_period_for_duty(saw_step, duty, pat_period_base, triangle) if pat_period is None: pat_period = cmd.AD9102_PAT_PERIOD_DEFAULT hexstring = cmd.create_AD9102_ramp_command( saw_step, pat_period, pat_period_base, enable=True, triangle=triangle, ) cmd.send_AD9102(prt, hexstring) time.sleep(WAIT_AFTER_SEND) return _print_state_reply(cmd.get_STATE(prt)) def start_ad9833_ramp(prt, freq_hz=None, mclk_hz=None, triangle=True, enable=True): if freq_hz is None: freq_hz = 0.0 hexstring = cmd.create_AD9833_ramp_command( freq_hz=freq_hz, mclk_hz=mclk_hz, enable=enable, triangle=triangle, ) cmd.send_AD9833(prt, hexstring) time.sleep(WAIT_AFTER_SEND) return _print_state_reply(cmd.get_STATE(prt)) def set_stm32_dac(prt, dac_code, enable=True): hexstring = cmd.create_STM32_DAC_command(dac_code=dac_code, enable=enable) cmd.send_STM32_DAC(prt, hexstring) time.sleep(WAIT_AFTER_SEND) return _print_state_reply(cmd.get_STATE(prt)) def load_ad9102_waveform_file(filepath): try: with open(filepath, "r", encoding="utf-8") as fh: payload = json.load(fh) except FileNotFoundError as exc: raise ValueError(f"Waveform file not found: {filepath}") from exc except json.JSONDecodeError as exc: raise ValueError(f"Invalid JSON in waveform file: {exc.msg}") from exc if not isinstance(payload, list): raise ValueError("Waveform file must contain a JSON array.") if len(payload) < 2 or len(payload) > cmd.AD9102_WAVE_MAX_SAMPLES: raise ValueError(f"Waveform length must be within [2, {cmd.AD9102_WAVE_MAX_SAMPLES}].") samples = [] for index, sample in enumerate(payload): if isinstance(sample, bool) or not isinstance(sample, int): raise ValueError(f"Waveform sample #{index} is not an integer.") if sample < cmd.AD9102_SAMPLE_MIN or sample > cmd.AD9102_SAMPLE_MAX: raise ValueError( f"Waveform sample #{index} is out of range " f"[{cmd.AD9102_SAMPLE_MIN}, {cmd.AD9102_SAMPLE_MAX}]." ) samples.append(int(sample)) return samples def upload_ad9102_waveform(prt, samples): waveform = list(samples) if samples is not None else [] if len(waveform) < 2 or len(waveform) > cmd.AD9102_WAVE_MAX_SAMPLES: raise ValueError(f"Waveform length must be within [2, {cmd.AD9102_WAVE_MAX_SAMPLES}].") try: begin_cmd = cmd.create_AD9102_waveform_control_command( cmd.AD9102_WAVE_OPCODE_BEGIN, param0=len(waveform), param1=0, ) cmd.send_AD9102_waveform_control(prt, begin_cmd, quiet=True) state_bytes = _wait_for_state_reply(prt, timeout_s=FAST_STATE_TIMEOUT_S, poll_s=FAST_STATE_POLL_S, quiet=True) if state_bytes is None or _state_has_errors(state_bytes): raise RuntimeError(f"Waveform BEGIN failed: {_state_message(state_bytes)}") for offset in range(0, len(waveform), cmd.AD9102_WAVE_CHUNK_SAMPLES): chunk = waveform[offset:offset + cmd.AD9102_WAVE_CHUNK_SAMPLES] chunk_cmd = cmd.create_AD9102_waveform_data_command(chunk) cmd.send_AD9102_waveform_data(prt, chunk_cmd, quiet=True) state_bytes = _wait_for_state_reply(prt, timeout_s=FAST_STATE_TIMEOUT_S, poll_s=FAST_STATE_POLL_S, quiet=True) if state_bytes is None or _state_has_errors(state_bytes): chunk_no = (offset // cmd.AD9102_WAVE_CHUNK_SAMPLES) + 1 raise RuntimeError(f"Waveform DATA chunk {chunk_no} failed: {_state_message(state_bytes)}") commit_cmd = cmd.create_AD9102_waveform_control_command(cmd.AD9102_WAVE_OPCODE_COMMIT) cmd.send_AD9102_waveform_control(prt, commit_cmd, quiet=True) state_bytes = _wait_for_state_reply(prt, timeout_s=FAST_STATE_TIMEOUT_S, poll_s=FAST_STATE_POLL_S, quiet=True) if state_bytes is None or _state_has_errors(state_bytes): raise RuntimeError(f"Waveform COMMIT failed: {_state_message(state_bytes)}") print(f"Uploaded AD9102 waveform ({len(waveform)} samples).") return True except Exception: _rollback_ad9102_waveform_upload(prt) raise def _wait_for_min_bytes(prt, expected_len, timeout_s, poll_s=0.01): deadline = time.time() + timeout_s while time.time() < deadline: waiting = prt.inWaiting() if waiting >= expected_len: return True time.sleep(poll_s) return prt.inWaiting() >= expected_len def send_ds1809_pulse(prt, uc=False, dc=False, count=1, pulse_ms=None): if count is None or count <= 0: count = 1 if pulse_ms is None or pulse_ms <= 0: pulse_ms = cmd.DS1809_PULSE_MS_DEFAULT hexstring = cmd.create_DS1809_pulse_command(uc=uc, dc=dc, count=count, pulse_ms=pulse_ms) cmd.send_DS1809(prt, hexstring) # Firmware blocks while pulsing DS1809 lines: wait pulse train + safe margin. pulse_train_time = (2.0 * float(count) * float(pulse_ms)) / 1000.0 time.sleep(max(WAIT_AFTER_SEND, pulse_train_time + 0.35)) # Then poll shortly for STATE bytes; this avoids early read (0 bytes) on startup. _wait_for_min_bytes(prt, expected_len=2, timeout_s=0.8) return _print_state_reply(cmd.get_STATE(prt)) def request_data(prt): cmd.send_TRANS_ENABLE(prt) time.sleep(WAIT_AFTER_SEND) data_bytes = cmd.get_DATA(prt) if data_bytes is None: return None return cmd.decode_DATA(data_bytes.hex()) def print_data(data): def shorten(i): return str(round(i, 2)) print("Data from device (time: " + datetime.now().strftime("%H:%M:%S:%f") + "):") print("Message Header:", data["Header"], " Message ID:", data["Message_ID"]) print( "Photodiode Current 1 (" + str(len(data["I1"])) + " values):", shorten(data["I1"]), shorten(data["I1"][1]), "...", shorten(data["I1"]), shorten(data["I1"][-1]), "mA", ) print( "Photodiode Current 2 (" + str(len(data["I2"])) + " values):", shorten(data["I2"]), shorten(data["I2"][1]), "...", shorten(data["I2"]), shorten(data["I2"][-1]), "mA", ) print("Laser Temperature 1:", shorten(data["Temp_1"]), "C") print("Laser Temperature 2:", shorten(data["Temp_2"]), "C") print("Temperature of external thermistor 1:", shorten(data["Temp_Ext_1"]), "C") print("Temperature of external thermistor 2:", shorten(data["Temp_Ext_2"]), "C") print( "Voltages 3V3: " + shorten(data["MON_3V3"]) + "V 5V1: " + shorten(data["MON_5V1"]) + "V 5V2: " + shorten(data["MON_5V2"]) + "V 7V0: " + shorten(data["MON_7V0"]) + "V." ) def close_connection(prt): cmd.close_port(prt)