# shitpost import sys import math import socket import platform from PyQt6 import uic from dataclasses import dataclass from PyQt6.QtCore import QProcess, QTimer from PyQt6.QtCore import QObject, QThread, pyqtSignal import pyqtgraph as pg from PyQt6.QtWidgets import QApplication, QMainWindow @dataclass class ReflectometerConfig: ip: str send_port: int recv_port: int dac_bits: int data_width: int window_size: int packet_size: int pulse_width: int pulse_period: int pulse_height: int pulse_num: int dac_adc_ratio: float = 0.52 socket_timeout_sec: float = 2.0 class ReflectometerWorker(QObject): data_ready = pyqtSignal(list) status = pyqtSignal(str) error = pyqtSignal(str) finished = pyqtSignal() def __init__(self, config: ReflectometerConfig): super().__init__() self.config = config self._stop_requested = False self._sock = None def stop(self): self._stop_requested = True if self._sock is not None: try: self._sock.close() except OSError: pass def run(self): try: self._validate_config() self.status.emit("Открытие UDP-сокета...") self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self._sock.settimeout(self.config.socket_timeout_sec) self._sock.bind(("0.0.0.0", self.config.recv_port)) dest = (self.config.ip, self.config.send_port) self.status.emit("Отправка soft reset...") self._sock.sendto((0x0F00).to_bytes(2, "big"), dest) self.status.emit("Отправка параметров...") ctrl_data = self._format_ctrl_data() self._sock.sendto(ctrl_data, dest) self.status.emit("Отправка start...") self._sock.sendto((0xF000).to_bytes(2, "big"), dest) self.status.emit("Приём данных...") data = self._recv_data() if self._stop_requested: self.status.emit("Операция остановлена") return self.data_ready.emit(data) self.status.emit(f"Получено samples: {len(data)}") except Exception as e: if not self._stop_requested: self.error.emit(str(e)) finally: if self._sock is not None: try: self._sock.close() except OSError: pass self.finished.emit() def _format_ctrl_data(self) -> bytes: output = bytearray() output += 0b10001000.to_bytes(1, "little") pulse_period_adc = ( int(self.config.pulse_period * self.config.dac_adc_ratio) // self.config.window_size ) * self.config.window_size output += self.config.pulse_width.to_bytes(4, "little") output += self.config.pulse_period.to_bytes(4, "little") output += self.config.pulse_num.to_bytes(2, "little") output += self.config.pulse_height.to_bytes(2, "little") output += pulse_period_adc.to_bytes(4, "little") if len(output) != 17: raise ValueError("Config data should be 128 bits + 8 bit header") return bytes(output) def _recv_data(self) -> list[int]: packet_count = math.ceil( ( self.config.dac_adc_ratio * self.config.pulse_period / self.config.window_size * self.config.data_width ) / self.config.packet_size ) expected_length = math.ceil( self.config.dac_adc_ratio * self.config.pulse_period / self.config.window_size ) recv_buf = [] for pkt_cnt in range(packet_count): if self._stop_requested: break try: packet, _ = self._sock.recvfrom(65536) except socket.timeout: raise TimeoutError(f"Таймаут приёма UDP-пакета #{pkt_cnt + 1}") if len(packet) % self.config.data_width != 0: raise ValueError( f"Некорректный размер UDP-пакета: {len(packet)} байт" ) for i in range(0, len(packet), self.config.data_width): sample = int.from_bytes( packet[i:i + self.config.data_width], "little", ) recv_buf.append(sample) if len(recv_buf) < expected_length: raise ValueError( f"Data underflow: получено {len(recv_buf)}, ожидалось {expected_length}" ) return recv_buf[:expected_length - 1] def _validate_config(self): if self.config.pulse_period <= 0: raise ValueError("pulse_period должен быть больше 0") if self.config.pulse_num <= 0: raise ValueError("pulse_num должен быть больше 0") if self.config.window_size <= 0: raise ValueError("window_size должен быть больше 0") if self.config.packet_size <= 0: raise ValueError("packet_size должен быть больше 0") if self.config.data_width <= 0: raise ValueError("data_width должен быть больше 0") if self.config.pulse_period % self.config.window_size != 0: raise ValueError("pulse_period должен быть кратен window_size") if self.config.pulse_width >= 2**32 - 1: raise ValueError("pulse_width слишком большой") if self.config.pulse_period >= 2**32 - 1: raise ValueError("pulse_period слишком большой") if self.config.pulse_num >= 2**16 - 1: raise ValueError("pulse_num слишком большой") if self.config.pulse_height >= 2**self.config.dac_bits - 1: raise ValueError("pulse_height слишком большой") class MainWindow(QMainWindow): def __init__(self): super().__init__() uic.loadUi("reflectometer.ui", self) self.ping_process = None self.ping_timeout_timer = QTimer(self) self.ping_timeout_timer.setSingleShot(True) self.ping_timeout_timer.timeout.connect(self.on_ping_timeout) self.button_ping.clicked.connect(self.check_ping) # settings self.pulse_period = 0 self.pulse_height = 0 self.pulse_width = 0 self.pulse_num = 0 self.dac_dw = 14 self.adc_dw = 12 self.nmax = 4096 self.packet_size = 1024 self.window_size = 65 self.dac_adc_ration = 0.52 self.accum_width = 32 # setup self.set_max_pulse_height(100) self.set_max_pulse_width(500) self.set_max_pulse_period(1000) self.set_max_pulse_num(20) self.setup_pulse_controls() self.setup_global_settings() self.update_pulse_limits() self.data = [] self.dac_adc_ratio = 0.52 self.measurement_thread = None self.measurement_worker = None self.setup_graph() self.setup_network_settings() self.button_start.clicked.connect(self.run_measurement) # ping utils def check_ping(self): ip = self.line_ip.text().strip() if not ip: self.label_ping_status.setText("set ip!!") return if "_" in self.line_ip.displayText(): self.label_ping_status.setText("IP invalid") return if self.ping_process is not None: if self.ping_process.state() != QProcess.ProcessState.NotRunning: self.label_ping_status.setText("Ping inflight") return self.label_ping_status.setText("ping...") self.button_ping.setEnabled(False) self.ping_process = QProcess(self) self.ping_process.finished.connect(self.on_ping_finished) self.ping_process.errorOccurred.connect(self.on_ping_error) system_name = platform.system().lower() if system_name == "windows": program = "ping" arguments = ["-n", "1", "-w", "2000", ip] else: program = "ping" arguments = ["-c", "1", "-W", "2", ip] self.ping_process.start(program, arguments) # fallback self.ping_timeout_timer.start(2000) def on_ping_finished(self, exit_code, exit_status): self.ping_timeout_timer.stop() self.button_ping.setEnabled(True) if exit_code == 0: self.label_ping_status.setText("алё✅") else: self.label_ping_status.setText("не алё❌") def on_ping_error(self): self.ping_timeout_timer.stop() self.button_ping.setEnabled(True) self.label_ping_status.setText("ping unavail") def on_ping_timeout(self): if self.ping_process is not None: if self.ping_process.state() != QProcess.ProcessState.NotRunning: self.ping_process.kill() self.button_ping.setEnabled(True) # pulse controls def setup_pulse_controls(self): self._bind_slider_and_spinbox( name="pulse_period", slider=self.slider_pulse_period, box=self.box_pulse_period, normalize_value=self.normalize_pulse_period, ) self._bind_slider_and_spinbox( name="pulse_height", slider=self.slider_pulse_height, box=self.box_pulse_height, ) self._bind_slider_and_spinbox( name="pulse_width", slider=self.slider_pulse_width, box=self.box_pulse_width, ) self._bind_slider_and_spinbox( name="pulse_num", slider=self.slider_pulse_num, box=self.box_pulse_num, ) def _bind_slider_and_spinbox(self, name, slider, box, normalize_value=None): """ Связывает QSlider и QSpinBox по значению. Значение автоматически записывается в self.. """ minimum = min(slider.minimum(), box.minimum()) maximum = max(slider.maximum(), box.maximum()) slider.setRange(minimum, maximum) box.setRange(minimum, maximum) def normalize(value): if normalize_value is None: return value return normalize_value(value) value = normalize(box.value()) slider.setValue(value) box.setValue(value) setattr(self, name, value) def update_value(new_value): new_value = normalize(new_value) if slider.value() != new_value: slider.setValue(new_value) if box.value() != new_value: box.setValue(new_value) setattr(self, name, new_value) slider.valueChanged.connect(update_value) box.valueChanged.connect(update_value) def normalize_pulse_period(self, value): step = max(1, getattr(self, "window_size", self.box_window_size.value())) snapped_value = round(value / step) * step minimum = self.box_pulse_period.minimum() maximum = self.box_pulse_period.maximum() return max(minimum, min(snapped_value, maximum)) def _set_max_for_pair(self, slider, box, maximum): slider.setMaximum(maximum) box.setMaximum(maximum) value = min(box.value(), maximum) box.setValue(value) slider.setValue(value) def set_max_pulse_period(self, maximum): self._set_max_for_pair( slider=self.slider_pulse_period, box=self.box_pulse_period, maximum=maximum, ) self.pulse_period = self.box_pulse_period.value() def set_max_pulse_height(self, maximum): self._set_max_for_pair( slider=self.slider_pulse_height, box=self.box_pulse_height, maximum=maximum, ) self.pulse_height = self.box_pulse_height.value() def set_max_pulse_width(self, maximum): self._set_max_for_pair( slider=self.slider_pulse_width, box=self.box_pulse_width, maximum=maximum, ) self.pulse_width = self.box_pulse_width.value() def set_max_pulse_num(self, maximum): self._set_max_for_pair( slider=self.slider_pulse_num, box=self.box_pulse_num, maximum=maximum, ) self.pulse_num = self.box_pulse_num.value() # settings def setup_global_settings(self): self._bind_spinbox_setting( name="dac_dw", box=self.box_dac_dw, ) self._bind_spinbox_setting( name="adc_dw", box=self.box_adc_dw, ) self._bind_spinbox_setting( name="nmax", box=self.box_nmax, ) self._bind_spinbox_setting( name="window_size", box=self.box_window_size, after_change=self.on_window_size_changed, ) self._bind_spinbox_setting( name="packet_size", box=self.box_packet_size, ) self._bind_spinbox_setting( name="dac_adc_ratio", box=self.box_dac_adc_ratio, ) self._bind_spinbox_setting( name="accum_width", box=self.box_accum_width, ) self._bind_spinbox_setting( name="recv_port", box=self.box_recv_port, ) self._bind_spinbox_setting( name="send_port", box=self.box_send_port, ) # применяем шаг для pulse_period сразу при старте self.update_pulse_period_step() def _bind_spinbox_setting(self, name, box, after_change=None): """ Связывает QSpinBox с полем self.. Например: box_dac_dw -> self.dac_dw box_window_size -> self.window_size """ value = box.value() setattr(self, name, value) def on_value_changed(new_value): setattr(self, name, new_value) self.update_pulse_limits() if after_change is not None: after_change(new_value) box.valueChanged.connect(on_value_changed) def update_pulse_limits(self): # re-calc limits """ Сюда потом можно дописать формулы для пересчета максимумов. Например: max_pulse_height = 2 ** self.dac_dw - 1 self.set_max_pulse_height(max_pulse_height) max_pulse_period = self.nmax * self.window_size self.set_max_pulse_period(max_pulse_period) max_pulse_width = ... self.set_max_pulse_width(max_pulse_width) max_pulse_num = ... self.set_max_pulse_num(max_pulse_num) """ # nmax -> pulse_period limit self.set_max_pulse_period(self.nmax * self.window_size) self.set_max_pulse_width(self.nmax * self.window_size) # accum_width + adc_width -> max pulse num self.set_max_pulse_num(2 ** (self.accum_width - self.adc_dw)) # dac_width -> max pulse height self.set_max_pulse_height(2 ** self.dac_dw) self.slider_pulse_period.setMinimum(self.window_size) self.box_pulse_period.setMinimum(self.window_size) def on_window_size_changed(self, new_value): self.update_pulse_period_step() def update_pulse_period_step(self): # set window_size step step = max(1, self.window_size) self.box_pulse_period.setSingleStep(step) self.slider_pulse_period.setSingleStep(step) self.slider_pulse_period.setPageStep(step) self.snap_pulse_period_to_step(step) def snap_pulse_period_to_step(self, step): """ Подгоняет текущее значение pulse_period к ближайшему кратному window_size. Это нужно потому, что QSlider при перетаскивании мышкой всё равно может дать любое промежуточное значение. """ current_value = self.box_pulse_period.value() snapped_value = round(current_value / step) * step minimum = self.box_pulse_period.minimum() maximum = self.box_pulse_period.maximum() snapped_value = max(minimum, min(snapped_value, maximum)) self.box_pulse_period.setValue(snapped_value) self.slider_pulse_period.setValue(snapped_value) self.pulse_period = snapped_value # graph def setup_graph(self): self.graph_widget = pg.PlotWidget() self.graph_widget.setLabel("left", "ADC value") self.graph_widget.setLabel("bottom", "Sample") self.graph_widget.showGrid(x=True, y=True) self.graph_curve = self.graph_widget.plot([]) self.graph_layout.addWidget(self.graph_widget) def setup_network_settings(self): self._bind_spinbox_setting( name="recv_port", box=self.box_recv_port, ) self._bind_spinbox_setting( name="send_port", box=self.box_send_port, ) def run_measurement(self): if self.measurement_thread is not None: if self.measurement_thread.isRunning(): self.set_measurement_status("Измерение уже выполняется") return config = self.build_reflectometer_config() self.data = [] self.graph_curve.setData([]) self.measurement_thread = QThread(self) self.measurement_worker = ReflectometerWorker(config) self.measurement_worker.moveToThread(self.measurement_thread) self.measurement_thread.started.connect(self.measurement_worker.run) self.measurement_worker.status.connect(self.set_measurement_status) self.measurement_worker.error.connect(self.on_measurement_error) self.measurement_worker.data_ready.connect(self.on_data_received) self.measurement_worker.finished.connect(self.measurement_thread.quit) self.measurement_worker.finished.connect( self.measurement_worker.deleteLater) self.measurement_thread.finished.connect( self.measurement_thread.deleteLater) self.measurement_thread.finished.connect(self.on_measurement_finished) self.measurement_thread.start() def build_reflectometer_config(self) -> ReflectometerConfig: ip = self.line_ip.text().strip() if not ip: raise ValueError("IP адрес не задан") # ВАЖНО: # В console.py data_width был в байтах: 4 для int32. # Если у тебя box_adc_dw хранит именно 4, оставь так. # Если box_adc_dw хранит 32 бита, замени на: # data_width = self.adc_dw // 8 data_width = self.adc_dw // 8 return ReflectometerConfig( ip=ip, send_port=self.send_port, recv_port=self.recv_port, dac_bits=self.dac_dw, data_width=data_width, window_size=self.window_size, packet_size=self.packet_size, pulse_width=self.pulse_width, pulse_period=self.pulse_period, pulse_height=self.pulse_height, pulse_num=self.pulse_num, dac_adc_ratio=self.dac_adc_ratio, ) def on_data_received(self, data: list[int]): self.data = data x = list(range(len(data))) self.graph_curve.setData(x, data) if data: self.set_measurement_status( f"Готово. Samples: {len(data)}, min: {min(data)}, max: {max(data)}" ) else: self.set_measurement_status("Готово, но данные пустые") def on_measurement_error(self, message: str): self.set_measurement_status(f"Ошибка: {message}") def on_measurement_finished(self): self.measurement_worker = None self.measurement_thread = None def stop_measurement(self): if self.measurement_worker is not None: self.measurement_worker.stop() def set_measurement_status(self, text: str): print(text) # Если отдельного label под статус измерения пока нет, # можно временно использовать label_ping_status. if hasattr(self, "label_ping_status"): self.label_ping_status.setText(text) def main(): app = QApplication(sys.argv) window = MainWindow() window.show() sys.exit(app.exec()) if __name__ == "__main__": main()