13 Commits

Author SHA1 Message Date
0ec504ffa9 Add new PyQt UI 2026-04-26 18:39:55 +03:00
c92745d2bc microfix and bat file added 2026-04-22 13:00:32 +03:00
awe
43d490e2f8 upd readme requirements 2026-02-18 20:07:52 +03:00
awe
dd63383c39 fix timings 2026-02-18 19:09:53 +03:00
awe
0b9eed566e fix variation 2026-02-18 19:01:28 +03:00
awe
9b82077b64 upd gui 2026-02-18 18:48:13 +03:00
awe
71e5eb9ecb fix flip 2026-02-18 18:41:19 +03:00
awe
662a42776f fix 2026-02-18 18:26:41 +03:00
awe
584dea1623 update project structure 2026-02-18 17:44:18 +03:00
awe
2476a68096 initial commit 2026-02-18 17:28:02 +03:00
awe
620bef8c88 initial commit 2025-11-24 15:57:57 +03:00
awe
c8b6aed434 remove junk from repo 2025-11-24 15:57:35 +03:00
awe
d3e39ec3b1 gitignore upd 2025-11-24 15:56:36 +03:00
33 changed files with 4043 additions and 1272 deletions

7
.gitignore vendored Normal file
View File

@ -0,0 +1,7 @@
*.venv
.venv/
*.pyc
__pycache__/
.pytest_cache/
*.egg-info/
.env

101
README.md
View File

@ -1,4 +1,99 @@
files description:
deploy -- creates venv and installs python libs in it
run -- resets Generator_PCB by toggling PRi`s GPIO pin, activates venv and runs main program
# RadioPhotonic PCB PC Software
PyQt6-приложение для управления лазерной платой по UART.
Вся рабочая логика сосредоточена в пакете `laser_control`; старый FreeSimpleGUI и legacy-модули удалены.
## Структура
```text
.
├── _device_main.py
├── run
├── run_device_main.bat
├── requirements.txt
├── laser_control/
│ ├── __init__.py
│ ├── constants.py
│ ├── controller.py
│ ├── conversions.py
│ ├── exceptions.py
│ ├── models.py
│ ├── protocol.py
│ ├── transport.py
│ ├── validators.py
│ ├── example_usage.py
│ └── gui/
│ ├── main.py
│ ├── theme.py
│ ├── window.py
│ ├── sections.py
│ └── worker.py
```
## Что поддерживается
- ручной режим: `T1/T2/I1/I2`
- live telemetry: `T1/T2`, внешние термисторы, фотодиоды, `3V3/5V1/5V2/7V0`
- AD9102: saw/SRAM режимы и загрузка custom waveform
- AD9833, DS1809 и STM32 DAC через отдельные firmware-команды
- сохранение профиля на SD-карту устройства
- сброс платы командой `DEFAULT_ENABLE`
Не поддерживается и удалено из PC-кода:
- legacy-команды `0x3333` и `0x5555`
- старый flow про `saved data` и `remove file`
- task/sweep-режим как публичный сценарий
## Установка
```bash
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
```
## Запуск GUI
```bash
source .venv/bin/activate
./run
```
или
```bash
python3 -m laser_control.gui.main
```
Автоподключение использует первый доступный USB UART-порт.
При подключении приложение только читает текущее состояние и телеметрию, без автоприменения ручных параметров.
Совместимый launcher `_device_main.py` сохранён, но он только проксирует запуск в новый PyQt entrypoint.
## Публичный API
```python
from laser_control import (
LaserController,
Measurements,
DeviceStatus,
DeviceState,
ValidationError,
CommunicationError,
)
```
## Пример встраивания
```python
from laser_control import LaserController
with LaserController(port="/dev/ttyUSB0") as controller:
controller.set_manual_mode(
temp1=25.0,
temp2=30.0,
current1=40.0,
current2=35.0,
)
print(controller.get_measurements())
```

Binary file not shown.

View File

@ -1,469 +1,7 @@
from FreeSimpleGUI import TIMEOUT_KEY, WIN_CLOSED
import json
import math
import socket
import subprocess
"""Compatibility launcher for the PyQt GUI."""
import device_interaction as dev
from laser_control.gui.main import main
import gui
use_client = False
sending_param = {}
#### ---- Constants
GUI_TIMEOUT_INTERVAL = 5#505 - dev.WAIT_AFTER_SEND*1000 # GUI refresh time in milliseconds
SAVE_POINTS_NUMBER = 1000 # Number of most recent data points kept in memory
INITIAL_TEMPERATURE_1 = 28 # Set initial temperature for Laser 1 in Celsius: from -1 to 45 C ??
INITIAL_TEMPERATURE_2 = 28.9 # Set initial temperature for Laser 2 in Celsius: from -1 to 45 C ??
INITIAL_CURRENT_1 = 33 # 64.0879 max # Set initial current for Laser 1, in mA
INITIAL_CURRENT_2 = 35 # 64.0879 max # Set initial current for Laser 2, in mA
#### ---- Functions
def start_task(prt):
global sending_param
dev.send_task_command(prt, sending_param)
def stop_task(prt):
global sending_param
sending_param = {}
dev.reset_port_settings(prt)
dev.send_control_parameters(prt, params)
def get_float(values, strId):
value = 0.0
try:
value = float(values[strId])
except:
value = float("nan")
window['-StartCycle-'].update(disabled = True)
return value
def shorten(i):
return "{:.2f}".format(round(i, 2))
def set_initial_params():
params = {}
params['Temp_1'] = INITIAL_TEMPERATURE_1 # Initial temperature for Laser 1
params['Temp_2'] = INITIAL_TEMPERATURE_2 # Initial temperature for Laser 2
params['ProportionalCoeff_1'] = int(10*256) # Proportional coefficient for temperature stabilizatoin for Laser 1 <-- ToDo (why int?)
params['ProportionalCoeff_2'] = int(10*256) # Proportional coefficient for temperature stabilizatoin for Laser 2 <-- ToDo (why int?)
params['IntegralCoeff_1'] = int(0.5*256) # Integral coefficient for temperature stabilizatoin for Laser 1 <-- ToDo (why int?)
params['IntegralCoeff_2'] = int(0.5*256) # Integral coefficient for temperature stabilizatoin for Laser 2 <-- ToDo (why int?)
params['Message_ID'] = "00FF" # Send Message ID (hex format)
params['Iset_1'] = INITIAL_CURRENT_1 # Currency value array for Laser 1, in mA
params['Iset_2'] = INITIAL_CURRENT_2 # Currency value array for Laser 2, in mA
params['Min_Temp_1'] = INITIAL_TEMPERATURE_1
params['Max_Temp_1'] = 28
params['Min_Current_1'] = INITIAL_CURRENT_1
params['Max_Current_1'] = 70.0 #50
params['Delta_Temp_1'] = 0.05
params['Delta_Current_1'] = 0.05
params['Min_Temp_2'] = INITIAL_TEMPERATURE_2
params['Max_Temp_2'] = 28
params['Min_Current_2'] = INITIAL_CURRENT_2
params['Max_Current_2'] = 60 # 50
params['Delta_Temp_2'] = 0.05
params['Delta_Current_2'] = 0.05
params['Delta_Time'] = 50
params['Tau'] = 10
return params
def update_data_lists():
saved_data.append(data)
if len(saved_data)>SAVE_POINTS_NUMBER:
saved_data.pop(0)
draw_data.append(data)
if len(draw_data)>gui.GRAPH_POINTS_NUMBER:
draw_data.pop(0)
######## ---- Main program
if __name__ == "__main__":
saved_data = []
draw_data = []
params = set_initial_params()
prt = dev.create_port_connection()
if prt is None:
print('Can\'t create connection. Closing program...')
exit(1)
# dev.request_state(prt)
dev.send_control_parameters(prt, params)
saved_data.append(dev.request_data(prt))
draw_data.append(saved_data[0])
window = gui.setup_gui(params)
axes_signs = gui.sign_axes(window)
current_and_temperature_settings_available = True
disableStartButton = False
if use_client:
p = subprocess.Popen("path/to/oscilloscope.exe")
sck = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sck.bind(("127.0.0.1", 9090))
sck.listen()
conn, _ = sck.accept()
while True:
event, values = window.read(timeout=GUI_TIMEOUT_INTERVAL)
enable_manual_settings = window['-EnableManualSettings-'].get()
if current_and_temperature_settings_available:
window['-EnableT1-'].update(disabled = enable_manual_settings)
window['-EnableT2-'].update(disabled = enable_manual_settings)
window['-EnableC1-'].update(disabled = enable_manual_settings)
window['-EnableC2-'].update(disabled = enable_manual_settings)
window['-InputMinT1-'].update(disabled = enable_manual_settings)
window['-InputMaxT1-'].update(disabled = enable_manual_settings)
window['-InputDeltaT1-'].update(disabled = enable_manual_settings)
window['-InputMinT2-'].update(disabled = enable_manual_settings)
window['-InputMaxT2-'].update(disabled = enable_manual_settings)
window['-InputDeltaT2-'].update(disabled = enable_manual_settings)
window['-InputMinC1-'].update(disabled = enable_manual_settings)
window['-InputMaxC1-'].update(disabled = enable_manual_settings)
window['-InputDeltaC1-'].update(disabled = enable_manual_settings)
window['-InputMinC2-'].update(disabled = enable_manual_settings)
window['-InputMaxC2-'].update(disabled = enable_manual_settings)
window['-InputDeltaC2-'].update(disabled = enable_manual_settings)
window['-InputT1-'].update(disabled = not enable_manual_settings)
window['-InputT2-'].update(disabled = not enable_manual_settings)
window['-InputI1-'].update(disabled = not enable_manual_settings)
window['-InputI2-'].update(disabled = not enable_manual_settings)
window['-StartCycle-'].update(disabled = not enable_manual_settings)
if current_and_temperature_settings_available and not enable_manual_settings:
enable_t1 = window['-EnableT1-'].get()
enable_t2 = window['-EnableT2-'].get()
enable_c1 = window['-EnableC1-'].get()
enable_c2 = window['-EnableC2-'].get()
sending_param['ProportionalCoeff_1'] = params['ProportionalCoeff_1']
sending_param['IntegralCoeff_1'] = params['IntegralCoeff_1']
sending_param['ProportionalCoeff_2'] = params['ProportionalCoeff_2']
sending_param['IntegralCoeff_2'] = params['IntegralCoeff_2']
if enable_t1 and \
not enable_t2 and \
not enable_c1 and \
not enable_c2:
sending_param['TaskType'] = dev.cmd.TaskType.ChangeTemperatureLD1
sending_param['MinT1'] = get_float(values, '-InputMinT1-')
sending_param['MaxT1'] = get_float(values, '-InputMaxT1-')
sending_param['DeltaT1'] = get_float(values, '-InputDeltaT1-')
sending_param['I1'] = get_float(values, '-InputI1-')
sending_param['I2'] = get_float(values, '-InputI2-')
sending_param['T2'] = get_float(values, '-InputT2-')
sending_param['Dt'] = get_float(values ,'-InputDeltaTime-')
sending_param['Tau'] = get_float(values ,'-InputTau-')
disableStartButton = math.isnan(sending_param['MinT1']) or \
math.isnan(sending_param['MaxT1']) or \
math.isnan(sending_param['DeltaT1']) or \
math.isnan(sending_param['I1']) or \
math.isnan(sending_param['I2']) or \
math.isnan(sending_param['T2']) or \
math.isnan(sending_param['Dt']) or \
math.isnan(sending_param['Tau'])
window['-EnableT2-'].update(disabled = enable_t1)
window['-EnableC1-'].update(disabled = enable_t1)
window['-EnableC2-'].update(disabled = enable_t1)
enable_t2 = window['-EnableT2-'].get()
enable_c1 = window['-EnableC1-'].get()
enable_c2 = window['-EnableC2-'].get()
window['-InputMinT1-'].update(disabled = not enable_t1)
window['-InputMaxT1-'].update(disabled = not enable_t1)
window['-InputDeltaT1-'].update(disabled = not enable_t1)
window['-InputI1-'].update(disabled = not enable_t1)
window['-InputI2-'].update(disabled = not enable_t1)
window['-InputT2-'].update(disabled = not enable_t1)
window['-InputMinT2-'].update(disabled = enable_t1)
window['-InputMaxT2-'].update(disabled = enable_t1)
window['-InputDeltaT2-'].update(disabled = enable_t1)
window['-InputMinC1-'].update(disabled = enable_t1)
window['-InputMaxC1-'].update(disabled = enable_t1)
window['-InputDeltaC1-'].update(disabled = enable_t1)
window['-InputMinC2-'].update(disabled = enable_t1)
window['-InputMaxC2-'].update(disabled = enable_t1)
window['-InputDeltaC2-'].update(disabled = enable_t1)
window['-EnableManualSettings-'].update(disabled = True)
elif enable_t2 and \
not enable_t1 and \
not enable_c1 and \
not enable_c2:
sending_param['TaskType'] = dev.cmd.TaskType.ChangeTemperatureLD2
sending_param['MinT2'] = get_float(values, '-InputMinT2-')
sending_param['MaxT2'] = get_float(values, '-InputMaxT2-')
sending_param['DeltaT2'] = get_float(values, '-InputDeltaT2-')
sending_param['I1'] = get_float(values, '-InputI1-')
sending_param['I2'] = get_float(values, '-InputI2-')
sending_param['T1'] = get_float(values, '-InputT1-')
sending_param['Dt'] = get_float(values ,'-InputDeltaTime-')
sending_param['Tau'] = get_float(values ,'-InputTau-')
disableStartButton = math.isnan(sending_param['MinT2']) or \
math.isnan(sending_param['MaxT2']) or \
math.isnan(sending_param['DeltaT2']) or \
math.isnan(sending_param['I1']) or \
math.isnan(sending_param['I2']) or \
math.isnan(sending_param['T1']) or \
math.isnan(sending_param['Dt']) or \
math.isnan(sending_param['Tau'])
window['-EnableT1-'].update(disabled = enable_t2)
window['-EnableC1-'].update(disabled = enable_t2)
window['-EnableC2-'].update(disabled = enable_t2)
enable_t1 = window['-EnableT1-'].get()
enable_c1 = window['-EnableC1-'].get()
enable_c2 = window['-EnableC2-'].get()
window['-InputMinT1-'].update(disabled = enable_t2)
window['-InputMaxT1-'].update(disabled = enable_t2)
window['-InputDeltaT1-'].update(disabled = enable_t2)
window['-InputT1-'].update(disabled = not enable_t2)
window['-InputI1-'].update(disabled = not enable_t2)
window['-InputI2-'].update(disabled = not enable_t2)
window['-InputMinT2-'].update(disabled = not enable_t2)
window['-InputMaxT2-'].update(disabled = not enable_t2)
window['-InputDeltaT2-'].update(disabled = not enable_t2)
window['-InputMinC1-'].update(disabled = enable_t2)
window['-InputMaxC1-'].update(disabled = enable_t2)
window['-InputDeltaC1-'].update(disabled = enable_t2)
window['-InputMinC2-'].update(disabled = enable_t2)
window['-InputMaxC2-'].update(disabled = enable_t2)
window['-InputDeltaC2-'].update(disabled = enable_t2)
window['-EnableManualSettings-'].update(disabled = True)
elif enable_c1 and \
not enable_c2 and \
not enable_t1 and \
not enable_t2:
sending_param['TaskType'] = dev.cmd.TaskType.ChangeCurrentLD1
sending_param['MinC1'] = get_float(values, '-InputMinC1-')
sending_param['MaxC1'] = get_float(values, '-InputMaxC1-')
sending_param['DeltaC1'] = get_float(values, '-InputDeltaC1-')
sending_param['T1'] = get_float(values, '-InputT1-')
sending_param['T2'] = get_float(values, '-InputT2-')
sending_param['I2'] = get_float(values, '-InputI2-')
sending_param['Dt'] = get_float(values ,'-InputDeltaTime-')
sending_param['Tau'] = get_float(values ,'-InputTau-')
disableStartButton = math.isnan(sending_param['MinC1']) or \
math.isnan(sending_param['MaxC1']) or \
math.isnan(sending_param['DeltaC1']) or \
math.isnan(sending_param['T1']) or \
math.isnan(sending_param['T2']) or \
math.isnan(sending_param['I2']) or \
math.isnan(sending_param['Dt']) or \
math.isnan(sending_param['Tau'])
window['-EnableT1-'].update(disabled = enable_c1)
window['-EnableT2-'].update(disabled = enable_c1)
window['-EnableC2-'].update(disabled = enable_c1)
enable_t1 = window['-EnableT1-'].get()
enable_t2 = window['-EnableT2-'].get()
enable_c2 = window['-EnableC2-'].get()
window['-InputMinT1-'].update(disabled = enable_c1)
window['-InputMaxT1-'].update(disabled = enable_c1)
window['-InputDeltaT1-'].update(disabled = enable_c1)
window['-InputT1-'].update(disabled = not enable_c1)
window['-InputT2-'].update(disabled = not enable_c1)
window['-InputI2-'].update(disabled = not enable_c1)
window['-InputMinT2-'].update(disabled = enable_c1)
window['-InputMaxT2-'].update(disabled = enable_c1)
window['-InputDeltaT2-'].update(disabled = enable_c1)
window['-InputMinC1-'].update(disabled = not enable_c1)
window['-InputMaxC1-'].update(disabled = not enable_c1)
window['-InputDeltaC1-'].update(disabled = not enable_c1)
window['-InputMinC2-'].update(disabled = enable_c1)
window['-InputMaxC2-'].update(disabled = enable_c1)
window['-InputDeltaC2-'].update(disabled = enable_c1)
window['-EnableManualSettings-'].update(disabled = True)
elif enable_c2 and \
not enable_c1 and \
not enable_t1 and \
not enable_t2:
sending_param['TaskType'] = dev.cmd.TaskType.ChangeCurrentLD2
sending_param['MinC2'] = get_float(values, '-InputMinC2-')
sending_param['MaxC2'] = get_float(values, '-InputMaxC2-')
sending_param['DeltaC2'] = get_float(values, '-InputDeltaC2-')
sending_param['T1'] = get_float(values, '-InputT1-')
sending_param['T2'] = get_float(values, '-InputT2-')
sending_param['I1'] = get_float(values, '-InputI1-')
sending_param['Dt'] = get_float(values ,'-InputDeltaTime-')
sending_param['Tau'] = get_float(values ,'-InputTau-')
disableStartButton = math.isnan(sending_param['MinC2']) or \
math.isnan(sending_param['MaxC2']) or \
math.isnan(sending_param['DeltaC2']) or \
math.isnan(sending_param['T1']) or \
math.isnan(sending_param['T2']) or \
math.isnan(sending_param['I1']) or \
math.isnan(sending_param['Dt']) or \
math.isnan(sending_param['Tau'])
window['-EnableT1-'].update(disabled = enable_c2)
window['-EnableT2-'].update(disabled = enable_c2)
window['-EnableC1-'].update(disabled = enable_c2)
enable_t1 = window['-EnableT1-'].get()
enable_t2 = window['-EnableT2-'].get()
enable_c1 = window['-EnableC1-'].get()
window['-InputMinT1-'].update(disabled = enable_c2)
window['-InputMaxT1-'].update(disabled = enable_c2)
window['-InputDeltaT1-'].update(disabled = enable_c2)
window['-InputI1-'].update(disabled = not enable_c2)
window['-InputT1-'].update(disabled = not enable_c2)
window['-InputT2-'].update(disabled = not enable_c2)
window['-InputMinT2-'].update(disabled = enable_c2)
window['-InputMaxT2-'].update(disabled = enable_c2)
window['-InputDeltaT2-'].update(disabled = enable_c2)
window['-InputMinC1-'].update(disabled = enable_c2)
window['-InputMaxC1-'].update(disabled = enable_c2)
window['-InputDeltaC1-'].update(disabled = enable_c2)
window['-InputMinC2-'].update(disabled = not enable_c2)
window['-InputMaxC2-'].update(disabled = not enable_c2)
window['-InputDeltaC2-'].update(disabled = not enable_c2)
window['-EnableManualSettings-'].update(disabled = True)
elif not enable_t1 and \
not enable_t2 and \
not enable_c1 and \
not enable_c2:
sending_param = {}
window['-EnableT1-'].update(disabled = False)
window['-EnableT2-'].update(disabled = False)
window['-EnableC1-'].update(disabled = False)
window['-EnableC2-'].update(disabled = False)
window['-InputMinT1-'].update(disabled = True)
window['-InputMaxT1-'].update(disabled = True)
window['-InputDeltaT1-'].update(disabled = True)
window['-InputMinT2-'].update(disabled = True)
window['-InputMaxT2-'].update(disabled = True)
window['-InputDeltaT2-'].update(disabled = True)
window['-InputMinC1-'].update(disabled = True)
window['-InputMaxC1-'].update(disabled = True)
window['-InputDeltaC1-'].update(disabled = True)
window['-InputMinC2-'].update(disabled = True)
window['-InputMaxC2-'].update(disabled = True)
window['-InputDeltaC2-'].update(disabled = True)
window['-InputT1-'].update(disabled = True)
window['-InputT2-'].update(disabled = True)
window['-InputI1-'].update(disabled = True)
window['-InputI2-'].update(disabled = True)
window['-EnableManualSettings-'].update(disabled = False)
window['-InputDeltaTime-'].update(disabled = not enable_c1 and not enable_t1 and not enable_c2 and not enable_t2)
window['-InputTau-'].update(disabled = not enable_c1 and not enable_t1 and not enable_c2 and not enable_t2)
window['-StartCycle-'].update(disabled = not enable_c1 and not enable_t1 and not enable_c2 and not enable_t2 or disableStartButton)
if event == WIN_CLOSED or event == '-EXIT-':
if use_client:
p.terminate()
conn.close()
sck.close()
dev.reset_port_settings(prt)
break
elif event == '-StartCycle-':
if not enable_manual_settings:
window['-StopCycle-'].update(disabled = False)
window['-StartCycle-'].update(disabled = True)
window['-EnableT1-'].update(disabled = True)
window['-EnableC1-'].update(disabled = True)
window['-EnableT1-'].update(False)
window['-EnableC1-'].update(False)
window['-EnableT2-'].update(disabled = True)
window['-EnableC2-'].update(disabled = True)
window['-EnableT2-'].update(False)
window['-EnableC2-'].update(False)
window['-InputMinT1-'].update(disabled = True)
window['-InputMaxT1-'].update(disabled = True)
window['-InputDeltaT1-'].update(disabled = True)
window['-InputMinT2-'].update(disabled = True)
window['-InputMaxT2-'].update(disabled = True)
window['-InputDeltaT2-'].update(disabled = True)
window['-InputMinC1-'].update(disabled = True)
window['-InputMaxC1-'].update(disabled = True)
window['-InputDeltaC1-'].update(disabled = True)
window['-InputMinC2-'].update(disabled = True)
window['-InputMaxC2-'].update(disabled = True)
window['-InputDeltaC2-'].update(disabled = True)
window['-InputDeltaTime-'].update(disabled = True)
window['-InputTau-'].update(disabled = True)
window['-InputT1-'].update(disabled = True)
window['-InputT2-'].update(disabled = True)
window['-InputI1-'].update(disabled = True)
window['-InputI2-'].update(disabled = True)
current_and_temperature_settings_available = False
# TODO get task parameters from gui and put its to params
if use_client:
jsondoc_str = json.dumps(sending_param)
jsondoc = bytearray()
jsondoc.extend(jsondoc_str.encode())
conn.sendall(jsondoc)
start_task(prt)
else:
params['Temp_1'] = float(values['-InputT1-'])
params['Temp_2'] = float(values['-InputT2-'])
params['Iset_1'] = float(values['-InputI1-'])
params['Iset_2'] = float(values['-InputI2-'])
dev.send_control_parameters(prt, params)
#print(sending_param)
elif event == '-StopCycle-':
window['-StopCycle-'].update(disabled = True)
current_and_temperature_settings_available = True
stop_task(prt)
elif event == TIMEOUT_KEY:
data = dev.request_data(prt)
update_data_lists()
window['-TOUT_1-'].update(gui.READ_TEMPERATURE_TEXT+' 1: '+shorten(data['Temp_1'])+' C')
window['-TOUT_2-'].update(gui.READ_TEMPERATURE_TEXT+' 2: '+shorten(data['Temp_2'])+' C')
window['-IOUT_1-'].update(gui.READ_CURRENT_TEXT+' 1: '+shorten(data['I1'])+' мА')
window['-IOUT_2-'].update(gui.READ_CURRENT_TEXT+' 2: '+shorten(data['I2'])+' мА')
window['-DateTime-'].update(data['datetime'].strftime('%d-%m-%Y %H:%M:%S:%f')[:-3])
window['-TTerm1-'].update('T терм 1: '+shorten(data['Temp_Ext_1'])+' C')
window['-TTerm2-'].update('T терм 2: '+shorten(data['Temp_Ext_2'])+' C')
window['-3V3-'].update('3V3: '+shorten(data['MON_3V3'])+' В')
window['-5V1-'].update('5V1: '+shorten(data['MON_5V1'])+' В')
window['-5V2-'].update('5V2: '+shorten(data['MON_5V2'])+' В')
window['-7V0-'].update('7V0: '+shorten(data['MON_7V0'])+' В')
window['-GraphT1-'].draw_line((len(draw_data)-1, draw_data[-2]['Temp_1']), (len(draw_data), draw_data[-1]['Temp_1']), color='yellow')
window['-GraphT2-'].draw_line((len(draw_data)-1, draw_data[-2]['Temp_2']), (len(draw_data), draw_data[-1]['Temp_2']), color='yellow')
window['-GraphI1-'].draw_line((len(draw_data)-1, draw_data[-2]['I1']), (len(draw_data), draw_data[-1]['I1']), color='yellow')
window['-GraphI2-'].draw_line((len(draw_data)-1, draw_data[-2]['I2']), (len(draw_data), draw_data[-1]['I2']), color='yellow')
# When graphs reach end of X scale, start scrolling
if len(draw_data)>=gui.GRAPH_POINTS_NUMBER:
# Scroll graphs
window['-GraphT1-'].move(-1, 0)
window['-GraphT2-'].move(-1, 0)
window['-GraphI1-'].move(-1, 0)
window['-GraphI2-'].move(-1, 0)
# Scroll back graphs' labels
for key, sgn in axes_signs.items():
window[key].MoveFigure(sgn[0], 1, 0)
window[key].MoveFigure(sgn[1], 1, 0)
window.close()
dev.close_connection(prt)
raise SystemExit(main())

6
deploy
View File

@ -1,6 +0,0 @@
#!/usr/bin/bash
sudo apt install python3-venv
python3 -m venv .venv
source .venv/bin/activate
pip install FreeSimpleGUI PySerial
deactivate

View File

@ -1,347 +0,0 @@
from enum import IntEnum
from serial import Serial
from serial.tools import list_ports
import device_conversion as cnv
from datetime import datetime
#### ---- Constants
GET_DATA_TOTAL_LENGTH = 30 # Total number of bytes when getting DATA
SEND_PARAMS_TOTAL_LENGTH = 30 # Total number of bytes when sending parameters
TASK_ENABLE_COMMAND_LENGTH = 32 # Total number of bytes when sending TASK_ENABLE command
class TaskType(IntEnum):
Manual = 0x00
ChangeCurrentLD1 = 0x01
ChangeCurrentLD2 = 0x02
ChangeTemperatureLD1 = 0x03
ChangeTemperatureLD2 = 0x04
#### ---- Auxiliary functions
def int_to_hex(inp):
if inp<0 or inp>65535:
print("Error. Input should be within [0, 65535]. Returning N=0.")
return "0000"
return f"{inp:#0{6}x}"[2:]
def crc(lst):
crc=int("0x"+lst[0],16)
for i in range(1,len(lst)):
crc=crc^int("0x"+lst[i],16)
return int_to_hex(crc)
def show_hex_string(string):
return "".join("\\x{}".format(char.encode()) for char in (string[i:i+2] for i in range(0, len(string), 2)))
def flipfour(s):
''' Changes "abcd" to "cdba"
'''
if len(s) != 4:
print("Error. Trying to flip string with length not equal to 4.")
return None
return s[2:4]+s[0:2]
#### ---- Port Operations
def setup_port_connection(baudrate: int, port: str, timeout_sec: float):
prt = Serial()
prt.baudrate = baudrate
prt.port = port
prt.timeout = timeout_sec
return prt
def open_port(prt):
prt.open()
if prt.is_open:
print("Connection succesful. Port is opened.")
print("Port parameters:", prt)
print("")
else:
print("Can't open port. Exiting program.")
exit()
def close_port(prt):
prt.close()
print("")
if prt.is_open:
print("Can't close port. Exiting program.")
exit()
else:
print("Port is closed. Exiting program.")
exit()
#### ---- Interacting with device: low-level
# ---- Sending commands
def send_TASK_ENABLE(prt, bytestring):
''' Set task parameters (x7777 + ...).
Expected device answer: STATE.
'''
if len(bytestring) != TASK_ENABLE_COMMAND_LENGTH:
print("Error. Wrong parameter string for TASK_ENABLE.")
return None
prt.write(bytestring)
print("Sent: Set control parameters (TASK_ENABLE).")
def send_DECODE_ENABLE(prt, bytestring):
''' Set control parameters (x1111 + ...).
Expected device answer: STATE.
'''
if len(bytestring) != SEND_PARAMS_TOTAL_LENGTH:
print("Error. Wrong parameter string for DECODE_ENABLE.")
return None
prt.write(bytestring)
print("Sent: Set control parameters (DECODE_ENABLE).")
def send_DEFAULT_ENABLE(prt):
''' Reset the device (x2222).
Expected device answer: STATE.
'''
input = bytearray.fromhex(flipfour("2222"))
prt.write(input)
print("Sent: Reset device (DEFAULT_ENABLE).")
def send_TRANSS_ENABLE(prt):
''' Request all saved data (x3333).
Expected device answer: SAVED_DATA.
'''
# TODO later.
pass
def send_TRANS_ENABLE(prt):
''' Request last piece of data (x4444).
Expected device answer: DATA.
'''
input = bytearray.fromhex(flipfour("4444"))
prt.write(input)
print("Sent: Request last data (TRANS_ENABLE).")
def send_REMOVE_FILE(prt):
''' Delete saved data (x5555).
Expected device answer: STATE.
'''
input = bytearray.fromhex(flipfour("5555"))
prt.write(input)
print("Sent: Delete saved data (REMOVE_FILE).")
pass
def send_STATE(prt):
''' Request state (x6666).
Expected device answer: STATE.
'''
input = bytearray.fromhex(flipfour("6666"))
prt.write(input)
print("Sent: Request state (STATE).")
pass
# ---- Getting data
def get_STATE(prt):
''' Get decoded state of the device in byte format (2 bytes).
'''
print("Received "+str(prt.inWaiting())+" bytes.")
if prt.inWaiting()!=2:
print("Error. Couldn't get STATE data. prt.inWaiting():", prt.inWaiting())
print("Flushing input data:", prt.read(prt.inWaiting()))
# print("Flushing input data:", prt.read(2), prt.read(2))
return None
out_bytes = prt.read(2)
return out_bytes
def get_DATA(prt):
''' Get decoded state of the device in byte format (426 bytes).
'''
print("Received "+str(prt.inWaiting())+" bytes.\n")
if prt.inWaiting()!=GET_DATA_TOTAL_LENGTH:
print("Error. Couldn't get DATA data.")
print("receiven data len:", prt.inWaiting())
return None
out_bytes = prt.read(GET_DATA_TOTAL_LENGTH)
return out_bytes
#### ---- Interacting with device: decode/encode messages
# ---- Encoding functions
def CalculateCRC(data):
CRC_input = []
for i in range(1,int(len(data)/4)):
CRC_input.append(data[4*i:4*i+4])
return crc(CRC_input)
def encode_Setup():
bits=['0']*16
bits[15] = "1" # enable work
bits[14] = "1" # enable 5v1
bits[13] = "1" # enable 5v2
bits[12] = "1" # enable LD1
bits[11] = "1" # enable LD2
bits[10] = "1" # enable REF1
bits[9] = "1" # enable REF2
bits[8] = "1" # enable TEC1
bits[7] = "1" # enable TEC2
bits[6] = "1" # enable temp stab 1
bits[5] = "1" # enable temp stab 2
bits[4] = "0" # enable sd save
bits[3] = "1" # enable PI1 coef read
bits[2] = "1" # enable PI2 coef read
bits[1] = "0" # reserved
bits[0] = "0" # reserved
s="".join([str(i) for i in bits])
return hex(int(s,2))[2:]
def create_TaskEnableCommand(sending_param):
data = flipfour("7777") # Word 0
data += flipfour(encode_Setup()) # Word 1
data += flipfour(int_to_hex(sending_param['TaskType'])) # Word 2
match sending_param['TaskType']:
case TaskType.ChangeCurrentLD1.value:
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['MinC1']))) # Word 3
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['MaxC1']))) # Word 4
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['DeltaC1']))) # Word 5
data += flipfour(int_to_hex(int(sending_param['Dt']*100))) # Word 6
data += flipfour(int_to_hex(cnv.conv_T_C_to_N(sending_param['T1']))) # Word 7
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['I2']))) # Word 8
data += flipfour(int_to_hex(cnv.conv_T_C_to_N(sending_param['T2']))) # Word 9
case TaskType.ChangeCurrentLD2.value:
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['MinC2']))) # Word 3
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['MaxC2']))) # Word 4
data += flipfour(int_to_hex(int(sending_param['DeltaC2']*100))) # Word 5
data += flipfour(int_to_hex(int(sending_param['Dt']*100))) # Word 6
data += flipfour(int_to_hex(cnv.conv_T_C_to_N(sending_param['T2']))) # Word 7
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['I1']))) # Word 8
data += flipfour(int_to_hex(cnv.conv_T_C_to_N(sending_param['T1']))) # Word 9
case TaskType.ChangeTemperatureLD1:
raise Exception("Temperature changing is not implemented yet")
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['MinT1']))) # Word 3
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['MaxT1']))) # Word 4
data += flipfour(int_to_hex(sending_param['DeltaT1']*100)) # Word 5
data += flipfour(int_to_hex(sending_param['Dt']*100)) # Word 6
data += flipfour(int_to_hex(cnv.conv_T_C_to_N(sending_param['I1']))) # Word 7
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['I2']))) # Word 8
data += flipfour(int_to_hex(cnv.conv_T_C_to_N(sending_param['T2']))) # Word 9
case TaskType.ChangeTemperatureLD2:
raise Exception("Temperature changing is not implemented yet")
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['MinT2']))) # Word 3
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['MaxT2']))) # Word 4
data += flipfour(int_to_hex(sending_param['DeltaT2']*100)) # Word 5
data += flipfour(int_to_hex(sending_param['Dt']*100)) # Word 6
data += flipfour(int_to_hex(cnv.conv_T_C_to_N(sending_param['I2']))) # Word 7
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(sending_param['I1']))) # Word 8
data += flipfour(int_to_hex(cnv.conv_T_C_to_N(sending_param['T1']))) # Word 9
case _:
raise Exception(f"Undefined TaskType:{sending_param['TaskType']}")
data += flipfour(int_to_hex(int(sending_param['Tau']))) # Word 10
data += flipfour(int_to_hex(sending_param['ProportionalCoeff_1'])) # Word 11
data += flipfour(int_to_hex(sending_param['IntegralCoeff_1'])) # Word 12
data += flipfour(int_to_hex(sending_param['ProportionalCoeff_2'])) # Word 13
data += flipfour(int_to_hex(sending_param['IntegralCoeff_2'])) # Word 14
data += CalculateCRC(data) # Word 15
return bytearray.fromhex(data)
def encode_Input(params):
if params is None:
return bytearray.fromhex("1111"+"00"*14)
data = flipfour("1111") # Word 0
data += flipfour(encode_Setup()) # Word 1
data += flipfour(int_to_hex(cnv.conv_T_C_to_N(params['Temp_1']))) # Word 2
data += flipfour(int_to_hex(cnv.conv_T_C_to_N(params['Temp_2']))) # Word 3
data += flipfour("0000")*3 # Words 4-6
data += flipfour(int_to_hex(params['ProportionalCoeff_1'])) # Word 7
data += flipfour(int_to_hex(params['IntegralCoeff_1'])) # Word 8
data += flipfour(int_to_hex(params['ProportionalCoeff_2'])) # Word 9
data += flipfour(int_to_hex(params['IntegralCoeff_2'])) # Word 10
data += flipfour(params['Message_ID']) # Word 11
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(params['Iset_1']))) # Word 12
data += flipfour(int_to_hex(cnv.conv_I_mA_to_N(params['Iset_2']))) # Word 13
CRC_input = []
for i in range(1,int(len(data)/4)):
CRC_input.append(data[4*i:4*i+4])
CRC = crc(CRC_input)
data += CRC # Word 14
return bytearray.fromhex(data)
# ---- Decoding functions
def decode_STATE(state):
st = flipfour(state)
if st == '0000':
status = "All ok."
elif st == '0001':
status = "SD Card reading/writing error (SD_ERR)."
elif st == '0002':
status = "Command error (UART_ERR)."
elif st == '0004':
status = "Wrong parameter value error (UART_DECODE_ERR)."
elif st == '0008':
status = "Laser 1: TEC driver overheat (TEC1_ERR)."
elif st == '0010':
status = "Laser 2: TEC driver overheat (TEC2_ERR)."
elif st == '0020':
status = "Resetting system error (DEFAULT_ERR)."
elif st == '0040':
status = "File deletion error (REMOVE_ERR)."
else:
status = "Unknown or reserved error."
return status
def decode_DATA(dh):
def get_word(s,num):
return flipfour(s[num*2*2:num*2*2+4])
def get_int_word(s,num):
return int(get_word(s,num),16)
data = {}
data['datetime'] = datetime.now()
data['Header'] = get_word(dh, 0)
data['I1'] = cnv.conv_I_N_to_mA(get_int_word(dh, 1)) #LD1_param.POWER
data['I2'] = cnv.conv_I_N_to_mA(get_int_word(dh, 2)) #LD2_param.POWER
data['TO_LSB'] = get_int_word(dh, 3) #TO6_counter_LSB
data['TO_MSB'] = get_int_word(dh, 4) #TO6_counter_MSB
data['Temp_1'] = cnv.conv_T_N_to_C(get_int_word(dh, 5)) #LD1_param.LD_CURR_TEMP
data['Temp_2'] = cnv.conv_T_N_to_C(get_int_word(dh, 6)) #LD2_param.LD_CURR_TEMP
data['Temp_Ext_1'] = cnv.conv_TExt_N_to_C(get_int_word(dh, 7)) #U_Rt1_ext_Gain
data['Temp_Ext_2'] = cnv.conv_TExt_N_to_C(get_int_word(dh, 8)) #U_Rt2_ext_Gain
data['MON_3V3'] = cnv.conv_U3V3_N_to_V(get_int_word(dh, 9)) #3V_monitor
data['MON_5V1'] = cnv.conv_U5V_N_to_V(get_int_word(dh, 10)) #5V1_monitor
data['MON_5V2'] = cnv.conv_U5V_N_to_V(get_int_word(dh, 11)) #5V2_monitor
data['MON_7V0'] = cnv.conv_U7V_N_to_V(get_int_word(dh, 12)) #7V_monitor
data['Message_ID'] = get_word(dh, 13) # Last received command
data['CRC'] = get_word(dh, 14)
return data

View File

@ -1,68 +0,0 @@
import math
# ---- Conversion functions
VREF = 2.5 # Volts
R1 = 10000 # Ohm
R2 = 2200 # Ohm
R3 = 27000 # Ohm
R4 = 30000 # Ohm
R5 = 27000 # Ohm
R6 = 56000 # Ohm
RREF = 10 # Ohm (current-setting resistor) @1550 nm - 28.7 Ohm; @840 nm - 10 Ohm
R7 = 22000 # Ohm
R8 = 22000 # Ohm
R9 = 5100 # Ohm
R10 = 180000 # Ohm
class Task:
def __init__(self):
self.task_type = 0
# Here should be fields, contained task parameters
def conv_T_C_to_N(T):
Rt = 10000 * math.exp( 3900/(T+273) - 3900/298 )
U = VREF/(R5*(R3+R4)) * ( R1*R4*(R5+R6) - Rt*(R3*R6-R4*R5) ) / (Rt+R1)
N = int(U * 65535 / VREF)
if N<0 or N>65535:
print("Error converting T=" + str(T) + " to N=" + str(N) + ". N should be within [0, 65535]. Returning N=0.")
return N
def conv_T_N_to_C(N):
U = N*VREF/65535 # Volts
Rt = R1 * (VREF*R4*(R5+R6) - U*R5*(R3+R4)) / (U*R5*(R3+R4) + VREF*R3*R6 - VREF*R4*R5) # Ohm
T = 1 / (1/298 + 1/3900 * math.log(Rt/10000)) - 273 # In Celsius
return T
def conv_TExt_N_to_C(N):
U = N*VREF/4095*1/(1+100000/R10) + VREF*R9/(R8+R9) # Volts
Rt = R7*U/(VREF-U) # Ohm
T = 1 / (1/298 + 1/3455 * math.log(Rt/10000)) - 273 # In Celsius
return T
def conv_I_mA_to_N(I):
N = int(65535/2000 * RREF * I) # I in mA
if N<0 or N>65535:
print("Error converting I=" + str(I) + " to N=" + str(N) + ". N should be within [0, 65535]. Returning N=0.")
N=0
return N
def conv_I_N_to_mA(N):
return N*2.5/(65535*4.4) - 1/20.4 # I in mA
def conv_U3V3_N_to_V(u_int):
return u_int * 1.221 * 0.001 # Volts
def conv_U5V_N_to_V(u_int):
return u_int * 1.8315 * 0.001 # Volts
def conv_U7V_N_to_V(u_int):
return u_int * 6.72 * 0.001 # Volts

View File

@ -1,144 +0,0 @@
import time
from datetime import datetime
import device_commands as cmd
#### ---- Constants
WAIT_AFTER_SEND = 0.15 # Wait after sending command, before requesting input (in seconds).
#### ---- High-level port commands
'''
def create_port_connection():
prt = None
for port, _, _ in sorted(cmd.list_ports.comports()):
try:
prt = cmd.setup_port_connection(port=port, baudrate=115200, timeout_sec=1)
cmd.open_port(prt)
reset_port_settings(prt)
except:
prt.close()
continue
break
return prt
'''
def create_port_connection():
prt = None
print()
ports = []
for port, _,_ in sorted(cmd.list_ports.comports()):
ports.append(port)
#ONLY FOR LINUX!!!
have_ttyUSB = False
USB_ports = []
for port in ports:
if "USB" in port:
USB_ports.append(port)
if len(USB_ports):
ports = USB_ports
# print("ports:", ports)
# for port, _, _ in sorted(cmd.list_ports.comports()):
for port in ports:
try:
print("PORT:", port)
prt = cmd.setup_port_connection(port=port, baudrate=115200, timeout_sec=1)
cmd.open_port(prt)
reset_port_settings(prt)
except:
prt.close()
continue
break
return prt
# def setup_connection():
# prt = cmd.setup_port_connection()
# cmd.open_port(prt)
# return prt
def reset_port_settings(prt):
# Reset port settings and check status
cmd.send_DEFAULT_ENABLE(prt)
time.sleep(WAIT_AFTER_SEND)
status = cmd.get_STATE(prt).hex()
if status is not None:
print("Received: STATE. State status:", cmd.decode_STATE(status), "("+cmd.flipfour(status)+")")
print("")
def request_state(prt):
# Request data
cmd.send_STATE(prt)
time.sleep(WAIT_AFTER_SEND)
status = cmd.get_STATE(prt).hex()
if status is not None:
print("Received: STATE. State status:", cmd.decode_STATE(status), "("+cmd.flipfour(status)+")")
print("")
def send_control_parameters(prt, params):
# Send control parameters
hexstring = cmd.encode_Input(params)
cmd.send_DECODE_ENABLE(prt,hexstring)
time.sleep(WAIT_AFTER_SEND)
status = cmd.get_STATE(prt).hex()
if status is not None:
print("Received: STATE. State status:", cmd.decode_STATE(status), "("+cmd.flipfour(status)+")")
print("")
else:
print("")
def send_task_command(prt, sending_param):
# Send task command (TASK_ENABLE state in firmware)
hexstring = cmd.create_TaskEnableCommand(sending_param)
cmd.send_TASK_ENABLE(prt,hexstring)
time.sleep(WAIT_AFTER_SEND)
status = cmd.get_STATE(prt).hex()
if status is not None:
print("Received: STATE. State status:", cmd.decode_STATE(status), "("+cmd.flipfour(status)+")")
print("")
else:
print("")
def request_data(prt):
# Request data
cmd.send_TRANS_ENABLE(prt)
time.sleep(WAIT_AFTER_SEND)
data = cmd.get_DATA(prt).hex()
data_dict = []
if data is not None:
data_dict = cmd.decode_DATA(data)
return data_dict
def print_data(data):
def shorten(i):
return str(round(i, 2))
print("Data from device (time: "+datetime.now().strftime("%H:%M:%S:%f")+"):")
print("Message Header:", data['Header'], " Message ID:", data['Message_ID'])
print("Photodiode Current 1 ("+str(len(data['I1']))+" values):", \
shorten(data['I1']), shorten(data['I1'][1]), "...", \
shorten(data['I1']), shorten(data['I1'][-1]), "mA")
print("Photodiode Current 2 ("+str(len(data['I2']))+" values):", \
shorten(data['I2']), shorten(data['I2'][1]), "...", \
shorten(data['I2']), shorten(data['I2'][-1]), "mA")
print("Laser Temperature 1:", shorten(data['Temp_1']), "C")
print("Laser Temperature 2:", shorten(data['Temp_2']), "C")
print("Temperature of external thermistor 1:", shorten(data['Temp_Ext_1']), "C")
print("Temperature of external thermistor 2:", shorten(data['Temp_Ext_2']), "C")
print("Voltages 3V3: "+shorten(data['MON_3V3'])+"V 5V1: "+shorten(data['MON_5V1'])+ \
"V 5V2: "+shorten(data['MON_5V2'])+"V 7V0: "+shorten(data['MON_7V0'])+"V.")
def close_connection(prt):
cmd.close_port(prt)

235
gui.py
View File

@ -1,235 +0,0 @@
#from Tools.scripts.highlight import default_html
#default_html
import FreeSimpleGUI as sg
#### ---- GUI Constants
WINDOW_TITLE = 'Модуль управления лазерной схемой оптического смесителя (Отдел радиофотоники МФТИ)'
WINDOW_SIZE = [0, 0]
COMPACT_LAYOUT = False
SET_BUTTON_TEXT = 'Задать'
SET_TEMPERATURE_TEXT_1 = 'Установка температуры лазера 1 (C):'
SET_TEMPERATURE_TEXT_2 = 'Установка температуры лазера 2 (C):'
SET_CURRENT_TEXT_1 = 'Управляющий ток лазера 1 (15-60 мА):'
SET_CURRENT_TEXT_2 = 'Управляющий ток лазера 2 (15-60 мА):'
SET_MANUAL_MODE_TEXT = 'Ручной режим ввода'
SET_TEXT_WIDTH = 34
SET_INPUT_WIDTH = 5
SET_MIN_TEMPERATURE_TEXT_1 = 'Минимальная температура лазера 1 (C):'
SET_MAX_TEMPERATURE_TEXT_1 = 'Максимальная температура лазера 1 (C):'
SET_DELTA_TEMPERATURE_TEXT_1 = 'Шаг дискретизации температуры лазера 1 (0.05-1 С):'
SET_MIN_CURRENT_TEXT_1 = 'Мнимальный ток лазера 1 (мА):'
SET_MAX_CURRENT_TEXT_1 = 'Максимальный ток лазера 1 (мА):'
SET_DELTA_CURRENT_TEXT_1 = 'Шаг дискретизации тока лазера 1 (0.002-0.5 мА):'
SET_MIN_TEMPERATURE_TEXT_2 = 'Минимальная температура лазера 2 (C):'
SET_MAX_TEMPERATURE_TEXT_2 = 'Максимальная температура лазера 2 (C):'
SET_DELTA_TEMPERATURE_TEXT_2 = 'Шаг дискретизации температуры лазера 2 (0.05-1 С):'
SET_MIN_CURRENT_TEXT_2 = 'Мнимальный ток лазера 2 (мА):'
SET_MAX_CURRENT_TEXT_2 = 'Максимальный ток лазера 2 (мА):'
SET_DELTA_CURRENT_TEXT_2 = 'Шаг дискретизации тока лазера 2 (0.002-0.5 мА):'
SET_DELTA_T_TEXT = 'Шаг дискретизации времени (20-100 мкс, шаг 10 мкс):'
SET_TAU_T_TEXT = 'Время задержки (3-10мс):'
SET_TEXT_WIDTH_NEW = 40
SET_START_BUTTON_TEXT = 'Пуск'
SET_STOP_BUTTON_TEXT = 'Стоп'
GRAPH_POINTS_NUMBER = 100 # Number of most recent data points shown on charts
GRAPH_CANVAS_SIZE = (0, 0)
GRAPH_BG_COLOR = '#303030'
GRAPH_SIGN_AXES_COLOR = 'orange'
GRAPH_T_MIN = 0 # Celsius
GRAPH_T_MAX = 50 # Celsius
GRAPH_I_MIN = 0.0 # mA
GRAPH_I_MAX = 1.0 # mA
READ_TEMPERATURE_TEXT = 'Температура лазера'
READ_CURRENT_TEXT = 'Ток фотодиода'
VOLTAGE_TEXT_WIDTH = 15
H_SEPARATOR_PAD = (1, 20)
OUTPUT_TEXT_PAD = (5, (20, 5))
WINDOW_MARGIN = (60, 90)
MIN_WINDOW_SIZE = (880, 660)
#### ---- Setting GUI
def get_screen_size():
global WINDOW_SIZE, GRAPH_CANVAS_SIZE, COMPACT_LAYOUT, SET_TEXT_WIDTH, SET_TEXT_WIDTH_NEW
global H_SEPARATOR_PAD, OUTPUT_TEXT_PAD
window = sg.Window('Test')
screen_width, screen_height = window.get_screen_size()
window.close()
COMPACT_LAYOUT = screen_width <= 1280 or screen_height <= 800
margin_w, margin_h = WINDOW_MARGIN
min_w, min_h = MIN_WINDOW_SIZE
window_width = min(screen_width, max(min_w, screen_width - margin_w))
window_height = min(screen_height, max(min_h, screen_height - margin_h))
WINDOW_SIZE = (window_width, window_height)
if COMPACT_LAYOUT:
SET_TEXT_WIDTH = 30
SET_TEXT_WIDTH_NEW = 34
graph_width = min(int(screen_width / 3.6), int(window_width / 3.1))
graph_height = max(110, int(screen_height / 6.5))
H_SEPARATOR_PAD = (1, 12)
OUTPUT_TEXT_PAD = (5, (12, 5))
else:
SET_TEXT_WIDTH = 34
SET_TEXT_WIDTH_NEW = 40
graph_width = int(screen_width / 3.5)
graph_height = int(screen_width / (3 * 2.75))
H_SEPARATOR_PAD = (1, 20)
OUTPUT_TEXT_PAD = (5, (20, 5))
graph_width = max(220, graph_width)
GRAPH_CANVAS_SIZE = (graph_width, graph_height)
return WINDOW_SIZE
def setup_gui(params):
sg.theme("DarkBlue12")
window_size = get_screen_size()
layout_input_col1 = [[sg.Text(SET_TEMPERATURE_TEXT_1, size=(SET_TEXT_WIDTH, 1)), sg.Push(),
sg.Input(params['Temp_1'], disabled_readonly_background_color="Gray", size=(SET_INPUT_WIDTH,1), key='-InputT1-', disabled = True)],
[sg.Text(SET_CURRENT_TEXT_1, size=(SET_TEXT_WIDTH, 1)), sg.Push(),
sg.Input(params['Iset_1'], disabled_readonly_background_color="Gray", size=(SET_INPUT_WIDTH,1), key='-InputI1-', disabled = True)],
[sg.HSeparator(pad=H_SEPARATOR_PAD)],
[sg.Push(), sg.Text(READ_TEMPERATURE_TEXT+' 1: ', key='-TOUT_1-')],
[sg.Graph(canvas_size=GRAPH_CANVAS_SIZE, graph_bottom_left=(0, GRAPH_T_MIN), graph_top_right=(GRAPH_POINTS_NUMBER, GRAPH_T_MAX),
background_color=GRAPH_BG_COLOR, enable_events=False, drag_submits=False, key='-GraphT1-')],
# [sg.HSeparator(pad=(10,15), color=sg.theme_background_color())],
[sg.Push(), sg.Text(READ_CURRENT_TEXT+' 1: ', pad=OUTPUT_TEXT_PAD, key='-IOUT_1-')],
[sg.Graph(canvas_size=GRAPH_CANVAS_SIZE, graph_bottom_left=(0, GRAPH_I_MIN), graph_top_right=(GRAPH_POINTS_NUMBER, GRAPH_I_MAX),
background_color=GRAPH_BG_COLOR, enable_events=False, drag_submits=False, key='-GraphI1-')]]
layout_input_col2 = [[sg.Text(SET_TEMPERATURE_TEXT_2, size=(SET_TEXT_WIDTH, 1)), sg.Push(),
sg.Input(params['Temp_2'], disabled_readonly_background_color="Gray", size=(SET_INPUT_WIDTH,1), key='-InputT2-', disabled = True)],
[sg.Text(SET_CURRENT_TEXT_2, size=(SET_TEXT_WIDTH, 1)), sg.Push(),
sg.Input(params['Iset_2'], disabled_readonly_background_color="Gray", size=(SET_INPUT_WIDTH,1), key='-InputI2-', disabled = True)],
[sg.HSeparator(pad=H_SEPARATOR_PAD)],
[sg.Push(), sg.Text(READ_TEMPERATURE_TEXT+' 2: ', key='-TOUT_2-')],
[sg.Graph(canvas_size=GRAPH_CANVAS_SIZE, graph_bottom_left=(0, GRAPH_T_MIN), graph_top_right=(GRAPH_POINTS_NUMBER, GRAPH_T_MAX),
background_color=GRAPH_BG_COLOR, enable_events=False, drag_submits=False, key='-GraphT2-')],
# [sg.HSeparator(pad=(10,15), color=sg.theme_background_color())],
[sg.Push(), sg.Text(READ_CURRENT_TEXT+' 2: ', pad=OUTPUT_TEXT_PAD, key='-IOUT_2-')],
[sg.Graph(canvas_size=GRAPH_CANVAS_SIZE, graph_bottom_left=(0, GRAPH_I_MIN), graph_top_right=(GRAPH_POINTS_NUMBER, GRAPH_I_MAX),
background_color=GRAPH_BG_COLOR, enable_events=False, drag_submits=False, key='-GraphI2-')]]
layout_input_col3 = [
[sg.Text(SET_MANUAL_MODE_TEXT, size=(SET_TEXT_WIDTH_NEW, 1)), sg.Checkbox('', default=False, key='-EnableManualSettings-')],
[sg.Text(SET_MIN_TEMPERATURE_TEXT_1, size=(SET_TEXT_WIDTH_NEW,1)),
sg.Input(params['Min_Temp_1'], size=(SET_INPUT_WIDTH,1), key='-InputMinT1-', disabled=True, disabled_readonly_background_color="Gray"), sg.Checkbox('', default=False, key='-EnableT1-')],
[sg.Text(SET_MAX_TEMPERATURE_TEXT_1, size=(SET_TEXT_WIDTH_NEW,1)),
sg.Input(params['Max_Temp_1'], size=(SET_INPUT_WIDTH,1), key='-InputMaxT1-', disabled=True, disabled_readonly_background_color="Gray")],
[sg.Text(SET_MIN_CURRENT_TEXT_1, size=(SET_TEXT_WIDTH_NEW,1)),
sg.Input(params['Min_Current_1'], size=(SET_INPUT_WIDTH,1), key='-InputMinC1-', disabled=True, disabled_readonly_background_color="Gray"), sg.Checkbox('', default=False, key='-EnableC1-')],
[sg.Text(SET_MAX_CURRENT_TEXT_1, size=(SET_TEXT_WIDTH_NEW,1)),
sg.Input(params['Max_Current_1'], size=(SET_INPUT_WIDTH,1), key='-InputMaxC1-', disabled=True, disabled_readonly_background_color="Gray")],
[sg.Text(SET_DELTA_TEMPERATURE_TEXT_1, size=(SET_TEXT_WIDTH_NEW,1)),
sg.Input(params['Delta_Temp_1'], size=(SET_INPUT_WIDTH,1), key='-InputDeltaT1-', disabled=True, disabled_readonly_background_color="Gray")],
[sg.Text(SET_DELTA_CURRENT_TEXT_1, size=(SET_TEXT_WIDTH_NEW,1)),
sg.Input(params['Delta_Current_1'], size=(SET_INPUT_WIDTH,1), key='-InputDeltaC1-', disabled=True, disabled_readonly_background_color="Gray")],
[sg.HSeparator(pad=H_SEPARATOR_PAD)],
[sg.Text(SET_MIN_TEMPERATURE_TEXT_2, size=(SET_TEXT_WIDTH_NEW,1)),
sg.Input(params['Min_Temp_2'], size=(SET_INPUT_WIDTH,1), key='-InputMinT2-', disabled=True, disabled_readonly_background_color="Gray"), sg.Checkbox('', default=False, key='-EnableT2-')],
[sg.Text(SET_MAX_TEMPERATURE_TEXT_2, size=(SET_TEXT_WIDTH_NEW,1)),
sg.Input(params['Max_Temp_2'], size=(SET_INPUT_WIDTH,1), key='-InputMaxT2-', disabled=True, disabled_readonly_background_color="Gray")],
[sg.Text(SET_MIN_CURRENT_TEXT_2, size=(SET_TEXT_WIDTH_NEW,1)),
sg.Input(params['Min_Current_2'], size=(SET_INPUT_WIDTH,1), key='-InputMinC2-', disabled=True, disabled_readonly_background_color="Gray"), sg.Checkbox('', default=False, key='-EnableC2-')],
[sg.Text(SET_MAX_CURRENT_TEXT_2, size=(SET_TEXT_WIDTH_NEW,1)),
sg.Input(params['Max_Current_2'], size=(SET_INPUT_WIDTH,1), key='-InputMaxC2-', disabled=True, disabled_readonly_background_color="Gray")],
[sg.Text(SET_DELTA_TEMPERATURE_TEXT_2, size=(SET_TEXT_WIDTH_NEW,1)),
sg.Input(params['Delta_Temp_2'], size=(SET_INPUT_WIDTH,1), key='-InputDeltaT2-', disabled=True, disabled_readonly_background_color="Gray")],
[sg.Text(SET_DELTA_CURRENT_TEXT_2, size=(SET_TEXT_WIDTH_NEW,1)),
sg.Input(params['Delta_Current_2'], size=(SET_INPUT_WIDTH,1), key='-InputDeltaC2-', disabled=True, disabled_readonly_background_color="Gray")],
[sg.HSeparator(pad=H_SEPARATOR_PAD)],
[sg.Text(SET_DELTA_T_TEXT, size=(SET_TEXT_WIDTH_NEW,1)),
sg.Input(params['Delta_Time'], size=(SET_INPUT_WIDTH,1), key='-InputDeltaTime-', disabled=True, disabled_readonly_background_color="Gray")],
[sg.Text(SET_TAU_T_TEXT, size=(SET_TEXT_WIDTH_NEW,1)),
sg.Input(params['Tau'], size=(SET_INPUT_WIDTH,1), key='-InputTau-', disabled=True, disabled_readonly_background_color="Gray")],
[sg.HSeparator(pad=H_SEPARATOR_PAD)],
[sg.Button(SET_START_BUTTON_TEXT, key='-StartCycle-', disabled_button_color=("Gray22", "Blue"), disabled=True), sg.Button(SET_STOP_BUTTON_TEXT, disabled_button_color=("Gray22", "Blue"), key='-StopCycle-', disabled=True)]]
layout = [[sg.Column(layout_input_col1, pad=(0,0)), sg.VSeparator(pad=(4,0)), sg.Column(layout_input_col2, pad=(0,0)), sg.VSeparator(pad=(4,0)), sg.Column(layout_input_col3, pad=(0,0))],
[sg.HSeparator(pad=(25,10))],
[sg.Text('', size=((3 if COMPACT_LAYOUT else 7),1)),
sg.Text('T терм 1:', size=(VOLTAGE_TEXT_WIDTH,1), key='-TTerm1-'), sg.Text('T терм 2:', size=(VOLTAGE_TEXT_WIDTH,1), key='-TTerm2-'),
sg.Text('3V3:', size=(VOLTAGE_TEXT_WIDTH,1), key='-3V3-'), sg.Text('5V1:', size=(VOLTAGE_TEXT_WIDTH,1), key='-5V1-'),
sg.Text('5V2:', size=(VOLTAGE_TEXT_WIDTH,1), key='-5V2-'), sg.Text('7V0:', size=(VOLTAGE_TEXT_WIDTH,1), key='-7V0-'),
sg.Push(), sg.Text('', key='-DateTime-', pad=(1,10)),
sg.Text('', size=(10,1))],
[sg.Exit('Выход', pad=(1,5), size=(10,1), key='-EXIT-')]]
window = sg.Window(WINDOW_TITLE, layout, finalize=True, element_justification='c', size=window_size, resizable=True)
window.bind('<Escape>', '-EXIT-')
return window
def sign_axes(window):
signs_dict = {}
signs_dict['-GraphT1-'] = \
(window['-GraphT1-'].draw_text(text=str(GRAPH_T_MIN)+' C', location=(3, GRAPH_T_MIN+(GRAPH_T_MAX-GRAPH_T_MIN)*0.05), color=GRAPH_SIGN_AXES_COLOR),
window['-GraphT1-'].draw_text(text=str(GRAPH_T_MAX)+' C', location=(3, GRAPH_T_MAX-(GRAPH_T_MAX-GRAPH_T_MIN)*0.05), color=GRAPH_SIGN_AXES_COLOR))
signs_dict['-GraphI1-'] = \
(window['-GraphI1-'].draw_text(text=str(GRAPH_I_MIN)+' мА', location=(4, GRAPH_I_MIN+(GRAPH_I_MAX-GRAPH_I_MIN)*0.05), color=GRAPH_SIGN_AXES_COLOR),
window['-GraphI1-'].draw_text(text=str(GRAPH_I_MAX)+' мА', location=(4, GRAPH_I_MAX-(GRAPH_I_MAX-GRAPH_I_MIN)*0.05), color=GRAPH_SIGN_AXES_COLOR))
signs_dict['-GraphT2-'] = \
(window['-GraphT2-'].draw_text(text=str(GRAPH_T_MIN)+' C', location=(3, GRAPH_T_MIN+(GRAPH_T_MAX-GRAPH_T_MIN)*0.05), color=GRAPH_SIGN_AXES_COLOR),
window['-GraphT2-'].draw_text(text=str(GRAPH_T_MAX)+' C', location=(3, GRAPH_T_MAX-(GRAPH_T_MAX-GRAPH_T_MIN)*0.05), color=GRAPH_SIGN_AXES_COLOR))
signs_dict['-GraphI2-'] = \
(window['-GraphI2-'].draw_text(text=str(GRAPH_I_MIN)+' мА', location=(4, GRAPH_I_MIN+(GRAPH_I_MAX-GRAPH_I_MIN)*0.05), color=GRAPH_SIGN_AXES_COLOR),
window['-GraphI2-'].draw_text(text=str(GRAPH_I_MAX)+' мА', location=(4, GRAPH_I_MAX-(GRAPH_I_MAX-GRAPH_I_MIN)*0.05), color=GRAPH_SIGN_AXES_COLOR))
return signs_dict

36
laser_control/__init__.py Normal file
View File

@ -0,0 +1,36 @@
"""Public package exports for the refactored laser-control application."""
from .controller import LaserController
from .models import DeviceState, DeviceStatus, Measurements
from .exceptions import (
LaserControlError,
ValidationError,
CommunicationError,
DeviceError,
CurrentOutOfRangeError,
DeviceNotRespondingError,
DeviceStateError,
InvalidParameterError,
PortNotFoundError,
ProtocolError,
TemperatureOutOfRangeError,
)
__version__ = "2.0.0"
__all__ = [
"LaserController",
"DeviceState",
"DeviceStatus",
"Measurements",
"LaserControlError",
"ValidationError",
"CommunicationError",
"CurrentOutOfRangeError",
"DeviceError",
"DeviceNotRespondingError",
"DeviceStateError",
"InvalidParameterError",
"PortNotFoundError",
"ProtocolError",
"TemperatureOutOfRangeError",
]

229
laser_control/constants.py Normal file
View File

@ -0,0 +1,229 @@
"""Shared constants for protocol, validation, transport, and GUI defaults."""
# ---- Transport / timing
BAUDRATE = 115200
SERIAL_TIMEOUT_SEC = 1.0
WAIT_AFTER_SEND_SEC = 0.15
GUI_POLL_INTERVAL_MS = 150
GUI_STATUS_INTERVAL_MS = 1000
# ---- Packet sizes
GET_DATA_TOTAL_LENGTH = 30
SEND_PARAMS_TOTAL_LENGTH = 30
SHORT_CONTROL_TOTAL_LENGTH = 10
WAVE_DATA_TOTAL_LENGTH = 30
PROFILE_SAVE_CONTROL_TOTAL_LENGTH = 30
PROFILE_SAVE_DATA_TOTAL_LENGTH = 30
SHORT_COMMAND_LENGTH = 2
STATUS_RESPONSE_LENGTH = 2
# ---- Supported firmware commands
CMD_DECODE_ENABLE = 0x1111
CMD_DEFAULT_ENABLE = 0x2222
CMD_TRANS_ENABLE = 0x4444
CMD_STATE = 0x6666
CMD_PROFILE_SAVE_CONTROL = 0x7777
CMD_AD9102_CONTROL = 0x8888
CMD_AD9833_CONTROL = 0x9999
CMD_DS1809_CONTROL = 0xAAAA
CMD_STM32_DAC_CONTROL = 0xBBBB
CMD_AD9102_WAVE_CONTROL = 0xCCCC
CMD_AD9102_WAVE_DATA = 0xDDDD
CMD_PROFILE_SAVE_DATA = 0xEEEE
# ---- Setup-word bit layout from firmware app_decode_work_packet()
SETUP_WORK_ENABLED = 1 << 0
SETUP_SUPPLY_5V1_ENABLED = 1 << 1
SETUP_SUPPLY_5V2_ENABLED = 1 << 2
SETUP_LASER1_ENABLED = 1 << 3
SETUP_LASER2_ENABLED = 1 << 4
SETUP_REFERENCE1_ENABLED = 1 << 5
SETUP_REFERENCE2_ENABLED = 1 << 6
SETUP_TEC1_ENABLED = 1 << 7
SETUP_TEC2_ENABLED = 1 << 8
SETUP_TEMP_SENSOR1_ENABLED = 1 << 9
SETUP_TEMP_SENSOR2_ENABLED = 1 << 10
SETUP_PID1_FROM_HOST = 1 << 12
SETUP_PID2_FROM_HOST = 1 << 13
DEFAULT_SETUP_WORD = (
SETUP_WORK_ENABLED
| SETUP_SUPPLY_5V1_ENABLED
| SETUP_SUPPLY_5V2_ENABLED
| SETUP_LASER1_ENABLED
| SETUP_LASER2_ENABLED
| SETUP_REFERENCE1_ENABLED
| SETUP_REFERENCE2_ENABLED
| SETUP_TEC1_ENABLED
| SETUP_TEC2_ENABLED
| SETUP_TEMP_SENSOR1_ENABLED
| SETUP_TEMP_SENSOR2_ENABLED
| SETUP_PID1_FROM_HOST
| SETUP_PID2_FROM_HOST
)
# ---- Status-byte flags from firmware app_types.h
STATUS_FLAG_SD_ERROR = 0x01
STATUS_FLAG_UART_ERROR = 0x02
STATUS_FLAG_UART_DECODE_ERROR = 0x04
STATUS_FLAG_TEC1_ERROR = 0x08
STATUS_FLAG_TEC2_ERROR = 0x10
STATUS_FLAG_DEFAULT_ERROR = 0x20
STATUS_FLAG_AD9102_ERROR = 0x80
STATUS_DESCRIPTIONS = {
STATUS_FLAG_SD_ERROR: "SD card read/write error.",
STATUS_FLAG_UART_ERROR: "UART framing or header error.",
STATUS_FLAG_UART_DECODE_ERROR: "Command payload validation error.",
STATUS_FLAG_TEC1_ERROR: "Laser 1 TEC driver overheat.",
STATUS_FLAG_TEC2_ERROR: "Laser 2 TEC driver overheat.",
STATUS_FLAG_DEFAULT_ERROR: "Device reset/default handling error.",
STATUS_FLAG_AD9102_ERROR: "AD9102 configuration or waveform error.",
}
# ---- Peripheral control flags from firmware app_types.h
AD9102_FLAG_ENABLE = 0x0001
AD9102_FLAG_TRIANGLE = 0x0002
AD9102_FLAG_SRAM = 0x0004
AD9102_FLAG_SRAM_FORMAT_ALT = 0x0008
AD9833_FLAG_ENABLE = 0x0001
AD9833_FLAG_TRIANGLE = 0x0002
DS1809_FLAG_INCREMENT = 0x0001
DS1809_FLAG_DECREMENT = 0x0002
STM32_DAC_FLAG_ENABLE = 0x0001
AD9102_WAVE_OPCODE_BEGIN = 0x0001
AD9102_WAVE_OPCODE_COMMIT = 0x0002
AD9102_WAVE_OPCODE_CANCEL = 0x0003
PROFILE_SAVE_OPCODE_BEGIN = 0x0001
PROFILE_SAVE_OPCODE_COMMIT = 0x0002
PROFILE_SAVE_OPCODE_CANCEL = 0x0003
PROFILE_SAVE_SECTION_PROFILE_TEXT = 0x0001
PROFILE_SAVE_SECTION_WAVEFORM_TEXT = 0x0002
# ---- Physical constants from the existing conversion formulas
VREF = 2.5
R1 = 10000
R2 = 2200
R3 = 27000
R4 = 30000
R5 = 27000
R6 = 56000
RREF = 30
R7 = 22000
R8 = 22000
R9 = 5100
R10 = 180000
BETA_INTERNAL = 3900
BETA_EXTERNAL = 3455
T0_K = 298
R0 = 10000
ADC_BITS_16 = 65535
ADC_BITS_12 = 4095
U3V3_COEFF = 1.221e-3
U5V_COEFF = 1.8315e-3
U7V_COEFF = 6.72e-3
# ---- Validation limits
TEMP_MIN_C = 15.0
TEMP_MAX_C = 40.0
CURRENT_MIN_MA = 15.0
CURRENT_MAX_MA = 60.0
AD9102_SAW_STEP_MIN = 1
AD9102_SAW_STEP_MAX = 63
AD9102_PAT_BASE_MIN = 0
AD9102_PAT_BASE_MAX = 15
AD9102_PAT_PERIOD_MIN = 0
AD9102_PAT_PERIOD_MAX = 65535
AD9102_SRAM_SAMPLE_MIN = 2
AD9102_SRAM_SAMPLE_MAX = 4096
AD9102_SRAM_HOLD_MIN = 0
AD9102_SRAM_HOLD_MAX = 15
AD9102_SRAM_AMPLITUDE_MIN = 0
AD9102_SRAM_AMPLITUDE_MAX = 8191
AD9102_WAVE_SAMPLE_MIN = -8192
AD9102_WAVE_SAMPLE_MAX = 8191
AD9102_WAVE_MAX_CHUNK_SAMPLES = 12
AD9102_CLOCK_HZ = 150_000_000
AD9833_FREQ_WORD_MIN = 0
AD9833_FREQ_WORD_MAX = 0x0FFFFFFF
AD9833_MCLK_HZ = 20_000_000
AD9833_OUTPUT_FREQ_MIN_HZ = 0
AD9833_OUTPUT_FREQ_MAX_HZ = AD9833_MCLK_HZ // 2
DS1809_COUNT_MIN = 1
DS1809_COUNT_MAX = 64
DS1809_PULSE_MS_MIN = 1
DS1809_PULSE_MS_MAX = 500
DS1809_PROFILE_POSITION_MIN = 0
DS1809_PROFILE_POSITION_MAX = 63
STM32_DAC_CODE_MIN = 0
STM32_DAC_CODE_MAX = 4095
# ---- Rail tolerances
VOLT_3V3_MIN = 3.1
VOLT_3V3_MAX = 3.5
VOLT_5V_MIN = 4.8
VOLT_5V_MAX = 5.3
VOLT_7V_MIN = 6.5
VOLT_7V_MAX = 7.5
# ---- UI / runtime defaults
DEFAULT_TEMP1_C = 28.0
DEFAULT_TEMP2_C = 29.2
DEFAULT_CURRENT1_MA = 33.0
DEFAULT_CURRENT2_MA = 60.0
DEFAULT_AD9102_SAW_STEP = 1
DEFAULT_AD9102_PAT_BASE = 2
DEFAULT_AD9102_PAT_PERIOD = 0xFFFF
DEFAULT_AD9102_SAMPLE_COUNT = 16
DEFAULT_AD9102_HOLD_CYCLES = 1
DEFAULT_AD9102_AMPLITUDE = 8191
DEFAULT_AD9102_SAW_FREQUENCY_HZ = 4577
DEFAULT_AD9102_SRAM_FREQUENCY_HZ = 9_375_000
DEFAULT_AD9833_FREQ_WORD = 0
DEFAULT_AD9833_FREQUENCY_HZ = 1_000_000
DEFAULT_DS1809_COUNT = 1
DEFAULT_DS1809_PULSE_MS = 2
DEFAULT_DS1809_PROFILE_POSITION = 39
DEFAULT_STM32_DAC_VOLT = 0.52
DEFAULT_STM32_DAC_VREF = 2.5
DEFAULT_STM32_DAC_CODE = round(
DEFAULT_STM32_DAC_VOLT / DEFAULT_STM32_DAC_VREF * STM32_DAC_CODE_MAX
)
DEFAULT_PI_P = 2560
DEFAULT_PI_I = 128
PROFILE_NAME_MAX_LENGTH = 16
PROFILE_NAME_ALLOWED_PATTERN = r"[A-Za-z0-9 _-]{1,16}"
PROFILE_SAVE_DATA_CHUNK_BYTES = 22
PLOT_POINTS = 100

596
laser_control/controller.py Normal file
View File

@ -0,0 +1,596 @@
"""High-level controller orchestrating protocol encoding and serial transport."""
from __future__ import annotations
import logging
import math
import time
from typing import Callable, Sequence
from .constants import (
AD9102_CLOCK_HZ,
AD9102_PAT_BASE_MAX,
AD9102_PAT_BASE_MIN,
AD9102_PAT_PERIOD_MAX,
AD9102_PAT_PERIOD_MIN,
AD9102_SAW_STEP_MAX,
AD9102_SAW_STEP_MIN,
AD9102_SRAM_AMPLITUDE_MAX,
AD9102_SRAM_AMPLITUDE_MIN,
AD9102_SRAM_HOLD_MAX,
AD9102_SRAM_HOLD_MIN,
AD9102_SRAM_SAMPLE_MAX,
AD9102_SRAM_SAMPLE_MIN,
AD9102_WAVE_MAX_CHUNK_SAMPLES,
AD9102_WAVE_SAMPLE_MAX,
AD9102_WAVE_SAMPLE_MIN,
AD9833_FREQ_WORD_MAX,
AD9833_FREQ_WORD_MIN,
AD9833_MCLK_HZ,
AD9833_OUTPUT_FREQ_MAX_HZ,
AD9833_OUTPUT_FREQ_MIN_HZ,
DEFAULT_CURRENT1_MA,
DEFAULT_AD9102_HOLD_CYCLES,
DEFAULT_AD9102_PAT_BASE,
DEFAULT_AD9102_PAT_PERIOD,
DEFAULT_CURRENT2_MA,
DEFAULT_PI_I,
DEFAULT_PI_P,
DEFAULT_TEMP1_C,
DEFAULT_TEMP2_C,
DS1809_COUNT_MAX,
DS1809_COUNT_MIN,
DS1809_PULSE_MS_MAX,
DS1809_PULSE_MS_MIN,
GET_DATA_TOTAL_LENGTH,
PROFILE_SAVE_DATA_CHUNK_BYTES,
PROFILE_SAVE_SECTION_PROFILE_TEXT,
PROFILE_SAVE_SECTION_WAVEFORM_TEXT,
STM32_DAC_CODE_MAX,
STM32_DAC_CODE_MIN,
STATUS_RESPONSE_LENGTH,
WAIT_AFTER_SEND_SEC,
)
from .exceptions import (
CommunicationError,
DeviceNotRespondingError,
DeviceStateError,
InvalidParameterError,
)
from .models import DeviceState, DeviceStatus, Measurements, ProfileSaveRequest
from .protocol import Protocol
from .transport import SerialTransport
from .validators import ParameterValidator
logger = logging.getLogger(__name__)
_AD9102_SAW_RAMP_STEPS = 1 << 14
def ad9102_saw_frequency_limits_hz(*, triangle: bool) -> tuple[int, int]:
"""Return the reachable frequency range for the built-in saw generator."""
factor = 2 if triangle else 1
minimum = math.ceil(AD9102_CLOCK_HZ / (_AD9102_SAW_RAMP_STEPS * factor * AD9102_SAW_STEP_MAX))
maximum = math.floor(AD9102_CLOCK_HZ / (_AD9102_SAW_RAMP_STEPS * factor * AD9102_SAW_STEP_MIN))
return minimum, maximum
def ad9102_saw_frequency_from_step_hz(*, triangle: bool, saw_step: int) -> float:
"""Calculate the actual built-in saw/triangle frequency for a given SAW_STEP."""
factor = 2 if triangle else 1
saw_step = max(AD9102_SAW_STEP_MIN, min(AD9102_SAW_STEP_MAX, int(saw_step)))
return AD9102_CLOCK_HZ / (_AD9102_SAW_RAMP_STEPS * factor * saw_step)
def ad9102_saw_step_from_frequency_hz(*, triangle: bool, frequency_hz: int) -> tuple[int, float]:
"""Map a desired built-in saw frequency to the closest supported SAW_STEP."""
min_hz, max_hz = ad9102_saw_frequency_limits_hz(triangle=triangle)
if frequency_hz < min_hz or frequency_hz > max_hz:
raise InvalidParameterError(
f"frequency_hz must be in range [{min_hz}, {max_hz}] for this AD9102 mode"
)
factor = 2 if triangle else 1
saw_step = round(AD9102_CLOCK_HZ / (_AD9102_SAW_RAMP_STEPS * factor * frequency_hz))
saw_step = max(AD9102_SAW_STEP_MIN, min(AD9102_SAW_STEP_MAX, saw_step))
return saw_step, ad9102_saw_frequency_from_step_hz(triangle=triangle, saw_step=saw_step)
def ad9102_sram_frequency_limits_hz(*, hold_cycles: int = DEFAULT_AD9102_HOLD_CYCLES) -> tuple[int, int]:
"""Return the reachable frequency range for SRAM playback for a fixed hold setting."""
hold = hold_cycles or DEFAULT_AD9102_HOLD_CYCLES
minimum = math.ceil(AD9102_CLOCK_HZ / (AD9102_SRAM_SAMPLE_MAX * hold))
maximum = math.floor(AD9102_CLOCK_HZ / (AD9102_SRAM_SAMPLE_MIN * hold))
return minimum, maximum
def ad9102_sram_frequency_from_playback_hz(*, sample_count: int, hold_cycles: int) -> float:
"""Calculate the actual SRAM playback frequency."""
sample_count = max(AD9102_SRAM_SAMPLE_MIN, min(AD9102_SRAM_SAMPLE_MAX, int(sample_count)))
hold = hold_cycles or DEFAULT_AD9102_HOLD_CYCLES
hold = max(DEFAULT_AD9102_HOLD_CYCLES, min(AD9102_SRAM_HOLD_MAX, int(hold)))
return AD9102_CLOCK_HZ / (sample_count * hold)
def ad9102_sram_sample_count_from_frequency_hz(
*,
frequency_hz: int,
hold_cycles: int = DEFAULT_AD9102_HOLD_CYCLES,
) -> tuple[int, float]:
"""Map a desired SRAM playback frequency to the closest supported sample count."""
min_hz, max_hz = ad9102_sram_frequency_limits_hz(hold_cycles=hold_cycles)
if frequency_hz < min_hz or frequency_hz > max_hz:
raise InvalidParameterError(
f"frequency_hz must be in range [{min_hz}, {max_hz}] for this AD9102 mode"
)
hold = hold_cycles or DEFAULT_AD9102_HOLD_CYCLES
sample_count = round(AD9102_CLOCK_HZ / (frequency_hz * hold))
sample_count = max(AD9102_SRAM_SAMPLE_MIN, min(AD9102_SRAM_SAMPLE_MAX, sample_count))
return sample_count, ad9102_sram_frequency_from_playback_hz(
sample_count=sample_count,
hold_cycles=hold,
)
class LaserController:
"""Public API for manual control, polling, and status queries."""
def __init__(
self,
port: str | None = None,
pi_coeff1_p: int = DEFAULT_PI_P,
pi_coeff1_i: int = DEFAULT_PI_I,
pi_coeff2_p: int = DEFAULT_PI_P,
pi_coeff2_i: int = DEFAULT_PI_I,
on_data: Callable[[Measurements], None] | None = None,
) -> None:
self._transport = SerialTransport(port=port)
self._pi1_p = pi_coeff1_p
self._pi1_i = pi_coeff1_i
self._pi2_p = pi_coeff2_p
self._pi2_i = pi_coeff2_i
self._on_data = on_data
self._message_id = 0
self._last_measurements: Measurements | None = None
self._last_temp1 = DEFAULT_TEMP1_C
self._last_temp2 = DEFAULT_TEMP2_C
self._last_current1 = DEFAULT_CURRENT1_MA
self._last_current2 = DEFAULT_CURRENT2_MA
@property
def is_connected(self) -> bool:
"""Return True when the serial port is connected."""
return self._transport.is_connected
@property
def port_name(self) -> str | None:
"""Return the active serial port name when available."""
return self._transport.port_name
def connect(self) -> bool:
"""Open the configured serial connection."""
self._transport.connect()
logger.info("Connected to laser controller on port %s", self.port_name)
return True
def disconnect(self) -> None:
"""Close the serial connection."""
self._transport.disconnect()
logger.info("Disconnected from laser controller")
def set_manual_mode(
self,
temp1: float,
temp2: float,
current1: float,
current2: float,
) -> None:
"""Send manual setpoints and remember them for post-reset restore."""
values = ParameterValidator.validate_manual_mode_params(
temp1=temp1,
temp2=temp2,
current1=current1,
current2=current2,
)
self._message_id = (self._message_id + 1) & 0xFFFF
command = Protocol.encode_decode_enable(
temp1=values["temp1"],
temp2=values["temp2"],
current1=values["current1"],
current2=values["current2"],
pi_coeff1_p=self._pi1_p,
pi_coeff1_i=self._pi1_i,
pi_coeff2_p=self._pi2_p,
pi_coeff2_i=self._pi2_i,
message_id=self._message_id,
)
self._send_and_expect_ok(command)
self._last_temp1 = values["temp1"]
self._last_temp2 = values["temp2"]
self._last_current1 = values["current1"]
self._last_current2 = values["current2"]
def reset(self) -> None:
"""Send DEFAULT_ENABLE and require an error-free acknowledgement."""
self._send_and_expect_ok(Protocol.encode_default_enable())
logger.info("Device reset command sent")
def configure_ad9102(
self,
*,
enabled: bool,
use_sram: bool,
triangle: bool = False,
saw_step: int = 1,
pat_period_base: int = 2,
pat_period: int = 0xFFFF,
sample_count: int = 16,
hold_cycles: int = 1,
amplitude: int = 8191,
use_amplitude_format: bool = False,
) -> int:
"""Configure the AD9102 signal generator in saw or generated-SRAM mode."""
if use_sram:
sample_count = self._validate_int_range(
sample_count,
"sample_count",
AD9102_SRAM_SAMPLE_MIN,
AD9102_SRAM_SAMPLE_MAX,
)
if use_amplitude_format:
amplitude = self._validate_int_range(
amplitude,
"amplitude",
AD9102_SRAM_AMPLITUDE_MIN,
AD9102_SRAM_AMPLITUDE_MAX,
)
command = Protocol.encode_ad9102_control(
enabled=enabled,
triangle=triangle,
sram_mode=True,
alt_format=True,
param0=amplitude,
param1=sample_count,
)
else:
hold_cycles = self._validate_int_range(
hold_cycles,
"hold_cycles",
AD9102_SRAM_HOLD_MIN,
AD9102_SRAM_HOLD_MAX,
)
command = Protocol.encode_ad9102_control(
enabled=enabled,
triangle=triangle,
sram_mode=True,
alt_format=False,
param0=sample_count,
param1=hold_cycles,
)
else:
saw_step = self._validate_int_range(
saw_step,
"saw_step",
AD9102_SAW_STEP_MIN,
AD9102_SAW_STEP_MAX,
)
pat_period_base = self._validate_int_range(
pat_period_base,
"pat_period_base",
AD9102_PAT_BASE_MIN,
AD9102_PAT_BASE_MAX,
)
pat_period = self._validate_int_range(
pat_period,
"pat_period",
AD9102_PAT_PERIOD_MIN,
AD9102_PAT_PERIOD_MAX,
)
param0 = ((pat_period_base & 0x0F) << 8) | (saw_step & 0xFF)
command = Protocol.encode_ad9102_control(
enabled=enabled,
triangle=triangle,
sram_mode=False,
alt_format=False,
param0=param0,
param1=pat_period,
)
detail = self._send_and_expect_ok(command)
logger.info("AD9102 configured: sram=%s triangle=%s enabled=%s", use_sram, triangle, enabled)
return detail
def configure_ad9102_simple(
self,
*,
enabled: bool,
use_sram: bool,
triangle: bool,
frequency_hz: int,
amplitude: int = 8191,
) -> dict[str, float | int | bool]:
"""Configure AD9102 using simplified frequency/shape controls."""
if use_sram:
amplitude = self._validate_int_range(
amplitude,
"amplitude",
AD9102_SRAM_AMPLITUDE_MIN,
AD9102_SRAM_AMPLITUDE_MAX,
)
sample_count, actual_frequency_hz = ad9102_sram_sample_count_from_frequency_hz(
frequency_hz=frequency_hz,
hold_cycles=DEFAULT_AD9102_HOLD_CYCLES,
)
detail = self.configure_ad9102(
enabled=enabled,
use_sram=True,
triangle=triangle,
sample_count=sample_count,
amplitude=amplitude,
use_amplitude_format=True,
)
return {
"detail": detail,
"actual_frequency_hz": actual_frequency_hz,
"sample_count": sample_count,
"hold_cycles": DEFAULT_AD9102_HOLD_CYCLES,
"amplitude_applied": True,
}
saw_step, actual_frequency_hz = ad9102_saw_step_from_frequency_hz(
triangle=triangle,
frequency_hz=frequency_hz,
)
detail = self.configure_ad9102(
enabled=enabled,
use_sram=False,
triangle=triangle,
saw_step=saw_step,
pat_period_base=DEFAULT_AD9102_PAT_BASE,
pat_period=DEFAULT_AD9102_PAT_PERIOD,
)
return {
"detail": detail,
"actual_frequency_hz": actual_frequency_hz,
"saw_step": saw_step,
"amplitude_applied": False,
}
def configure_ad9833(self, *, enabled: bool, triangle: bool, frequency_word: int) -> None:
"""Configure the AD9833 generator using its raw 28-bit frequency word."""
frequency_word = self._validate_int_range(
frequency_word,
"frequency_word",
AD9833_FREQ_WORD_MIN,
AD9833_FREQ_WORD_MAX,
)
self._send_and_expect_ok(
Protocol.encode_ad9833_control(
enabled=enabled,
triangle=triangle,
frequency_word=frequency_word,
)
)
logger.info("AD9833 configured: enabled=%s triangle=%s word=%d", enabled, triangle, frequency_word)
def configure_ad9833_frequency(self, *, enabled: bool, triangle: bool, frequency_hz: int) -> int:
"""Configure AD9833 using output frequency in hertz for a 20 MHz master clock."""
frequency_hz = self._validate_int_range(
frequency_hz,
"frequency_hz",
AD9833_OUTPUT_FREQ_MIN_HZ,
AD9833_OUTPUT_FREQ_MAX_HZ,
)
frequency_word = int(round(frequency_hz * (1 << 28) / AD9833_MCLK_HZ))
if frequency_word < AD9833_FREQ_WORD_MIN:
frequency_word = AD9833_FREQ_WORD_MIN
if frequency_word > AD9833_FREQ_WORD_MAX:
frequency_word = AD9833_FREQ_WORD_MAX
self.configure_ad9833(
enabled=enabled,
triangle=triangle,
frequency_word=frequency_word,
)
return frequency_word
def pulse_ds1809(self, *, increment: bool, count: int, pulse_ms: int) -> None:
"""Pulse the DS1809 digital potentiometer in one direction."""
if not increment and count:
decrement = True
else:
decrement = False
count = self._validate_int_range(count, "count", DS1809_COUNT_MIN, DS1809_COUNT_MAX)
pulse_ms = self._validate_int_range(
pulse_ms,
"pulse_ms",
DS1809_PULSE_MS_MIN,
DS1809_PULSE_MS_MAX,
)
self._send_and_expect_ok(
Protocol.encode_ds1809_control(
increment=increment,
decrement=decrement,
count=count,
pulse_ms=pulse_ms,
)
)
logger.info("DS1809 pulsed: increment=%s count=%d pulse_ms=%d", increment, count, pulse_ms)
def set_stm32_dac(self, *, enabled: bool, dac_code: int) -> None:
"""Set the STM32 on-chip DAC code and output-enable state."""
dac_code = self._validate_int_range(
dac_code,
"dac_code",
STM32_DAC_CODE_MIN,
STM32_DAC_CODE_MAX,
)
self._send_and_expect_ok(
Protocol.encode_stm32_dac_control(enabled=enabled, dac_code=dac_code)
)
logger.info("STM32 DAC configured: enabled=%s code=%d", enabled, dac_code)
def save_profile_to_sd(self, request: ProfileSaveRequest) -> None:
"""Stream a rendered profile INI and optional waveform CSV to the device SD card."""
if not isinstance(request, ProfileSaveRequest):
raise InvalidParameterError("request", "Value must be a ProfileSaveRequest instance")
profile_name = ParameterValidator.validate_profile_name(request.profile_name)
if not isinstance(request.profile_text, str) or not request.profile_text.strip():
raise InvalidParameterError("profile_text", "Value must not be empty")
if not isinstance(request.waveform_text, str):
raise InvalidParameterError("waveform_text", "Value must be a string")
try:
profile_bytes = request.profile_text.encode("ascii")
waveform_bytes = request.waveform_text.encode("ascii")
except UnicodeEncodeError as exc:
raise InvalidParameterError(
"profile_text",
"Profile payload must contain ASCII text only",
) from exc
begin_sent = False
try:
self._send_and_expect_ok(
Protocol.encode_profile_save_begin(
profile_name=profile_name,
profile_text_bytes=len(profile_bytes),
waveform_text_bytes=len(waveform_bytes),
)
)
begin_sent = True
for start in range(0, len(profile_bytes), PROFILE_SAVE_DATA_CHUNK_BYTES):
self._send_and_expect_ok(
Protocol.encode_profile_save_data(
section_id=PROFILE_SAVE_SECTION_PROFILE_TEXT,
chunk=profile_bytes[start:start + PROFILE_SAVE_DATA_CHUNK_BYTES],
)
)
for start in range(0, len(waveform_bytes), PROFILE_SAVE_DATA_CHUNK_BYTES):
self._send_and_expect_ok(
Protocol.encode_profile_save_data(
section_id=PROFILE_SAVE_SECTION_WAVEFORM_TEXT,
chunk=waveform_bytes[start:start + PROFILE_SAVE_DATA_CHUNK_BYTES],
)
)
self._send_and_expect_ok(Protocol.encode_profile_save_commit())
except Exception:
if begin_sent:
try:
self._send_and_expect_ok(Protocol.encode_profile_save_cancel())
except Exception as cancel_exc: # noqa: BLE001
logger.warning("Profile save cancel failed: %s", cancel_exc)
raise
logger.info(
"Profile saved to SD: name=%s waveform_bytes=%d",
profile_name,
len(waveform_bytes),
)
def upload_ad9102_waveform(self, samples: Sequence[int]) -> None:
"""Upload and commit a custom AD9102 waveform from signed 14-bit samples."""
if not samples:
raise InvalidParameterError("samples", "At least two samples are required")
sample_list = [self._validate_wave_sample(sample, index) for index, sample in enumerate(samples)]
sample_count = len(sample_list)
if not AD9102_SRAM_SAMPLE_MIN <= sample_count <= AD9102_SRAM_SAMPLE_MAX:
raise InvalidParameterError(
"samples",
f"Sample count must be in range [{AD9102_SRAM_SAMPLE_MIN}, {AD9102_SRAM_SAMPLE_MAX}]",
)
self._send_and_expect_ok(Protocol.encode_ad9102_wave_begin(sample_count))
for start in range(0, sample_count, AD9102_WAVE_MAX_CHUNK_SAMPLES):
chunk = sample_list[start:start + AD9102_WAVE_MAX_CHUNK_SAMPLES]
self._send_and_expect_ok(Protocol.encode_ad9102_wave_data(chunk))
self._send_and_expect_ok(Protocol.encode_ad9102_wave_commit())
logger.info("Uploaded AD9102 waveform with %d samples", sample_count)
def cancel_ad9102_waveform_upload(self) -> None:
"""Cancel an in-progress AD9102 custom waveform upload."""
self._send_and_expect_ok(Protocol.encode_ad9102_wave_cancel())
logger.info("Cancelled AD9102 waveform upload")
def get_measurements(self) -> Measurements | None:
"""Request one telemetry frame from the device."""
self._send(Protocol.encode_trans_enable())
raw = self._transport.read(GET_DATA_TOTAL_LENGTH)
if len(raw) != GET_DATA_TOTAL_LENGTH:
logger.warning("Expected %d telemetry bytes, got %d", GET_DATA_TOTAL_LENGTH, len(raw))
return None
measurements = Protocol.decode_response(raw)
self._last_measurements = measurements
if self._on_data is not None:
self._on_data(measurements)
return measurements
def get_status(self) -> DeviceStatus:
"""Query the current two-byte firmware status word."""
self._send(Protocol.encode_state())
raw = self._transport.read(STATUS_RESPONSE_LENGTH)
if len(raw) != STATUS_RESPONSE_LENGTH:
raise DeviceNotRespondingError()
state, detail = Protocol.decode_status(raw)
return DeviceStatus(
state=state,
detail=detail,
measurements=self._last_measurements,
is_connected=self.is_connected,
last_command_id=self._message_id,
error_message=Protocol.state_to_description(state),
)
def _send(self, data: bytes) -> None:
if not self.is_connected:
raise CommunicationError("Not connected to device. Call connect() first.")
self._transport.send(data)
time.sleep(WAIT_AFTER_SEND_SEC)
def _send_and_expect_ok(self, data: bytes) -> int:
self._send(data)
raw = self._transport.read(STATUS_RESPONSE_LENGTH)
if len(raw) != STATUS_RESPONSE_LENGTH:
raise DeviceNotRespondingError()
state, detail = Protocol.decode_status(raw)
if state != DeviceState.OK:
combined_code = int(state) | (detail << 8)
raise DeviceStateError(
combined_code,
Protocol.state_to_description(state),
)
return detail
@staticmethod
def _validate_int_range(value: int, name: str, minimum: int, maximum: int) -> int:
if isinstance(value, bool) or not isinstance(value, int):
raise InvalidParameterError(name, "Value must be an integer")
if not minimum <= value <= maximum:
raise InvalidParameterError(name, f"Value must be in range [{minimum}, {maximum}]")
return value
@staticmethod
def _validate_wave_sample(value: int, index: int) -> int:
if isinstance(value, bool) or not isinstance(value, int):
raise InvalidParameterError(f"samples[{index}]", "Value must be an integer")
if not AD9102_WAVE_SAMPLE_MIN <= value <= AD9102_WAVE_SAMPLE_MAX:
raise InvalidParameterError(
f"samples[{index}]",
f"Value must be in range [{AD9102_WAVE_SAMPLE_MIN}, {AD9102_WAVE_SAMPLE_MAX}]",
)
return value
def __enter__(self) -> "LaserController":
self.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
if self.is_connected:
self.disconnect()
return False

View File

@ -0,0 +1,114 @@
"""
Physical unit conversions for laser control module.
Converts between physical quantities (°C, mA, V) and
raw ADC/DAC integer values used by the device firmware.
All formulas are taken directly from the original device_conversion.py.
"""
import math
from .constants import (
VREF, R1, R3, R4, R5, R6,
R7, R8, R9, R10,
RREF,
BETA_INTERNAL, BETA_EXTERNAL, T0_K, R0,
ADC_BITS_16, ADC_BITS_12,
U3V3_COEFF, U5V_COEFF, U7V_COEFF,
)
def temp_c_to_n(temp_c: float) -> int:
"""
Convert temperature (°C) to 16-bit DAC integer (Wheatstone bridge setpoint).
Args:
temp_c: Temperature in degrees Celsius.
Returns:
Integer in [0, 65535] for the DAC.
"""
rt = R0 * math.exp(BETA_INTERNAL / (temp_c + 273) - BETA_INTERNAL / T0_K)
u = VREF / (R5 * (R3 + R4)) * (
R1 * R4 * (R5 + R6) - rt * (R3 * R6 - R4 * R5)
) / (rt + R1)
n = int(u * ADC_BITS_16 / VREF)
n = max(0, min(ADC_BITS_16, n))
return n
def temp_n_to_c(n: int) -> float:
"""
Convert 16-bit ADC integer to temperature (°C).
Args:
n: Raw ADC value in [0, 65535].
Returns:
Temperature in degrees Celsius.
"""
u = n * VREF / ADC_BITS_16
rt = R1 * (VREF * R4 * (R5 + R6) - u * R5 * (R3 + R4)) / (
u * R5 * (R3 + R4) + VREF * R3 * R6 - VREF * R4 * R5
)
t = 1 / (1 / T0_K + 1 / BETA_INTERNAL * math.log(rt / R0)) - 273
return t
def temp_ext_n_to_c(n: int) -> float:
"""
Convert 12-bit ADC integer to external thermistor temperature (°C).
Args:
n: Raw 12-bit ADC value in [0, 4095].
Returns:
Temperature in degrees Celsius.
"""
u = n * VREF / ADC_BITS_12 * 1 / (1 + 100000 / R10) + VREF * R9 / (R8 + R9)
rt = R7 * u / (VREF - u)
t = 1 / (1 / T0_K + 1 / BETA_EXTERNAL * math.log(rt / R0)) - 273
return t
def current_ma_to_n(current_ma: float) -> int:
"""
Convert laser drive current (mA) to 16-bit DAC integer.
Args:
current_ma: Current in milliamps.
Returns:
Integer in [0, 65535] for the DAC.
"""
n = int(ADC_BITS_16 / 2000 * RREF * current_ma)
n = max(0, min(ADC_BITS_16, n))
return n
def current_n_to_ma(n: int) -> float:
"""
Convert raw ADC integer to photodiode current (mA).
Args:
n: Raw ADC value in [0, 65535].
Returns:
Current in milliamps.
"""
return n * 2.5 / (ADC_BITS_16 * 4.4) - 1 / 20.4
def voltage_3v3_n_to_v(n: int) -> float:
"""Convert 3.3V rail ADC count to volts."""
return n * U3V3_COEFF
def voltage_5v_n_to_v(n: int) -> float:
"""Convert 5V rail ADC count to volts (both 5V1 and 5V2)."""
return n * U5V_COEFF
def voltage_7v_n_to_v(n: int) -> float:
"""Convert 7V rail ADC count to volts."""
return n * U7V_COEFF

View File

@ -0,0 +1,59 @@
"""Minimal examples for embedding laser_control into another Python app."""
import sys
from laser_control import (
LaserController,
ValidationError,
CommunicationError,
)
def example_manual_mode(port: str = None):
"""Manual mode: set fixed temperatures and currents."""
with LaserController(port=port) as ctrl:
try:
ctrl.set_manual_mode(
temp1=25.0,
temp2=30.0,
current1=40.0,
current2=35.0,
)
print("Manual parameters sent.")
data = ctrl.get_measurements()
if data:
print(f" Temp1: {data.temp1:.2f} °C")
print(f" Temp2: {data.temp2:.2f} °C")
print(f" I1: {data.current1:.3f} mA")
print(f" I2: {data.current2:.3f} mA")
print(f" 3.3V: {data.voltage_3v3:.3f} V")
print(f" 5V: {data.voltage_5v1:.3f} V")
print(f" 7V: {data.voltage_7v0:.3f} V")
except ValidationError as e:
print(f"Parameter validation error: {e}")
except CommunicationError as e:
print(f"Communication error: {e}")
def example_embed_in_app():
"""
Minimal embedding pattern for use inside another application.
The controller can be created once and kept alive for the lifetime
of the host application. No GUI dependency whatsoever.
"""
ctrl = LaserController(port=None) # auto-detect port
try:
ctrl.connect()
except CommunicationError as e:
print(f"Cannot connect: {e}")
return ctrl
return ctrl # caller owns the controller; call ctrl.disconnect() when done
if __name__ == '__main__':
port = sys.argv[1] if len(sys.argv) > 1 else None
print("=== Manual mode example ===")
example_manual_mode(port)

139
laser_control/exceptions.py Normal file
View File

@ -0,0 +1,139 @@
"""
Custom exceptions for laser control module.
Provides a hierarchy of exceptions for different error conditions
that may occur during laser control operations.
"""
class LaserControlError(Exception):
"""Base exception for all laser control errors."""
pass
class ValidationError(LaserControlError):
"""Base exception for validation errors."""
pass
class TemperatureOutOfRangeError(ValidationError):
"""Exception raised when temperature is outside valid range."""
def __init__(self, param_name: str, value: float, min_val: float, max_val: float):
self.param_name = param_name
self.value = value
self.min_val = min_val
self.max_val = max_val
super().__init__(
f"{param_name}: Temperature {value}°C is out of range "
f"[{min_val}°C - {max_val}°C]"
)
class CurrentOutOfRangeError(ValidationError):
"""Exception raised when current is outside valid range."""
def __init__(self, param_name: str, value: float, min_val: float, max_val: float):
self.param_name = param_name
self.value = value
self.min_val = min_val
self.max_val = max_val
super().__init__(
f"{param_name}: Current {value}mA is out of range "
f"[{min_val}mA - {max_val}mA]"
)
class InvalidParameterError(ValidationError):
"""Exception raised for invalid parameter types or values."""
def __init__(self, param_name: str, message: str):
self.param_name = param_name
super().__init__(f"{param_name}: {message}")
class CommunicationError(LaserControlError):
"""Base exception for communication errors."""
pass
class PortNotFoundError(CommunicationError):
"""Exception raised when serial port cannot be found."""
def __init__(self, port: str = None):
if port:
message = f"Serial port '{port}' not found"
else:
message = "No suitable serial port found for device connection"
super().__init__(message)
class DeviceNotRespondingError(CommunicationError):
"""Exception raised when device doesn't respond to commands."""
def __init__(self, timeout: float = None):
if timeout:
message = f"Device did not respond within {timeout} seconds"
else:
message = "Device is not responding to commands"
super().__init__(message)
class CRCError(CommunicationError):
"""Exception raised when CRC check fails."""
def __init__(self, expected: int = None, received: int = None):
if expected is not None and received is not None:
message = f"CRC check failed. Expected: 0x{expected:04X}, Received: 0x{received:04X}"
else:
message = "CRC check failed on received data"
super().__init__(message)
class ProtocolError(CommunicationError):
"""Exception raised for protocol-level errors."""
def __init__(self, message: str):
super().__init__(f"Protocol error: {message}")
class DeviceError(LaserControlError):
"""Base exception for device-level errors."""
pass
class DeviceOverheatingError(DeviceError):
"""Exception raised when device reports overheating."""
def __init__(self, laser_id: int = None, temperature: float = None):
if laser_id and temperature:
message = f"Laser {laser_id} overheating: {temperature}°C"
else:
message = "Device overheating detected"
super().__init__(message)
class PowerSupplyError(DeviceError):
"""Exception raised when power supply issues are detected."""
def __init__(self, rail: str = None, voltage: float = None, expected: float = None):
if rail and voltage is not None:
if expected:
message = f"Power supply {rail}: {voltage}V (expected ~{expected}V)"
else:
message = f"Power supply {rail}: abnormal voltage {voltage}V"
else:
message = "Power supply error detected"
super().__init__(message)
class DeviceStateError(DeviceError):
"""Exception raised when device is in an error state."""
def __init__(self, state_code: int, state_name: str = None):
self.state_code = state_code
if state_name:
message = f"Device error state: {state_name} (0x{state_code:04X})"
else:
message = f"Device error state: 0x{state_code:04X}"
super().__init__(message)

View File

@ -0,0 +1 @@
"""PyQt GUI package for the laser-control application."""

View File

@ -0,0 +1,74 @@
"""Small dialogs used by the main laser-control window."""
from __future__ import annotations
from PyQt6.QtCore import QRegularExpression
from PyQt6.QtGui import QRegularExpressionValidator
from PyQt6.QtWidgets import (
QCheckBox,
QDialog,
QDialogButtonBox,
QLabel,
QLineEdit,
QVBoxLayout,
)
from laser_control.constants import PROFILE_NAME_ALLOWED_PATTERN, PROFILE_NAME_MAX_LENGTH
class ProfileSaveDialog(QDialog):
"""Ask the user for a short profile name before saving it to the device SD card."""
def __init__(self, *, custom_waveform_available: bool, parent=None) -> None:
super().__init__(parent)
self.setWindowTitle("Сохранить профиль на SD")
self.setModal(True)
layout = QVBoxLayout(self)
note = QLabel(
"Введите короткое имя профиля для маленького LCD на устройстве. "
f"Допустимо до {PROFILE_NAME_MAX_LENGTH} ASCII-символов: буквы, цифры, пробел, '-' и '_'."
)
note.setWordWrap(True)
self._name_edit = QLineEdit(self)
self._name_edit.setPlaceholderText("Например: Factory Saw")
self._name_edit.setMaxLength(PROFILE_NAME_MAX_LENGTH)
self._name_edit.setValidator(
QRegularExpressionValidator(QRegularExpression(PROFILE_NAME_ALLOWED_PATTERN), self)
)
self._waveform_checkbox = QCheckBox(
"Сохранить и пользовательскую форму из вкладки «Своя форма»",
self,
)
self._waveform_checkbox.setVisible(custom_waveform_available)
self._buttons = QDialogButtonBox(
QDialogButtonBox.StandardButton.Ok | QDialogButtonBox.StandardButton.Cancel,
parent=self,
)
self._buttons.accepted.connect(self.accept)
self._buttons.rejected.connect(self.reject)
self._buttons.button(QDialogButtonBox.StandardButton.Ok).setEnabled(False)
self._name_edit.textChanged.connect(self._update_accept_state)
layout.addWidget(note)
layout.addWidget(self._name_edit)
layout.addWidget(self._waveform_checkbox)
layout.addWidget(self._buttons)
def profile_name(self) -> str:
"""Return the trimmed display name entered by the user."""
return self._name_edit.text().strip()
def include_custom_waveform(self) -> bool:
"""Return True when a valid custom waveform should be saved with the profile."""
return self._waveform_checkbox.isVisible() and self._waveform_checkbox.isChecked()
def _update_accept_state(self) -> None:
self._buttons.button(QDialogButtonBox.StandardButton.Ok).setEnabled(
bool(self.profile_name())
)

32
laser_control/gui/main.py Normal file
View File

@ -0,0 +1,32 @@
"""Application entry point for the PyQt-based laser-control GUI."""
from __future__ import annotations
import os
import sys
from PyQt6.QtWidgets import QApplication
import pyqtgraph as pg
from .theme import apply_theme
from .window import MainWindow
def main() -> int:
"""Run the GUI event loop."""
os.environ.setdefault("PYQTGRAPH_QT_LIB", "PyQt6")
app = QApplication(sys.argv)
pg.setConfigOptions(antialias=True, background="#0f1720", foreground="#dce6f2")
apply_theme(app)
window = MainWindow(auto_connect=True)
screen = app.primaryScreen()
if screen is not None:
window.setGeometry(screen.availableGeometry())
window.show()
return app.exec()
if __name__ == "__main__":
raise SystemExit(main())

View File

@ -0,0 +1,563 @@
"""Small UI builders used by the main laser-control window."""
from __future__ import annotations
from PyQt6.QtWidgets import (
QCheckBox,
QComboBox,
QDoubleSpinBox,
QFormLayout,
QGridLayout,
QGroupBox,
QHBoxLayout,
QLabel,
QPlainTextEdit,
QPushButton,
QSizePolicy,
QSpinBox,
QTabWidget,
QTextEdit,
QVBoxLayout,
QWidget,
)
from laser_control.constants import (
AD9102_PAT_BASE_MAX,
AD9102_PAT_BASE_MIN,
AD9102_PAT_PERIOD_MAX,
AD9102_PAT_PERIOD_MIN,
AD9102_SAW_STEP_MAX,
AD9102_SAW_STEP_MIN,
AD9102_SRAM_AMPLITUDE_MAX,
AD9102_SRAM_AMPLITUDE_MIN,
AD9102_SRAM_HOLD_MAX,
AD9102_SRAM_HOLD_MIN,
AD9102_SRAM_SAMPLE_MAX,
AD9102_SRAM_SAMPLE_MIN,
CURRENT_MAX_MA,
CURRENT_MIN_MA,
DEFAULT_AD9102_AMPLITUDE,
DEFAULT_AD9102_SAW_FREQUENCY_HZ,
DEFAULT_AD9102_SRAM_FREQUENCY_HZ,
DEFAULT_AD9102_HOLD_CYCLES,
DEFAULT_AD9102_PAT_BASE,
DEFAULT_AD9102_PAT_PERIOD,
DEFAULT_AD9102_SAMPLE_COUNT,
DEFAULT_AD9102_SAW_STEP,
DEFAULT_AD9833_FREQUENCY_HZ,
DEFAULT_CURRENT1_MA,
DEFAULT_CURRENT2_MA,
DEFAULT_DS1809_COUNT,
DEFAULT_DS1809_PROFILE_POSITION,
DEFAULT_DS1809_PULSE_MS,
DEFAULT_STM32_DAC_CODE,
DEFAULT_TEMP1_C,
DEFAULT_TEMP2_C,
DS1809_COUNT_MAX,
DS1809_COUNT_MIN,
DS1809_PROFILE_POSITION_MAX,
DS1809_PROFILE_POSITION_MIN,
DS1809_PULSE_MS_MAX,
DS1809_PULSE_MS_MIN,
AD9833_MCLK_HZ,
AD9833_OUTPUT_FREQ_MAX_HZ,
AD9833_OUTPUT_FREQ_MIN_HZ,
STM32_DAC_CODE_MAX,
STM32_DAC_CODE_MIN,
TEMP_MAX_C,
TEMP_MIN_C,
)
def _double_spinbox(
minimum: float,
maximum: float,
value: float,
*,
decimals: int = 2,
step: float = 0.1,
suffix: str = "",
) -> QDoubleSpinBox:
box = QDoubleSpinBox()
box.setRange(minimum, maximum)
box.setDecimals(decimals)
box.setSingleStep(step)
box.setValue(value)
if suffix:
box.setSuffix(suffix)
return box
def _int_spinbox(minimum: int, maximum: int, value: int, *, suffix: str = "") -> QSpinBox:
box = QSpinBox()
box.setRange(minimum, maximum)
box.setValue(value)
if suffix:
box.setSuffix(suffix)
return box
def _expanding_button(label: str, *, primary: bool = False) -> QPushButton:
button = QPushButton(label)
button.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Fixed)
if primary:
button.setObjectName("primaryButton")
return button
def build_manual_group(owner) -> QGroupBox:
"""Create manual control inputs."""
group = QGroupBox("Ручной режим")
layout = QFormLayout(group)
layout.setHorizontalSpacing(12)
layout.setVerticalSpacing(8)
owner._manual_temp1 = _double_spinbox(TEMP_MIN_C, TEMP_MAX_C, DEFAULT_TEMP1_C, suffix=" °C")
owner._manual_temp2 = _double_spinbox(TEMP_MIN_C, TEMP_MAX_C, DEFAULT_TEMP2_C, suffix=" °C")
owner._manual_current1 = _double_spinbox(
CURRENT_MIN_MA,
CURRENT_MAX_MA,
DEFAULT_CURRENT1_MA,
decimals=3,
step=0.05,
suffix=" мА",
)
owner._manual_current2 = _double_spinbox(
CURRENT_MIN_MA,
CURRENT_MAX_MA,
DEFAULT_CURRENT2_MA,
decimals=3,
step=0.05,
suffix=" мА",
)
layout.addRow("Температура лазера 1", owner._manual_temp1)
layout.addRow("Температура лазера 2", owner._manual_temp2)
layout.addRow("Ток лазера 1", owner._manual_current1)
layout.addRow("Ток лазера 2", owner._manual_current2)
owner._apply_manual_button = _expanding_button("Применить", primary=True)
owner._apply_manual_button.clicked.connect(owner._on_apply_manual)
layout.addRow(owner._apply_manual_button)
return group
def build_device_group(owner) -> QGroupBox:
"""Create compact tabs for supported peripheral commands."""
group = QGroupBox("Периферия")
layout = QVBoxLayout(group)
tabs = QTabWidget(group)
tabs.addTab(_build_ad9102_tab(owner), "Генератор AD9102")
tabs.addTab(_build_ad9833_tab(owner), "Генератор AD9833")
tabs.addTab(_build_aux_tab(owner), "Выходы и DS1809")
tabs.addTab(_build_wave_tab(owner), "Своя форма")
layout.addWidget(tabs)
return group
def _build_ad9102_tab(owner) -> QWidget:
tab = QWidget()
layout = QVBoxLayout(tab)
layout.setSpacing(10)
note = QLabel(
"AD9102 в этой прошивке умеет два режима. "
"Первый: встроенная пила или треугольник самого AD9102. "
"Второй: STM32 сама строит пилу или треугольник из точек и записывает их в память AD9102. "
"Для упрощённого режима ниже задаются понятные величины, а регистровые поля спрятаны в расширенных настройках."
)
note.setWordWrap(True)
note.setObjectName("captionLabel")
basic_group = QGroupBox("Основные настройки")
basic_layout = QFormLayout(basic_group)
basic_layout.setHorizontalSpacing(12)
basic_layout.setVerticalSpacing(8)
owner._ad9102_enable = QCheckBox("Подать сигнал на выход")
owner._ad9102_enable.setChecked(True)
owner._ad9102_mode = QComboBox()
owner._ad9102_mode.addItem("Встроенная пила/треугольник AD9102", "saw")
owner._ad9102_mode.addItem("Пила/треугольник через память AD9102", "sram")
owner._ad9102_shape = QComboBox()
owner._ad9102_shape.addItem("Пила", "saw")
owner._ad9102_shape.addItem("Треугольник", "triangle")
owner._ad9102_shape.setCurrentIndex(1)
owner._ad9102_frequency_hz = _int_spinbox(
1,
DEFAULT_AD9102_SRAM_FREQUENCY_HZ,
DEFAULT_AD9102_SAW_FREQUENCY_HZ,
suffix=" Гц",
)
owner._ad9102_frequency_hz.setSingleStep(100)
owner._ad9102_frequency_hz.setGroupSeparatorShown(True)
owner._ad9102_basic_hint = QLabel()
owner._ad9102_basic_hint.setWordWrap(True)
owner._ad9102_basic_hint.setObjectName("captionLabel")
owner._ad9102_preview = QLabel("Реальная частота: —")
owner._ad9102_preview.setObjectName("valueLabel")
owner._ad9102_advanced_toggle = QCheckBox("Показать расширенные параметры AD9102")
owner._ad9102_saw_step = _int_spinbox(
AD9102_SAW_STEP_MIN,
AD9102_SAW_STEP_MAX,
DEFAULT_AD9102_SAW_STEP,
)
owner._ad9102_pat_base = _int_spinbox(
AD9102_PAT_BASE_MIN,
AD9102_PAT_BASE_MAX,
DEFAULT_AD9102_PAT_BASE,
)
owner._ad9102_pat_period = _int_spinbox(
AD9102_PAT_PERIOD_MIN,
AD9102_PAT_PERIOD_MAX,
DEFAULT_AD9102_PAT_PERIOD,
)
owner._ad9102_sample_count = _int_spinbox(
AD9102_SRAM_SAMPLE_MIN,
AD9102_SRAM_SAMPLE_MAX,
DEFAULT_AD9102_SAMPLE_COUNT,
)
owner._ad9102_hold_cycles = _int_spinbox(
AD9102_SRAM_HOLD_MIN,
AD9102_SRAM_HOLD_MAX,
DEFAULT_AD9102_HOLD_CYCLES,
)
owner._ad9102_amplitude = _int_spinbox(
AD9102_SRAM_AMPLITUDE_MIN,
AD9102_SRAM_AMPLITUDE_MAX,
DEFAULT_AD9102_AMPLITUDE,
)
owner._ad9102_use_amplitude = QCheckBox(
"Использовать формат \"размах + число точек\" вместо \"пауза + число точек\""
)
owner._ad9102_mode.currentIndexChanged.connect(owner._on_ad9102_mode_changed)
owner._ad9102_shape.currentIndexChanged.connect(owner._update_ad9102_form)
owner._ad9102_frequency_hz.valueChanged.connect(owner._update_ad9102_form)
owner._ad9102_advanced_toggle.toggled.connect(owner._update_ad9102_form)
owner._ad9102_use_amplitude.toggled.connect(owner._update_ad9102_form)
owner._ad9102_sample_count.valueChanged.connect(owner._update_ad9102_form)
owner._ad9102_hold_cycles.valueChanged.connect(owner._update_ad9102_form)
owner._ad9102_saw_step.valueChanged.connect(owner._update_ad9102_form)
owner._ad9102_amplitude.valueChanged.connect(owner._update_ad9102_form)
owner._ad9102_use_amplitude.setChecked(True)
basic_layout.addRow(owner._ad9102_enable)
basic_layout.addRow("Режим генерации", owner._ad9102_mode)
basic_layout.addRow("Форма сигнала", owner._ad9102_shape)
basic_layout.addRow("Частота сигнала", owner._ad9102_frequency_hz)
basic_layout.addRow("Размах сигнала в режиме памяти (0..8191)", owner._ad9102_amplitude)
basic_layout.addRow(owner._ad9102_preview)
basic_layout.addRow(owner._ad9102_basic_hint)
owner._ad9102_advanced_group = QGroupBox("Расширенные параметры")
advanced_layout = QFormLayout(owner._ad9102_advanced_group)
advanced_layout.setHorizontalSpacing(12)
advanced_layout.setVerticalSpacing(8)
advanced_layout.addRow("Скорость нарастания пилы (1..63)", owner._ad9102_saw_step)
advanced_layout.addRow("Масштаб периода (0..15)", owner._ad9102_pat_base)
advanced_layout.addRow("Длина периода (0..65535)", owner._ad9102_pat_period)
advanced_layout.addRow("Число точек формы (2..4096)", owner._ad9102_sample_count)
advanced_layout.addRow("Пауза на каждой точке (0..15)", owner._ad9102_hold_cycles)
advanced_layout.addRow(owner._ad9102_use_amplitude)
layout.addWidget(note)
layout.addWidget(basic_group)
layout.addWidget(owner._ad9102_advanced_toggle)
layout.addWidget(owner._ad9102_advanced_group)
owner._apply_ad9102_button = _expanding_button("Применить настройки генератора", primary=True)
owner._apply_ad9102_button.clicked.connect(owner._on_apply_ad9102)
layout.addWidget(owner._apply_ad9102_button)
layout.addStretch(1)
owner._ad9102_saw_step.setToolTip(
"Код шага нарастания пилообразного сигнала. Диапазон 1..63. "
"Чем больше значение, тем быстрее растёт сигнал внутри одного периода."
)
owner._ad9102_pat_base.setToolTip(
"Грубый масштаб периода повторения. Диапазон 0..15. "
"Используется вместе с длиной периода и задаёт базу времени генератора."
)
owner._ad9102_pat_period.setToolTip(
"Точная длина периода повторения. Диапазон 0..65535. "
"Совместно с масштабом периода определяет, как часто форма начинается заново."
)
owner._ad9102_sample_count.setToolTip(
"Количество отсчётов формы в памяти SRAM. Диапазон 2..4096. "
"Один период в режиме памяти состоит из этого числа точек."
)
owner._ad9102_hold_cycles.setToolTip(
"Количество внутренних циклов удержания одной точки формы. Диапазон 0..15. "
"Чем больше значение, тем дольше каждая точка удерживается перед переходом к следующей."
)
owner._ad9102_amplitude.setToolTip(
"Амплитудный коэффициент для режима, где STM32 сама строит пилу или треугольник "
"и записывает их в память AD9102. Диапазон 0..8191. Чем больше значение, тем больше размах сигнала."
)
owner._ad9102_frequency_hz.setToolTip(
"Желаемая частота сигнала в герцах. "
"Интерфейс автоматически подберёт ближайшие поддерживаемые параметры AD9102."
)
owner._ad9102_preview.setToolTip(
"Показывает, какая реальная частота получится после округления к поддерживаемым параметрам чипа."
)
owner._ad9102_use_amplitude.setToolTip(
"Ограничение текущей короткой STM-команды: в режиме памяти можно передать "
"либо \"размах + число точек\", либо \"пауза + число точек\"."
)
return tab
def _build_ad9833_tab(owner) -> QWidget:
tab = QWidget()
layout = QFormLayout(tab)
layout.setHorizontalSpacing(12)
layout.setVerticalSpacing(8)
owner._ad9833_enable = QCheckBox("Подать сигнал на выход")
owner._ad9833_enable.setChecked(True)
owner._ad9833_shape = QComboBox()
owner._ad9833_shape.addItem("Синус", "sine")
owner._ad9833_shape.addItem("Треугольник", "triangle")
owner._ad9833_shape.setCurrentIndex(1)
owner._ad9833_frequency_hz = _int_spinbox(
AD9833_OUTPUT_FREQ_MIN_HZ,
AD9833_OUTPUT_FREQ_MAX_HZ,
DEFAULT_AD9833_FREQUENCY_HZ,
suffix=" Гц",
)
owner._ad9833_frequency_hz.setSingleStep(1000)
owner._ad9833_frequency_hz.setGroupSeparatorShown(True)
owner._ad9833_frequency_hz.valueChanged.connect(owner._update_ad9833_preview)
owner._ad9833_word_preview = QLabel("Внутренний код: —")
owner._ad9833_word_preview.setObjectName("valueLabel")
note = QLabel(
f"AD9833 тактируется от {AD9833_MCLK_HZ:,} Гц. "
f"Поэтому здесь задаётся сразу частота сигнала в герцах. "
f"Рабочий диапазон интерфейса: {AD9833_OUTPUT_FREQ_MIN_HZ:,}..{AD9833_OUTPUT_FREQ_MAX_HZ:,} Гц "
f"(до половины тактовой частоты). Внутренний код рассчитывается автоматически."
)
note.setWordWrap(True)
note.setObjectName("captionLabel")
layout.addRow(owner._ad9833_enable)
layout.addRow("Форма сигнала", owner._ad9833_shape)
layout.addRow(
f"Частота сигнала ({AD9833_OUTPUT_FREQ_MIN_HZ:,}..{AD9833_OUTPUT_FREQ_MAX_HZ:,} Гц)",
owner._ad9833_frequency_hz,
)
layout.addRow("Внутренний код AD9833", owner._ad9833_word_preview)
layout.addRow(note)
owner._apply_ad9833_button = _expanding_button("Применить настройки генератора", primary=True)
owner._apply_ad9833_button.clicked.connect(owner._on_apply_ad9833)
layout.addRow(owner._apply_ad9833_button)
owner._ad9833_frequency_hz.setToolTip(
"Частота выходного сигнала в герцах. "
"Интерфейс ограничен диапазоном 0..10 МГц для тактовой частоты 20 МГц."
)
owner._ad9833_word_preview.setToolTip(
"28-битный frequency word, который будет автоматически передан в AD9833."
)
return tab
def _build_aux_tab(owner) -> QWidget:
tab = QWidget()
layout = QVBoxLayout(tab)
layout.setSpacing(10)
dac_group = QGroupBox("Аналоговый выход STM32 (PA4)")
dac_layout = QFormLayout(dac_group)
dac_layout.setHorizontalSpacing(12)
dac_layout.setVerticalSpacing(8)
dac_note = QLabel(
"Это встроенный 12-битный ЦАП микроконтроллера STM32. "
"Диапазон кода: 0..4095. Это соответствует примерно 0..Vref+, "
"то есть обычно около 0..3.3 В."
)
dac_note.setWordWrap(True)
dac_note.setObjectName("captionLabel")
owner._stm32_dac_enable = QCheckBox("Подать напряжение на выход")
owner._stm32_dac_enable.setChecked(True)
owner._stm32_dac_code = _int_spinbox(
STM32_DAC_CODE_MIN,
STM32_DAC_CODE_MAX,
DEFAULT_STM32_DAC_CODE,
)
owner._apply_stm32_dac_button = _expanding_button("Применить уровень выхода", primary=True)
owner._apply_stm32_dac_button.clicked.connect(owner._on_apply_stm32_dac)
dac_layout.addRow(dac_note)
dac_layout.addRow(owner._stm32_dac_enable)
dac_layout.addRow("Уровень выхода (0..4095)", owner._stm32_dac_code)
dac_layout.addRow(owner._apply_stm32_dac_button)
owner._stm32_dac_code.setToolTip(
"Код встроенного 12-битного ЦАП STM32. "
"0 = примерно 0 В, 4095 = примерно верхний предел питания ЦАП "
"(обычно около 3.3 В)."
)
ds_group = QGroupBox("Цифровой подстроечный резистор DS1809")
ds_layout = QFormLayout(ds_group)
ds_layout.setHorizontalSpacing(12)
ds_layout.setVerticalSpacing(8)
owner._ds1809_direction = QComboBox()
owner._ds1809_direction.addItem("Увеличить", "inc")
owner._ds1809_direction.addItem("Уменьшить", "dec")
owner._ds1809_count = _int_spinbox(DS1809_COUNT_MIN, DS1809_COUNT_MAX, DEFAULT_DS1809_COUNT)
owner._ds1809_pulse_ms = _int_spinbox(
DS1809_PULSE_MS_MIN,
DS1809_PULSE_MS_MAX,
DEFAULT_DS1809_PULSE_MS,
suffix=" мс",
)
owner._pulse_ds1809_button = _expanding_button("Сделать шаги резистора", primary=True)
owner._pulse_ds1809_button.clicked.connect(owner._on_pulse_ds1809)
owner._ds1809_profile_apply = QCheckBox("Сохранять абсолютную позицию в профиль")
owner._ds1809_profile_apply.setChecked(True)
owner._ds1809_profile_position = _int_spinbox(
DS1809_PROFILE_POSITION_MIN,
DS1809_PROFILE_POSITION_MAX,
DEFAULT_DS1809_PROFILE_POSITION,
)
ds_layout.addRow("Куда менять", owner._ds1809_direction)
ds_layout.addRow("Число шагов", owner._ds1809_count)
ds_layout.addRow("Длительность шага", owner._ds1809_pulse_ms)
ds_layout.addRow(owner._ds1809_profile_apply)
ds_layout.addRow("Позиция для профиля (от минимума)", owner._ds1809_profile_position)
ds_layout.addRow(owner._pulse_ds1809_button)
owner._ds1809_count.setToolTip("На сколько шагов изменить цифровой резистор.")
owner._ds1809_pulse_ms.setToolTip("Сколько миллисекунд длится один управляющий импульс.")
owner._ds1809_profile_apply.setToolTip(
"Если флажок включён, при сохранении профиля прошивка сначала загонит DS1809 в минимум, "
"а затем поднимет его до указанной позиции."
)
owner._ds1809_profile_position.setToolTip(
"Абсолютная позиция DS1809 относительно минимального положения. "
"Используется только при сохранении профиля на SD."
)
layout.addWidget(dac_group)
layout.addWidget(ds_group)
layout.addStretch(1)
return tab
def _build_wave_tab(owner) -> QWidget:
tab = QWidget()
layout = QVBoxLayout(tab)
layout.setSpacing(10)
note = QLabel(
"Здесь можно вручную загрузить свою форму сигнала для AD9102. "
"Каждое число - это одна точка формы. Допустимый диапазон: от -8192 до 8191."
)
note.setWordWrap(True)
note.setObjectName("captionLabel")
layout.addWidget(note)
owner._wave_info_label = QLabel("Отсчётов: 0")
owner._wave_info_label.setObjectName("valueLabel")
layout.addWidget(owner._wave_info_label)
owner._wave_samples_box = QPlainTextEdit()
owner._wave_samples_box.setPlaceholderText("0 1024 2048 1024 0 -1024 -2048 -1024")
owner._wave_samples_box.setMinimumHeight(180)
owner._wave_samples_box.textChanged.connect(owner._on_wave_text_changed)
layout.addWidget(owner._wave_samples_box)
buttons = QWidget()
buttons_layout = QHBoxLayout(buttons)
buttons_layout.setContentsMargins(0, 0, 0, 0)
buttons_layout.setSpacing(8)
owner._load_wave_file_button = _expanding_button("Открыть файл")
owner._upload_wave_button = _expanding_button("Загрузить форму", primary=True)
owner._cancel_wave_button = _expanding_button("Отменить загрузку")
owner._load_wave_file_button.clicked.connect(owner._on_load_wave_file)
owner._upload_wave_button.clicked.connect(owner._on_upload_waveform)
owner._cancel_wave_button.clicked.connect(owner._on_cancel_waveform)
buttons_layout.addWidget(owner._load_wave_file_button)
buttons_layout.addWidget(owner._upload_wave_button)
buttons_layout.addWidget(owner._cancel_wave_button)
layout.addWidget(buttons)
return tab
def build_status_group(owner) -> QGroupBox:
"""Create status and telemetry labels."""
group = QGroupBox("Телеметрия и статус")
layout = QVBoxLayout(group)
layout.setSpacing(10)
owner._status_header = QLabel("Отключено")
owner._status_header.setObjectName("statusError")
layout.addWidget(owner._status_header)
grid = QGridLayout()
grid.setHorizontalSpacing(12)
grid.setVerticalSpacing(6)
rows = [
("Порт", "_port_value"),
("Статус", "_state_value"),
("Доп. код", "_detail_value"),
("ID сообщения", "_message_id_value"),
("Температура 1", "_telemetry_temp1"),
("Температура 2", "_telemetry_temp2"),
("Фотодиод 1", "_telemetry_current1"),
("Фотодиод 2", "_telemetry_current2"),
("Внешняя температура 1", "_telemetry_temp_ext1"),
("Внешняя температура 2", "_telemetry_temp_ext2"),
("Питание 3.3 В", "_telemetry_3v3"),
("Питание 5V1", "_telemetry_5v1"),
("Питание 5V2", "_telemetry_5v2"),
("Питание 7V0", "_telemetry_7v0"),
]
for row_index, (label_text, attr_name) in enumerate(rows):
label = QLabel(label_text)
label.setObjectName("captionLabel")
value = QLabel("")
value.setObjectName("valueLabel")
setattr(owner, attr_name, value)
grid.addWidget(label, row_index, 0)
grid.addWidget(value, row_index, 1)
layout.addLayout(grid)
buttons = QWidget()
buttons_layout = QHBoxLayout(buttons)
buttons_layout.setContentsMargins(0, 0, 0, 0)
buttons_layout.setSpacing(8)
owner._reconnect_button = _expanding_button("Переподключить")
owner._reset_button = _expanding_button("Сброс")
owner._save_profile_button = _expanding_button("Сохранить профиль", primary=True)
owner._reconnect_button.clicked.connect(owner._on_reconnect)
owner._reset_button.clicked.connect(owner._on_reset_device)
owner._save_profile_button.clicked.connect(owner._on_save_profile)
buttons_layout.addWidget(owner._reconnect_button)
buttons_layout.addWidget(owner._reset_button)
layout.addWidget(buttons)
layout.addWidget(owner._save_profile_button)
return group
def build_log_group(owner) -> QGroupBox:
"""Create compact runtime log output."""
group = QGroupBox("Runtime Log")
layout = QVBoxLayout(group)
owner._log_box = QTextEdit()
owner._log_box.setObjectName("logBox")
owner._log_box.setReadOnly(True)
owner._log_box.setMinimumHeight(180)
layout.addWidget(owner._log_box)
return group

156
laser_control/gui/theme.py Normal file
View File

@ -0,0 +1,156 @@
"""Shared Qt theme for the laser-control desktop UI."""
from __future__ import annotations
from PyQt6.QtGui import QColor, QPalette
from PyQt6.QtWidgets import QApplication, QStyleFactory
_STYLESHEET = """
QMainWindow {
background-color: #edf2f7;
}
QWidget {
color: #17212b;
font-size: 13px;
}
QGroupBox {
background-color: #ffffff;
border: 1px solid #d8e0ea;
border-radius: 14px;
margin-top: 14px;
padding: 12px;
}
QGroupBox::title {
subcontrol-origin: margin;
left: 12px;
padding: 0 6px;
color: #506274;
font-weight: 600;
}
QPushButton {
background-color: #f7fafc;
border: 1px solid #c8d3df;
border-radius: 8px;
padding: 7px 12px;
}
QPushButton:hover {
background-color: #edf3f9;
}
QPushButton:pressed {
background-color: #e2ebf4;
}
QPushButton#primaryButton {
background-color: #1f6feb;
border-color: #1f6feb;
color: #ffffff;
font-weight: 600;
}
QPushButton#primaryButton:hover {
background-color: #2b7bf7;
}
QPushButton:disabled {
color: #95a3b3;
background-color: #f3f6f9;
border-color: #dde5ee;
}
QLabel#captionLabel {
color: #6b7b8d;
}
QLabel#valueLabel {
color: #1f2937;
font-weight: 600;
}
QLabel#statusOk {
color: #156f3d;
font-weight: 700;
}
QLabel#statusError {
color: #b42318;
font-weight: 700;
}
QDoubleSpinBox,
QSpinBox,
QComboBox,
QLineEdit,
QTextEdit,
QPlainTextEdit {
background-color: #ffffff;
border: 1px solid #c8d3df;
border-radius: 8px;
padding: 5px 8px;
}
QDoubleSpinBox:focus,
QSpinBox:focus,
QComboBox:focus,
QLineEdit:focus,
QTextEdit:focus,
QPlainTextEdit:focus {
border: 1px solid #1f6feb;
}
QTextEdit#logBox,
QPlainTextEdit {
font-family: "DejaVu Sans Mono";
font-size: 12px;
background-color: #fbfdff;
}
QTabWidget::pane {
border: 1px solid #d8e0ea;
border-radius: 10px;
background-color: #f8fbfe;
top: -1px;
}
QTabBar::tab {
background-color: #eef4fa;
border: 1px solid #d8e0ea;
border-bottom: none;
padding: 7px 12px;
margin-right: 4px;
border-top-left-radius: 8px;
border-top-right-radius: 8px;
}
QTabBar::tab:selected {
background-color: #f8fbfe;
color: #1f2937;
font-weight: 600;
}
"""
def apply_theme(app: QApplication) -> None:
"""Apply a light desktop theme aligned with the radar_system UI style."""
app.setStyle(QStyleFactory.create("Fusion"))
palette = QPalette()
palette.setColor(QPalette.ColorRole.Window, QColor("#edf2f7"))
palette.setColor(QPalette.ColorRole.WindowText, QColor("#17212b"))
palette.setColor(QPalette.ColorRole.Base, QColor("#ffffff"))
palette.setColor(QPalette.ColorRole.AlternateBase, QColor("#f6f9fc"))
palette.setColor(QPalette.ColorRole.ToolTipBase, QColor("#ffffff"))
palette.setColor(QPalette.ColorRole.ToolTipText, QColor("#17212b"))
palette.setColor(QPalette.ColorRole.Text, QColor("#17212b"))
palette.setColor(QPalette.ColorRole.Button, QColor("#f7fafc"))
palette.setColor(QPalette.ColorRole.ButtonText, QColor("#17212b"))
palette.setColor(QPalette.ColorRole.Highlight, QColor("#1f6feb"))
palette.setColor(QPalette.ColorRole.HighlightedText, QColor("#ffffff"))
app.setPalette(palette)
app.setStyleSheet(_STYLESHEET)

752
laser_control/gui/window.py Normal file
View File

@ -0,0 +1,752 @@
"""Main PyQt window for the laser-controller desktop application."""
from __future__ import annotations
from collections import deque
from datetime import datetime
import re
from PyQt6.QtCore import Qt, QThread, QTimer, pyqtSignal
from PyQt6.QtGui import QTextCursor
from PyQt6.QtWidgets import (
QFileDialog,
QGridLayout,
QHBoxLayout,
QLabel,
QMainWindow,
QScrollArea,
QSizePolicy,
QVBoxLayout,
QWidget,
)
import pyqtgraph as pg
from laser_control.constants import (
AD9833_MCLK_HZ,
DEFAULT_AD9102_AMPLITUDE,
DEFAULT_AD9102_HOLD_CYCLES,
DEFAULT_AD9102_PAT_BASE,
DEFAULT_AD9102_PAT_PERIOD,
DEFAULT_AD9102_SAMPLE_COUNT,
DEFAULT_AD9102_SAW_FREQUENCY_HZ,
DEFAULT_AD9102_SRAM_FREQUENCY_HZ,
DEFAULT_PI_I,
DEFAULT_PI_P,
GUI_POLL_INTERVAL_MS,
PLOT_POINTS,
)
from laser_control.conversions import current_ma_to_n, temp_c_to_n
from laser_control.controller import (
ad9102_saw_frequency_from_step_hz,
ad9102_saw_frequency_limits_hz,
ad9102_saw_step_from_frequency_hz,
ad9102_sram_frequency_from_playback_hz,
ad9102_sram_frequency_limits_hz,
ad9102_sram_sample_count_from_frequency_hz,
)
from laser_control.models import DeviceStatus, Measurements, ProfileSaveRequest
from .dialogs import ProfileSaveDialog
from .sections import (
build_device_group,
build_log_group,
build_manual_group,
build_status_group,
)
from .worker import ControllerWorker
class MainWindow(QMainWindow):
"""Compact GUI composed around live plots and explicit control cards."""
request_connect = pyqtSignal()
request_apply_manual = pyqtSignal(float, float, float, float)
request_reset = pyqtSignal()
request_apply_ad9102 = pyqtSignal(dict)
request_apply_ad9833 = pyqtSignal(bool, bool, int)
request_pulse_ds1809 = pyqtSignal(bool, int, int)
request_set_stm32_dac = pyqtSignal(bool, int)
request_upload_wave = pyqtSignal(object)
request_cancel_wave = pyqtSignal()
request_save_profile = pyqtSignal(object)
request_poll = pyqtSignal()
request_shutdown = pyqtSignal()
def __init__(self, *, auto_connect: bool = True) -> None:
super().__init__()
self.setWindowTitle("Управление лазерной схемой")
self._connected = False
self._port_name = ""
self._poll_in_flight = False
self._command_in_flight = False
self._temp1_history = deque(maxlen=PLOT_POINTS)
self._temp2_history = deque(maxlen=PLOT_POINTS)
self._current1_history = deque(maxlen=PLOT_POINTS)
self._current2_history = deque(maxlen=PLOT_POINTS)
self._build_ui()
self._update_ad9102_form()
self._update_ad9833_preview()
self._on_wave_text_changed()
self._update_control_state()
self._build_worker()
self._poll_timer = QTimer(self)
self._poll_timer.setInterval(GUI_POLL_INTERVAL_MS)
self._poll_timer.timeout.connect(self._request_poll_if_idle)
self._poll_timer.start()
self._append_log("INFO", "GUI started")
if auto_connect:
self._emit_connect_request()
def _build_ui(self) -> None:
root = QWidget(self)
self.setCentralWidget(root)
layout = QHBoxLayout(root)
layout.setContentsMargins(14, 14, 14, 14)
layout.setSpacing(14)
layout.addWidget(self._build_plot_panel(), stretch=11)
layout.addWidget(self._build_side_panel(), stretch=5)
def _build_plot_panel(self) -> QWidget:
panel = QWidget(self)
grid = QGridLayout(panel)
grid.setContentsMargins(0, 0, 0, 0)
grid.setHorizontalSpacing(12)
grid.setVerticalSpacing(12)
self._plot_temp1, self._curve_temp1 = self._build_plot_card(
"Температура лазера 1",
"#ffb703",
0,
50,
)
self._plot_temp2, self._curve_temp2 = self._build_plot_card(
"Температура лазера 2",
"#fb8500",
0,
50,
)
self._plot_current1, self._curve_current1 = self._build_plot_card(
"Фотодиод 1",
"#219ebc",
0,
1.2,
)
self._plot_current2, self._curve_current2 = self._build_plot_card(
"Фотодиод 2",
"#2a9d8f",
0,
1.2,
)
grid.addWidget(self._plot_temp1, 0, 0)
grid.addWidget(self._plot_temp2, 0, 1)
grid.addWidget(self._plot_current1, 1, 0)
grid.addWidget(self._plot_current2, 1, 1)
grid.setColumnStretch(0, 1)
grid.setColumnStretch(1, 1)
grid.setRowStretch(0, 1)
grid.setRowStretch(1, 1)
return panel
def _build_plot_card(
self,
title: str,
color: str,
y_min: float,
y_max: float,
) -> tuple[QWidget, pg.PlotDataItem]:
container = QWidget(self)
container_layout = QVBoxLayout(container)
container_layout.setContentsMargins(0, 0, 0, 0)
container_layout.setSpacing(8)
label = QLabel(title, container)
label.setObjectName("valueLabel")
container_layout.addWidget(label)
plot = pg.PlotWidget(background="#0f1720", enableMenu=False)
plot.showGrid(x=True, y=True, alpha=0.16)
plot.setYRange(y_min, y_max)
plot.setXRange(0, max(1, PLOT_POINTS - 1))
plot.setMouseEnabled(x=False, y=False)
plot.getPlotItem().hideButtons()
plot.getPlotItem().setClipToView(True)
plot.getPlotItem().setDownsampling(mode="peak")
plot.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding)
curve = plot.plot(pen=pg.mkPen(color=color, width=2))
container_layout.addWidget(plot, stretch=1)
return container, curve
def _build_side_panel(self) -> QWidget:
panel = QWidget(self)
panel.setMinimumWidth(420)
layout = QVBoxLayout(panel)
layout.setContentsMargins(0, 0, 0, 0)
layout.setSpacing(12)
self._subtitle = QLabel("Автоподключение при запуске без автоприменения параметров")
self._subtitle.setObjectName("captionLabel")
layout.addWidget(self._subtitle)
layout.addWidget(build_manual_group(self))
layout.addWidget(build_device_group(self))
layout.addWidget(build_status_group(self))
layout.addWidget(build_log_group(self), stretch=1)
layout.addStretch(1)
scroll = QScrollArea(self)
scroll.setWidgetResizable(True)
scroll.setFrameShape(QScrollArea.Shape.NoFrame)
scroll.setHorizontalScrollBarPolicy(Qt.ScrollBarPolicy.ScrollBarAlwaysOff)
scroll.setWidget(panel)
return scroll
def _build_worker(self) -> None:
self._worker_thread = QThread(self)
self._worker = ControllerWorker()
self._worker.moveToThread(self._worker_thread)
self.request_connect.connect(self._worker.connect_device)
self.request_apply_manual.connect(self._worker.apply_manual)
self.request_reset.connect(self._worker.reset_device)
self.request_apply_ad9102.connect(self._worker.apply_ad9102)
self.request_apply_ad9833.connect(self._worker.apply_ad9833)
self.request_pulse_ds1809.connect(self._worker.pulse_ds1809)
self.request_set_stm32_dac.connect(self._worker.set_stm32_dac)
self.request_upload_wave.connect(self._worker.upload_ad9102_waveform)
self.request_cancel_wave.connect(self._worker.cancel_ad9102_waveform_upload)
self.request_save_profile.connect(self._worker.save_profile)
self.request_poll.connect(self._worker.poll)
self.request_shutdown.connect(self._worker.shutdown)
self._worker.connected_changed.connect(self._on_connected_changed)
self._worker.measurements_ready.connect(self._on_measurements_ready)
self._worker.status_ready.connect(self._on_status_ready)
self._worker.log_message.connect(self._append_log)
self._worker.command_finished.connect(self._on_command_finished)
self._worker.poll_finished.connect(self._on_poll_finished)
self._worker_thread.start()
def _emit_connect_request(self) -> None:
self._dispatch_command(self.request_connect.emit)
def _dispatch_command(self, emit_request) -> None:
if self._command_in_flight:
return
self._command_in_flight = True
self._poll_timer.stop()
self._update_control_state()
emit_request()
def _request_poll_if_idle(self) -> None:
if not self._connected or self._command_in_flight or self._poll_in_flight:
return
self._poll_in_flight = True
self.request_poll.emit()
def _on_command_finished(self) -> None:
self._command_in_flight = False
self._update_control_state()
if self._connected and not self._poll_timer.isActive():
self._poll_timer.start()
def _on_poll_finished(self) -> None:
self._poll_in_flight = False
def _on_apply_manual(self) -> None:
self._dispatch_command(
lambda: self.request_apply_manual.emit(
self._manual_temp1.value(),
self._manual_temp2.value(),
self._manual_current1.value(),
self._manual_current2.value(),
)
)
def _on_reset_device(self) -> None:
self._dispatch_command(self.request_reset.emit)
def _on_apply_ad9102(self) -> None:
use_sram = self._ad9102_mode.currentData() == "sram"
advanced = self._ad9102_advanced_toggle.isChecked()
config = {
"use_basic": not advanced,
"enabled": self._ad9102_enable.isChecked(),
"use_sram": use_sram,
"triangle": self._ad9102_shape.currentData() == "triangle",
"frequency_hz": self._ad9102_frequency_hz.value(),
"saw_step": self._ad9102_saw_step.value(),
"pat_period_base": self._ad9102_pat_base.value(),
"pat_period": self._ad9102_pat_period.value(),
"sample_count": self._ad9102_sample_count.value(),
"hold_cycles": self._ad9102_hold_cycles.value(),
"amplitude": self._ad9102_amplitude.value(),
"use_amplitude_format": use_sram and (not advanced or self._ad9102_use_amplitude.isChecked()),
}
self._dispatch_command(lambda: self.request_apply_ad9102.emit(config))
def _on_apply_ad9833(self) -> None:
self._dispatch_command(
lambda: self.request_apply_ad9833.emit(
self._ad9833_enable.isChecked(),
self._ad9833_shape.currentData() == "triangle",
self._ad9833_frequency_hz.value(),
)
)
def _on_pulse_ds1809(self) -> None:
self._dispatch_command(
lambda: self.request_pulse_ds1809.emit(
self._ds1809_direction.currentData() == "inc",
self._ds1809_count.value(),
self._ds1809_pulse_ms.value(),
)
)
def _on_apply_stm32_dac(self) -> None:
self._dispatch_command(
lambda: self.request_set_stm32_dac.emit(
self._stm32_dac_enable.isChecked(),
self._stm32_dac_code.value(),
)
)
def _on_save_profile(self) -> None:
dialog = ProfileSaveDialog(
custom_waveform_available=self._custom_waveform_is_available(),
parent=self,
)
if dialog.exec() != ProfileSaveDialog.DialogCode.Accepted:
return
try:
request = self._build_profile_save_request(
profile_name=dialog.profile_name(),
include_custom_waveform=dialog.include_custom_waveform(),
)
except Exception as exc: # noqa: BLE001
self._append_log("ERROR", str(exc))
return
self._dispatch_command(lambda: self.request_save_profile.emit(request))
def _on_upload_waveform(self) -> None:
try:
samples = self._parse_wave_samples(self._wave_samples_box.toPlainText())
if len(samples) < 2:
raise ValueError("Для загрузки waveform нужно минимум 2 отсчёта")
except Exception as exc: # noqa: BLE001
self._append_log("ERROR", str(exc))
return
self._dispatch_command(lambda: self.request_upload_wave.emit(samples))
def _on_cancel_waveform(self) -> None:
self._dispatch_command(self.request_cancel_wave.emit)
def _on_load_wave_file(self) -> None:
path, _ = QFileDialog.getOpenFileName(
self,
"Открыть файл waveform",
"",
"Text files (*.txt *.csv *.dat);;All files (*)",
)
if not path:
return
try:
with open(path, encoding="utf-8") as handle:
self._wave_samples_box.setPlainText(handle.read())
self._append_log("INFO", f"Waveform file loaded: {path}")
except Exception as exc: # noqa: BLE001
self._append_log("ERROR", f"Не удалось открыть файл: {exc}")
def _on_wave_text_changed(self) -> None:
text = self._wave_samples_box.toPlainText().strip()
if not text:
self._wave_info_label.setText("Отсчётов: 0")
return
try:
count = len(self._parse_wave_samples(text))
self._wave_info_label.setText(f"Отсчётов: {count}")
except Exception:
self._wave_info_label.setText("Отсчётов: ошибка формата")
def _on_reconnect(self) -> None:
self._append_log("INFO", "Reconnect requested from UI")
self._emit_connect_request()
def _on_connected_changed(self, connected: bool, port_name: str) -> None:
self._connected = connected
self._port_name = port_name
if not connected:
self._poll_in_flight = False
self._command_in_flight = False
if connected:
self._status_header.setText("Подключено")
self._status_header.setObjectName("statusOk")
self._subtitle.setText(f"Подключено к {port_name}")
else:
self._status_header.setText("Отключено")
self._status_header.setObjectName("statusError")
self._subtitle.setText("Автоподключение при запуске без автоприменения параметров")
self._status_header.style().unpolish(self._status_header)
self._status_header.style().polish(self._status_header)
self._update_control_state()
def _update_control_state(self) -> None:
connected = self._connected and not self._command_in_flight
self._apply_manual_button.setEnabled(connected)
self._apply_ad9102_button.setEnabled(connected)
self._apply_ad9833_button.setEnabled(connected)
self._apply_stm32_dac_button.setEnabled(connected)
self._pulse_ds1809_button.setEnabled(connected)
self._upload_wave_button.setEnabled(connected)
self._cancel_wave_button.setEnabled(connected)
self._save_profile_button.setEnabled(connected)
self._reset_button.setEnabled(connected)
self._reconnect_button.setEnabled(not self._command_in_flight)
self._load_wave_file_button.setEnabled(True)
def _on_ad9102_mode_changed(self) -> None:
if not hasattr(self, "_ad9102_frequency_hz"):
return
use_sram = self._ad9102_mode.currentData() == "sram"
if use_sram:
min_hz, max_hz = ad9102_sram_frequency_limits_hz()
frequency_hz = DEFAULT_AD9102_SRAM_FREQUENCY_HZ
else:
min_hz, max_hz = ad9102_saw_frequency_limits_hz(
triangle=self._ad9102_shape.currentData() == "triangle",
)
frequency_hz = DEFAULT_AD9102_SAW_FREQUENCY_HZ
frequency_hz = max(min_hz, min(max_hz, frequency_hz))
self._ad9102_frequency_hz.blockSignals(True)
self._ad9102_frequency_hz.setRange(min_hz, max_hz)
self._ad9102_frequency_hz.setValue(frequency_hz)
self._ad9102_frequency_hz.blockSignals(False)
if use_sram:
self._ad9102_sample_count.blockSignals(True)
self._ad9102_hold_cycles.blockSignals(True)
self._ad9102_amplitude.blockSignals(True)
self._ad9102_use_amplitude.blockSignals(True)
self._ad9102_sample_count.setValue(DEFAULT_AD9102_SAMPLE_COUNT)
self._ad9102_hold_cycles.setValue(DEFAULT_AD9102_HOLD_CYCLES)
self._ad9102_amplitude.setValue(DEFAULT_AD9102_AMPLITUDE)
self._ad9102_use_amplitude.setChecked(True)
self._ad9102_sample_count.blockSignals(False)
self._ad9102_hold_cycles.blockSignals(False)
self._ad9102_amplitude.blockSignals(False)
self._ad9102_use_amplitude.blockSignals(False)
self._update_ad9102_form()
def _update_ad9102_form(self) -> None:
if not hasattr(self, "_ad9102_advanced_group"):
return
use_sram = self._ad9102_mode.currentData() == "sram"
advanced = self._ad9102_advanced_toggle.isChecked()
triangle = self._ad9102_shape.currentData() == "triangle"
use_amplitude = use_sram and (not advanced or self._ad9102_use_amplitude.isChecked())
self._ad9102_advanced_group.setVisible(advanced)
self._ad9102_frequency_hz.setEnabled(not advanced)
self._ad9102_saw_step.setEnabled(advanced and not use_sram)
self._ad9102_pat_base.setEnabled(advanced and not use_sram)
self._ad9102_pat_period.setEnabled(advanced and not use_sram)
self._ad9102_sample_count.setEnabled(advanced and use_sram)
self._ad9102_use_amplitude.setEnabled(advanced and use_sram)
self._ad9102_hold_cycles.setEnabled(advanced and use_sram and not self._ad9102_use_amplitude.isChecked())
self._ad9102_amplitude.setEnabled(use_sram and use_amplitude)
if advanced:
if use_sram:
sample_count = self._ad9102_sample_count.value()
hold_cycles = (
1
if self._ad9102_use_amplitude.isChecked()
else (self._ad9102_hold_cycles.value() or 1)
)
actual_frequency = ad9102_sram_frequency_from_playback_hz(
sample_count=sample_count,
hold_cycles=hold_cycles,
)
self._ad9102_preview.setText(
"Реальная частота: "
f"{self._format_hz(actual_frequency)} "
f"(точек: {sample_count}, удержание: {hold_cycles})"
)
if self._ad9102_use_amplitude.isChecked():
hint = (
"Расширенный режим памяти. Сейчас задаются размах и число точек. "
"Удержание фиксировано значением из прошивки: 1 такт на точку."
)
else:
hint = (
"Расширенный режим памяти. Сейчас задаются число точек и удержание. "
"Размах при этом возьмётся из прошивочного значения по умолчанию."
)
else:
actual_frequency = ad9102_saw_frequency_from_step_hz(
triangle=triangle,
saw_step=self._ad9102_saw_step.value(),
)
self._ad9102_preview.setText(
"Реальная частота: "
f"{self._format_hz(actual_frequency)} "
f"(код шага: {self._ad9102_saw_step.value()})"
)
hint = (
"Расширенный встроенный режим AD9102. Частота определяется в основном кодом шага, "
)
else:
if use_sram:
min_hz, max_hz = ad9102_sram_frequency_limits_hz()
else:
min_hz, max_hz = ad9102_saw_frequency_limits_hz(triangle=triangle)
self._ad9102_frequency_hz.blockSignals(True)
self._ad9102_frequency_hz.setRange(min_hz, max_hz)
if self._ad9102_frequency_hz.value() < min_hz:
self._ad9102_frequency_hz.setValue(min_hz)
elif self._ad9102_frequency_hz.value() > max_hz:
self._ad9102_frequency_hz.setValue(max_hz)
self._ad9102_frequency_hz.blockSignals(False)
desired_frequency = self._ad9102_frequency_hz.value()
if use_sram:
sample_count, actual_frequency = ad9102_sram_sample_count_from_frequency_hz(
frequency_hz=desired_frequency,
)
self._ad9102_sample_count.blockSignals(True)
self._ad9102_hold_cycles.blockSignals(True)
self._ad9102_use_amplitude.blockSignals(True)
self._ad9102_sample_count.setValue(sample_count)
self._ad9102_hold_cycles.setValue(1)
self._ad9102_use_amplitude.setChecked(True)
self._ad9102_sample_count.blockSignals(False)
self._ad9102_hold_cycles.blockSignals(False)
self._ad9102_use_amplitude.blockSignals(False)
self._ad9102_preview.setText(
"Реальная частота: "
f"{self._format_hz(actual_frequency)} "
f"(точек: {sample_count}, удержание: 1)"
)
hint = (
f"Доступный диапазон: {min_hz:,}..{max_hz:,} Гц. "
).replace(",", " ")
else:
saw_step, actual_frequency = ad9102_saw_step_from_frequency_hz(
triangle=triangle,
frequency_hz=desired_frequency,
)
self._ad9102_saw_step.blockSignals(True)
self._ad9102_pat_base.blockSignals(True)
self._ad9102_pat_period.blockSignals(True)
self._ad9102_saw_step.setValue(saw_step)
self._ad9102_pat_base.setValue(DEFAULT_AD9102_PAT_BASE)
self._ad9102_pat_period.setValue(DEFAULT_AD9102_PAT_PERIOD)
self._ad9102_saw_step.blockSignals(False)
self._ad9102_pat_base.blockSignals(False)
self._ad9102_pat_period.blockSignals(False)
self._ad9102_preview.setText(
"Реальная частота: "
f"{self._format_hz(actual_frequency)} "
f"(код шага: {saw_step})"
)
hint = (
f"Доступный диапазон: {min_hz:,}..{max_hz:,} Гц. "
"Амплитуда этой STM-командой не управляется."
).replace(",", " ")
self._ad9102_basic_hint.setText(hint)
@staticmethod
def _format_hz(value: float) -> str:
return f"{value:,.1f} Гц".replace(",", " ")
def _update_ad9833_preview(self) -> None:
frequency_hz = self._ad9833_frequency_hz.value()
frequency_word = int(round(frequency_hz * (1 << 28) / AD9833_MCLK_HZ))
self._ad9833_word_preview.setText(
f"Внутренний код: {frequency_word:,}".replace(",", " ")
)
def _on_measurements_ready(self, measurements: Measurements) -> None:
self._telemetry_temp1.setText(f"{measurements.temp1:.2f} °C")
self._telemetry_temp2.setText(f"{measurements.temp2:.2f} °C")
self._telemetry_current1.setText(f"{measurements.current1:.3f} мА")
self._telemetry_current2.setText(f"{measurements.current2:.3f} мА")
self._telemetry_temp_ext1.setText(f"{measurements.temp_ext1:.2f} °C")
self._telemetry_temp_ext2.setText(f"{measurements.temp_ext2:.2f} °C")
self._telemetry_3v3.setText(f"{measurements.voltage_3v3:.3f} В")
self._telemetry_5v1.setText(f"{measurements.voltage_5v1:.3f} В")
self._telemetry_5v2.setText(f"{measurements.voltage_5v2:.3f} В")
self._telemetry_7v0.setText(f"{measurements.voltage_7v0:.3f} В")
self._message_id_value.setText(str(measurements.message_id))
self._temp1_history.append(measurements.temp1)
self._temp2_history.append(measurements.temp2)
self._current1_history.append(measurements.current1)
self._current2_history.append(measurements.current2)
self._refresh_plot_curves()
def _on_status_ready(self, status: DeviceStatus) -> None:
self._port_value.setText(self._port_name or "auto")
self._state_value.setText(status.error_message or "All ok.")
self._detail_value.setText(f"0x{status.detail:02X}")
self._message_id_value.setText(
str(status.last_command_id) if status.last_command_id is not None else ""
)
if status.has_error:
self._status_header.setText("Есть ошибки")
self._status_header.setObjectName("statusError")
elif self._connected:
self._status_header.setText("Подключено")
self._status_header.setObjectName("statusOk")
self._status_header.style().unpolish(self._status_header)
self._status_header.style().polish(self._status_header)
def _refresh_plot_curves(self) -> None:
x1 = list(range(len(self._temp1_history)))
x2 = list(range(len(self._temp2_history)))
x3 = list(range(len(self._current1_history)))
x4 = list(range(len(self._current2_history)))
self._curve_temp1.setData(x1, list(self._temp1_history))
self._curve_temp2.setData(x2, list(self._temp2_history))
self._curve_current1.setData(x3, list(self._current1_history))
self._curve_current2.setData(x4, list(self._current2_history))
def _append_log(self, level: str, message: str) -> None:
timestamp = datetime.now().strftime("%H:%M:%S")
self._log_box.append(f"[{timestamp}] {level:<5} {message}")
self._log_box.moveCursor(QTextCursor.MoveOperation.End)
def _custom_waveform_is_available(self) -> bool:
try:
return len(self._parse_wave_samples(self._wave_samples_box.toPlainText())) >= 2
except Exception:
return False
def _build_profile_save_request(
self,
*,
profile_name: str,
include_custom_waveform: bool,
) -> ProfileSaveRequest:
custom_wave_samples: list[int] = []
if include_custom_waveform:
custom_wave_samples = self._parse_wave_samples(self._wave_samples_box.toPlainText())
if len(custom_wave_samples) < 2:
raise ValueError("Для сохранения пользовательской формы нужно минимум 2 отсчёта")
return ProfileSaveRequest(
profile_name=profile_name.strip(),
profile_text=self._build_profile_text(
profile_name=profile_name.strip(),
custom_wave_samples=custom_wave_samples,
),
waveform_text=self._build_waveform_text(custom_wave_samples) if custom_wave_samples else "",
)
def _build_profile_text(self, *, profile_name: str, custom_wave_samples: list[int]) -> str:
waveform_mode = "custom_sram" if custom_wave_samples else (
"generated_sram" if self._ad9102_mode.currentData() == "sram" else "saw"
)
waveform_sample_count = len(custom_wave_samples) if custom_wave_samples else self._ad9102_sample_count.value()
waveform_hold_cycles = 1 if custom_wave_samples else self._ad9102_hold_cycles.value()
waveform_triangle = 1 if self._ad9102_shape.currentData() == "triangle" else 0
ad9833_frequency_word = int(round(self._ad9833_frequency_hz.value() * (1 << 28) / AD9833_MCLK_HZ))
pid_p = DEFAULT_PI_P / 256.0
pid_i = DEFAULT_PI_I / 256.0
lines = [
"# Saved from the desktop GUI.",
f"profile_name={profile_name}",
"boot_enabled=true",
"auto_run=true",
"",
"work_enable=1",
"u5v1_enable=1",
"u5v2_enable=1",
"ld1_enable=1",
"ld2_enable=1",
"ref1_enable=1",
"ref2_enable=1",
"tec1_enable=1",
"tec2_enable=1",
"ts1_enable=1",
"ts2_enable=1",
"",
"pid1_from_host=1",
"pid2_from_host=1",
"averages=0",
"message_id=0",
"",
f"laser1_target_temp={temp_c_to_n(self._manual_temp1.value())}",
f"laser2_target_temp={temp_c_to_n(self._manual_temp2.value())}",
f"laser1_current={current_ma_to_n(self._manual_current1.value())}",
f"laser2_current={current_ma_to_n(self._manual_current2.value())}",
f"laser1_pid_p={pid_p:.6g}",
f"laser1_pid_i={pid_i:.6g}",
f"laser2_pid_p={pid_p:.6g}",
f"laser2_pid_i={pid_i:.6g}",
"",
f"waveform_mode={waveform_mode}",
f"waveform_enable={1 if self._ad9102_enable.isChecked() else 0}",
f"waveform_triangle={waveform_triangle}",
f"waveform_saw_step={self._ad9102_saw_step.value()}",
f"waveform_pat_base={self._ad9102_pat_base.value()}",
f"waveform_pat_period={self._ad9102_pat_period.value()}",
f"waveform_sample_count={waveform_sample_count}",
f"waveform_hold_cycles={waveform_hold_cycles}",
f"waveform_amplitude={self._ad9102_amplitude.value()}",
"",
f"ad9833_enable={1 if self._ad9833_enable.isChecked() else 0}",
f"ad9833_triangle={1 if self._ad9833_shape.currentData() == 'triangle' else 0}",
f"ad9833_frequency_word={ad9833_frequency_word}",
"",
f"stm32_dac_enable={1 if self._stm32_dac_enable.isChecked() else 0}",
f"stm32_dac_code={self._stm32_dac_code.value()}",
"",
f"ds1809_apply={'true' if self._ds1809_profile_apply.isChecked() else 'false'}",
f"ds1809_position_from_min={self._ds1809_profile_position.value()}",
]
return "\n".join(lines) + "\n"
@staticmethod
def _build_waveform_text(samples: list[int]) -> str:
return "\n".join(str(sample) for sample in samples) + "\n"
@staticmethod
def _parse_wave_samples(text: str) -> list[int]:
cleaned = (
text.replace("[", " ")
.replace("]", " ")
.replace("(", " ")
.replace(")", " ")
)
tokens = [token for token in re.split(r"[\s,;]+", cleaned.strip()) if token]
return [int(token, 0) for token in tokens]
def closeEvent(self, event) -> None: # noqa: N802
self._poll_timer.stop()
self.request_shutdown.emit()
self._worker_thread.quit()
self._worker_thread.wait(3000)
super().closeEvent(event)

290
laser_control/gui/worker.py Normal file
View File

@ -0,0 +1,290 @@
"""Worker object hosting the controller in a dedicated QThread."""
from __future__ import annotations
from collections.abc import Callable
import time
from PyQt6.QtCore import QObject, pyqtSignal, pyqtSlot
from laser_control import (
CommunicationError,
DeviceNotRespondingError,
LaserController,
)
class ControllerWorker(QObject):
"""Run blocking serial I/O away from the GUI thread."""
connected_changed = pyqtSignal(bool, str)
measurements_ready = pyqtSignal(object)
status_ready = pyqtSignal(object)
log_message = pyqtSignal(str, str)
command_finished = pyqtSignal()
poll_finished = pyqtSignal()
def __init__(self) -> None:
super().__init__()
self._controller = LaserController()
self._poll_in_progress = False
self._last_status_time = 0.0
@pyqtSlot()
def connect_device(self) -> None:
"""Connect to the board and query current status without changing setpoints."""
self._run_command(self._connect_device_impl)
@pyqtSlot(float, float, float, float)
def apply_manual(
self,
temp1: float,
temp2: float,
current1: float,
current2: float,
) -> None:
"""Apply manual setpoints on the device."""
self._run_command(
lambda: (
self._ensure_connected(),
self._apply_manual_impl(temp1, temp2, current1, current2),
)
)
@pyqtSlot()
def reset_device(self) -> None:
"""Send the firmware default command."""
self._run_command(
lambda: (
self._ensure_connected(),
self._reset_device_impl(),
)
)
@pyqtSlot(dict)
def apply_ad9102(self, config: dict) -> None:
"""Configure AD9102 generator state."""
self._run_command(
lambda: (
self._ensure_connected(),
self._apply_ad9102_impl(config),
)
)
@pyqtSlot(bool, bool, int)
def apply_ad9833(self, enabled: bool, triangle: bool, frequency_hz: int) -> None:
"""Configure AD9833 generator state."""
self._run_command(
lambda: (
self._ensure_connected(),
self._apply_ad9833_impl(enabled, triangle, frequency_hz),
)
)
@pyqtSlot(bool, int, int)
def pulse_ds1809(self, increment: bool, count: int, pulse_ms: int) -> None:
"""Pulse the DS1809 potentiometer."""
self._run_command(
lambda: (
self._ensure_connected(),
self._pulse_ds1809_impl(increment, count, pulse_ms),
)
)
@pyqtSlot(bool, int)
def set_stm32_dac(self, enabled: bool, dac_code: int) -> None:
"""Configure the STM32 DAC."""
self._run_command(
lambda: (
self._ensure_connected(),
self._set_stm32_dac_impl(enabled, dac_code),
)
)
@pyqtSlot(object)
def save_profile(self, request: object) -> None:
"""Save the current GUI configuration to the device SD card."""
self._run_command(
lambda: (
self._ensure_connected(),
self._save_profile_impl(request),
)
)
@pyqtSlot(object)
def upload_ad9102_waveform(self, samples: object) -> None:
"""Upload a custom waveform to AD9102 SRAM."""
self._run_command(
lambda: (
self._ensure_connected(),
self._upload_ad9102_waveform_impl(samples),
)
)
@pyqtSlot()
def cancel_ad9102_waveform_upload(self) -> None:
"""Cancel an in-progress waveform upload."""
self._run_command(
lambda: (
self._ensure_connected(),
self._cancel_ad9102_waveform_upload_impl(),
)
)
@pyqtSlot()
def poll(self) -> None:
"""Fetch measurements regularly and refresh status once per second."""
if self._poll_in_progress or not self._controller.is_connected:
return
self._poll_in_progress = True
try:
measurements = self._controller.get_measurements()
if measurements is not None:
self.measurements_ready.emit(measurements)
now = time.monotonic()
if now - self._last_status_time >= 1.0:
self._emit_status()
except (CommunicationError, DeviceNotRespondingError) as exc:
self.log_message.emit("ERROR", str(exc))
self._disconnect_silently()
except Exception as exc: # noqa: BLE001
self.log_message.emit("ERROR", str(exc))
finally:
self._poll_in_progress = False
self.poll_finished.emit()
@pyqtSlot()
def shutdown(self) -> None:
"""Disconnect gracefully when the GUI closes."""
self._disconnect_silently()
def _run_command(self, action: Callable[[], None]) -> None:
try:
action()
except Exception as exc: # noqa: BLE001
self.log_message.emit("ERROR", str(exc))
finally:
self.command_finished.emit()
def _connect_device_impl(self) -> None:
self._disconnect_silently()
try:
self._controller.connect()
self.connected_changed.emit(True, self._controller.port_name or "")
self.log_message.emit(
"INFO",
f"Connected to {self._controller.port_name or 'auto-detected port'}",
)
self._emit_status()
measurements = self._controller.get_measurements()
if measurements is not None:
self.measurements_ready.emit(measurements)
except Exception:
self._disconnect_silently()
raise
def _apply_manual_impl(self, temp1: float, temp2: float, current1: float, current2: float) -> None:
self._controller.set_manual_mode(temp1, temp2, current1, current2)
self.log_message.emit(
"INFO",
f"Manual mode applied: T1={temp1:.2f} T2={temp2:.2f} I1={current1:.3f} I2={current2:.3f}",
)
self._emit_status()
def _reset_device_impl(self) -> None:
self._controller.reset()
self.log_message.emit("INFO", "DEFAULT_ENABLE sent")
self._emit_status()
def _apply_ad9102_impl(self, config: dict) -> None:
if config.pop("use_basic", False):
simple_config = {
"enabled": config["enabled"],
"use_sram": config["use_sram"],
"triangle": config["triangle"],
"frequency_hz": config["frequency_hz"],
"amplitude": config["amplitude"],
}
result = self._controller.configure_ad9102_simple(**simple_config)
actual_frequency_hz = float(result["actual_frequency_hz"])
if simple_config["use_sram"]:
self.log_message.emit(
"INFO",
"AD9102 memory waveform applied: "
f"{actual_frequency_hz:.1f} Hz, "
f"samples={result['sample_count']}, "
f"amplitude={simple_config['amplitude']}",
)
else:
self.log_message.emit(
"INFO",
"AD9102 built-in waveform applied: "
f"{actual_frequency_hz:.1f} Hz, "
f"saw_step={result['saw_step']}",
)
else:
self._controller.configure_ad9102(**config)
self.log_message.emit("INFO", "AD9102 advanced settings applied")
self._emit_status()
def _apply_ad9833_impl(self, enabled: bool, triangle: bool, frequency_hz: int) -> None:
frequency_word = self._controller.configure_ad9833_frequency(
enabled=enabled,
triangle=triangle,
frequency_hz=frequency_hz,
)
self.log_message.emit(
"INFO",
f"AD9833 settings applied: {frequency_hz} Hz, code={frequency_word}",
)
self._emit_status()
def _pulse_ds1809_impl(self, increment: bool, count: int, pulse_ms: int) -> None:
self._controller.pulse_ds1809(
increment=increment,
count=count,
pulse_ms=pulse_ms,
)
direction = "increment" if increment else "decrement"
self.log_message.emit("INFO", f"DS1809 pulse: {direction}, count={count}, pulse={pulse_ms} ms")
self._emit_status()
def _set_stm32_dac_impl(self, enabled: bool, dac_code: int) -> None:
self._controller.set_stm32_dac(enabled=enabled, dac_code=dac_code)
self.log_message.emit("INFO", f"STM32 DAC set to code {dac_code}")
self._emit_status()
def _save_profile_impl(self, request: object) -> None:
self._controller.save_profile_to_sd(request)
profile_name = getattr(request, "profile_name", "<unnamed>")
self.log_message.emit("INFO", f"Profile saved to SD: {profile_name}")
self._emit_status()
def _upload_ad9102_waveform_impl(self, samples: object) -> None:
sample_list = list(samples)
self._controller.upload_ad9102_waveform(sample_list)
self.log_message.emit("INFO", f"AD9102 waveform uploaded ({len(sample_list)} samples)")
self._emit_status()
def _cancel_ad9102_waveform_upload_impl(self) -> None:
self._controller.cancel_ad9102_waveform_upload()
self.log_message.emit("INFO", "AD9102 waveform upload cancelled")
self._emit_status()
def _emit_status(self) -> None:
status = self._controller.get_status()
self._last_status_time = time.monotonic()
self.status_ready.emit(status)
def _ensure_connected(self) -> None:
if not self._controller.is_connected:
raise CommunicationError("Device is not connected")
def _disconnect_silently(self) -> None:
try:
if self._controller.is_connected:
self._controller.disconnect()
finally:
self.connected_changed.emit(False, "")

128
laser_control/models.py Normal file
View File

@ -0,0 +1,128 @@
"""Public domain models used by the controller and GUI layers."""
from dataclasses import dataclass, field
from datetime import datetime
from enum import IntFlag
from typing import Any
from .constants import (
VOLT_3V3_MAX,
VOLT_3V3_MIN,
VOLT_5V_MAX,
VOLT_5V_MIN,
VOLT_7V_MAX,
VOLT_7V_MIN,
)
class DeviceState(IntFlag):
"""Bit-mask of device error flags returned by the firmware status packet."""
OK = 0x0000
SD_ERROR = 0x0001
UART_ERROR = 0x0002
UART_DECODE_ERROR = 0x0004
TEC1_ERROR = 0x0008
TEC2_ERROR = 0x0010
DEFAULT_ERROR = 0x0020
AD9102_ERROR = 0x0080
@dataclass(slots=True)
class Measurements:
"""Latest live telemetry frame decoded from the board."""
current1: float
current2: float
temp1: float
temp2: float
temp_ext1: float | None = None
temp_ext2: float | None = None
voltage_3v3: float = 0.0
voltage_5v1: float = 0.0
voltage_5v2: float = 0.0
voltage_7v0: float = 0.0
message_id: int | None = None
to6_counter_lsb: int | None = None
to6_counter_msb: int | None = None
timestamp: datetime = field(default_factory=datetime.now)
def to_dict(self) -> dict[str, Any]:
"""Return a JSON-friendly representation."""
return {
"current1": self.current1,
"current2": self.current2,
"temp1": self.temp1,
"temp2": self.temp2,
"temp_ext1": self.temp_ext1,
"temp_ext2": self.temp_ext2,
"voltage_3v3": self.voltage_3v3,
"voltage_5v1": self.voltage_5v1,
"voltage_5v2": self.voltage_5v2,
"voltage_7v0": self.voltage_7v0,
"message_id": self.message_id,
"to6_counter_lsb": self.to6_counter_lsb,
"to6_counter_msb": self.to6_counter_msb,
"timestamp": self.timestamp.isoformat(),
}
def check_power_rails(self) -> dict[str, bool]:
"""Check nominal supply rails against static tolerances."""
return {
"3v3": VOLT_3V3_MIN <= self.voltage_3v3 <= VOLT_3V3_MAX,
"5v1": VOLT_5V_MIN <= self.voltage_5v1 <= VOLT_5V_MAX,
"5v2": VOLT_5V_MIN <= self.voltage_5v2 <= VOLT_5V_MAX,
"7v0": VOLT_7V_MIN <= self.voltage_7v0 <= VOLT_7V_MAX,
}
@dataclass(slots=True)
class DeviceStatus:
"""Decoded two-byte status response from the board."""
state: DeviceState = DeviceState.OK
detail: int = 0
measurements: Measurements | None = None
is_connected: bool = False
last_command_id: int | None = None
error_message: str | None = None
@property
def has_error(self) -> bool:
"""Return True when any firmware error bit is set."""
return self.state != DeviceState.OK
@property
def is_ok(self) -> bool:
"""Convenience alias for the common no-error case."""
return not self.has_error
@property
def active_errors(self) -> list[str]:
"""Return the names of all active error flags."""
return [
flag.name
for flag in DeviceState
if flag is not DeviceState.OK and (self.state & flag) == flag
]
def to_dict(self) -> dict[str, Any]:
"""Return a JSON-friendly representation."""
return {
"state_mask": int(self.state),
"state_names": self.active_errors,
"detail": self.detail,
"measurements": self.measurements.to_dict() if self.measurements else None,
"is_connected": self.is_connected,
"last_command_id": self.last_command_id,
"error_message": self.error_message,
"has_error": self.has_error,
}
@dataclass(slots=True)
class ProfileSaveRequest:
"""Rendered profile payload that should be persisted on the device SD card."""
profile_name: str
profile_text: str
waveform_text: str = ""

486
laser_control/protocol.py Normal file
View File

@ -0,0 +1,486 @@
"""Codec for the UART protocol implemented by the current firmware."""
from __future__ import annotations
from datetime import datetime
import struct
from .constants import (
AD9102_FLAG_ENABLE,
AD9102_FLAG_SRAM,
AD9102_FLAG_SRAM_FORMAT_ALT,
AD9102_FLAG_TRIANGLE,
AD9102_WAVE_MAX_CHUNK_SAMPLES,
AD9102_WAVE_OPCODE_BEGIN,
AD9102_WAVE_OPCODE_CANCEL,
AD9102_WAVE_OPCODE_COMMIT,
AD9102_WAVE_SAMPLE_MAX,
AD9102_WAVE_SAMPLE_MIN,
AD9833_FLAG_ENABLE,
AD9833_FLAG_TRIANGLE,
CMD_DECODE_ENABLE,
CMD_DEFAULT_ENABLE,
CMD_PROFILE_SAVE_CONTROL,
CMD_PROFILE_SAVE_DATA,
CMD_AD9102_CONTROL,
CMD_AD9102_WAVE_CONTROL,
CMD_AD9102_WAVE_DATA,
CMD_AD9833_CONTROL,
CMD_DS1809_CONTROL,
CMD_STATE,
CMD_STM32_DAC_CONTROL,
CMD_TRANS_ENABLE,
DEFAULT_SETUP_WORD,
DS1809_FLAG_DECREMENT,
DS1809_FLAG_INCREMENT,
GET_DATA_TOTAL_LENGTH,
PROFILE_NAME_MAX_LENGTH,
PROFILE_SAVE_CONTROL_TOTAL_LENGTH,
PROFILE_SAVE_DATA_CHUNK_BYTES,
PROFILE_SAVE_DATA_TOTAL_LENGTH,
PROFILE_SAVE_OPCODE_BEGIN,
PROFILE_SAVE_OPCODE_CANCEL,
PROFILE_SAVE_OPCODE_COMMIT,
PROFILE_SAVE_SECTION_PROFILE_TEXT,
PROFILE_SAVE_SECTION_WAVEFORM_TEXT,
SEND_PARAMS_TOTAL_LENGTH,
SHORT_CONTROL_TOTAL_LENGTH,
STM32_DAC_FLAG_ENABLE,
STATUS_DESCRIPTIONS,
STATUS_RESPONSE_LENGTH,
WAVE_DATA_TOTAL_LENGTH,
)
from .conversions import (
current_ma_to_n,
current_n_to_ma,
temp_c_to_n,
temp_ext_n_to_c,
temp_n_to_c,
voltage_3v3_n_to_v,
voltage_5v_n_to_v,
voltage_7v_n_to_v,
)
from .exceptions import CRCError, ProtocolError
from .models import DeviceState, Measurements
def _int_to_hex4(value: int) -> str:
"""Return a zero-padded four-digit lowercase hex string."""
if value < 0 or value > 0xFFFF:
raise ValueError(f"Value {value} out of uint16 range")
return f"{value:04x}"
def _flipfour(value: str) -> str:
"""Swap byte pairs in a four-character hex word."""
if len(value) != 4:
raise ValueError(f"Expected 4 hex chars, got {value!r}")
return value[2:4] + value[0:2]
def _build_crc(data_hex: str) -> str:
"""Return the checksum word for a wire-order hex packet without CRC."""
if len(data_hex) % 4 != 0:
raise ValueError("Packet hex string must contain complete 16-bit words")
words = [data_hex[index:index + 4] for index in range(0, len(data_hex), 4)]
checksum = 0
for word in words[1:]:
checksum ^= int(word, 16)
return _int_to_hex4(checksum)
def _pack_words(words: list[int]) -> bytes:
return struct.pack("<" + "H" * len(words), *words)
def _unpack_words(data: bytes) -> tuple[int, ...]:
if len(data) % 2 != 0:
raise ProtocolError(f"Packet length must be even, got {len(data)} bytes")
return struct.unpack("<" + "H" * (len(data) // 2), data)
def _payload_checksum(words: list[int]) -> int:
checksum = 0
for word in words:
checksum ^= word
return checksum & 0xFFFF
def _ensure_uint(value: int, name: str, minimum: int, maximum: int) -> int:
if not isinstance(value, int):
raise ValueError(f"{name} must be an integer")
if not minimum <= value <= maximum:
raise ValueError(f"{name} must be in range [{minimum}, {maximum}]")
return value
def _encode_ascii_name_words(profile_name: str) -> tuple[list[int], int]:
if not isinstance(profile_name, str):
raise ValueError("profile_name must be a string")
try:
encoded = profile_name.encode("ascii")
except UnicodeEncodeError as exc:
raise ValueError("profile_name must contain ASCII characters only") from exc
if not 1 <= len(encoded) <= PROFILE_NAME_MAX_LENGTH:
raise ValueError(
f"profile_name length must be in range [1, {PROFILE_NAME_MAX_LENGTH}]"
)
padded = encoded + (b"\x00" * (PROFILE_NAME_MAX_LENGTH - len(encoded)))
words = [
padded[index] | (padded[index + 1] << 8)
for index in range(0, PROFILE_NAME_MAX_LENGTH, 2)
]
return words, len(encoded)
class Protocol:
"""Static helpers for encoding commands and decoding responses."""
@staticmethod
def calculate_crc(data: bytes) -> int:
"""Calculate XOR checksum over all words except the first header word."""
words = _unpack_words(data)
if len(words) <= 1:
return 0
return _payload_checksum(list(words[1:]))
@staticmethod
def encode_decode_enable(
temp1: float,
temp2: float,
current1: float,
current2: float,
pi_coeff1_p: int,
pi_coeff1_i: int,
pi_coeff2_p: int,
pi_coeff2_i: int,
message_id: int,
) -> bytes:
"""Build the 30-byte DECODE_ENABLE command."""
words = [
CMD_DECODE_ENABLE,
DEFAULT_SETUP_WORD,
temp_c_to_n(temp1),
temp_c_to_n(temp2),
0,
0,
0,
pi_coeff1_p & 0xFFFF,
pi_coeff1_i & 0xFFFF,
pi_coeff2_p & 0xFFFF,
pi_coeff2_i & 0xFFFF,
message_id & 0xFFFF,
current_ma_to_n(current1),
current_ma_to_n(current2),
]
words.append(_payload_checksum(words[1:]))
packet = _pack_words(words)
if len(packet) != SEND_PARAMS_TOTAL_LENGTH:
raise ProtocolError(
f"DECODE_ENABLE length mismatch: {len(packet)} bytes"
)
return packet
@staticmethod
def encode_trans_enable() -> bytes:
"""Build the short TRANS_ENABLE command."""
return _pack_words([CMD_TRANS_ENABLE])
@staticmethod
def encode_state() -> bytes:
"""Build the short STATE command."""
return _pack_words([CMD_STATE])
@staticmethod
def encode_default_enable() -> bytes:
"""Build the short DEFAULT_ENABLE command."""
return _pack_words([CMD_DEFAULT_ENABLE])
@staticmethod
def encode_ad9102_control(
*,
enabled: bool,
triangle: bool,
sram_mode: bool,
param0: int,
param1: int,
alt_format: bool = False,
) -> bytes:
"""Build an AD9102 control packet."""
flags = 0
if enabled:
flags |= AD9102_FLAG_ENABLE
if triangle:
flags |= AD9102_FLAG_TRIANGLE
if sram_mode:
flags |= AD9102_FLAG_SRAM
if alt_format:
flags |= AD9102_FLAG_SRAM_FORMAT_ALT
return Protocol._encode_short_control(
CMD_AD9102_CONTROL,
flags,
_ensure_uint(param0, "param0", 0, 0xFFFF),
_ensure_uint(param1, "param1", 0, 0xFFFF),
)
@staticmethod
def encode_ad9833_control(*, enabled: bool, triangle: bool, frequency_word: int) -> bytes:
"""Build an AD9833 control packet."""
flags = 0
if enabled:
flags |= AD9833_FLAG_ENABLE
if triangle:
flags |= AD9833_FLAG_TRIANGLE
frequency_word = _ensure_uint(frequency_word, "frequency_word", 0, 0x0FFFFFFF)
return Protocol._encode_short_control(
CMD_AD9833_CONTROL,
flags,
frequency_word & 0x3FFF,
(frequency_word >> 14) & 0x3FFF,
)
@staticmethod
def encode_ds1809_control(*, increment: bool, decrement: bool, count: int, pulse_ms: int) -> bytes:
"""Build a DS1809 control packet."""
if increment and decrement:
raise ValueError("increment and decrement cannot both be true")
flags = 0
if increment:
flags |= DS1809_FLAG_INCREMENT
if decrement:
flags |= DS1809_FLAG_DECREMENT
return Protocol._encode_short_control(
CMD_DS1809_CONTROL,
flags,
_ensure_uint(count, "count", 0, 0xFFFF),
_ensure_uint(pulse_ms, "pulse_ms", 0, 0xFFFF),
)
@staticmethod
def encode_stm32_dac_control(*, enabled: bool, dac_code: int) -> bytes:
"""Build an STM32 DAC control packet."""
flags = STM32_DAC_FLAG_ENABLE if enabled else 0
return Protocol._encode_short_control(
CMD_STM32_DAC_CONTROL,
flags,
_ensure_uint(dac_code, "dac_code", 0, 0x0FFF),
0,
)
@staticmethod
def encode_ad9102_wave_begin(sample_count: int) -> bytes:
"""Build an AD9102 custom-wave upload BEGIN packet."""
return Protocol._encode_short_control(
CMD_AD9102_WAVE_CONTROL,
AD9102_WAVE_OPCODE_BEGIN,
_ensure_uint(sample_count, "sample_count", 0, 0xFFFF),
0,
)
@staticmethod
def encode_ad9102_wave_commit() -> bytes:
"""Build an AD9102 custom-wave upload COMMIT packet."""
return Protocol._encode_short_control(
CMD_AD9102_WAVE_CONTROL,
AD9102_WAVE_OPCODE_COMMIT,
0,
0,
)
@staticmethod
def encode_ad9102_wave_cancel() -> bytes:
"""Build an AD9102 custom-wave upload CANCEL packet."""
return Protocol._encode_short_control(
CMD_AD9102_WAVE_CONTROL,
AD9102_WAVE_OPCODE_CANCEL,
0,
0,
)
@staticmethod
def encode_ad9102_wave_data(samples: list[int]) -> bytes:
"""Build one fixed-size AD9102 custom-wave data chunk packet."""
if not samples:
raise ValueError("samples must not be empty")
if len(samples) > AD9102_WAVE_MAX_CHUNK_SAMPLES:
raise ValueError(
f"samples length must be <= {AD9102_WAVE_MAX_CHUNK_SAMPLES}"
)
encoded_samples = []
for index, sample in enumerate(samples):
if not isinstance(sample, int):
raise ValueError(f"sample[{index}] must be an integer")
if not AD9102_WAVE_SAMPLE_MIN <= sample <= AD9102_WAVE_SAMPLE_MAX:
raise ValueError(
f"sample[{index}] must be in range "
f"[{AD9102_WAVE_SAMPLE_MIN}, {AD9102_WAVE_SAMPLE_MAX}]"
)
encoded_samples.append(sample & 0xFFFF)
padded_samples = encoded_samples + [0] * (AD9102_WAVE_MAX_CHUNK_SAMPLES - len(samples))
words = [CMD_AD9102_WAVE_DATA, len(samples), *padded_samples]
words.append(_payload_checksum(words[1:]))
packet = _pack_words(words)
if len(packet) != WAVE_DATA_TOTAL_LENGTH:
raise ProtocolError(f"AD9102_WAVE_DATA length mismatch: {len(packet)} bytes")
return packet
@staticmethod
def encode_profile_save_begin(
*,
profile_name: str,
profile_text_bytes: int,
waveform_text_bytes: int,
) -> bytes:
"""Build the fixed-size BEGIN packet for a streamed SD profile save."""
name_words, name_length = _encode_ascii_name_words(profile_name)
payload_words = [
PROFILE_SAVE_OPCODE_BEGIN,
_ensure_uint(profile_text_bytes, "profile_text_bytes", 1, 0xFFFF),
_ensure_uint(waveform_text_bytes, "waveform_text_bytes", 0, 0xFFFF),
name_length,
*name_words,
0,
]
payload_words.append(_payload_checksum(payload_words))
packet = _pack_words([CMD_PROFILE_SAVE_CONTROL, *payload_words])
if len(packet) != PROFILE_SAVE_CONTROL_TOTAL_LENGTH:
raise ProtocolError(
f"PROFILE_SAVE_BEGIN length mismatch: {len(packet)} bytes"
)
return packet
@staticmethod
def encode_profile_save_commit() -> bytes:
"""Build the fixed-size COMMIT packet for a streamed SD profile save."""
payload_words = [PROFILE_SAVE_OPCODE_COMMIT] + ([0] * 12)
payload_words.append(_payload_checksum(payload_words))
packet = _pack_words([CMD_PROFILE_SAVE_CONTROL, *payload_words])
if len(packet) != PROFILE_SAVE_CONTROL_TOTAL_LENGTH:
raise ProtocolError(
f"PROFILE_SAVE_COMMIT length mismatch: {len(packet)} bytes"
)
return packet
@staticmethod
def encode_profile_save_cancel() -> bytes:
"""Build the fixed-size CANCEL packet for a streamed SD profile save."""
payload_words = [PROFILE_SAVE_OPCODE_CANCEL] + ([0] * 12)
payload_words.append(_payload_checksum(payload_words))
packet = _pack_words([CMD_PROFILE_SAVE_CONTROL, *payload_words])
if len(packet) != PROFILE_SAVE_CONTROL_TOTAL_LENGTH:
raise ProtocolError(
f"PROFILE_SAVE_CANCEL length mismatch: {len(packet)} bytes"
)
return packet
@staticmethod
def encode_profile_save_data(*, section_id: int, chunk: bytes) -> bytes:
"""Build one fixed-size data packet carrying profile or waveform text."""
if not isinstance(chunk, (bytes, bytearray)):
raise ValueError("chunk must be bytes")
if not chunk:
raise ValueError("chunk must not be empty")
if len(chunk) > PROFILE_SAVE_DATA_CHUNK_BYTES:
raise ValueError(
f"chunk length must be <= {PROFILE_SAVE_DATA_CHUNK_BYTES}"
)
if section_id not in (
PROFILE_SAVE_SECTION_PROFILE_TEXT,
PROFILE_SAVE_SECTION_WAVEFORM_TEXT,
):
raise ValueError("section_id is invalid")
padded = bytes(chunk) + (b"\x00" * (PROFILE_SAVE_DATA_CHUNK_BYTES - len(chunk)))
data_words = [
padded[index] | (padded[index + 1] << 8)
for index in range(0, PROFILE_SAVE_DATA_CHUNK_BYTES, 2)
]
payload_words = [section_id, len(chunk), *data_words]
payload_words.append(_payload_checksum(payload_words))
packet = _pack_words([CMD_PROFILE_SAVE_DATA, *payload_words])
if len(packet) != PROFILE_SAVE_DATA_TOTAL_LENGTH:
raise ProtocolError(
f"PROFILE_SAVE_DATA length mismatch: {len(packet)} bytes"
)
return packet
@staticmethod
def _encode_short_control(header: int, word0: int, word1: int, word2: int) -> bytes:
words = [header, word0 & 0xFFFF, word1 & 0xFFFF, word2 & 0xFFFF]
words.append(_payload_checksum(words[1:]))
packet = _pack_words(words)
if len(packet) != SHORT_CONTROL_TOTAL_LENGTH:
raise ProtocolError(f"Short control length mismatch: {len(packet)} bytes")
return packet
@staticmethod
def decode_response(data: bytes) -> Measurements:
"""Decode a 30-byte telemetry frame into a Measurements object."""
if len(data) != GET_DATA_TOTAL_LENGTH:
raise ProtocolError(
f"Expected {GET_DATA_TOTAL_LENGTH} bytes, got {len(data)} bytes"
)
words = _unpack_words(data)
expected_crc = _payload_checksum(list(words[1:14]))
if words[14] != expected_crc:
raise CRCError(expected=expected_crc, received=words[14])
return Measurements(
current1=current_n_to_ma(words[1]),
current2=current_n_to_ma(words[2]),
temp1=temp_n_to_c(words[5]),
temp2=temp_n_to_c(words[6]),
temp_ext1=temp_ext_n_to_c(words[7]),
temp_ext2=temp_ext_n_to_c(words[8]),
voltage_3v3=voltage_3v3_n_to_v(words[9]),
voltage_5v1=voltage_5v_n_to_v(words[10]),
voltage_5v2=voltage_5v_n_to_v(words[11]),
voltage_7v0=voltage_7v_n_to_v(words[12]),
message_id=words[13],
to6_counter_lsb=words[3],
to6_counter_msb=words[4],
timestamp=datetime.now(),
)
@staticmethod
def decode_status(data: bytes) -> tuple[DeviceState, int]:
"""Decode the two-byte firmware status response into flags and detail."""
if len(data) != STATUS_RESPONSE_LENGTH:
raise ProtocolError(
f"Expected {STATUS_RESPONSE_LENGTH} status bytes, got {len(data)}"
)
raw_word = _unpack_words(data)[0]
flags = DeviceState(raw_word & 0x00FF)
detail = (raw_word >> 8) & 0x00FF
return flags, detail
@staticmethod
def decode_state(data: bytes) -> int:
"""Compatibility helper returning only the low-byte status mask."""
flags, _detail = Protocol.decode_status(data)
return int(flags)
@staticmethod
def state_to_description(state: DeviceState | int) -> str:
"""Return a readable description for a status mask."""
state = DeviceState(int(state))
if state == DeviceState.OK:
return "All ok."
parts = [
text
for mask, text in STATUS_DESCRIPTIONS.items()
if (state & DeviceState(mask)) == DeviceState(mask)
]
if parts:
return "; ".join(parts)
return f"Unknown status mask: 0x{int(state):02X}"
__all__ = ["Protocol", "_build_crc", "_flipfour", "_int_to_hex4"]

View File

@ -0,0 +1,78 @@
"""Serial transport for the laser controller board."""
from __future__ import annotations
import serial
import serial.tools.list_ports
from .constants import BAUDRATE, SERIAL_TIMEOUT_SEC
from .exceptions import CommunicationError, PortNotFoundError
class SerialTransport:
"""Small serial wrapper with auto-detection and explicit lifecycle."""
def __init__(
self,
port: str | None = None,
baudrate: int = BAUDRATE,
timeout: float = SERIAL_TIMEOUT_SEC,
) -> None:
self._requested_port = port
self._active_port: str | None = None
self._baudrate = baudrate
self._timeout = timeout
self._serial: serial.Serial | None = None
@property
def port_name(self) -> str | None:
"""Return the connected port or the requested port when disconnected."""
return self._active_port or self._requested_port
@property
def is_connected(self) -> bool:
"""Return True when the serial port is currently open."""
return self._serial is not None and self._serial.is_open
def connect(self) -> None:
"""Open the serial port, auto-detecting the first USB port when needed."""
port = self._requested_port or self._detect_port()
try:
self._serial = serial.Serial(
port=port,
baudrate=self._baudrate,
timeout=self._timeout,
)
except Exception as exc: # noqa: BLE001
raise CommunicationError(f"Cannot connect to port '{port}': {exc}") from exc
self._active_port = port
def disconnect(self) -> None:
"""Close the serial port if it is open."""
if self._serial is not None and self._serial.is_open:
self._serial.close()
self._serial = None
def send(self, data: bytes) -> None:
"""Write raw bytes to the serial port."""
if not self.is_connected:
raise CommunicationError("Serial port is not connected")
assert self._serial is not None
self._serial.write(data)
def read(self, length: int) -> bytes:
"""Read a fixed number of bytes from the serial port."""
if not self.is_connected:
raise CommunicationError("Serial port is not connected")
assert self._serial is not None
return self._serial.read(length)
def _detect_port(self) -> str:
ports = sorted(serial.tools.list_ports.comports(), key=lambda port: port.device)
if not ports:
raise PortNotFoundError()
usb_ports = [port.device for port in ports if "USB" in port.device.upper()]
if usb_ports:
return usb_ports[0]
return ports[0].device

133
laser_control/validators.py Normal file
View File

@ -0,0 +1,133 @@
"""Validation helpers for controller inputs."""
import math
import re
from typing import Any
from .constants import (
TEMP_MIN_C, TEMP_MAX_C,
CURRENT_MIN_MA, CURRENT_MAX_MA,
PROFILE_NAME_ALLOWED_PATTERN,
PROFILE_NAME_MAX_LENGTH,
)
from .exceptions import (
ValidationError,
TemperatureOutOfRangeError,
CurrentOutOfRangeError,
InvalidParameterError,
)
class ParameterValidator:
"""Validates all input parameters for the laser controller."""
@staticmethod
def _check_numeric(value: Any, param_name: str) -> float:
"""Check that value is a valid finite number. Returns float."""
if value is None:
raise InvalidParameterError(param_name, "Value must not be None")
if not isinstance(value, (int, float)):
raise InvalidParameterError(param_name, "Value must be a number")
if math.isnan(value):
raise InvalidParameterError(param_name, "Value must not be NaN")
if math.isinf(value):
raise InvalidParameterError(param_name, "Value must not be infinite")
return float(value)
@staticmethod
def validate_temperature(value: Any, param_name: str) -> float:
"""
Validate a laser temperature value.
Args:
value: Temperature in °C.
param_name: Parameter name for error messages.
Returns:
Validated temperature as float.
Raises:
InvalidParameterError: If value is not a valid number.
TemperatureOutOfRangeError: If value is outside [TEMP_MIN_C, TEMP_MAX_C].
"""
value = ParameterValidator._check_numeric(value, param_name)
if value < TEMP_MIN_C or value > TEMP_MAX_C:
raise TemperatureOutOfRangeError(
param_name, value, TEMP_MIN_C, TEMP_MAX_C
)
return value
@staticmethod
def validate_current(value: Any, param_name: str) -> float:
"""
Validate a laser drive current value.
Args:
value: Current in mA.
param_name: Parameter name for error messages.
Returns:
Validated current as float.
Raises:
InvalidParameterError: If value is not a valid number.
CurrentOutOfRangeError: If value is outside [CURRENT_MIN_MA, CURRENT_MAX_MA].
"""
value = ParameterValidator._check_numeric(value, param_name)
if value < CURRENT_MIN_MA or value > CURRENT_MAX_MA:
raise CurrentOutOfRangeError(
param_name, value, CURRENT_MIN_MA, CURRENT_MAX_MA
)
return value
@staticmethod
def validate_manual_mode_params(
temp1: Any,
temp2: Any,
current1: Any,
current2: Any,
) -> dict[str, float]:
"""
Validate all four manual mode parameters.
Args:
temp1: Laser 1 temperature, °C.
temp2: Laser 2 temperature, °C.
current1: Laser 1 current, mA.
current2: Laser 2 current, mA.
Returns:
Dict with validated floats: temp1, temp2, current1, current2.
Raises:
ValidationError: For any out-of-range value.
InvalidParameterError: For wrong types.
"""
return {
'temp1': ParameterValidator.validate_temperature(temp1, 'temp1'),
'temp2': ParameterValidator.validate_temperature(temp2, 'temp2'),
'current1': ParameterValidator.validate_current(current1, 'current1'),
'current2': ParameterValidator.validate_current(current2, 'current2'),
}
@staticmethod
def validate_profile_name(value: Any) -> str:
"""Validate a short ASCII profile name suitable for the device LCD."""
if not isinstance(value, str):
raise InvalidParameterError("profile_name", "Value must be a string")
normalized = value.strip()
if not normalized:
raise InvalidParameterError("profile_name", "Value must not be empty")
if len(normalized) > PROFILE_NAME_MAX_LENGTH:
raise InvalidParameterError(
"profile_name",
f"Value must be at most {PROFILE_NAME_MAX_LENGTH} characters long",
)
if re.fullmatch(PROFILE_NAME_ALLOWED_PATTERN, normalized) is None:
raise InvalidParameterError(
"profile_name",
"Only ASCII letters, digits, spaces, '-' and '_' are allowed",
)
return normalized

3
requirements.txt Normal file
View File

@ -0,0 +1,3 @@
PyQt6>=6.6
pyqtgraph>=0.13
pyserial==3.5

5
run
View File

@ -1,7 +1,4 @@
#!/usr/bin/bash
#reset generator PCB
pinctrl set 26 op dl # drive PCB NRST LOW -> reset stm32
pinctrl set 26 op dh # turn stm32 back ON
source .venv/bin/activate
python3 _device_main.py
python3 -m laser_control.gui.main

65
run_device_main.bat Normal file
View File

@ -0,0 +1,65 @@
@echo off
setlocal
set "PROJECT_DIR=%~dp0"
pushd "%PROJECT_DIR%" >nul || (
echo Failed to switch to project directory: %PROJECT_DIR%
exit /b 1
)
set "VENV_DIR=.venv"
set "VENV_PYTHON=%VENV_DIR%\Scripts\python.exe"
set "VENV_ACTIVATE=%VENV_DIR%\Scripts\activate.bat"
if exist "%VENV_PYTHON%" goto venv_ready
echo Creating virtual environment...
call :find_python
if errorlevel 1 goto :error
"%PYTHON_EXE%" %PYTHON_ARGS% -m venv "%VENV_DIR%"
if errorlevel 1 goto :error
:venv_ready
call "%VENV_ACTIVATE%"
if errorlevel 1 goto :error
if exist "requirements.txt" (
echo Installing dependencies...
python -m pip install --disable-pip-version-check -r requirements.txt
if errorlevel 1 goto :error
) else (
echo requirements.txt not found. Skipping dependency installation.
)
echo Starting laser_control.gui.main...
python -m laser_control.gui.main
set "EXIT_CODE=%ERRORLEVEL%"
popd >nul
exit /b %EXIT_CODE%
:find_python
where py >nul 2>&1
if not errorlevel 1 (
set "PYTHON_EXE=py"
set "PYTHON_ARGS=-3"
exit /b 0
)
where python >nul 2>&1
if not errorlevel 1 (
set "PYTHON_EXE=python"
set "PYTHON_ARGS="
exit /b 0
)
echo Python 3 was not found. Install Python and try again.
exit /b 1
:error
set "EXIT_CODE=%ERRORLEVEL%"
echo Script failed with exit code %EXIT_CODE%.
popd >nul
exit /b %EXIT_CODE%