""" FastAPI Web Application for Beacon Tracker Video Streaming This application reads JPEG frames from shared memory and streams them to web browsers via Server-Sent Events (SSE) or WebSocket (H.264 stream). """ from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request from fastapi.responses import HTMLResponse, StreamingResponse, Response from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from shared_memory_reader import SharedMemoryFrameReader import asyncio import time import base64 import json import os from typing import Set from contextlib import asynccontextmanager # Global state reader = None reader_lock = asyncio.Lock() last_frame_data = None last_frame_header = None frame_lock = asyncio.Lock() # Video streaming state active_websocket_clients: Set[WebSocket] = set() streaming_task = None streaming_active = False TCP_HOST = '127.0.0.1' TCP_PORT = 8888 async def init_reader(): """Initialize the shared memory reader""" global reader async with reader_lock: if reader is None: try: reader = SharedMemoryFrameReader() print("Connected to shared memory successfully") except Exception as e: print(f"Failed to connect to shared memory: {e}") reader = None async def cleanup_reader(): """Clean up shared memory reader""" global reader async with reader_lock: if reader: reader.close() reader = None @asynccontextmanager async def lifespan(app: FastAPI): """Lifespan context manager for startup and shutdown events""" # Startup await init_reader() print("FastAPI application started") yield # Shutdown await cleanup_reader() print("FastAPI application shutdown") # Create FastAPI app app = FastAPI( title="Beacon Tracker Video Stream", description="High-performance video streaming with WebSocket and SSE support", version="2.0", lifespan=lifespan ) # Mount static files if directory exists if os.path.exists("static"): app.mount("/static", StaticFiles(directory="static"), name="static") else: print("Warning: 'static' directory not found, static files will not be served") # Templates templates = Jinja2Templates(directory="templates") @app.get("/", response_class=HTMLResponse) async def index(request: Request): """Render the main page""" return templates.TemplateResponse("index.html", {"request": request}) @app.get("/stream") async def stream_sse(): """ Server-Sent Events stream endpoint Continuously sends JPEG frames to the client as they become available. """ async def generate(): global reader # Initialize reader for this stream await init_reader() # Check again after init async with reader_lock: local_reader = reader if local_reader is None: yield f"data: {json.dumps({'error': 'Failed to connect to shared memory'})}\n\n" return consecutive_failures = 0 max_failures = 100 while True: try: async with reader_lock: if reader is None: yield f"data: {json.dumps({'error': 'Reader is None'})}\n\n" return # Run blocking read in thread pool result = await asyncio.to_thread(reader.read_frame) if result: header, jpeg_data = result consecutive_failures = 0 # Encode JPEG as base64 for transmission jpeg_base64 = base64.b64encode(jpeg_data).decode('utf-8') # Create event data event_data = { 'frame_number': header.frame_number, 'timestamp_us': header.timestamp_us, 'width': header.width, 'height': header.height, 'data_size': header.data_size, 'jpeg': jpeg_base64 } # Send as SSE event yield f"data: {json.dumps(event_data)}\n\n" # Update global state async with frame_lock: global last_frame_data, last_frame_header last_frame_data = jpeg_data last_frame_header = header else: consecutive_failures += 1 if consecutive_failures >= max_failures: yield f"data: {json.dumps({'error': 'No frames available'})}\n\n" consecutive_failures = 0 # Small delay to prevent busy waiting await asyncio.sleep(0.001) # 1ms - fast updates except Exception as e: print(f"Error in stream: {e}") yield f"data: {json.dumps({'error': str(e)})}\n\n" await asyncio.sleep(1) return StreamingResponse( generate(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" } ) @app.get("/status") async def status(): """Get the current status of the stream""" await init_reader() async with frame_lock: if last_frame_header: return { 'connected': True, 'last_frame': last_frame_header.frame_number, 'timestamp_us': last_frame_header.timestamp_us, 'resolution': f"{last_frame_header.width}x{last_frame_header.height}" } else: return { 'connected': reader is not None, 'last_frame': None, 'message': 'Waiting for frames...' } @app.get("/latest_frame") async def latest_frame(): """Get the latest frame as a JPEG image""" async with frame_lock: if last_frame_data: return Response(content=last_frame_data, media_type="image/jpeg") else: return Response(content="No frame available", status_code=404) async def stream_video_from_tcp(): """ Read MPEG-TS video stream from TCP socket and broadcast via WebSocket. This runs as a background task. """ global streaming_active print(f"Starting video stream reader from TCP: {TCP_HOST}:{TCP_PORT}") # Wait a bit for C++ application to start TCP server await asyncio.sleep(2) try: # Connect to TCP server print(f"Connecting to TCP server {TCP_HOST}:{TCP_PORT}...") reader, writer = await asyncio.open_connection(TCP_HOST, TCP_PORT) print("Connected to TCP server, starting stream...") streaming_active = True chunk_size = 32768 # 32KB chunks for MPEG-TS while streaming_active and active_websocket_clients: try: # Read chunk from TCP data = await asyncio.wait_for(reader.read(chunk_size), timeout=5.0) if not data: print("No data from TCP, stream may have ended") break # Broadcast binary data to all connected WebSocket clients disconnected_clients = set() for client in active_websocket_clients.copy(): try: await client.send_bytes(data) except Exception as e: print(f"Error sending to client: {e}") disconnected_clients.add(client) # Remove disconnected clients active_websocket_clients.difference_update(disconnected_clients) except asyncio.TimeoutError: # No data in 5 seconds, continue waiting continue except Exception as e: print(f"Error reading from TCP: {e}") break writer.close() await writer.wait_closed() except ConnectionRefusedError: print(f"ERROR: Could not connect to TCP server {TCP_HOST}:{TCP_PORT}") print("Make sure C++ application is running first") except Exception as e: print(f"ERROR in video streaming: {e}") import traceback traceback.print_exc() finally: streaming_active = False print("Video streaming stopped") @app.websocket("/ws/video") async def websocket_video_endpoint(websocket: WebSocket): """WebSocket endpoint for video streaming""" global streaming_task, streaming_active await websocket.accept() print(f"Client connected to video stream: {id(websocket)}") # Add client to active set active_websocket_clients.add(websocket) # Start streaming task if not already running if not streaming_active: streaming_task = asyncio.create_task(stream_video_from_tcp()) print("Started video streaming task") try: # Keep connection alive and handle client messages while True: # Wait for client messages (like ping/pong or control messages) try: data = await asyncio.wait_for(websocket.receive_text(), timeout=1.0) # Handle client messages if needed if data == "ping": await websocket.send_text("pong") except asyncio.TimeoutError: # No message received, continue continue except WebSocketDisconnect: print(f"Client disconnected from video stream: {id(websocket)}") except Exception as e: print(f"WebSocket error: {e}") finally: # Remove client from active set active_websocket_clients.discard(websocket) # Stop streaming if no more clients if not active_websocket_clients: streaming_active = False print("No more clients, stopping stream") @app.get("/health") async def health(): """Health check endpoint""" return { "status": "healthy", "active_websocket_clients": len(active_websocket_clients), "streaming_active": streaming_active, "tcp_endpoint": f"{TCP_HOST}:{TCP_PORT}" } if __name__ == '__main__': import uvicorn print("Starting FastAPI server on http://0.0.0.0:5000") print(f"Video stream will be available at ws://0.0.0.0:5000/ws/video") print(f"Expecting C++ TCP stream at {TCP_HOST}:{TCP_PORT}") uvicorn.run( app, host='0.0.0.0', port=5000, log_level='info', access_log=True )