200 lines
6.4 KiB
Python
200 lines
6.4 KiB
Python
"""
|
|
Shared Memory Frame Buffer Reader
|
|
|
|
This module provides a Python interface to read JPEG frames from the
|
|
SharedMemoryFrameBuffer created by the C++ beacon tracking application.
|
|
"""
|
|
|
|
import struct
|
|
import mmap
|
|
import posix_ipc
|
|
from typing import Optional, Tuple
|
|
from dataclasses import dataclass
|
|
|
|
|
|
@dataclass
|
|
class FrameHeader:
|
|
"""Header structure for a single frame"""
|
|
frame_number: int
|
|
data_size: int
|
|
timestamp_us: int
|
|
width: int
|
|
height: int
|
|
|
|
|
|
class SharedMemoryFrameReader:
|
|
"""
|
|
Reader for SharedMemoryFrameBuffer
|
|
|
|
This class maps the shared memory segment and provides methods to read
|
|
the latest JPEG frame data.
|
|
"""
|
|
|
|
MAGIC_VALUE = 0x4246424D # 'BFBM' (Beacon Frame Buffer Memory)
|
|
CURRENT_VERSION = 1
|
|
|
|
def __init__(self, name: str = "BeaconFrameBuffer"):
|
|
"""
|
|
Initialize the shared memory reader
|
|
|
|
Args:
|
|
name: Name of the shared memory segment
|
|
"""
|
|
self.name = name
|
|
self.shm = None
|
|
self.mmap = None
|
|
self.header_size = None # Will be read from shared memory
|
|
self.frame_header_size = 24 # Size of FrameHeader (5 * uint32 + int64)
|
|
|
|
self._connect()
|
|
|
|
def _connect(self):
|
|
"""Connect to the shared memory segment"""
|
|
try:
|
|
# Open existing shared memory
|
|
self.shm = posix_ipc.SharedMemory(self.name, flags=0, mode=0o600)
|
|
|
|
# Map the shared memory
|
|
self.mmap = mmap.mmap(self.shm.fd, 0, access=mmap.ACCESS_READ)
|
|
|
|
# Validate the shared memory segment
|
|
self._validate_segment()
|
|
|
|
except posix_ipc.ExistentialError:
|
|
raise RuntimeError(f"Shared memory segment '{self.name}' does not exist. "
|
|
"Make sure the C++ application is running.")
|
|
except Exception as e:
|
|
self._cleanup()
|
|
raise RuntimeError(f"Failed to connect to shared memory: {e}")
|
|
|
|
def _validate_segment(self):
|
|
"""Validate the shared memory segment header"""
|
|
min_header_size = 64 # Minimum expected header size
|
|
if len(self.mmap) < min_header_size:
|
|
raise RuntimeError("Shared memory segment too small")
|
|
|
|
# Read magic value (offset 0, uint32)
|
|
magic = struct.unpack_from('I', self.mmap, 0)[0]
|
|
if magic != self.MAGIC_VALUE:
|
|
raise RuntimeError(f"Invalid magic value: {hex(magic)} (expected {hex(self.MAGIC_VALUE)})")
|
|
|
|
# Read version (offset 4, uint32)
|
|
version = struct.unpack_from('I', self.mmap, 4)[0]
|
|
if version != self.CURRENT_VERSION:
|
|
raise RuntimeError(f"Version mismatch: {version} (expected {self.CURRENT_VERSION})")
|
|
|
|
# Read headerSize (offset 8, uint64)
|
|
self.header_size = struct.unpack_from('Q', self.mmap, 8)[0]
|
|
if self.header_size < min_header_size or self.header_size > 4096:
|
|
raise RuntimeError(f"Invalid header size: {self.header_size}")
|
|
|
|
def read_frame(self) -> Optional[Tuple[FrameHeader, bytes]]:
|
|
"""
|
|
Read the latest frame from shared memory
|
|
|
|
Returns:
|
|
Tuple of (FrameHeader, jpeg_data) if a frame is available, None otherwise
|
|
"""
|
|
if not self.mmap:
|
|
return None
|
|
|
|
try:
|
|
# Read hasFrame flag (offset 32, atomic uint32)
|
|
has_frame = struct.unpack_from('I', self.mmap, 32)[0]
|
|
if has_frame == 0:
|
|
return None
|
|
|
|
# Use seqlock-style reading for consistency
|
|
max_retries = 100
|
|
for _ in range(max_retries):
|
|
# Read sequence number (offset 24, atomic uint64)
|
|
seq_begin = struct.unpack_from('Q', self.mmap, 24)[0]
|
|
|
|
# Check if writer is in progress (odd sequence number)
|
|
if seq_begin & 1:
|
|
continue
|
|
|
|
# Read frame header starting at offset header_size
|
|
frame_header_offset = self.header_size
|
|
frame_number, data_size, timestamp_us, width, height = struct.unpack_from(
|
|
'IIqII', self.mmap, frame_header_offset
|
|
)
|
|
|
|
# Read JPEG data
|
|
data_offset = self.header_size + self.frame_header_size
|
|
jpeg_data = bytes(self.mmap[data_offset:data_offset + data_size])
|
|
|
|
# Read sequence number again
|
|
seq_end = struct.unpack_from('Q', self.mmap, 24)[0]
|
|
|
|
# Check if sequence didn't change during read
|
|
if seq_begin == seq_end and not (seq_end & 1):
|
|
header = FrameHeader(
|
|
frame_number=frame_number,
|
|
data_size=data_size,
|
|
timestamp_us=timestamp_us,
|
|
width=width,
|
|
height=height
|
|
)
|
|
return header, jpeg_data
|
|
|
|
# Failed to read after max retries
|
|
return None
|
|
|
|
except Exception as e:
|
|
print(f"Error reading frame: {e}")
|
|
return None
|
|
|
|
def _cleanup(self):
|
|
"""Clean up resources"""
|
|
if self.mmap:
|
|
self.mmap.close()
|
|
self.mmap = None
|
|
if self.shm:
|
|
self.shm.close_fd()
|
|
self.shm = None
|
|
|
|
def close(self):
|
|
"""Close the shared memory connection"""
|
|
self._cleanup()
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
self.close()
|
|
return False
|
|
|
|
|
|
def main():
|
|
"""Test the shared memory reader"""
|
|
print("Connecting to shared memory...")
|
|
try:
|
|
with SharedMemoryFrameReader() as reader:
|
|
print("Connected successfully!")
|
|
print("\nReading frames (Ctrl+C to stop)...")
|
|
|
|
import time
|
|
frame_count = 0
|
|
while True:
|
|
result = reader.read_frame()
|
|
if result:
|
|
header, jpeg_data = result
|
|
frame_count += 1
|
|
print(f"\rFrame {frame_count}: #{header.frame_number}, "
|
|
f"{len(jpeg_data)} bytes, {header.width}x{header.height}",
|
|
end='', flush=True)
|
|
else:
|
|
print("\rWaiting for frames...", end='', flush=True)
|
|
|
|
time.sleep(0.033) # ~30 FPS
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n\nStopped by user")
|
|
except Exception as e:
|
|
print(f"\nError: {e}")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|