Files
vna_system/reference_data_acquisition/bridge_logger.py
2025-09-23 18:42:55 +03:00

131 lines
4.1 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Half-duplex serial bridge & logger.
- Open two fds:
host_fd: readable/writable endpoint the app uses (VIRT_DEV)
dev_fd: real serial device (REAL_DEV)
- Relay bytes between them.
- Build one log record per *directional burst*:
- When bytes arrive from host: start/continue a TX frame.
- When first byte arrives from device: flush TX (if any), start RX frame.
- And vice versa.
- Binary log format:
MAGIC "VNALOG1\n"
Then repeated records: [1 byte dir][4 bytes len (big-endian)][payload]
dir 0x01 = host→device ('>'), 0x00 = device→host ('<')
"""
import os, sys, fcntl, select, struct
MAGIC = b"VNALOG1\n"
DIR_TX = 0x01 # host→device
DIR_RX = 0x00 # device→host
if len(sys.argv) != 4:
print("usage: bridge_logger.py VIRT_DEV REAL_DEV LOGFILE", file=sys.stderr)
sys.exit(2)
VIRT_DEV, REAL_DEV, LOGFILE = sys.argv[1:4]
# Open endpoints non-blocking, read+write
host_fd = os.open(VIRT_DEV, os.O_RDWR | os.O_NONBLOCK)
dev_fd = os.open(REAL_DEV, os.O_RDWR | os.O_NONBLOCK)
# Prepare log (write header if empty)
log_fd = os.open(LOGFILE, os.O_CREAT | os.O_RDWR, 0o644)
if os.lseek(log_fd, 0, os.SEEK_END) == 0:
os.write(log_fd, MAGIC)
def set_nonblock(fd):
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
if not (fl & os.O_NONBLOCK):
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
set_nonblock(host_fd); set_nonblock(dev_fd)
poll = select.poll()
poll.register(host_fd, select.POLLIN)
poll.register(dev_fd, select.POLLIN)
# Current frame state
cur_dir = None # None / DIR_TX / DIR_RX
buf = bytearray()
READ_CHUNK = 65536
def write_all(fd, data: bytes):
"""Non-blocking write-all with POLLOUT assistance."""
view = memoryview(data)
off = 0
while off < len(view):
try:
n = os.write(fd, view[off:])
if n is None: n = 0
except BlockingIOError:
# Wait for fd to become writable
p = select.poll(); p.register(fd, select.POLLOUT)
p.poll(1000)
continue
off += n
def flush_record():
"""If a frame is open, append it as one binary record and reset buffer."""
global cur_dir, buf
if cur_dir is None or not buf:
return
# dir byte + 4-byte big-endian length + payload
rec = bytes([cur_dir]) + struct.pack(">I", len(buf)) + bytes(buf)
# Single write is fine (kernel will split if huge)
os.write(log_fd, rec)
buf.clear()
cur_dir = None
try:
while True:
events = poll.poll(1000) # 1s tick; framing doesn't depend on idle
if not events:
# If nothing is happening for a while, we could flush a tail,
# but with half-duplex the tail will be flushed on direction switch or exit.
continue
for (fd, ev) in events:
if not (ev & select.POLLIN):
continue
try:
chunk = os.read(fd, READ_CHUNK)
except BlockingIOError:
continue
if not chunk:
# EOF on one side: flush and exit
flush_record()
sys.exit(0)
if fd is host_fd:
# Incoming from host: forward to device
if cur_dir is None:
cur_dir = DIR_TX
elif cur_dir != DIR_TX:
# Direction switched: flush previous RX frame, start TX
flush_record()
cur_dir = DIR_TX
buf.extend(chunk)
write_all(dev_fd, chunk)
else:
# Incoming from device: forward to host
if cur_dir is None:
cur_dir = DIR_RX
elif cur_dir != DIR_RX:
# Direction switched: flush previous TX frame, start RX
flush_record()
cur_dir = DIR_RX
buf.extend(chunk)
write_all(host_fd, chunk)
except KeyboardInterrupt:
pass
finally:
# Final flush and cleanup
flush_record()
try: os.close(host_fd)
except: pass
try: os.close(dev_fd)
except: pass
try: os.close(log_fd)
except: pass