change flask to fastapi

This commit is contained in:
awe
2025-11-20 15:35:36 +03:00
parent 0e1b3a2916
commit a85368fdfd
9 changed files with 1452 additions and 121 deletions

View File

@ -1,31 +1,41 @@
"""
Flask Web Application for Beacon Tracker Video Streaming
FastAPI Web Application for Beacon Tracker Video Streaming
This application reads JPEG frames from shared memory and streams them
to web browsers via Server-Sent Events (SSE).
to web browsers via Server-Sent Events (SSE) or WebSocket (H.264 stream).
"""
from flask import Flask, render_template, Response, jsonify
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request
from fastapi.responses import HTMLResponse, StreamingResponse, Response
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from shared_memory_reader import SharedMemoryFrameReader
import asyncio
import time
import base64
import json
from threading import Lock
app = Flask(__name__)
import os
from typing import Set
from contextlib import asynccontextmanager
# Global state
reader = None
reader_lock = Lock()
reader_lock = asyncio.Lock()
last_frame_data = None
last_frame_header = None
frame_lock = Lock()
frame_lock = asyncio.Lock()
# Video streaming state
active_websocket_clients: Set[WebSocket] = set()
streaming_task = None
streaming_active = False
PIPE_PATH = '/tmp/beacon_video_stream'
def init_reader():
async def init_reader():
"""Initialize the shared memory reader"""
global reader
with reader_lock:
async with reader_lock:
if reader is None:
try:
reader = SharedMemoryFrameReader()
@ -35,27 +45,63 @@ def init_reader():
reader = None
@app.route('/')
def index():
async def cleanup_reader():
"""Clean up shared memory reader"""
global reader
async with reader_lock:
if reader:
reader.close()
reader = None
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Lifespan context manager for startup and shutdown events"""
# Startup
await init_reader()
print("FastAPI application started")
yield
# Shutdown
await cleanup_reader()
print("FastAPI application shutdown")
# Create FastAPI app
app = FastAPI(
title="Beacon Tracker Video Stream",
description="High-performance video streaming with WebSocket and SSE support",
version="2.0",
lifespan=lifespan
)
# Mount static files
app.mount("/static", StaticFiles(directory="static"), name="static")
# Templates
templates = Jinja2Templates(directory="templates")
@app.get("/", response_class=HTMLResponse)
async def index(request: Request):
"""Render the main page"""
return render_template('index.html')
return templates.TemplateResponse("index.html", {"request": request})
@app.route('/stream')
def stream():
@app.get("/stream")
async def stream_sse():
"""
Server-Sent Events stream endpoint
Continuously sends JPEG frames to the client as they become available.
"""
def generate():
async def generate():
global reader
# Initialize reader for this stream
init_reader()
await init_reader()
# Check again after init
with reader_lock:
async with reader_lock:
local_reader = reader
if local_reader is None:
@ -67,11 +113,13 @@ def stream():
while True:
try:
with reader_lock:
async with reader_lock:
if reader is None:
yield f"data: {json.dumps({'error': 'Reader is None'})}\n\n"
return
result = reader.read_frame()
# Run blocking read in thread pool
result = await asyncio.to_thread(reader.read_frame)
if result:
header, jpeg_data = result
@ -94,7 +142,7 @@ def stream():
yield f"data: {json.dumps(event_data)}\n\n"
# Update global state
with frame_lock:
async with frame_lock:
global last_frame_data, last_frame_header
last_frame_data = jpeg_data
last_frame_header = header
@ -106,64 +154,202 @@ def stream():
consecutive_failures = 0
# Small delay to prevent busy waiting
time.sleep(0.001) # 1ms - fast updates
await asyncio.sleep(0.001) # 1ms - fast updates
except GeneratorExit:
# Client disconnected
break
except Exception as e:
print(f"Error in stream: {e}")
yield f"data: {json.dumps({'error': str(e)})}\n\n"
time.sleep(1)
await asyncio.sleep(1)
return Response(generate(), mimetype='text/event-stream')
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no"
}
)
@app.route('/status')
def status():
@app.get("/status")
async def status():
"""Get the current status of the stream"""
init_reader()
await init_reader()
with frame_lock:
async with frame_lock:
if last_frame_header:
return jsonify({
return {
'connected': True,
'last_frame': last_frame_header.frame_number,
'timestamp_us': last_frame_header.timestamp_us,
'resolution': f"{last_frame_header.width}x{last_frame_header.height}"
})
}
else:
return jsonify({
return {
'connected': reader is not None,
'last_frame': None,
'message': 'Waiting for frames...'
})
}
@app.route('/latest_frame')
def latest_frame():
@app.get("/latest_frame")
async def latest_frame():
"""Get the latest frame as a JPEG image"""
with frame_lock:
async with frame_lock:
if last_frame_data:
return Response(last_frame_data, mimetype='image/jpeg')
return Response(content=last_frame_data, media_type="image/jpeg")
else:
return "No frame available", 404
return Response(content="No frame available", status_code=404)
@app.teardown_appcontext
def cleanup(exception=None):
"""Clean up resources on shutdown"""
global reader
with reader_lock:
if reader:
reader.close()
reader = None
async def stream_video_from_pipe():
"""
Read MPEG-TS video stream from named pipe and broadcast via WebSocket.
This runs as a background task.
"""
global streaming_active
print(f"Starting video stream reader from: {PIPE_PATH}")
# Wait for pipe to be created by C++ application
max_wait_time = 30 # seconds
start_time = time.time()
while not os.path.exists(PIPE_PATH):
if time.time() - start_time > max_wait_time:
print(f"ERROR: Pipe {PIPE_PATH} not found after {max_wait_time}s")
return
print(f"Waiting for pipe {PIPE_PATH}...")
await asyncio.sleep(1)
print(f"Pipe found: {PIPE_PATH}")
try:
# Open the named pipe in binary read mode
print("Opening pipe for reading...")
# Use asyncio to read from pipe
streaming_active = True
# Open pipe in non-blocking mode
import fcntl
pipe_fd = os.open(PIPE_PATH, os.O_RDONLY | os.O_NONBLOCK)
print("Pipe opened successfully, starting stream...")
chunk_size = 32768 # 32KB chunks for MPEG-TS
while streaming_active and active_websocket_clients:
try:
# Read chunk from pipe
try:
data = os.read(pipe_fd, chunk_size)
except BlockingIOError:
# No data available, wait a bit
await asyncio.sleep(0.001)
continue
if not data:
print("No data from pipe, stream may have ended")
break
# Broadcast binary data to all connected WebSocket clients
disconnected_clients = set()
for client in active_websocket_clients.copy():
try:
await client.send_bytes(data)
except Exception as e:
print(f"Error sending to client: {e}")
disconnected_clients.add(client)
# Remove disconnected clients
active_websocket_clients.difference_update(disconnected_clients)
# Small delay to prevent overwhelming clients
await asyncio.sleep(0.001)
except Exception as e:
print(f"Error reading from pipe: {e}")
break
os.close(pipe_fd)
except FileNotFoundError:
print(f"ERROR: Pipe {PIPE_PATH} not found")
except Exception as e:
print(f"ERROR in video streaming: {e}")
import traceback
traceback.print_exc()
finally:
streaming_active = False
print("Video streaming stopped")
@app.websocket("/ws/video")
async def websocket_video_endpoint(websocket: WebSocket):
"""WebSocket endpoint for video streaming"""
global streaming_task, streaming_active
await websocket.accept()
print(f"Client connected to video stream: {id(websocket)}")
# Add client to active set
active_websocket_clients.add(websocket)
# Start streaming task if not already running
if not streaming_active:
streaming_task = asyncio.create_task(stream_video_from_pipe())
print("Started video streaming task")
try:
# Keep connection alive and handle client messages
while True:
# Wait for client messages (like ping/pong or control messages)
try:
data = await asyncio.wait_for(websocket.receive_text(), timeout=1.0)
# Handle client messages if needed
if data == "ping":
await websocket.send_text("pong")
except asyncio.TimeoutError:
# No message received, continue
continue
except WebSocketDisconnect:
print(f"Client disconnected from video stream: {id(websocket)}")
except Exception as e:
print(f"WebSocket error: {e}")
finally:
# Remove client from active set
active_websocket_clients.discard(websocket)
# Stop streaming if no more clients
if not active_websocket_clients:
streaming_active = False
print("No more clients, stopping stream")
@app.get("/health")
async def health():
"""Health check endpoint"""
return {
"status": "healthy",
"active_websocket_clients": len(active_websocket_clients),
"streaming_active": streaming_active,
"pipe_exists": os.path.exists(PIPE_PATH)
}
if __name__ == '__main__':
# Initialize reader on startup
init_reader()
import uvicorn
# Run the Flask app
# Use 0.0.0.0 to make it accessible from other machines on the network
app.run(host='0.0.0.0', port=5000, debug=True, threaded=True)
print("Starting FastAPI server on http://0.0.0.0:5000")
print(f"Video stream will be available at ws://0.0.0.0:5000/ws/video")
uvicorn.run(
app,
host='0.0.0.0',
port=5000,
log_level='info',
access_log=True
)