tcp update
This commit is contained in:
@ -29,7 +29,8 @@ frame_lock = asyncio.Lock()
|
|||||||
active_websocket_clients: Set[WebSocket] = set()
|
active_websocket_clients: Set[WebSocket] = set()
|
||||||
streaming_task = None
|
streaming_task = None
|
||||||
streaming_active = False
|
streaming_active = False
|
||||||
PIPE_PATH = '/tmp/beacon_video_stream'
|
TCP_HOST = '127.0.0.1'
|
||||||
|
TCP_PORT = 8888
|
||||||
|
|
||||||
|
|
||||||
async def init_reader():
|
async def init_reader():
|
||||||
@ -206,55 +207,36 @@ async def latest_frame():
|
|||||||
return Response(content="No frame available", status_code=404)
|
return Response(content="No frame available", status_code=404)
|
||||||
|
|
||||||
|
|
||||||
async def stream_video_from_pipe():
|
async def stream_video_from_tcp():
|
||||||
"""
|
"""
|
||||||
Read MPEG-TS video stream from named pipe and broadcast via WebSocket.
|
Read MPEG-TS video stream from TCP socket and broadcast via WebSocket.
|
||||||
This runs as a background task.
|
This runs as a background task.
|
||||||
"""
|
"""
|
||||||
global streaming_active
|
global streaming_active
|
||||||
|
|
||||||
print(f"Starting video stream reader from: {PIPE_PATH}")
|
print(f"Starting video stream reader from TCP: {TCP_HOST}:{TCP_PORT}")
|
||||||
|
|
||||||
# Wait for pipe to be created by C++ application
|
# Wait a bit for C++ application to start TCP server
|
||||||
max_wait_time = 30 # seconds
|
await asyncio.sleep(2)
|
||||||
start_time = time.time()
|
|
||||||
|
|
||||||
while not os.path.exists(PIPE_PATH):
|
|
||||||
if time.time() - start_time > max_wait_time:
|
|
||||||
print(f"ERROR: Pipe {PIPE_PATH} not found after {max_wait_time}s")
|
|
||||||
return
|
|
||||||
print(f"Waiting for pipe {PIPE_PATH}...")
|
|
||||||
await asyncio.sleep(1)
|
|
||||||
|
|
||||||
print(f"Pipe found: {PIPE_PATH}")
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Open the named pipe in binary read mode
|
# Connect to TCP server
|
||||||
print("Opening pipe for reading...")
|
print(f"Connecting to TCP server {TCP_HOST}:{TCP_PORT}...")
|
||||||
|
|
||||||
|
reader, writer = await asyncio.open_connection(TCP_HOST, TCP_PORT)
|
||||||
|
|
||||||
|
print("Connected to TCP server, starting stream...")
|
||||||
|
|
||||||
# Use asyncio to read from pipe
|
|
||||||
streaming_active = True
|
streaming_active = True
|
||||||
|
|
||||||
# Open pipe in non-blocking mode
|
|
||||||
import fcntl
|
|
||||||
pipe_fd = os.open(PIPE_PATH, os.O_RDONLY | os.O_NONBLOCK)
|
|
||||||
|
|
||||||
print("Pipe opened successfully, starting stream...")
|
|
||||||
|
|
||||||
chunk_size = 32768 # 32KB chunks for MPEG-TS
|
chunk_size = 32768 # 32KB chunks for MPEG-TS
|
||||||
|
|
||||||
while streaming_active and active_websocket_clients:
|
while streaming_active and active_websocket_clients:
|
||||||
try:
|
try:
|
||||||
# Read chunk from pipe
|
# Read chunk from TCP
|
||||||
try:
|
data = await asyncio.wait_for(reader.read(chunk_size), timeout=5.0)
|
||||||
data = os.read(pipe_fd, chunk_size)
|
|
||||||
except BlockingIOError:
|
|
||||||
# No data available, wait a bit
|
|
||||||
await asyncio.sleep(0.001)
|
|
||||||
continue
|
|
||||||
|
|
||||||
if not data:
|
if not data:
|
||||||
print("No data from pipe, stream may have ended")
|
print("No data from TCP, stream may have ended")
|
||||||
break
|
break
|
||||||
|
|
||||||
# Broadcast binary data to all connected WebSocket clients
|
# Broadcast binary data to all connected WebSocket clients
|
||||||
@ -269,17 +251,19 @@ async def stream_video_from_pipe():
|
|||||||
# Remove disconnected clients
|
# Remove disconnected clients
|
||||||
active_websocket_clients.difference_update(disconnected_clients)
|
active_websocket_clients.difference_update(disconnected_clients)
|
||||||
|
|
||||||
# Small delay to prevent overwhelming clients
|
except asyncio.TimeoutError:
|
||||||
await asyncio.sleep(0.001)
|
# No data in 5 seconds, continue waiting
|
||||||
|
continue
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error reading from pipe: {e}")
|
print(f"Error reading from TCP: {e}")
|
||||||
break
|
break
|
||||||
|
|
||||||
os.close(pipe_fd)
|
writer.close()
|
||||||
|
await writer.wait_closed()
|
||||||
|
|
||||||
except FileNotFoundError:
|
except ConnectionRefusedError:
|
||||||
print(f"ERROR: Pipe {PIPE_PATH} not found")
|
print(f"ERROR: Could not connect to TCP server {TCP_HOST}:{TCP_PORT}")
|
||||||
|
print("Make sure C++ application is running first")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"ERROR in video streaming: {e}")
|
print(f"ERROR in video streaming: {e}")
|
||||||
import traceback
|
import traceback
|
||||||
@ -302,7 +286,7 @@ async def websocket_video_endpoint(websocket: WebSocket):
|
|||||||
|
|
||||||
# Start streaming task if not already running
|
# Start streaming task if not already running
|
||||||
if not streaming_active:
|
if not streaming_active:
|
||||||
streaming_task = asyncio.create_task(stream_video_from_pipe())
|
streaming_task = asyncio.create_task(stream_video_from_tcp())
|
||||||
print("Started video streaming task")
|
print("Started video streaming task")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@ -339,7 +323,7 @@ async def health():
|
|||||||
"status": "healthy",
|
"status": "healthy",
|
||||||
"active_websocket_clients": len(active_websocket_clients),
|
"active_websocket_clients": len(active_websocket_clients),
|
||||||
"streaming_active": streaming_active,
|
"streaming_active": streaming_active,
|
||||||
"pipe_exists": os.path.exists(PIPE_PATH)
|
"tcp_endpoint": f"{TCP_HOST}:{TCP_PORT}"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -348,6 +332,7 @@ if __name__ == '__main__':
|
|||||||
|
|
||||||
print("Starting FastAPI server on http://0.0.0.0:5000")
|
print("Starting FastAPI server on http://0.0.0.0:5000")
|
||||||
print(f"Video stream will be available at ws://0.0.0.0:5000/ws/video")
|
print(f"Video stream will be available at ws://0.0.0.0:5000/ws/video")
|
||||||
|
print(f"Expecting C++ TCP stream at {TCP_HOST}:{TCP_PORT}")
|
||||||
|
|
||||||
uvicorn.run(
|
uvicorn.run(
|
||||||
app,
|
app,
|
||||||
|
|||||||
Reference in New Issue
Block a user