""" FastAPI Web Application for Beacon Tracker Video Streaming This application reads JPEG frames from shared memory and streams them to web browsers via Server-Sent Events (SSE) or WebSocket (H.264 stream). """ from fastapi import FastAPI, Request from fastapi.responses import HTMLResponse, StreamingResponse, Response from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from shared_memory_reader import SharedMemoryFrameReader import asyncio import time import base64 import json import os from contextlib import asynccontextmanager # Global state reader = None reader_lock = asyncio.Lock() last_frame_data = None last_frame_header = None frame_lock = asyncio.Lock() # Video streaming state streaming_task = None streaming_active = False VIDEO_FIFO_PATH = '/tmp/beacon_video_stream' # Named pipe for H.264 stream video_fifo_file = None # Keep FIFO open to prevent C++ blocking async def init_reader(): """Initialize the shared memory reader""" global reader async with reader_lock: if reader is None: try: reader = SharedMemoryFrameReader() print("Connected to shared memory successfully") except Exception as e: print(f"Failed to connect to shared memory: {e}") reader = None async def init_video_fifo(): """Initialize video FIFO - open it to unblock C++ writer""" global video_fifo_file # Wait for C++ to create FIFO for i in range(30): if os.path.exists(VIDEO_FIFO_PATH): break print(f"Waiting for FIFO to be created by C++ ({i+1}/30)...") await asyncio.sleep(1) if not os.path.exists(VIDEO_FIFO_PATH): print(f"WARNING: Video FIFO not found at {VIDEO_FIFO_PATH}") return try: print(f"Opening FIFO {VIDEO_FIFO_PATH} for reading (will block until C++ opens for writing)...") loop = asyncio.get_event_loop() # Open file in BLOCKING mode - this will unblock C++ when it opens for writing def open_fifo(): return open(VIDEO_FIFO_PATH, 'rb', buffering=0) video_fifo_file = await loop.run_in_executor(None, open_fifo) print(f"FIFO opened successfully - C++ should now be writing") except Exception as e: print(f"Failed to open video FIFO: {e}") async def cleanup_reader(): """Clean up shared memory reader""" global reader, video_fifo_file async with reader_lock: if reader: reader.close() reader = None # Close video FIFO if video_fifo_file: try: video_fifo_file.close() print("Video FIFO closed") except: pass video_fifo_file = None @asynccontextmanager async def lifespan(app: FastAPI): """Lifespan context manager for startup and shutdown events""" global streaming_task, streaming_active # Startup await init_reader() await init_video_fifo() # Start HLS streaming task streaming_task = asyncio.create_task(stream_video_from_fifo()) print("Started HLS video streaming task") print("FastAPI application started") yield # Shutdown streaming_active = False if streaming_task: try: await asyncio.wait_for(streaming_task, timeout=5.0) except asyncio.TimeoutError: print("HLS streaming task did not stop gracefully") await cleanup_reader() print("FastAPI application shutdown") # Create FastAPI app app = FastAPI( title="Beacon Tracker Video Stream", description="High-performance video streaming with WebSocket and SSE support", version="2.0", lifespan=lifespan ) # Mount static files if directory exists if os.path.exists("static"): app.mount("/static", StaticFiles(directory="static"), name="static") else: print("Warning: 'static' directory not found, static files will not be served") # Templates templates = Jinja2Templates(directory="templates") @app.get("/", response_class=HTMLResponse) async def index(request: Request): """Render the main page""" return templates.TemplateResponse("index.html", {"request": request}) @app.get("/stream") async def stream_sse(): """ Server-Sent Events stream endpoint Continuously sends JPEG frames to the client as they become available. """ async def generate(): global reader # Initialize reader for this stream await init_reader() # Check again after init async with reader_lock: local_reader = reader if local_reader is None: yield f"data: {json.dumps({'error': 'Failed to connect to shared memory'})}\n\n" return consecutive_failures = 0 max_failures = 100 while True: try: async with reader_lock: if reader is None: yield f"data: {json.dumps({'error': 'Reader is None'})}\n\n" return # Run blocking read in thread pool result = await asyncio.to_thread(reader.read_frame) if result: header, jpeg_data = result consecutive_failures = 0 # Encode JPEG as base64 for transmission jpeg_base64 = base64.b64encode(jpeg_data).decode('utf-8') # Create event data event_data = { 'frame_number': header.frame_number, 'timestamp_us': header.timestamp_us, 'width': header.width, 'height': header.height, 'data_size': header.data_size, 'jpeg': jpeg_base64 } # Send as SSE event yield f"data: {json.dumps(event_data)}\n\n" # Update global state async with frame_lock: global last_frame_data, last_frame_header last_frame_data = jpeg_data last_frame_header = header else: consecutive_failures += 1 if consecutive_failures >= max_failures: yield f"data: {json.dumps({'error': 'No frames available'})}\n\n" consecutive_failures = 0 # Small delay to prevent busy waiting await asyncio.sleep(0.001) # 1ms - fast updates except Exception as e: print(f"Error in stream: {e}") yield f"data: {json.dumps({'error': str(e)})}\n\n" await asyncio.sleep(1) return StreamingResponse( generate(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" } ) @app.get("/status") async def status(): """Get the current status of the stream""" await init_reader() async with frame_lock: if last_frame_header: return { 'connected': True, 'last_frame': last_frame_header.frame_number, 'timestamp_us': last_frame_header.timestamp_us, 'resolution': f"{last_frame_header.width}x{last_frame_header.height}" } else: return { 'connected': reader is not None, 'last_frame': None, 'message': 'Waiting for frames...' } @app.get("/latest_frame") async def latest_frame(): """Get the latest frame as a JPEG image""" async with frame_lock: if last_frame_data: return Response(content=last_frame_data, media_type="image/jpeg") else: return Response(content="No frame available", status_code=404) async def stream_video_from_fifo(): """ Read raw H.264 from FIFO and convert to HLS format via ffmpeg. This runs as a background task and generates HLS playlist + segments. """ global streaming_active, video_fifo_file import subprocess print(f"Starting HLS video stream generator from FIFO: {VIDEO_FIFO_PATH}") ffmpeg_process = None hls_dir = "hls_output" # Create HLS output directory if it doesn't exist if not os.path.exists(hls_dir): os.makedirs(hls_dir) try: # Use the globally opened FIFO file if video_fifo_file is None: print("ERROR: FIFO not opened during startup") return fifo = video_fifo_file print("Connected to FIFO, starting ffmpeg H.264->HLS converter...") # Start ffmpeg to convert raw H.264 to HLS format # -probesize 32 -analyzeduration 0: minimal probing for low latency startup # -f h264: input format is raw H.264 byte-stream (Annex B from GStreamer) # -r 10: input framerate (from config.ini StreamFps) # -i pipe:0: read from stdin # -c:v copy: copy video codec without re-encoding (use hardware-encoded H.264) # -f hls: output HLS format # -hls_time 1: 1 second per segment (low latency) # -hls_list_size 5: keep 5 segments in playlist # -hls_flags delete_segments+append_list+omit_endlist: live streaming flags # -hls_segment_type mpegts: use MPEG-TS segments # -start_number 0: start segment numbering from 0 ffmpeg_process = subprocess.Popen( ['ffmpeg', '-probesize', '32', '-analyzeduration', '0', '-f', 'h264', '-r', '10', # Must match GStreamer StreamFps from config.ini '-i', 'pipe:0', '-c:v', 'copy', '-f', 'hls', '-hls_time', '1', '-hls_list_size', '5', '-hls_flags', 'delete_segments+append_list+omit_endlist', '-hls_segment_type', 'mpegts', '-hls_segment_filename', f'{hls_dir}/segment_%03d.ts', '-start_number', '0', f'{hls_dir}/playlist.m3u8'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) streaming_active = True chunk_size = 65536 # 64KB chunks loop = asyncio.get_event_loop() bytes_written = 0 print(f"Reading raw H.264 from FIFO and generating HLS segments...") async def read_from_fifo(): """Read from FIFO and pipe to ffmpeg stdin""" nonlocal bytes_written try: while streaming_active: def read_chunk(): return fifo.read(chunk_size) data = await loop.run_in_executor(None, read_chunk) if not data: print("FIFO EOF reached") break if ffmpeg_process and ffmpeg_process.stdin: try: ffmpeg_process.stdin.write(data) ffmpeg_process.stdin.flush() bytes_written += len(data) # Log progress every 1MB if bytes_written % (1024 * 1024) < chunk_size: print(f"Written {bytes_written / 1024 / 1024:.2f} MB to ffmpeg for HLS encoding") except Exception as e: print(f"Error writing to ffmpeg stdin: {e}") break finally: if ffmpeg_process and ffmpeg_process.stdin: try: ffmpeg_process.stdin.close() print("Closed ffmpeg stdin") except: pass async def read_ffmpeg_stderr(): """Read and log ffmpeg stderr for debugging""" line_count = 0 try: while streaming_active: def read_err(): line = ffmpeg_process.stderr.readline() return line.decode('utf-8', errors='ignore') if line else None line = await loop.run_in_executor(None, read_err) if not line: break line_count += 1 # Log first 30 lines to see HLS setup, then only errors/warnings if line_count <= 30: # Skip empty lines and progress updates if line.strip() and not line.startswith('frame='): print(f"FFmpeg[{line_count}]: {line.strip()}") elif 'error' in line.lower() or 'warning' in line.lower(): print(f"FFmpeg: {line.strip()}") except Exception as e: print(f"Error reading ffmpeg stderr: {e}") # Run both tasks concurrently await asyncio.gather( read_from_fifo(), read_ffmpeg_stderr(), return_exceptions=True ) except FileNotFoundError: print(f"ERROR: Could not open FIFO {VIDEO_FIFO_PATH}") print("Make sure C++ application is running first") except Exception as e: print(f"ERROR in video streaming: {e}") import traceback traceback.print_exc() finally: # Cleanup ffmpeg process if ffmpeg_process: try: ffmpeg_process.terminate() ffmpeg_process.wait(timeout=2) except: try: ffmpeg_process.kill() except: pass streaming_active = False print("Video streaming stopped") @app.get("/hls/playlist.m3u8") async def get_hls_playlist(): """Serve HLS playlist manifest""" playlist_path = "hls_output/playlist.m3u8" if not os.path.exists(playlist_path): return Response(content="HLS playlist not ready yet", status_code=404) try: with open(playlist_path, 'r') as f: content = f.read() return Response( content=content, media_type="application/vnd.apple.mpegurl", headers={ "Cache-Control": "no-cache, no-store, must-revalidate", "Pragma": "no-cache", "Expires": "0", "Access-Control-Allow-Origin": "*" } ) except Exception as e: print(f"Error serving HLS playlist: {e}") return Response(content=f"Error: {e}", status_code=500) @app.get("/hls/{segment_name}") async def get_hls_segment(segment_name: str): """Serve HLS video segments""" # Security: only allow .ts files if not segment_name.endswith('.ts'): return Response(content="Invalid segment name", status_code=400) segment_path = f"hls_output/{segment_name}" if not os.path.exists(segment_path): return Response(content="Segment not found", status_code=404) try: with open(segment_path, 'rb') as f: content = f.read() return Response( content=content, media_type="video/MP2T", headers={ "Cache-Control": "public, max-age=3600", "Access-Control-Allow-Origin": "*" } ) except Exception as e: print(f"Error serving HLS segment {segment_name}: {e}") return Response(content=f"Error: {e}", status_code=500) @app.get("/health") async def health(): """Health check endpoint""" hls_playlist_exists = os.path.exists("hls_output/playlist.m3u8") return { "status": "healthy", "streaming_active": streaming_active, "hls_playlist_ready": hls_playlist_exists, "video_fifo": VIDEO_FIFO_PATH } if __name__ == '__main__': import uvicorn print("Starting FastAPI server on http://0.0.0.0:5000") print(f"Video stream will be available at ws://0.0.0.0:5000/ws/video") print(f"Expecting C++ H.264 stream via FIFO at {VIDEO_FIFO_PATH}") uvicorn.run( app, host='0.0.0.0', port=5000, log_level='info', access_log=True )