Files
reflectometer_fpga_project/software/gui.py
2026-05-26 16:46:51 +03:00

749 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# shitpost
import sys
import math
import socket
import platform
from PyQt6 import uic
from dataclasses import dataclass
from PyQt6.QtCore import QProcess, QTimer
from PyQt6.QtCore import QObject, QThread, pyqtSignal
from PyQt6.QtCore import Qt
import pyqtgraph as pg
from PyQt6.QtWidgets import QApplication, QMainWindow
@dataclass
class ReflectometerConfig:
ip: str
send_port: int
recv_port: int
dac_bits: int
data_width: int
window_size: int
packet_size: int
pulse_width: int
pulse_period: int
pulse_height: int
pulse_num: int
adc_dac_ratio: float = 0.52
socket_timeout_sec: float = 2.0
class ReflectometerWorker(QObject):
data_ready = pyqtSignal(list)
status = pyqtSignal(str)
error = pyqtSignal(str)
finished = pyqtSignal()
def __init__(self, config: ReflectometerConfig):
super().__init__()
self.config = config
self._stop_requested = False
self._sock = None
def stop(self):
self._stop_requested = True
if self._sock is not None:
try:
self._sock.close()
except OSError:
pass
def run(self):
try:
self._validate_config()
self.status.emit("Открытие UDP-сокета...")
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._sock.settimeout(self.config.socket_timeout_sec)
self._sock.bind(("0.0.0.0", self.config.recv_port))
dest = (self.config.ip, self.config.send_port)
self.status.emit("Отправка soft reset...")
self._sock.sendto((0x0F00).to_bytes(2, "big"), dest)
self.status.emit("Отправка параметров...")
ctrl_data = self._format_ctrl_data()
self._sock.sendto(ctrl_data, dest)
self.status.emit("Отправка start...")
self._sock.sendto((0xF000).to_bytes(2, "big"), dest)
self.status.emit("Приём данных...")
data = self._recv_data()
if self._stop_requested:
self.status.emit("Операция остановлена")
return
self.data_ready.emit(data)
self.status.emit(f"Получено samples: {len(data)}")
except Exception as e:
if not self._stop_requested:
self.error.emit(str(e))
finally:
if self._sock is not None:
try:
self._sock.close()
except OSError:
pass
self.finished.emit()
def _format_ctrl_data(self) -> bytes:
output = bytearray()
output += 0b10001000.to_bytes(1, "little")
pulse_period_adc = (
int(self.config.pulse_period * self.config.adc_dac_ratio)
// self.config.window_size
) * self.config.window_size
output += self.config.pulse_width.to_bytes(4, "little")
output += self.config.pulse_period.to_bytes(4, "little")
output += self.config.pulse_num.to_bytes(2, "little")
output += self.config.pulse_height.to_bytes(2, "little")
output += pulse_period_adc.to_bytes(4, "little")
if len(output) != 17:
raise ValueError("Config data should be 128 bits + 8 bit header")
return bytes(output)
def _recv_data(self) -> list[int]:
packet_count = math.ceil(
(
self.config.adc_dac_ratio
* self.config.pulse_period
/ self.config.window_size
* self.config.data_width
)
/ self.config.packet_size
)
expected_length = math.ceil(
self.config.adc_dac_ratio
* self.config.pulse_period
/ self.config.window_size
)
recv_buf = []
for pkt_cnt in range(packet_count):
if self._stop_requested:
break
try:
packet, _ = self._sock.recvfrom(65536)
except socket.timeout:
raise TimeoutError(f"Таймаут приёма UDP-пакета #{pkt_cnt + 1}")
if len(packet) % self.config.data_width != 0:
raise ValueError(
f"Некорректный размер UDP-пакета: {len(packet)} байт"
)
for i in range(0, len(packet), self.config.data_width):
sample = int.from_bytes(
packet[i:i + self.config.data_width],
"little",
)
recv_buf.append(sample)
if len(recv_buf) < expected_length:
raise ValueError(
f"Data underflow: получено {len(recv_buf)}, ожидалось {expected_length}"
)
return recv_buf[:expected_length - 1]
def _validate_config(self):
if self.config.pulse_period <= 0:
raise ValueError("pulse_period должен быть больше 0")
if self.config.pulse_num <= 0:
raise ValueError("pulse_num должен быть больше 0")
if self.config.window_size <= 0:
raise ValueError("window_size должен быть больше 0")
if self.config.packet_size <= 0:
raise ValueError("packet_size должен быть больше 0")
if self.config.data_width <= 0:
raise ValueError("data_width должен быть больше 0")
if self.config.pulse_period % self.config.window_size != 0:
raise ValueError("pulse_period должен быть кратен window_size")
if self.config.pulse_width >= 2**32 - 1:
raise ValueError("pulse_width слишком большой")
if self.config.pulse_period >= 2**32 - 1:
raise ValueError("pulse_period слишком большой")
if self.config.pulse_num >= 2**16 - 1:
raise ValueError("pulse_num слишком большой")
if self.config.pulse_height >= 2**self.config.dac_bits - 1:
raise ValueError("pulse_height слишком большой")
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
uic.loadUi("reflectometer.ui", self)
self.ping_process = None
self.ping_timeout_timer = QTimer(self)
self.ping_timeout_timer.setSingleShot(True)
self.ping_timeout_timer.timeout.connect(self.on_ping_timeout)
self.button_ping.clicked.connect(self.check_ping)
# settings
self.pulse_period = 0
self.pulse_height = 0
self.pulse_width = 0
self.pulse_num = 0
self.dac_dw = 14
self.adc_dw = 12
self.nmax = 4096
self.packet_size = 1024
self.window_size = 65
self.adc_dac_ration = 0.52
self.accum_width = 32
# setup
self.setup_pulse_controls()
self.setup_global_settings()
self.update_pulse_limits()
self.data = []
self.adc_dac_ratio = 0.52
self.measurement_thread = None
self.measurement_worker = None
self.setup_graph()
self.setup_network_settings()
self.button_start.clicked.connect(self.run_measurement)
# ping utils
def check_ping(self):
ip = self.line_ip.text().strip()
if not ip:
self.label_ping_status.setText("set ip!!")
return
if "_" in self.line_ip.displayText():
self.label_ping_status.setText("IP invalid")
return
if self.ping_process is not None:
if self.ping_process.state() != QProcess.ProcessState.NotRunning:
self.label_ping_status.setText("Ping inflight")
return
self.label_ping_status.setText("ping...")
self.button_ping.setEnabled(False)
self.ping_process = QProcess(self)
self.ping_process.finished.connect(self.on_ping_finished)
self.ping_process.errorOccurred.connect(self.on_ping_error)
system_name = platform.system().lower()
if system_name == "windows":
program = "ping"
arguments = ["-n", "1", "-w", "2000", ip]
else:
program = "ping"
arguments = ["-c", "1", "-W", "2", ip]
self.ping_process.start(program, arguments)
# fallback
self.ping_timeout_timer.start(2000)
def on_ping_finished(self, exit_code, exit_status):
self.ping_timeout_timer.stop()
self.button_ping.setEnabled(True)
if exit_code == 0:
self.label_ping_status.setText("алё✅")
else:
self.label_ping_status.setText("не алё❌")
def on_ping_error(self):
self.ping_timeout_timer.stop()
self.button_ping.setEnabled(True)
self.label_ping_status.setText("ping unavail")
def on_ping_timeout(self):
if self.ping_process is not None:
if self.ping_process.state() != QProcess.ProcessState.NotRunning:
self.ping_process.kill()
self.button_ping.setEnabled(True)
# pulse controls
def setup_pulse_controls(self):
self._bind_slider_and_spinbox(
name="pulse_period",
slider=self.slider_pulse_period,
box=self.box_pulse_period,
normalize_value=self.normalize_pulse_period,
)
self._bind_slider_and_spinbox(
name="pulse_height",
slider=self.slider_pulse_height,
box=self.box_pulse_height,
)
self._bind_slider_and_spinbox(
name="pulse_width",
slider=self.slider_pulse_width,
box=self.box_pulse_width,
)
self._bind_slider_and_spinbox(
name="pulse_num",
slider=self.slider_pulse_num,
box=self.box_pulse_num,
)
def _bind_slider_and_spinbox(self, name, slider, box, normalize_value=None):
"""
Связывает QSlider и QSpinBox по значению.
Значение автоматически записывается в self.<name>.
"""
minimum = min(slider.minimum(), box.minimum())
maximum = max(slider.maximum(), box.maximum())
slider.setRange(minimum, maximum)
box.setRange(minimum, maximum)
def normalize(value):
if normalize_value is None:
return value
return normalize_value(value)
value = normalize(box.value())
slider.setValue(value)
box.setValue(value)
setattr(self, name, value)
def update_value(new_value):
new_value = normalize(new_value)
if slider.value() != new_value:
slider.setValue(new_value)
if box.value() != new_value:
box.setValue(new_value)
setattr(self, name, new_value)
slider.valueChanged.connect(update_value)
box.valueChanged.connect(update_value)
def normalize_pulse_period(self, value):
step = max(1, getattr(self, "window_size",
self.box_window_size.value()))
snapped_value = round(value / step) * step
minimum = self.box_pulse_period.minimum()
maximum = self.box_pulse_period.maximum()
return max(minimum, min(snapped_value, maximum))
def _set_max_for_pair(self, slider, box, maximum):
slider.setMaximum(maximum)
box.setMaximum(maximum)
value = min(box.value(), maximum)
box.setValue(value)
slider.setValue(value)
def set_max_pulse_period(self, maximum):
self._set_max_for_pair(
slider=self.slider_pulse_period,
box=self.box_pulse_period,
maximum=maximum,
)
self.pulse_period = self.box_pulse_period.value()
def set_max_pulse_height(self, maximum):
self._set_max_for_pair(
slider=self.slider_pulse_height,
box=self.box_pulse_height,
maximum=maximum,
)
self.pulse_height = self.box_pulse_height.value()
def set_max_pulse_width(self, maximum):
self._set_max_for_pair(
slider=self.slider_pulse_width,
box=self.box_pulse_width,
maximum=maximum,
)
self.pulse_width = self.box_pulse_width.value()
def set_max_pulse_num(self, maximum):
self._set_max_for_pair(
slider=self.slider_pulse_num,
box=self.box_pulse_num,
maximum=maximum,
)
self.pulse_num = self.box_pulse_num.value()
# settings
def setup_global_settings(self):
self._bind_spinbox_setting(
name="dac_dw",
box=self.box_dac_dw,
)
self._bind_spinbox_setting(
name="adc_dw",
box=self.box_adc_dw,
)
self._bind_spinbox_setting(
name="nmax",
box=self.box_nmax,
)
self._bind_spinbox_setting(
name="window_size",
box=self.box_window_size,
after_change=self.on_window_size_changed,
)
self._bind_spinbox_setting(
name="packet_size",
box=self.box_packet_size,
)
self._bind_spinbox_setting(
name="adc_dac_ratio",
box=self.box_adc_dac_ratio,
)
self._bind_spinbox_setting(
name="accum_width",
box=self.box_accum_width,
)
self._bind_spinbox_setting(
name="recv_port",
box=self.box_recv_port,
)
self._bind_spinbox_setting(
name="send_port",
box=self.box_send_port,
)
# применяем шаг для pulse_period сразу при старте
self.update_pulse_period_step()
def _bind_spinbox_setting(self, name, box, after_change=None):
"""
Связывает QSpinBox с полем self.<name>.
Например:
box_dac_dw -> self.dac_dw
box_window_size -> self.window_size
"""
value = box.value()
setattr(self, name, value)
def on_value_changed(new_value):
setattr(self, name, new_value)
self.update_pulse_limits()
if after_change is not None:
after_change(new_value)
box.valueChanged.connect(on_value_changed)
def update_pulse_limits(self):
# re-calc limits
"""
Сюда потом можно дописать формулы для пересчета максимумов.
Например:
max_pulse_height = 2 ** self.dac_dw - 1
self.set_max_pulse_height(max_pulse_height)
max_pulse_period = self.nmax * self.window_size
self.set_max_pulse_period(max_pulse_period)
max_pulse_width = ...
self.set_max_pulse_width(max_pulse_width)
max_pulse_num = ...
self.set_max_pulse_num(max_pulse_num)
"""
# nmax -> pulse_period limit
self.set_max_pulse_period(self.nmax * self.window_size)
self.set_max_pulse_width(self.nmax * self.window_size)
# accum_width + adc_width -> max pulse num
self.set_max_pulse_num(2 ** (self.accum_width - self.adc_dw) - 1)
# dac_width -> max pulse height
self.set_max_pulse_height(2 ** self.dac_dw - 1)
self.slider_pulse_period.setMinimum(self.window_size)
self.box_pulse_period.setMinimum(self.window_size)
def on_window_size_changed(self, new_value):
self.update_pulse_period_step()
def update_pulse_period_step(self):
# set window_size step
step = max(1, self.window_size)
self.box_pulse_period.setSingleStep(step)
self.slider_pulse_period.setSingleStep(step)
self.slider_pulse_period.setPageStep(step)
self.snap_pulse_period_to_step(step)
def snap_pulse_period_to_step(self, step):
"""
Подгоняет текущее значение pulse_period к ближайшему кратному window_size.
Это нужно потому, что QSlider при перетаскивании мышкой
всё равно может дать любое промежуточное значение.
"""
current_value = self.box_pulse_period.value()
snapped_value = round(current_value / step) * step
minimum = self.box_pulse_period.minimum()
maximum = self.box_pulse_period.maximum()
snapped_value = max(minimum, min(snapped_value, maximum))
self.box_pulse_period.setValue(snapped_value)
self.slider_pulse_period.setValue(snapped_value)
self.pulse_period = snapped_value
# graph
def setup_graph(self):
self.graph_widget = pg.PlotWidget()
self.graph_widget.setLabel("left", "ADC value")
self.graph_widget.setLabel("bottom", "Sample")
self.graph_widget.showGrid(x=True, y=True)
self.graph_curve = self.graph_widget.plot(
[],
name="Data",
)
self.reference_curve = self.graph_widget.plot(
[],
name="Reference",
color="b"
)
self.graph_layout.addWidget(self.graph_widget)
self.graph_curve = self.graph_widget.plot([], pen=pg.mkPen(width=2))
self.reference_curve = self.graph_widget.plot(
[], pen=pg.mkPen(style=Qt.PenStyle.DashLine))
self.checkbox_draw_reference.stateChanged.connect(
self.update_reference_graph)
def setup_network_settings(self):
self._bind_spinbox_setting(
name="recv_port",
box=self.box_recv_port,
)
self._bind_spinbox_setting(
name="send_port",
box=self.box_send_port,
)
def run_measurement(self):
if self.measurement_thread is not None:
if self.measurement_thread.isRunning():
self.set_measurement_status("Измерение уже выполняется")
return
config = self.build_reflectometer_config()
self.data = []
self.graph_curve.setData([])
self.measurement_thread = QThread(self)
self.measurement_worker = ReflectometerWorker(config)
self.measurement_worker.moveToThread(self.measurement_thread)
self.measurement_thread.started.connect(self.measurement_worker.run)
self.measurement_worker.status.connect(self.set_measurement_status)
self.measurement_worker.error.connect(self.on_measurement_error)
self.measurement_worker.data_ready.connect(self.on_data_received)
self.measurement_worker.finished.connect(self.measurement_thread.quit)
self.measurement_worker.finished.connect(
self.measurement_worker.deleteLater)
self.measurement_thread.finished.connect(
self.measurement_thread.deleteLater)
self.measurement_thread.finished.connect(self.on_measurement_finished)
self.measurement_thread.start()
def build_reflectometer_config(self) -> ReflectometerConfig:
ip = self.line_ip.text().strip()
if not ip:
raise ValueError("IP адрес не задан")
data_width = self.accum_width // 8
return ReflectometerConfig(
ip=ip,
send_port=self.send_port,
recv_port=self.recv_port,
dac_bits=self.dac_dw,
data_width=data_width,
window_size=self.window_size,
packet_size=self.packet_size,
pulse_width=self.pulse_width,
pulse_period=self.pulse_period,
pulse_height=self.pulse_height,
pulse_num=self.pulse_num,
adc_dac_ratio=self.adc_dac_ratio,
)
def on_data_received(self, data: list[int]):
self.data = data
self.draw_main_graph()
self.update_reference_graph()
if data:
self.set_measurement_status(
f"Готово. Samples: {len(data)}, min: {min(data)}, max: {max(data)}"
)
else:
self.set_measurement_status("Готово, но данные пустые")
# normalize
for i in range(len(data)):
self.data[i] -= 2 ** (self.adc_dw - 1)
self.data[i] /= (self.window_size * self.pulse_num)
def on_measurement_error(self, message: str):
self.set_measurement_status(f"Ошибка: {message}")
def on_measurement_finished(self):
self.measurement_worker = None
self.measurement_thread = None
def stop_measurement(self):
if self.measurement_worker is not None:
self.measurement_worker.stop()
def set_measurement_status(self, text: str):
print(text)
# Если отдельного label под статус измерения пока нет,
# можно временно использовать label_ping_status.
if hasattr(self, "label_ping_status"):
self.label_ping_status.setText(text)
def draw_main_graph(self):
if not self.data:
self.graph_curve.setData([])
return
x = list(range(len(self.data)))
self.graph_curve.setData(x, self.data)
def update_reference_graph(self):
"""
Рисует или очищает эталонный график.
Вызывается после получения данных и при переключении checkbox_draw_reference.
"""
if not self.checkbox_draw_reference.isChecked():
self.reference_curve.setData([])
return
if not self.data:
self.reference_curve.setData([])
return
reference_data = self.build_reference_data(len(self.data))
if not reference_data:
self.reference_curve.setData([])
return
x = list(range(len(reference_data)))
self.reference_curve.setData(x, reference_data)
def build_reference_data(self, length: int) -> list[int]:
reference = [0] * length
reference[0:(self.pulse_width // self.window_size + 1)
] = [self.pulse_height, ] * (self.pulse_width // self.window_size)
return reference
def main():
app = QApplication(sys.argv)
window = MainWindow()
window.show()
sys.exit(app.exec())
if __name__ == "__main__":
main()