131 lines
4.1 KiB
Python
Executable File
131 lines
4.1 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Half-duplex serial bridge & logger.
|
|
|
|
- Open two fds:
|
|
host_fd: readable/writable endpoint the app uses (VIRT_DEV)
|
|
dev_fd: real serial device (REAL_DEV)
|
|
- Relay bytes between them.
|
|
- Build one log record per *directional burst*:
|
|
- When bytes arrive from host: start/continue a TX frame.
|
|
- When first byte arrives from device: flush TX (if any), start RX frame.
|
|
- And vice versa.
|
|
- Binary log format:
|
|
MAGIC "VNALOG1\n"
|
|
Then repeated records: [1 byte dir][4 bytes len (big-endian)][payload]
|
|
dir 0x01 = host→device ('>'), 0x00 = device→host ('<')
|
|
"""
|
|
|
|
import os, sys, fcntl, select, struct
|
|
|
|
MAGIC = b"VNALOG1\n"
|
|
DIR_TX = 0x01 # host→device
|
|
DIR_RX = 0x00 # device→host
|
|
|
|
if len(sys.argv) != 4:
|
|
print("usage: bridge_logger.py VIRT_DEV REAL_DEV LOGFILE", file=sys.stderr)
|
|
sys.exit(2)
|
|
VIRT_DEV, REAL_DEV, LOGFILE = sys.argv[1:4]
|
|
|
|
# Open endpoints non-blocking, read+write
|
|
host_fd = os.open(VIRT_DEV, os.O_RDWR | os.O_NONBLOCK)
|
|
dev_fd = os.open(REAL_DEV, os.O_RDWR | os.O_NONBLOCK)
|
|
|
|
# Prepare log (write header if empty)
|
|
log_fd = os.open(LOGFILE, os.O_CREAT | os.O_RDWR, 0o644)
|
|
if os.lseek(log_fd, 0, os.SEEK_END) == 0:
|
|
os.write(log_fd, MAGIC)
|
|
|
|
def set_nonblock(fd):
|
|
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
|
|
if not (fl & os.O_NONBLOCK):
|
|
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
|
|
set_nonblock(host_fd); set_nonblock(dev_fd)
|
|
|
|
poll = select.poll()
|
|
poll.register(host_fd, select.POLLIN)
|
|
poll.register(dev_fd, select.POLLIN)
|
|
|
|
# Current frame state
|
|
cur_dir = None # None / DIR_TX / DIR_RX
|
|
buf = bytearray()
|
|
|
|
READ_CHUNK = 65536
|
|
|
|
def write_all(fd, data: bytes):
|
|
"""Non-blocking write-all with POLLOUT assistance."""
|
|
view = memoryview(data)
|
|
off = 0
|
|
while off < len(view):
|
|
try:
|
|
n = os.write(fd, view[off:])
|
|
if n is None: n = 0
|
|
except BlockingIOError:
|
|
# Wait for fd to become writable
|
|
p = select.poll(); p.register(fd, select.POLLOUT)
|
|
p.poll(1000)
|
|
continue
|
|
off += n
|
|
|
|
def flush_record():
|
|
"""If a frame is open, append it as one binary record and reset buffer."""
|
|
global cur_dir, buf
|
|
if cur_dir is None or not buf:
|
|
return
|
|
# dir byte + 4-byte big-endian length + payload
|
|
rec = bytes([cur_dir]) + struct.pack(">I", len(buf)) + bytes(buf)
|
|
# Single write is fine (kernel will split if huge)
|
|
os.write(log_fd, rec)
|
|
buf.clear()
|
|
cur_dir = None
|
|
|
|
try:
|
|
while True:
|
|
events = poll.poll(1000) # 1s tick; framing doesn't depend on idle
|
|
if not events:
|
|
# If nothing is happening for a while, we could flush a tail,
|
|
# but with half-duplex the tail will be flushed on direction switch or exit.
|
|
continue
|
|
for (fd, ev) in events:
|
|
if not (ev & select.POLLIN):
|
|
continue
|
|
try:
|
|
chunk = os.read(fd, READ_CHUNK)
|
|
except BlockingIOError:
|
|
continue
|
|
if not chunk:
|
|
# EOF on one side: flush and exit
|
|
flush_record()
|
|
sys.exit(0)
|
|
|
|
if fd is host_fd:
|
|
# Incoming from host: forward to device
|
|
if cur_dir is None:
|
|
cur_dir = DIR_TX
|
|
elif cur_dir != DIR_TX:
|
|
# Direction switched: flush previous RX frame, start TX
|
|
flush_record()
|
|
cur_dir = DIR_TX
|
|
buf.extend(chunk)
|
|
write_all(dev_fd, chunk)
|
|
else:
|
|
# Incoming from device: forward to host
|
|
if cur_dir is None:
|
|
cur_dir = DIR_RX
|
|
elif cur_dir != DIR_RX:
|
|
# Direction switched: flush previous TX frame, start RX
|
|
flush_record()
|
|
cur_dir = DIR_RX
|
|
buf.extend(chunk)
|
|
write_all(host_fd, chunk)
|
|
except KeyboardInterrupt:
|
|
pass
|
|
finally:
|
|
# Final flush and cleanup
|
|
flush_record()
|
|
try: os.close(host_fd)
|
|
except: pass
|
|
try: os.close(dev_fd)
|
|
except: pass
|
|
try: os.close(log_fd)
|
|
except: pass |