107 lines
3.5 KiB
Python
107 lines
3.5 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Эмулятор серийного порта: воспроизводит лог-файл в цикле через PTY.
|
||
|
||
Использование:
|
||
python3 replay_pty.py my_picocom_logfile.txt
|
||
python3 replay_pty.py my_picocom_logfile.txt --pty /tmp/ttyVIRT0
|
||
python3 replay_pty.py my_picocom_logfile.txt --speed 2.0 # в 2 раза быстрее реального
|
||
python3 replay_pty.py my_picocom_logfile.txt --speed 0 # максимально быстро
|
||
|
||
Затем в другом терминале:
|
||
python -m rfg_adc_plotter.main /tmp/ttyVIRT0
|
||
"""
|
||
|
||
import argparse
|
||
import os
|
||
import sys
|
||
import time
|
||
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(
|
||
description="Воспроизводит лог-файл через PTY как виртуальный серийный порт."
|
||
)
|
||
parser.add_argument("file", help="Путь к лог-файлу (например my_picocom_logfile.txt)")
|
||
parser.add_argument(
|
||
"--pty",
|
||
default="/tmp/ttyVIRT0",
|
||
help="Путь симлинка PTY (по умолчанию /tmp/ttyVIRT0)",
|
||
)
|
||
parser.add_argument(
|
||
"--speed",
|
||
type=float,
|
||
default=1.0,
|
||
help=(
|
||
"Множитель скорости воспроизведения: "
|
||
"1.0 = реальное время при --baud, "
|
||
"2.0 = вдвое быстрее, "
|
||
"0 = максимально быстро"
|
||
),
|
||
)
|
||
parser.add_argument(
|
||
"--baud",
|
||
type=int,
|
||
default=115200,
|
||
help="Скорость (бод) для расчёта задержек (по умолчанию 115200)",
|
||
)
|
||
args = parser.parse_args()
|
||
|
||
if not os.path.isfile(args.file):
|
||
sys.stderr.write(f"[error] Файл не найден: {args.file}\n")
|
||
sys.exit(1)
|
||
|
||
# Открываем PTY-пару: master (мы пишем) / slave (GUI читает)
|
||
master_fd, slave_fd = os.openpty()
|
||
slave_path = os.ttyname(slave_fd)
|
||
os.close(slave_fd) # GUI откроет slave сам по симлинку
|
||
|
||
# Симлинк с удобным именем
|
||
try:
|
||
os.unlink(args.pty)
|
||
except FileNotFoundError:
|
||
pass
|
||
os.symlink(slave_path, args.pty)
|
||
|
||
print(f"PTY slave : {slave_path}")
|
||
print(f"Симлинк : {args.pty} → {slave_path}")
|
||
print(f"Запустите : python -m rfg_adc_plotter.main {args.pty}")
|
||
print("Ctrl+C для остановки.\n")
|
||
|
||
# Задержка на байт: 10 бит (8N1) / скорость / множитель
|
||
if args.speed > 0:
|
||
bytes_per_sec = args.baud / 10.0 * args.speed
|
||
delay_per_byte = 1.0 / bytes_per_sec
|
||
else:
|
||
delay_per_byte = 0.0
|
||
|
||
_CHUNK = 4096
|
||
loop = 0
|
||
try:
|
||
while True:
|
||
loop += 1
|
||
print(f"[loop {loop}] {args.file}")
|
||
with open(args.file, "rb") as f:
|
||
while True:
|
||
chunk = f.read(_CHUNK)
|
||
if not chunk:
|
||
break
|
||
os.write(master_fd, chunk)
|
||
if delay_per_byte > 0:
|
||
time.sleep(delay_per_byte * len(chunk))
|
||
except KeyboardInterrupt:
|
||
print("\nОстановлено.")
|
||
finally:
|
||
try:
|
||
os.unlink(args.pty)
|
||
except Exception:
|
||
pass
|
||
try:
|
||
os.close(master_fd)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|