Files
radar_frontend/web_viewer/app.py
2025-11-21 16:48:51 +03:00

530 lines
17 KiB
Python

"""
FastAPI Web Application for Beacon Tracker Video Streaming
This application reads JPEG frames from shared memory and streams them
to web browsers via Server-Sent Events (SSE) or WebSocket (H.264 stream).
"""
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse, StreamingResponse, Response
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from shared_memory_reader import SharedMemoryFrameReader
import asyncio
import time
import base64
import json
import os
from contextlib import asynccontextmanager
# Global state
reader = None
reader_lock = asyncio.Lock()
last_frame_data = None
last_frame_header = None
frame_lock = asyncio.Lock()
# Video streaming state
streaming_task = None
streaming_active = False
VIDEO_FIFO_PATH = '/tmp/beacon_video_stream' # Named pipe for H.264 stream
video_fifo_file = None # Keep FIFO open to prevent C++ blocking
async def init_reader():
"""Initialize the shared memory reader"""
global reader
async with reader_lock:
if reader is None:
try:
reader = SharedMemoryFrameReader()
print("Connected to shared memory successfully")
except Exception as e:
print(f"Failed to connect to shared memory: {e}")
reader = None
async def init_video_fifo():
"""Initialize video FIFO - open it to unblock C++ writer"""
global video_fifo_file
# Wait for C++ to create FIFO
for i in range(30):
if os.path.exists(VIDEO_FIFO_PATH):
break
print(f"Waiting for FIFO to be created by C++ ({i+1}/30)...")
await asyncio.sleep(1)
if not os.path.exists(VIDEO_FIFO_PATH):
print(f"WARNING: Video FIFO not found at {VIDEO_FIFO_PATH}")
return
try:
print(f"Opening FIFO {VIDEO_FIFO_PATH} for reading (non-blocking mode)...")
loop = asyncio.get_event_loop()
# Open file in NON-BLOCKING mode to prevent deadlock
# GStreamer filesink opens FIFO lazily (only when first frame arrives)
# Using O_NONBLOCK prevents Python from blocking while waiting for C++ writer
def open_fifo():
fd = os.open(VIDEO_FIFO_PATH, os.O_RDONLY | os.O_NONBLOCK)
return os.fdopen(fd, 'rb', buffering=0)
video_fifo_file = await loop.run_in_executor(None, open_fifo)
print(f"FIFO opened successfully in non-blocking mode")
except Exception as e:
print(f"Failed to open video FIFO: {e}")
async def cleanup_reader():
"""Clean up shared memory reader"""
global reader, video_fifo_file
async with reader_lock:
if reader:
reader.close()
reader = None
# Close video FIFO
if video_fifo_file:
try:
video_fifo_file.close()
print("Video FIFO closed")
except:
pass
video_fifo_file = None
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Lifespan context manager for startup and shutdown events"""
global streaming_task, streaming_active
# Startup
await init_reader()
await init_video_fifo()
# Start HLS streaming task
streaming_task = asyncio.create_task(stream_video_from_fifo())
print("Started HLS video streaming task")
print("FastAPI application started")
yield
# Shutdown
streaming_active = False
if streaming_task:
try:
await asyncio.wait_for(streaming_task, timeout=5.0)
except asyncio.TimeoutError:
print("HLS streaming task did not stop gracefully")
await cleanup_reader()
print("FastAPI application shutdown")
# Create FastAPI app
app = FastAPI(
title="Beacon Tracker Video Stream",
description="High-performance video streaming with WebSocket and SSE support",
version="2.0",
lifespan=lifespan
)
# Mount static files if directory exists
if os.path.exists("static"):
app.mount("/static", StaticFiles(directory="static"), name="static")
else:
print("Warning: 'static' directory not found, static files will not be served")
# Templates
templates = Jinja2Templates(directory="templates")
@app.get("/", response_class=HTMLResponse)
async def index(request: Request):
"""Render the main page"""
return templates.TemplateResponse("index.html", {"request": request})
@app.get("/stream")
async def stream_sse():
"""
Server-Sent Events stream endpoint
Continuously sends JPEG frames to the client as they become available.
"""
async def generate():
global reader
# Initialize reader for this stream
await init_reader()
# Check again after init
async with reader_lock:
local_reader = reader
if local_reader is None:
yield f"data: {json.dumps({'error': 'Failed to connect to shared memory'})}\n\n"
return
consecutive_failures = 0
max_failures = 100
while True:
try:
async with reader_lock:
if reader is None:
yield f"data: {json.dumps({'error': 'Reader is None'})}\n\n"
return
# Run blocking read in thread pool
result = await asyncio.to_thread(reader.read_frame)
if result:
header, jpeg_data = result
consecutive_failures = 0
# Encode JPEG as base64 for transmission
jpeg_base64 = base64.b64encode(jpeg_data).decode('utf-8')
# Create event data
event_data = {
'frame_number': header.frame_number,
'timestamp_us': header.timestamp_us,
'width': header.width,
'height': header.height,
'data_size': header.data_size,
'jpeg': jpeg_base64
}
# Send as SSE event
yield f"data: {json.dumps(event_data)}\n\n"
# Update global state
async with frame_lock:
global last_frame_data, last_frame_header
last_frame_data = jpeg_data
last_frame_header = header
else:
consecutive_failures += 1
if consecutive_failures >= max_failures:
yield f"data: {json.dumps({'error': 'No frames available'})}\n\n"
consecutive_failures = 0
# Small delay to prevent busy waiting
await asyncio.sleep(0.001) # 1ms - fast updates
except Exception as e:
print(f"Error in stream: {e}")
yield f"data: {json.dumps({'error': str(e)})}\n\n"
await asyncio.sleep(1)
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no"
}
)
@app.get("/status")
async def status():
"""Get the current status of the stream"""
await init_reader()
async with frame_lock:
if last_frame_header:
return {
'connected': True,
'last_frame': last_frame_header.frame_number,
'timestamp_us': last_frame_header.timestamp_us,
'resolution': f"{last_frame_header.width}x{last_frame_header.height}"
}
else:
return {
'connected': reader is not None,
'last_frame': None,
'message': 'Waiting for frames...'
}
@app.get("/latest_frame")
async def latest_frame():
"""Get the latest frame as a JPEG image"""
async with frame_lock:
if last_frame_data:
return Response(content=last_frame_data, media_type="image/jpeg")
else:
return Response(content="No frame available", status_code=404)
async def stream_video_from_fifo():
"""
Read raw H.264 from FIFO and convert to HLS format via ffmpeg.
This runs as a background task and generates HLS playlist + segments.
"""
global streaming_active, video_fifo_file
import subprocess
print(f"Starting HLS video stream generator from FIFO: {VIDEO_FIFO_PATH}")
ffmpeg_process = None
hls_dir = "hls_output"
# Create HLS output directory if it doesn't exist
if not os.path.exists(hls_dir):
os.makedirs(hls_dir)
try:
# Use the globally opened FIFO file
if video_fifo_file is None:
print("ERROR: FIFO not opened during startup")
return
fifo = video_fifo_file
print("Connected to FIFO, starting ffmpeg H.264->HLS converter...")
# Start ffmpeg to convert raw H.264 to HLS format
# -probesize 32 -analyzeduration 0: minimal probing for low latency startup
# -f h264: input format is raw H.264 byte-stream (Annex B from GStreamer)
# -r 30: input framerate (from config.ini StreamFps)
# -i pipe:0: read from stdin
# -c:v copy: copy video codec without re-encoding (use hardware-encoded H.264)
# -f hls: output HLS format
# -hls_time 1: 1 second per segment (low latency)
# -hls_list_size 10: keep 10 segments in playlist (more buffer for clients)
# -hls_delete_threshold 3: delete segments only after 3 new ones created
# -hls_flags delete_segments+append_list+omit_endlist: live streaming flags
# -hls_segment_type mpegts: use MPEG-TS segments
# -hls_segment_filename: use %d for unlimited numbering
# -start_number 0: start segment numbering from 0
ffmpeg_process = subprocess.Popen(
['ffmpeg',
'-probesize', '32',
'-analyzeduration', '0',
'-f', 'h264',
'-r', '30', # Must match GStreamer StreamFps from config.ini
'-i', 'pipe:0',
'-c:v', 'copy',
'-f', 'hls',
'-hls_time', '1',
'-hls_list_size', '10',
'-hls_delete_threshold', '3',
'-hls_flags', 'delete_segments+append_list+omit_endlist',
'-hls_segment_type', 'mpegts',
'-hls_segment_filename', f'{hls_dir}/segment_%d.ts',
'-start_number', '0',
f'{hls_dir}/playlist.m3u8'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
streaming_active = True
chunk_size = 65536 # 64KB chunks
loop = asyncio.get_event_loop()
bytes_written = 0
print(f"Reading raw H.264 from FIFO and generating HLS segments...")
async def read_from_fifo():
"""Read from FIFO and pipe to ffmpeg stdin"""
nonlocal bytes_written
writer_connected = False
try:
while streaming_active:
def read_chunk():
try:
return fifo.read(chunk_size)
except BlockingIOError:
# No data available yet (writer not connected or no data)
return None
data = await loop.run_in_executor(None, read_chunk)
if data is None:
# BlockingIOError: no data available, wait a bit
if not writer_connected:
print("Waiting for C++ to start writing to FIFO...")
await asyncio.sleep(0.1)
continue
if not data:
# Empty data: EOF (writer closed the FIFO)
if writer_connected:
print("FIFO EOF reached - C++ stopped writing")
else:
print("FIFO closed before C++ started writing")
break
# Got data! Mark writer as connected
if not writer_connected:
writer_connected = True
print("C++ started writing to FIFO successfully")
if ffmpeg_process and ffmpeg_process.stdin:
try:
ffmpeg_process.stdin.write(data)
ffmpeg_process.stdin.flush()
bytes_written += len(data)
# Log progress every 1MB
if bytes_written % (1024 * 1024) < chunk_size:
print(f"Written {bytes_written / 1024 / 1024:.2f} MB to ffmpeg for HLS encoding")
except Exception as e:
print(f"Error writing to ffmpeg stdin: {e}")
break
finally:
if ffmpeg_process and ffmpeg_process.stdin:
try:
ffmpeg_process.stdin.close()
print("Closed ffmpeg stdin")
except:
pass
async def read_ffmpeg_stderr():
"""Read and log ffmpeg stderr for debugging"""
line_count = 0
try:
while streaming_active:
def read_err():
line = ffmpeg_process.stderr.readline()
return line.decode('utf-8', errors='ignore') if line else None
line = await loop.run_in_executor(None, read_err)
if not line:
break
line_count += 1
# Log first 30 lines to see HLS setup, then only errors/warnings
if line_count <= 30:
# Skip empty lines and progress updates
if line.strip() and not line.startswith('frame='):
print(f"FFmpeg[{line_count}]: {line.strip()}")
elif 'error' in line.lower() or 'warning' in line.lower():
print(f"FFmpeg: {line.strip()}")
except Exception as e:
print(f"Error reading ffmpeg stderr: {e}")
# Run both tasks concurrently
await asyncio.gather(
read_from_fifo(),
read_ffmpeg_stderr(),
return_exceptions=True
)
except FileNotFoundError:
print(f"ERROR: Could not open FIFO {VIDEO_FIFO_PATH}")
print("Make sure C++ application is running first")
except Exception as e:
print(f"ERROR in video streaming: {e}")
import traceback
traceback.print_exc()
finally:
# Cleanup ffmpeg process
if ffmpeg_process:
try:
ffmpeg_process.terminate()
ffmpeg_process.wait(timeout=2)
except:
try:
ffmpeg_process.kill()
except:
pass
streaming_active = False
print("Video streaming stopped")
@app.get("/hls/playlist.m3u8")
async def get_hls_playlist():
"""Serve HLS playlist manifest"""
playlist_path = "hls_output/playlist.m3u8"
if not os.path.exists(playlist_path):
return Response(content="HLS playlist not ready yet", status_code=404)
try:
with open(playlist_path, 'r') as f:
content = f.read()
return Response(
content=content,
media_type="application/vnd.apple.mpegurl",
headers={
"Cache-Control": "no-cache, no-store, must-revalidate",
"Pragma": "no-cache",
"Expires": "0",
"Access-Control-Allow-Origin": "*"
}
)
except Exception as e:
print(f"Error serving HLS playlist: {e}")
return Response(content=f"Error: {e}", status_code=500)
@app.get("/hls/{segment_name}")
async def get_hls_segment(segment_name: str):
"""Serve HLS video segments"""
# Security: only allow .ts files
if not segment_name.endswith('.ts'):
return Response(content="Invalid segment name", status_code=400)
segment_path = f"hls_output/{segment_name}"
if not os.path.exists(segment_path):
return Response(content="Segment not found", status_code=404)
try:
with open(segment_path, 'rb') as f:
content = f.read()
return Response(
content=content,
media_type="video/MP2T",
headers={
"Cache-Control": "public, max-age=3600",
"Access-Control-Allow-Origin": "*"
}
)
except Exception as e:
print(f"Error serving HLS segment {segment_name}: {e}")
return Response(content=f"Error: {e}", status_code=500)
@app.get("/health")
async def health():
"""Health check endpoint"""
hls_playlist_exists = os.path.exists("hls_output/playlist.m3u8")
return {
"status": "healthy",
"streaming_active": streaming_active,
"hls_playlist_ready": hls_playlist_exists,
"video_fifo": VIDEO_FIFO_PATH
}
if __name__ == '__main__':
import uvicorn
print("Starting FastAPI server on http://0.0.0.0:5000")
print(f"Video stream will be available at ws://0.0.0.0:5000/ws/video")
print(f"Expecting C++ H.264 stream via FIFO at {VIDEO_FIFO_PATH}")
uvicorn.run(
app,
host='0.0.0.0',
port=5000,
log_level='info',
access_log=True
)