Files
E502_ADC_BF_PC_companion/pipe_reader_test.py
2025-12-04 16:21:24 +03:00

66 lines
2.0 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Test script to read radar data from named pipe (FIFO)
This script demonstrates how to read binary data from the named pipe
created by the radar acquisition program.
"""
import struct
import sys
import os
def read_from_pipe(pipe_path="/tmp/radar_data_pipe"):
"""
Read uint32_t data from named pipe
Args:
pipe_path: Path to the named pipe (FIFO)
"""
print(f"Opening pipe: {pipe_path}")
print("Waiting for radar data...")
try:
# Open pipe for reading (this will block until writer connects)
with open(pipe_path, 'rb') as pipe:
print("Connected to pipe! Reading data...")
word_count = 0
while True:
# Read 4 bytes (one uint32_t)
data = pipe.read(4)
if not data:
print("\nEnd of stream or pipe closed")
break
if len(data) < 4:
print(f"\nWarning: incomplete data read ({len(data)} bytes)")
break
# Unpack as uint32_t (little-endian)
value = struct.unpack('<I', data)[0]
# Print first 10 values and then every 1000th value
if word_count < 10 or word_count % 1000 == 0:
print(f"Word {word_count}: 0x{value:08X} ({value})")
word_count += 1
# Optional: process the data here
# For example, convert to voltage, apply filters, etc.
print(f"\nTotal words received: {word_count}")
except KeyboardInterrupt:
print("\n\nInterrupted by user")
print(f"Total words received: {word_count}")
except FileNotFoundError:
print(f"Error: Pipe {pipe_path} does not exist")
print("Make sure the radar acquisition program is running first")
except Exception as e:
print(f"Error reading from pipe: {e}")
if __name__ == "__main__":
pipe_path = sys.argv[1] if len(sys.argv) > 1 else "/tmp/radar_data_pipe"
read_from_pipe(pipe_path)