""" Shared Memory Frame Buffer Reader This module provides a Python interface to read JPEG frames from the SharedMemoryFrameBuffer created by the C++ beacon tracking application. """ import struct import mmap import posix_ipc from typing import Optional, Tuple from dataclasses import dataclass @dataclass class FrameHeader: """Header structure for a single frame""" frame_number: int data_size: int timestamp_us: int width: int height: int class SharedMemoryFrameReader: """ Reader for SharedMemoryFrameBuffer This class maps the shared memory segment and provides methods to read the latest JPEG frame data. """ MAGIC_VALUE = 0x4246424D # 'BFBM' (Beacon Frame Buffer Memory) CURRENT_VERSION = 1 def __init__(self, name: str = "BeaconFrameBuffer"): """ Initialize the shared memory reader Args: name: Name of the shared memory segment """ self.name = name self.shm = None self.mmap = None self.header_size = None # Will be read from shared memory self.frame_header_size = 24 # Size of FrameHeader (5 * uint32 + int64) self._connect() def _connect(self): """Connect to the shared memory segment""" try: # Open existing shared memory self.shm = posix_ipc.SharedMemory(self.name, flags=0, mode=0o600) # Map the shared memory self.mmap = mmap.mmap(self.shm.fd, 0, access=mmap.ACCESS_READ) # Validate the shared memory segment self._validate_segment() except posix_ipc.ExistentialError: raise RuntimeError(f"Shared memory segment '{self.name}' does not exist. " "Make sure the C++ application is running.") except Exception as e: self._cleanup() raise RuntimeError(f"Failed to connect to shared memory: {e}") def _validate_segment(self): """Validate the shared memory segment header""" min_header_size = 64 # Minimum expected header size if len(self.mmap) < min_header_size: raise RuntimeError("Shared memory segment too small") # Read magic value (offset 0, uint32) magic = struct.unpack_from('I', self.mmap, 0)[0] if magic != self.MAGIC_VALUE: raise RuntimeError(f"Invalid magic value: {hex(magic)} (expected {hex(self.MAGIC_VALUE)})") # Read version (offset 4, uint32) version = struct.unpack_from('I', self.mmap, 4)[0] if version != self.CURRENT_VERSION: raise RuntimeError(f"Version mismatch: {version} (expected {self.CURRENT_VERSION})") # Read headerSize (offset 8, uint64) self.header_size = struct.unpack_from('Q', self.mmap, 8)[0] if self.header_size < min_header_size or self.header_size > 4096: raise RuntimeError(f"Invalid header size: {self.header_size}") def read_frame(self) -> Optional[Tuple[FrameHeader, bytes]]: """ Read the latest frame from shared memory Returns: Tuple of (FrameHeader, jpeg_data) if a frame is available, None otherwise """ if not self.mmap: return None try: # Read hasFrame flag (offset 32, atomic uint32) has_frame = struct.unpack_from('I', self.mmap, 32)[0] if has_frame == 0: return None # Use seqlock-style reading for consistency max_retries = 100 for _ in range(max_retries): # Read sequence number (offset 24, atomic uint64) seq_begin = struct.unpack_from('Q', self.mmap, 24)[0] # Check if writer is in progress (odd sequence number) if seq_begin & 1: continue # Read frame header starting at offset header_size frame_header_offset = self.header_size frame_number, data_size, timestamp_us, width, height = struct.unpack_from( 'IIqII', self.mmap, frame_header_offset ) # Read JPEG data data_offset = self.header_size + self.frame_header_size jpeg_data = bytes(self.mmap[data_offset:data_offset + data_size]) # Read sequence number again seq_end = struct.unpack_from('Q', self.mmap, 24)[0] # Check if sequence didn't change during read if seq_begin == seq_end and not (seq_end & 1): header = FrameHeader( frame_number=frame_number, data_size=data_size, timestamp_us=timestamp_us, width=width, height=height ) return header, jpeg_data # Failed to read after max retries return None except Exception as e: print(f"Error reading frame: {e}") return None def _cleanup(self): """Clean up resources""" if self.mmap: self.mmap.close() self.mmap = None if self.shm: self.shm.close_fd() self.shm = None def close(self): """Close the shared memory connection""" self._cleanup() def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() return False def main(): """Test the shared memory reader""" print("Connecting to shared memory...") try: with SharedMemoryFrameReader() as reader: print("Connected successfully!") print("\nReading frames (Ctrl+C to stop)...") import time frame_count = 0 while True: result = reader.read_frame() if result: header, jpeg_data = result frame_count += 1 print(f"\rFrame {frame_count}: #{header.frame_number}, " f"{len(jpeg_data)} bytes, {header.width}x{header.height}", end='', flush=True) else: print("\rWaiting for frames...", end='', flush=True) time.sleep(0.033) # ~30 FPS except KeyboardInterrupt: print("\n\nStopped by user") except Exception as e: print(f"\nError: {e}") if __name__ == '__main__': main()