# shitpost import sys import math import socket import platform from PyQt6 import uic from dataclasses import dataclass from PyQt6.QtCore import QProcess, QTimer from PyQt6.QtCore import QObject, QThread, pyqtSignal from PyQt6.QtCore import Qt import pyqtgraph as pg from PyQt6.QtWidgets import QApplication, QMainWindow @dataclass class ReflectometerConfig: ip: str send_port: int recv_port: int dac_bits: int data_width: int window_size: int packet_size: int pulse_width: int pulse_period: int pulse_height: int pulse_num: int adc_dac_ratio: float = 0.52 socket_timeout_sec: float = 2.0 class ReflectometerWorker(QObject): data_ready = pyqtSignal(list) status = pyqtSignal(str) error = pyqtSignal(str) finished = pyqtSignal() def __init__(self, config: ReflectometerConfig): super().__init__() self.config = config self._stop_requested = False self._sock = None def stop(self): self._stop_requested = True if self._sock is not None: try: self._sock.close() except OSError: pass def run(self): try: self._validate_config() self.status.emit("Открытие UDP-сокета...") self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self._sock.settimeout(self.config.socket_timeout_sec) self._sock.bind(("0.0.0.0", self.config.recv_port)) dest = (self.config.ip, self.config.send_port) self.status.emit("Отправка soft reset...") self._sock.sendto((0x0F00).to_bytes(2, "big"), dest) self.status.emit("Отправка параметров...") ctrl_data = self._format_ctrl_data() self._sock.sendto(ctrl_data, dest) self.status.emit("Отправка start...") self._sock.sendto((0xF000).to_bytes(2, "big"), dest) self.status.emit("Приём данных...") data = self._recv_data() if self._stop_requested: self.status.emit("Операция остановлена") return self.data_ready.emit(data) self.status.emit(f"Получено samples: {len(data)}") except Exception as e: if not self._stop_requested: self.error.emit(str(e)) finally: if self._sock is not None: try: self._sock.close() except OSError: pass self.finished.emit() def _format_ctrl_data(self) -> bytes: output = bytearray() output += 0b10001000.to_bytes(1, "little") pulse_period_adc = ( int(self.config.pulse_period * self.config.adc_dac_ratio) // self.config.window_size ) * self.config.window_size output += self.config.pulse_width.to_bytes(4, "little") output += self.config.pulse_period.to_bytes(4, "little") output += self.config.pulse_num.to_bytes(2, "little") output += self.config.pulse_height.to_bytes(2, "little") output += pulse_period_adc.to_bytes(4, "little") if len(output) != 17: raise ValueError("Config data should be 128 bits + 8 bit header") return bytes(output) def _recv_data(self) -> list[int]: packet_count = math.ceil( ( self.config.adc_dac_ratio * self.config.pulse_period / self.config.window_size * self.config.data_width ) / self.config.packet_size ) expected_length = math.ceil( self.config.adc_dac_ratio * self.config.pulse_period / self.config.window_size ) recv_buf = [] for pkt_cnt in range(packet_count): if self._stop_requested: break try: packet, _ = self._sock.recvfrom(65536) except socket.timeout: raise TimeoutError(f"Таймаут приёма UDP-пакета #{pkt_cnt + 1}") if len(packet) % self.config.data_width != 0: raise ValueError( f"Некорректный размер UDP-пакета: {len(packet)} байт" ) for i in range(0, len(packet), self.config.data_width): sample = int.from_bytes( packet[i:i + self.config.data_width], "little", ) recv_buf.append(sample) if len(recv_buf) < expected_length: raise ValueError( f"Data underflow: получено {len(recv_buf)}, ожидалось {expected_length}" ) return recv_buf[:expected_length - 1] def _validate_config(self): if self.config.pulse_period <= 0: raise ValueError("pulse_period должен быть больше 0") if self.config.pulse_num <= 0: raise ValueError("pulse_num должен быть больше 0") if self.config.window_size <= 0: raise ValueError("window_size должен быть больше 0") if self.config.packet_size <= 0: raise ValueError("packet_size должен быть больше 0") if self.config.data_width <= 0: raise ValueError("data_width должен быть больше 0") if self.config.pulse_period % self.config.window_size != 0: raise ValueError("pulse_period должен быть кратен window_size") if self.config.pulse_width >= 2**32 - 1: raise ValueError("pulse_width слишком большой") if self.config.pulse_period >= 2**32 - 1: raise ValueError("pulse_period слишком большой") if self.config.pulse_num >= 2**16 - 1: raise ValueError("pulse_num слишком большой") if self.config.pulse_height > 2**self.config.dac_bits - 1: raise ValueError("pulse_height слишком большой") class MainWindow(QMainWindow): def __init__(self): super().__init__() uic.loadUi("reflectometer.ui", self) self.ping_process = None self.ping_timeout_timer = QTimer(self) self.ping_timeout_timer.setSingleShot(True) self.ping_timeout_timer.timeout.connect(self.on_ping_timeout) self.button_ping.clicked.connect(self.check_ping) # settings self.pulse_period = 0 self.pulse_height = 0 self.pulse_width = 0 self.pulse_num = 0 self.dac_dw = 14 self.adc_dw = 12 self.nmax = 4096 self.packet_size = 1024 self.window_size = 65 self.adc_dac_ration = 0.52 self.accum_width = 32 # setup self.setup_pulse_controls() self.setup_global_settings() self.update_pulse_limits() self.data = [] self.adc_dac_ratio = 0.52 self.measurement_thread = None self.measurement_worker = None self.setup_graph() self.setup_network_settings() self.button_start.clicked.connect(self.run_measurement) # ping utils def check_ping(self): ip = self.line_ip.text().strip() if not ip: self.label_ping_status.setText("set ip!!") return if "_" in self.line_ip.displayText(): self.label_ping_status.setText("IP invalid") return if self.ping_process is not None: if self.ping_process.state() != QProcess.ProcessState.NotRunning: self.label_ping_status.setText("Ping inflight") return self.label_ping_status.setText("ping...") self.button_ping.setEnabled(False) self.ping_process = QProcess(self) self.ping_process.finished.connect(self.on_ping_finished) self.ping_process.errorOccurred.connect(self.on_ping_error) system_name = platform.system().lower() if system_name == "windows": program = "ping" arguments = ["-n", "1", "-w", "2000", ip] else: program = "ping" arguments = ["-c", "1", "-W", "2", ip] self.ping_process.start(program, arguments) # fallback self.ping_timeout_timer.start(2000) def on_ping_finished(self, exit_code, exit_status): self.ping_timeout_timer.stop() self.button_ping.setEnabled(True) if exit_code == 0: self.label_ping_status.setText("алё✅") else: self.label_ping_status.setText("не алё❌") def on_ping_error(self): self.ping_timeout_timer.stop() self.button_ping.setEnabled(True) self.label_ping_status.setText("ping unavail") def on_ping_timeout(self): if self.ping_process is not None: if self.ping_process.state() != QProcess.ProcessState.NotRunning: self.ping_process.kill() self.button_ping.setEnabled(True) # pulse controls def setup_pulse_controls(self): self._bind_slider_and_spinbox( name="pulse_period", slider=self.slider_pulse_period, box=self.box_pulse_period, normalize_value=self.normalize_pulse_period, ) self._bind_slider_and_spinbox( name="pulse_height", slider=self.slider_pulse_height, box=self.box_pulse_height, ) self._bind_slider_and_spinbox( name="pulse_width", slider=self.slider_pulse_width, box=self.box_pulse_width, ) self._bind_slider_and_spinbox( name="pulse_num", slider=self.slider_pulse_num, box=self.box_pulse_num, ) def _bind_slider_and_spinbox(self, name, slider, box, normalize_value=None): """ Связывает QSlider и QSpinBox по значению. Значение автоматически записывается в self.. """ minimum = min(slider.minimum(), box.minimum()) maximum = max(slider.maximum(), box.maximum()) slider.setRange(minimum, maximum) box.setRange(minimum, maximum) def normalize(value): if normalize_value is None: return value return normalize_value(value) value = normalize(box.value()) slider.setValue(value) box.setValue(value) setattr(self, name, value) def update_value(new_value): new_value = normalize(new_value) if slider.value() != new_value: slider.setValue(new_value) if box.value() != new_value: box.setValue(new_value) setattr(self, name, new_value) slider.valueChanged.connect(update_value) box.valueChanged.connect(update_value) def normalize_pulse_period(self, value): step = max(1, getattr(self, "window_size", self.box_window_size.value())) snapped_value = round(value / step) * step minimum = self.box_pulse_period.minimum() maximum = self.box_pulse_period.maximum() return max(minimum, min(snapped_value, maximum)) def _set_max_for_pair(self, slider, box, maximum): slider.setMaximum(maximum) box.setMaximum(maximum) value = min(box.value(), maximum) box.setValue(value) slider.setValue(value) def set_max_pulse_period(self, maximum): self._set_max_for_pair( slider=self.slider_pulse_period, box=self.box_pulse_period, maximum=maximum, ) self.pulse_period = self.box_pulse_period.value() def set_max_pulse_height(self, maximum): self._set_max_for_pair( slider=self.slider_pulse_height, box=self.box_pulse_height, maximum=maximum, ) self.pulse_height = self.box_pulse_height.value() def set_max_pulse_width(self, maximum): self._set_max_for_pair( slider=self.slider_pulse_width, box=self.box_pulse_width, maximum=maximum, ) self.pulse_width = self.box_pulse_width.value() def set_max_pulse_num(self, maximum): self._set_max_for_pair( slider=self.slider_pulse_num, box=self.box_pulse_num, maximum=maximum, ) self.pulse_num = self.box_pulse_num.value() # settings def setup_global_settings(self): self._bind_spinbox_setting( name="dac_dw", box=self.box_dac_dw, ) self._bind_spinbox_setting( name="adc_dw", box=self.box_adc_dw, ) self._bind_spinbox_setting( name="nmax", box=self.box_nmax, ) self._bind_spinbox_setting( name="window_size", box=self.box_window_size, after_change=self.on_window_size_changed, ) self._bind_spinbox_setting( name="packet_size", box=self.box_packet_size, ) self._bind_spinbox_setting( name="adc_dac_ratio", box=self.box_adc_dac_ratio, ) self._bind_spinbox_setting( name="accum_width", box=self.box_accum_width, ) self._bind_spinbox_setting( name="recv_port", box=self.box_recv_port, ) self._bind_spinbox_setting( name="send_port", box=self.box_send_port, ) # применяем шаг для pulse_period сразу при старте self.update_pulse_period_step() def _bind_spinbox_setting(self, name, box, after_change=None): """ Связывает QSpinBox с полем self.. Например: box_dac_dw -> self.dac_dw box_window_size -> self.window_size """ value = box.value() setattr(self, name, value) def on_value_changed(new_value): setattr(self, name, new_value) self.update_pulse_limits() if after_change is not None: after_change(new_value) box.valueChanged.connect(on_value_changed) def update_pulse_limits(self): # re-calc limits # nmax -> pulse_period limit self.set_max_pulse_period(self.nmax * self.window_size) self.set_max_pulse_width(self.nmax * self.window_size) # accum_width + adc_width -> max pulse num self.set_max_pulse_num( 2 ** (self.accum_width - self.adc_dw - math.ceil(math.log2(self.window_size))) - 1) # dac_width -> max pulse height self.set_max_pulse_height(2 ** self.dac_dw - 1) self.slider_pulse_period.setMinimum(self.window_size) self.box_pulse_period.setMinimum(self.window_size) def on_window_size_changed(self, new_value): self.update_pulse_period_step() def update_pulse_period_step(self): # set window_size step step = max(1, self.window_size) self.box_pulse_period.setSingleStep(step) self.slider_pulse_period.setSingleStep(step) self.slider_pulse_period.setPageStep(step) self.snap_pulse_period_to_step(step) def snap_pulse_period_to_step(self, step): """ Подгоняет текущее значение pulse_period к ближайшему кратному window_size. Это нужно потому, что QSlider при перетаскивании мышкой всё равно может дать любое промежуточное значение. """ current_value = self.box_pulse_period.value() snapped_value = round(current_value / step) * step minimum = self.box_pulse_period.minimum() maximum = self.box_pulse_period.maximum() snapped_value = max(minimum, min(snapped_value, maximum)) self.box_pulse_period.setValue(snapped_value) self.slider_pulse_period.setValue(snapped_value) self.pulse_period = snapped_value # graph def setup_graph(self): self.graph_widget = pg.PlotWidget() self.graph_widget.setLabel("left", "ADC value") self.graph_widget.setLabel("bottom", "Sample") self.graph_widget.showGrid(x=True, y=True) self.graph_curve = self.graph_widget.plot( [], name="Data", ) self.reference_curve = self.graph_widget.plot( [], name="Reference", ) self.graph_layout.addWidget(self.graph_widget) self.graph_curve = self.graph_widget.plot( [], pen=pg.mkPen(width=2, color="b")) self.reference_curve = self.graph_widget.plot( [], pen=pg.mkPen(style=Qt.PenStyle.DashLine, color="g")) self.checkbox_draw_reference.stateChanged.connect( self.update_reference_graph) def setup_network_settings(self): self._bind_spinbox_setting( name="recv_port", box=self.box_recv_port, ) self._bind_spinbox_setting( name="send_port", box=self.box_send_port, ) def run_measurement(self): if self.measurement_thread is not None: if self.measurement_thread.isRunning(): self.set_measurement_status("Измерение выполняется") return config = self.build_reflectometer_config() self.data = [] self.graph_curve.setData([]) self.measurement_thread = QThread(self) self.measurement_worker = ReflectometerWorker(config) self.measurement_worker.moveToThread(self.measurement_thread) self.measurement_thread.started.connect(self.measurement_worker.run) self.measurement_worker.status.connect(self.set_measurement_status) self.measurement_worker.error.connect(self.on_measurement_error) self.measurement_worker.data_ready.connect(self.on_data_received) self.measurement_worker.finished.connect(self.measurement_thread.quit) self.measurement_worker.finished.connect( self.measurement_worker.deleteLater) self.measurement_thread.finished.connect( self.measurement_thread.deleteLater) self.measurement_thread.finished.connect(self.on_measurement_finished) self.measurement_thread.start() def build_reflectometer_config(self) -> ReflectometerConfig: ip = self.line_ip.text().strip() if not ip: raise ValueError("IP адрес не задан") data_width = self.accum_width // 8 return ReflectometerConfig( ip=ip, send_port=self.send_port, recv_port=self.recv_port, dac_bits=self.dac_dw, data_width=data_width, window_size=self.window_size, packet_size=self.packet_size, pulse_width=self.pulse_width, pulse_period=self.pulse_period, pulse_height=self.pulse_height, pulse_num=self.pulse_num, adc_dac_ratio=self.adc_dac_ratio, ) def on_data_received(self, data: list[int]): self.data = data # normalize for i in range(len(data)): self.data[i] /= (self.window_size * self.pulse_num) self.data[i] -= 2 ** (self.adc_dw - 1) + 1 self.draw_main_graph() self.update_reference_graph() if data: self.set_measurement_status( f"Готово. smp: {len(data)}, min: {min(data)}, max: {max(data)}" ) else: self.set_measurement_status("Данные пустые") def on_measurement_error(self, message: str): self.set_measurement_status(f"Ошибка: {message}") def on_measurement_finished(self): self.measurement_worker = None self.measurement_thread = None def stop_measurement(self): if self.measurement_worker is not None: self.measurement_worker.stop() def set_measurement_status(self, text: str): self.label_status.setText(text) def draw_main_graph(self): if not self.data: self.graph_curve.setData([]) return x = list(range(len(self.data))) self.graph_curve.setData(x, self.data) def update_reference_graph(self): """ Рисует или очищает эталонный график. Вызывается после получения данных и при переключении checkbox_draw_reference. """ if not self.checkbox_draw_reference.isChecked(): self.reference_curve.setData([]) return if not self.data: self.reference_curve.setData([]) return reference_data = self.build_reference_data(len(self.data)) if not reference_data: self.reference_curve.setData([]) return x = list(range(len(reference_data))) self.reference_curve.setData(x, reference_data) def build_reference_data(self, length: int) -> list[int]: reference = [0] * length actual_pulse_width = round( (self.pulse_width * self.adc_dac_ratio) / self.window_size) reference[0:actual_pulse_width] = [ self.pulse_height - 2 ** (self.adc_dw - 1) + 1, ] * (actual_pulse_width - 1) return reference def main(): app = QApplication(sys.argv) window = MainWindow() window.show() sys.exit(app.exec()) if __name__ == "__main__": main()