#!/usr/bin/env python3 """ Эмулятор серийного порта: воспроизводит лог-файл в цикле через PTY. Использование: python3 replay_pty.py my_picocom_logfile.txt python3 replay_pty.py my_picocom_logfile.txt --pty /tmp/ttyVIRT0 python3 replay_pty.py my_picocom_logfile.txt --speed 2.0 # в 2 раза быстрее реального python3 replay_pty.py my_picocom_logfile.txt --speed 0 # максимально быстро Затем в другом терминале: python -m rfg_adc_plotter.main /tmp/ttyVIRT0 """ import argparse import os import sys import time def main(): parser = argparse.ArgumentParser( description="Воспроизводит лог-файл через PTY как виртуальный серийный порт." ) parser.add_argument("file", help="Путь к лог-файлу (например my_picocom_logfile.txt)") parser.add_argument( "--pty", default="/tmp/ttyVIRT0", help="Путь симлинка PTY (по умолчанию /tmp/ttyVIRT0)", ) parser.add_argument( "--speed", type=float, default=1.0, help=( "Множитель скорости воспроизведения: " "1.0 = реальное время при --baud, " "2.0 = вдвое быстрее, " "0 = максимально быстро" ), ) parser.add_argument( "--baud", type=int, default=115200, help="Скорость (бод) для расчёта задержек (по умолчанию 115200)", ) args = parser.parse_args() if not os.path.isfile(args.file): sys.stderr.write(f"[error] Файл не найден: {args.file}\n") sys.exit(1) # Открываем PTY-пару: master (мы пишем) / slave (GUI читает) master_fd, slave_fd = os.openpty() slave_path = os.ttyname(slave_fd) os.close(slave_fd) # GUI откроет slave сам по симлинку # Симлинк с удобным именем try: os.unlink(args.pty) except FileNotFoundError: pass os.symlink(slave_path, args.pty) print(f"PTY slave : {slave_path}") print(f"Симлинк : {args.pty} → {slave_path}") print(f"Запустите : python -m rfg_adc_plotter.main {args.pty}") print("Ctrl+C для остановки.\n") # Задержка на байт: 10 бит (8N1) / скорость / множитель if args.speed > 0: bytes_per_sec = args.baud / 10.0 * args.speed delay_per_byte = 1.0 / bytes_per_sec else: delay_per_byte = 0.0 loop = 0 try: while True: loop += 1 print(f"[loop {loop}] {args.file}") with open(args.file, "rb") as f: for line in f: os.write(master_fd, line) if delay_per_byte > 0: time.sleep(delay_per_byte * len(line)) except KeyboardInterrupt: print("\nОстановлено.") finally: try: os.unlink(args.pty) except Exception: pass try: os.close(master_fd) except Exception: pass if __name__ == "__main__": main()