add stream_module

This commit is contained in:
awe
2025-11-18 12:26:48 +03:00
commit c67ac45617
1353 changed files with 253533 additions and 0 deletions

View File

@ -0,0 +1,199 @@
"""
Shared Memory Frame Buffer Reader
This module provides a Python interface to read JPEG frames from the
SharedMemoryFrameBuffer created by the C++ beacon tracking application.
"""
import struct
import mmap
import posix_ipc
from typing import Optional, Tuple
from dataclasses import dataclass
@dataclass
class FrameHeader:
"""Header structure for a single frame"""
frame_number: int
data_size: int
timestamp_us: int
width: int
height: int
class SharedMemoryFrameReader:
"""
Reader for SharedMemoryFrameBuffer
This class maps the shared memory segment and provides methods to read
the latest JPEG frame data.
"""
MAGIC_VALUE = 0x4246424D # 'BFBM' (Beacon Frame Buffer Memory)
CURRENT_VERSION = 1
def __init__(self, name: str = "BeaconFrameBuffer"):
"""
Initialize the shared memory reader
Args:
name: Name of the shared memory segment
"""
self.name = name
self.shm = None
self.mmap = None
self.header_size = None # Will be read from shared memory
self.frame_header_size = 24 # Size of FrameHeader (5 * uint32 + int64)
self._connect()
def _connect(self):
"""Connect to the shared memory segment"""
try:
# Open existing shared memory
self.shm = posix_ipc.SharedMemory(self.name, flags=0, mode=0o600)
# Map the shared memory
self.mmap = mmap.mmap(self.shm.fd, 0, access=mmap.ACCESS_READ)
# Validate the shared memory segment
self._validate_segment()
except posix_ipc.ExistentialError:
raise RuntimeError(f"Shared memory segment '{self.name}' does not exist. "
"Make sure the C++ application is running.")
except Exception as e:
self._cleanup()
raise RuntimeError(f"Failed to connect to shared memory: {e}")
def _validate_segment(self):
"""Validate the shared memory segment header"""
min_header_size = 64 # Minimum expected header size
if len(self.mmap) < min_header_size:
raise RuntimeError("Shared memory segment too small")
# Read magic value (offset 0, uint32)
magic = struct.unpack_from('I', self.mmap, 0)[0]
if magic != self.MAGIC_VALUE:
raise RuntimeError(f"Invalid magic value: {hex(magic)} (expected {hex(self.MAGIC_VALUE)})")
# Read version (offset 4, uint32)
version = struct.unpack_from('I', self.mmap, 4)[0]
if version != self.CURRENT_VERSION:
raise RuntimeError(f"Version mismatch: {version} (expected {self.CURRENT_VERSION})")
# Read headerSize (offset 8, uint64)
self.header_size = struct.unpack_from('Q', self.mmap, 8)[0]
if self.header_size < min_header_size or self.header_size > 4096:
raise RuntimeError(f"Invalid header size: {self.header_size}")
def read_frame(self) -> Optional[Tuple[FrameHeader, bytes]]:
"""
Read the latest frame from shared memory
Returns:
Tuple of (FrameHeader, jpeg_data) if a frame is available, None otherwise
"""
if not self.mmap:
return None
try:
# Read hasFrame flag (offset 32, atomic uint32)
has_frame = struct.unpack_from('I', self.mmap, 32)[0]
if has_frame == 0:
return None
# Use seqlock-style reading for consistency
max_retries = 100
for _ in range(max_retries):
# Read sequence number (offset 24, atomic uint64)
seq_begin = struct.unpack_from('Q', self.mmap, 24)[0]
# Check if writer is in progress (odd sequence number)
if seq_begin & 1:
continue
# Read frame header starting at offset header_size
frame_header_offset = self.header_size
frame_number, data_size, timestamp_us, width, height = struct.unpack_from(
'IIqII', self.mmap, frame_header_offset
)
# Read JPEG data
data_offset = self.header_size + self.frame_header_size
jpeg_data = bytes(self.mmap[data_offset:data_offset + data_size])
# Read sequence number again
seq_end = struct.unpack_from('Q', self.mmap, 24)[0]
# Check if sequence didn't change during read
if seq_begin == seq_end and not (seq_end & 1):
header = FrameHeader(
frame_number=frame_number,
data_size=data_size,
timestamp_us=timestamp_us,
width=width,
height=height
)
return header, jpeg_data
# Failed to read after max retries
return None
except Exception as e:
print(f"Error reading frame: {e}")
return None
def _cleanup(self):
"""Clean up resources"""
if self.mmap:
self.mmap.close()
self.mmap = None
if self.shm:
self.shm.close_fd()
self.shm = None
def close(self):
"""Close the shared memory connection"""
self._cleanup()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
return False
def main():
"""Test the shared memory reader"""
print("Connecting to shared memory...")
try:
with SharedMemoryFrameReader() as reader:
print("Connected successfully!")
print("\nReading frames (Ctrl+C to stop)...")
import time
frame_count = 0
while True:
result = reader.read_frame()
if result:
header, jpeg_data = result
frame_count += 1
print(f"\rFrame {frame_count}: #{header.frame_number}, "
f"{len(jpeg_data)} bytes, {header.width}x{header.height}",
end='', flush=True)
else:
print("\rWaiting for frames...", end='', flush=True)
time.sleep(0.033) # ~30 FPS
except KeyboardInterrupt:
print("\n\nStopped by user")
except Exception as e:
print(f"\nError: {e}")
if __name__ == '__main__':
main()