Files
reflectometer_fpga_project/software/console.py
2026-05-08 16:05:53 +03:00

192 lines
6.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import argparse
import socket
import math
import matplotlib.pyplot as plt
adc_dac_ratio = 0.52
def run_debug(args, sock):
"""Debug run: send fixed values to test eth+ctrl on fpga."""
print(f"DEBUG MODE: ip={args.ip} send_port={args.send_port}")
dest = (args.ip, args.send_port)
# reset
sock.sendto(0x0f00.to_bytes(2), dest)
print("Sent soft_reset!")
# config data
sock.sendto(format_ctrl_data(0x12345678, 0x9abcdef0,
0x0bea, 0xdead, dac_bits=args.dac_bits), dest)
print("Config data sent!")
sock.sendto(0xf000.to_bytes(2), dest)
print("Sent start!")
def format_ctrl_data(pulse_width: int, pulse_period: int,
pulse_height: int, pulse_num: int, args, dac_bits: int = 16) -> bytes:
"""Format data packet for set_data command."""
output = bytearray()
output += 0b10001000.to_bytes(1, 'little')
pulse_period_adc = (int(pulse_period * adc_dac_ratio) //
args.window_size) * args.window_size
print(pulse_period_adc)
# no negative please
assert pulse_width > 0, "pulse_width should be positive"
assert pulse_period > 0, "pulse_period should be positive"
assert pulse_num > 0, "pulse_num should be positive"
assert pulse_height > 0, "pulse_height should be positive"
# overflow check
assert pulse_width < 2**32-1, "pulse_width too high"
assert pulse_period < 2**32-1, "pulse_period too high"
assert pulse_num < 2**16-1, "pulse_num too high"
assert pulse_height < 2**dac_bits-1, "pulse_height too high"
output += pulse_width.to_bytes(4, 'little')
output += pulse_period.to_bytes(4, 'little')
output += pulse_num.to_bytes(2, 'little')
output += pulse_height.to_bytes(2, 'little')
output += pulse_period_adc.to_bytes(4, 'little')
assert len(output) == 17, "Config data should be 128 bits + 8 bit header"
return output
def verify_args(args):
"""check args are non zero and in bound, request from user if needed"""
if args.pulse_width == 0:
args.pulse_width = int(input("pulse_width: "))
if args.pulse_period == 0:
args.pulse_period = int(input("pulse_period: "))
if args.pulse_num == 0:
args.pulse_num = int(input("pulse_num: "))
if args.pulse_height == 0:
args.pulse_height = int(input("pulse_height: "))
def recv_data(args, sock) -> list:
# calculate count & size
packet_count = math.ceil(
((adc_dac_ratio * args.pulse_period) / args.window_size * args.data_width) / args.packet_size)
print(packet_count)
recv_buf = []
try:
for pkt_cnt in range(packet_count):
try:
data, address = sock.recvfrom(65536)
if len(data) % args.data_width != 0:
print("invalid packet size!")
for i in range(0, len(data), args.data_width):
sample = int.from_bytes(
data[i:i+args.data_width], "little")
recv_buf.append(sample)
except socket.timeout:
print("socket timeout")
except KeyboardInterrupt:
print(f"recv: {pkt_cnt}")
break
except Exception as e:
print(f"err: {e}")
expected_length = math.ceil(
adc_dac_ratio * args.pulse_period / args.window_size)
if len(recv_buf) < expected_length:
print("data underflow")
return []
recv_buf = recv_buf[:expected_length-1]
print(f"collected {len(recv_buf)} samples")
# print(recv_buf)
return recv_buf
def run(args, sock):
dest = (args.ip, args.send_port)
if args.pulse_period % args.window_size != 0:
print("Invalid pulse period (should be divisable by WINDOW_SIZE)")
return
# reset
sock.sendto(0x0f00.to_bytes(2), dest)
# config data
sock.sendto(format_ctrl_data(args.pulse_width,
args.pulse_period,
args.pulse_height,
args.pulse_num, args,
dac_bits=args.dac_bits), dest)
sock.sendto(0xf000.to_bytes(2), dest)
print("Sent start!")
data = recv_data(args, sock)
print(min(data), max(data))
plt.plot(data)
plt.show()
def main():
parser = argparse.ArgumentParser(
description="Консоль для рефлектометра"
)
parser.add_argument("--debug", action='store_true',
help="отладочная отправка пакета soft_reset, пакета с данными и пакета start")
parser.add_argument("--ip", type=str, default="192.168.0.2",
help="IP рефлектометра, по умолчанию 192.168.0.2")
parser.add_argument("--send-port", type=int, default=8080,
help="Порт для отправки команд")
parser.add_argument("--recv-port", type=int,
default=8080, help="Порт для приема данных")
parser.add_argument("--dac-bits", type=int, default=12,
help="Битность ЦАП (влияет на максимальный pulse_height)")
parser.add_argument("--data-width", type=int,
default=4, help="Байтность получаемых данных, по умолчанию 4 (AKA int32)")
parser.add_argument("--window-size", type=int,
default=65, help="Размер окна для первого усреднения.")
parser.add_argument("--packet-size", type=int,
default=1024, help="Размер отправляемых пакетов.")
# передача параметров через аргументы
for arg in ("pulse_width", "pulse_period", "pulse_num", "pulse_height"):
parser.add_argument(f"--{arg}", type=int,
default=0, help=f"Задать {arg}")
args = parser.parse_args()
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(("0.0.0.0", args.recv_port))
if args.debug:
run_debug(args, sock)
else:
verify_args(args)
run(args, sock)
sock.close()
if __name__ == "__main__":
main()