66 lines
2.0 KiB
Python
Executable File
66 lines
2.0 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Test script to read radar data from named pipe (FIFO)
|
|
|
|
This script demonstrates how to read binary data from the named pipe
|
|
created by the radar acquisition program.
|
|
"""
|
|
|
|
import struct
|
|
import sys
|
|
import os
|
|
|
|
def read_from_pipe(pipe_path="/tmp/radar_data_pipe"):
|
|
"""
|
|
Read uint32_t data from named pipe
|
|
|
|
Args:
|
|
pipe_path: Path to the named pipe (FIFO)
|
|
"""
|
|
print(f"Opening pipe: {pipe_path}")
|
|
print("Waiting for radar data...")
|
|
|
|
try:
|
|
# Open pipe for reading (this will block until writer connects)
|
|
with open(pipe_path, 'rb') as pipe:
|
|
print("Connected to pipe! Reading data...")
|
|
|
|
word_count = 0
|
|
while True:
|
|
# Read 4 bytes (one uint32_t)
|
|
data = pipe.read(4)
|
|
|
|
if not data:
|
|
print("\nEnd of stream or pipe closed")
|
|
break
|
|
|
|
if len(data) < 4:
|
|
print(f"\nWarning: incomplete data read ({len(data)} bytes)")
|
|
break
|
|
|
|
# Unpack as uint32_t (little-endian)
|
|
value = struct.unpack('<I', data)[0]
|
|
|
|
# Print first 10 values and then every 1000th value
|
|
if word_count < 10 or word_count % 1000 == 0:
|
|
print(f"Word {word_count}: 0x{value:08X} ({value})")
|
|
|
|
word_count += 1
|
|
|
|
# Optional: process the data here
|
|
# For example, convert to voltage, apply filters, etc.
|
|
|
|
print(f"\nTotal words received: {word_count}")
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n\nInterrupted by user")
|
|
print(f"Total words received: {word_count}")
|
|
except FileNotFoundError:
|
|
print(f"Error: Pipe {pipe_path} does not exist")
|
|
print("Make sure the radar acquisition program is running first")
|
|
except Exception as e:
|
|
print(f"Error reading from pipe: {e}")
|
|
|
|
if __name__ == "__main__":
|
|
pipe_path = sys.argv[1] if len(sys.argv) > 1 else "/tmp/radar_data_pipe"
|
|
read_from_pipe(pipe_path) |