#!/usr/bin/env python3 """ Half-duplex serial bridge & logger. - Open two fds: host_fd: readable/writable endpoint the app uses (VIRT_DEV) dev_fd: real serial device (REAL_DEV) - Relay bytes between them. - Build one log record per *directional burst*: - When bytes arrive from host: start/continue a TX frame. - When first byte arrives from device: flush TX (if any), start RX frame. - And vice versa. - Binary log format: MAGIC "VNALOG1\n" Then repeated records: [1 byte dir][4 bytes len (big-endian)][payload] dir 0x01 = host→device ('>'), 0x00 = device→host ('<') """ import os, sys, fcntl, select, struct MAGIC = b"VNALOG1\n" DIR_TX = 0x01 # host→device DIR_RX = 0x00 # device→host if len(sys.argv) != 4: print("usage: bridge_logger.py VIRT_DEV REAL_DEV LOGFILE", file=sys.stderr) sys.exit(2) VIRT_DEV, REAL_DEV, LOGFILE = sys.argv[1:4] # Open endpoints non-blocking, read+write host_fd = os.open(VIRT_DEV, os.O_RDWR | os.O_NONBLOCK) dev_fd = os.open(REAL_DEV, os.O_RDWR | os.O_NONBLOCK) # Prepare log (write header if empty) log_fd = os.open(LOGFILE, os.O_CREAT | os.O_RDWR, 0o644) if os.lseek(log_fd, 0, os.SEEK_END) == 0: os.write(log_fd, MAGIC) def set_nonblock(fd): fl = fcntl.fcntl(fd, fcntl.F_GETFL) if not (fl & os.O_NONBLOCK): fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) set_nonblock(host_fd); set_nonblock(dev_fd) poll = select.poll() poll.register(host_fd, select.POLLIN) poll.register(dev_fd, select.POLLIN) # Current frame state cur_dir = None # None / DIR_TX / DIR_RX buf = bytearray() READ_CHUNK = 65536 def write_all(fd, data: bytes): """Non-blocking write-all with POLLOUT assistance.""" view = memoryview(data) off = 0 while off < len(view): try: n = os.write(fd, view[off:]) if n is None: n = 0 except BlockingIOError: # Wait for fd to become writable p = select.poll(); p.register(fd, select.POLLOUT) p.poll(1000) continue off += n def flush_record(): """If a frame is open, append it as one binary record and reset buffer.""" global cur_dir, buf if cur_dir is None or not buf: return # dir byte + 4-byte big-endian length + payload rec = bytes([cur_dir]) + struct.pack(">I", len(buf)) + bytes(buf) # Single write is fine (kernel will split if huge) os.write(log_fd, rec) buf.clear() cur_dir = None try: while True: events = poll.poll(1000) # 1s tick; framing doesn't depend on idle if not events: # If nothing is happening for a while, we could flush a tail, # but with half-duplex the tail will be flushed on direction switch or exit. continue for (fd, ev) in events: if not (ev & select.POLLIN): continue try: chunk = os.read(fd, READ_CHUNK) except BlockingIOError: continue if not chunk: # EOF on one side: flush and exit flush_record() sys.exit(0) if fd is host_fd: # Incoming from host: forward to device if cur_dir is None: cur_dir = DIR_TX elif cur_dir != DIR_TX: # Direction switched: flush previous RX frame, start TX flush_record() cur_dir = DIR_TX buf.extend(chunk) write_all(dev_fd, chunk) else: # Incoming from device: forward to host if cur_dir is None: cur_dir = DIR_RX elif cur_dir != DIR_RX: # Direction switched: flush previous TX frame, start RX flush_record() cur_dir = DIR_RX buf.extend(chunk) write_all(host_fd, chunk) except KeyboardInterrupt: pass finally: # Final flush and cleanup flush_record() try: os.close(host_fd) except: pass try: os.close(dev_fd) except: pass try: os.close(log_fd) except: pass