Save current local changes

This commit is contained in:
Ayzen
2026-03-13 17:50:11 +03:00
parent ecdad1b583
commit cb7f966081
8 changed files with 548 additions and 11 deletions

View File

@ -1,4 +1,5 @@
import time
import json
from datetime import datetime
import device_commands as cmd
@ -7,19 +8,54 @@ import device_commands as cmd
#### ---- Constants
WAIT_AFTER_SEND = 0.15 # Wait after sending command before requesting input (s).
FAST_STATE_TIMEOUT_S = 0.4
FAST_STATE_POLL_S = 0.005
#### ---- High-level port commands
def _port_sort_key(port_info):
device = str(getattr(port_info, "device", "") or "").lower()
description = str(getattr(port_info, "description", "") or "").lower()
hwid = str(getattr(port_info, "hwid", "") or "").lower()
is_usb = ("usb" in device) or ("acm" in device) or ("usb" in description) or ("vid:pid" in hwid)
is_builtin_uart = device.startswith("/dev/ttys")
return (
0 if is_usb else 1,
1 if is_builtin_uart else 0,
device,
)
def _is_preferred_serial_port(port_info):
device = str(getattr(port_info, "device", "") or "").lower()
description = str(getattr(port_info, "description", "") or "").lower()
hwid = str(getattr(port_info, "hwid", "") or "").lower()
return ("usb" in device) or ("acm" in device) or ("usb" in description) or ("vid:pid" in hwid)
def create_port_connection():
prt = None
for port, _, _ in sorted(cmd.list_ports.comports()):
port_infos = list(cmd.list_ports.comports())
preferred_ports = [port_info for port_info in port_infos if _is_preferred_serial_port(port_info)]
if preferred_ports:
port_infos = preferred_ports
for port_info in sorted(port_infos, key=_port_sort_key):
port = getattr(port_info, "device", None)
if not port:
continue
try:
prt = cmd.setup_port_connection(port=port, baudrate=115200, timeout_sec=1)
cmd.open_port(prt)
reset_port_settings(prt)
except:
prt.close()
if not reset_port_settings(prt):
raise RuntimeError(f"No valid STATE reply on {port}")
except Exception:
if prt is not None and prt.is_open:
prt.close()
prt = None
continue
break
return prt
@ -35,14 +71,71 @@ def _print_state_reply(state_bytes):
return True
def _decode_state_word(state_bytes):
if state_bytes is None:
return None
try:
return int(cmd.flipfour(state_bytes.hex()), 16)
except Exception:
return None
def _state_has_errors(state_bytes):
state_word = _decode_state_word(state_bytes)
if state_word is None:
return True
return (state_word & 0x00FF) != 0
def _state_message(state_bytes):
if state_bytes is None:
return "STATE reply not received."
return cmd.decode_STATE(state_bytes.hex())
def _wait_for_state_reply(prt, timeout_s=WAIT_AFTER_SEND, poll_s=0.01, quiet=False):
if not _wait_for_min_bytes(prt, expected_len=2, timeout_s=timeout_s, poll_s=poll_s):
if not quiet:
print("Error. Timed out waiting for STATE.")
return None
state_bytes = cmd.get_STATE(prt, quiet=quiet)
if quiet:
return state_bytes
_print_state_reply(state_bytes)
return state_bytes
def _rollback_ad9102_waveform_upload(prt):
try:
cancel_cmd = cmd.create_AD9102_waveform_control_command(cmd.AD9102_WAVE_OPCODE_CANCEL)
cmd.send_AD9102_waveform_control(prt, cancel_cmd, quiet=True)
state_bytes = _wait_for_state_reply(prt, timeout_s=FAST_STATE_TIMEOUT_S,
poll_s=FAST_STATE_POLL_S, quiet=True)
if state_bytes is not None and not _state_has_errors(state_bytes):
return
except Exception:
pass
reset_port_settings(prt)
def reset_port_settings(prt):
# Reset port settings and check status
try:
prt.reset_input_buffer()
prt.reset_output_buffer()
except Exception:
pass
cmd.send_DEFAULT_ENABLE(prt)
time.sleep(WAIT_AFTER_SEND)
status = cmd.get_STATE(prt).hex()
if status is not None:
state_bytes = _wait_for_state_reply(prt, timeout_s=max(WAIT_AFTER_SEND, 0.4), poll_s=0.01, quiet=True)
if state_bytes is not None:
status = state_bytes.hex()
print("Received: STATE. State status:", cmd.decode_STATE(status), "(" + cmd.flipfour(status) + ")")
print("")
return True
return False
def request_state(prt):
@ -143,6 +236,76 @@ def set_stm32_dac(prt, dac_code, enable=True):
return _print_state_reply(cmd.get_STATE(prt))
def load_ad9102_waveform_file(filepath):
try:
with open(filepath, "r", encoding="utf-8") as fh:
payload = json.load(fh)
except FileNotFoundError as exc:
raise ValueError(f"Waveform file not found: {filepath}") from exc
except json.JSONDecodeError as exc:
raise ValueError(f"Invalid JSON in waveform file: {exc.msg}") from exc
if not isinstance(payload, list):
raise ValueError("Waveform file must contain a JSON array.")
if len(payload) < 2 or len(payload) > cmd.AD9102_WAVE_MAX_SAMPLES:
raise ValueError(f"Waveform length must be within [2, {cmd.AD9102_WAVE_MAX_SAMPLES}].")
samples = []
for index, sample in enumerate(payload):
if isinstance(sample, bool) or not isinstance(sample, int):
raise ValueError(f"Waveform sample #{index} is not an integer.")
if sample < cmd.AD9102_SAMPLE_MIN or sample > cmd.AD9102_SAMPLE_MAX:
raise ValueError(
f"Waveform sample #{index} is out of range "
f"[{cmd.AD9102_SAMPLE_MIN}, {cmd.AD9102_SAMPLE_MAX}]."
)
samples.append(int(sample))
return samples
def upload_ad9102_waveform(prt, samples):
waveform = list(samples) if samples is not None else []
if len(waveform) < 2 or len(waveform) > cmd.AD9102_WAVE_MAX_SAMPLES:
raise ValueError(f"Waveform length must be within [2, {cmd.AD9102_WAVE_MAX_SAMPLES}].")
try:
begin_cmd = cmd.create_AD9102_waveform_control_command(
cmd.AD9102_WAVE_OPCODE_BEGIN,
param0=len(waveform),
param1=0,
)
cmd.send_AD9102_waveform_control(prt, begin_cmd, quiet=True)
state_bytes = _wait_for_state_reply(prt, timeout_s=FAST_STATE_TIMEOUT_S,
poll_s=FAST_STATE_POLL_S, quiet=True)
if state_bytes is None or _state_has_errors(state_bytes):
raise RuntimeError(f"Waveform BEGIN failed: {_state_message(state_bytes)}")
for offset in range(0, len(waveform), cmd.AD9102_WAVE_CHUNK_SAMPLES):
chunk = waveform[offset:offset + cmd.AD9102_WAVE_CHUNK_SAMPLES]
chunk_cmd = cmd.create_AD9102_waveform_data_command(chunk)
cmd.send_AD9102_waveform_data(prt, chunk_cmd, quiet=True)
state_bytes = _wait_for_state_reply(prt, timeout_s=FAST_STATE_TIMEOUT_S,
poll_s=FAST_STATE_POLL_S, quiet=True)
if state_bytes is None or _state_has_errors(state_bytes):
chunk_no = (offset // cmd.AD9102_WAVE_CHUNK_SAMPLES) + 1
raise RuntimeError(f"Waveform DATA chunk {chunk_no} failed: {_state_message(state_bytes)}")
commit_cmd = cmd.create_AD9102_waveform_control_command(cmd.AD9102_WAVE_OPCODE_COMMIT)
cmd.send_AD9102_waveform_control(prt, commit_cmd, quiet=True)
state_bytes = _wait_for_state_reply(prt, timeout_s=FAST_STATE_TIMEOUT_S,
poll_s=FAST_STATE_POLL_S, quiet=True)
if state_bytes is None or _state_has_errors(state_bytes):
raise RuntimeError(f"Waveform COMMIT failed: {_state_message(state_bytes)}")
print(f"Uploaded AD9102 waveform ({len(waveform)} samples).")
return True
except Exception:
_rollback_ad9102_waveform_upload(prt)
raise
def _wait_for_min_bytes(prt, expected_len, timeout_s, poll_s=0.01):
deadline = time.time() + timeout_s
while time.time() < deadline: