Files
RFG_Receiver_GUI/main.py

1240 lines
55 KiB
Python
Executable File
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/python3
import os
import time
import numpy as np
import tkinter as tk
from tkinter import ttk
import matplotlib.pyplot as plt
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from scipy.signal import square
from scipy.ndimage import gaussian_filter1d
from scipy.interpolate import interp1d
from datetime import datetime, timedelta
import threading
import queue
# ================================================================================
# ПАРАМЕТРЫ И КОНСТАНТЫ
# ================================================================================
#data_dir = r"D:\data"
data_dir = "./data"
PeriodIntegrate = 2
pontInOneFqChange = 86
# Высота B-скана для RAW/SYNC_DET обработки
height = 60
timePoint = 100
FQend = 512
FFT0_delta = 5
STANDARD_RAW_SIZE = 64000
STANDARD_SYNC_SIZE = 1000
# FOURIER размер = SYNC_DET_SIZE // 2 (положительные частоты)
STANDARD_FOURIER_SIZE = STANDARD_SYNC_SIZE // 2 # 1000 // 2 = 500
STANDARD_FOURIER_ROWS = 60
STANDARD_FOURIER_COLS = 100
MAX_PROCESSING_TIME_MS = 250
# ПЕРЕЧИСЛЕНИЕ ТИПОВ ДАННЫХ
DATA_TYPE_RAW = "RAW"
DATA_TYPE_SYNC_DET = "SYNC_DET"
DATA_TYPE_FOURIER = "FOURIER"
DATA_TYPE_HEX = "HEX"
# Режим обработки FOURIER файлов
FOURIER_MODE = 'collapse_mean'
FOURIER_SUBSAMPLE_K = 10
# Начальный интервал между файлами (в миллисекундах)
DEFAULT_FILE_INTERVAL_MS = 300 # 300 мс
# Коэффициент для определения разрыва (1.5x интервала)
GAP_THRESHOLD_MULTIPLIER = 1.5
# Начальное время опроса файлов (в миллисекундах)
DEFAULT_FILE_POLL_INTERVAL_MS = 100 # 100 мс
# ================================================================================
# ВСПОМОГАТЕЛЬНЫЕ ФУНКЦИИ
# ================================================================================
def detect_data_type(first_line):
"""Определяет тип данных по первой строке файла.
Логика: если первая строка начинается с ключевого слова RAW/SYNC_DET/FOURIER/FFT,
считаем соответствующий тип. Иначе — HEX.
"""
try:
up = first_line.strip().upper()
if up.startswith('RAW'):
return DATA_TYPE_RAW
if up.startswith('SYNC_DET') or up.startswith('SYNC DET'):
return DATA_TYPE_SYNC_DET
if up.startswith('FOURIER') or up.startswith('FFT'):
return DATA_TYPE_FOURIER
return DATA_TYPE_HEX
except Exception:
return DATA_TYPE_HEX
def resize_1d_interpolate(data, target_size):
"""Ресайзит одномерный массив с использованием линейной интерполяции."""
if len(data) == target_size:
return data
old_indices = np.linspace(0, 1, len(data))
new_indices = np.linspace(0, 1, target_size)
f = interp1d(old_indices, data, kind='linear', fill_value='extrapolate')
return f(new_indices)
def resize_2d_interpolate(data, target_rows, target_cols):
"""Ресайзит двумерный массив на target_rows x target_cols с интерполяцией."""
rows, cols = data.shape
if rows == target_rows and cols == target_cols:
return data
old_row_indices = np.linspace(0, 1, rows)
new_row_indices = np.linspace(0, 1, target_rows)
f_rows = interp1d(old_row_indices, data, axis=0, kind='linear', fill_value='extrapolate')
data_resampled_rows = f_rows(new_row_indices)
old_col_indices = np.linspace(0, 1, cols)
new_col_indices = np.linspace(0, 1, target_cols)
f_cols = interp1d(old_col_indices, data_resampled_rows, axis=1, kind='linear', fill_value='extrapolate')
data_resampled = f_cols(new_col_indices)
return data_resampled
def load_data_with_type(filename):
"""Загружает данные и определяет их тип по первой строке."""
with open(filename, 'r') as f:
first_line = f.readline()
detected_type = detect_data_type(first_line)
if detected_type != DATA_TYPE_HEX:
try:
data = np.loadtxt(filename, skiprows=1)
except:
data = np.loadtxt(filename)
return detected_type, data
# HEX формат: строки вида 0xAABBBBBB, где AA — тип, BBBBBB — int24_t
return parse_hex_file(filename)
def parse_hex_file(filename):
"""Парсит HEX формат с разделением по FE и мапит к RAW/SYNC_DET/FOURIER.
Возвращает (data_type, data), где data может быть:
- numpy.ndarray (1D) для одного сегмента
- list[numpy.ndarray] для нескольких сегментов (используется для FOURIER, а также RAW/SYNC_DET)
"""
def to_int24(v):
x = int(v, 16)
if x & 0x800000:
x -= 0x1000000
return float(x)
# Текущий накапливаемый сегмент
cur = {"D0": [], "F0": [], "F1": [], "F2": [], "F3": [], "F4": []}
# Списки сегментов по типам данных
seg_raw = []
seg_sync = []
seg_fourier = []
def finalize_segment():
nonlocal cur
# Приоритет выбора, что считать сегментом
if cur["F4"]:
seg_fourier.append(np.asarray(cur["F4"], dtype=float))
elif cur["F3"]:
arr = np.asarray(cur["F3"], dtype=float)
seg_fourier.append(np.sqrt(np.maximum(0.0, arr)))
elif cur["F1"] and cur["F2"] and len(cur["F1"]) == len(cur["F2"]):
re = np.asarray(cur["F1"], dtype=float)
im = np.asarray(cur["F2"], dtype=float)
seg_fourier.append(np.sqrt(re * re + im * im))
elif cur["F0"]:
seg_sync.append(np.asarray(cur["F0"], dtype=float))
elif cur["D0"]:
seg_raw.append(np.asarray(cur["D0"], dtype=float))
# Сброс
cur = {"D0": [], "F0": [], "F1": [], "F2": [], "F3": [], "F4": []}
with open(filename, 'r') as f:
for line in f:
s = line.strip()
if not s:
continue
# Требование: учитывать только строки, начинающиеся с 0x/0X
if not (s.startswith('0x') or s.startswith('0X')):
continue
h = s[2:]
h = ''.join(ch for ch in h if ch in '0123456789abcdefABCDEF')
if len(h) < 2:
continue
t_byte = h[:2].upper()
# FE — завершить текущий сегмент
if t_byte == 'FE':
finalize_segment()
continue
# E0..E9 — игнор
if t_byte.startswith('E') and len(t_byte) == 2 and t_byte[1] in '0123456789':
continue
# 00 — цифровые биты, пока пропускаем
if t_byte == '00':
continue
if len(h) < 8:
continue
# Значение 24 бита
val_hex = h[2:8]
try:
value = to_int24(val_hex)
except Exception:
continue
if t_byte == 'D0':
cur['D0'].append(value)
elif t_byte == 'F0':
cur['F0'].append(value)
elif t_byte == 'F1':
cur['F1'].append(value)
elif t_byte == 'F2':
cur['F2'].append(value)
elif t_byte == 'F3':
cur['F3'].append(value)
elif t_byte == 'F4':
cur['F4'].append(value)
else:
# Неизвестные — пропускаем
continue
# Финализируем хвост
finalize_segment()
if seg_fourier:
return DATA_TYPE_FOURIER, seg_fourier
if seg_sync:
# Если несколько, вернём список сегментов
return DATA_TYPE_SYNC_DET, seg_sync if len(seg_sync) > 1 else seg_sync[0]
if seg_raw:
return DATA_TYPE_RAW, seg_raw if len(seg_raw) > 1 else seg_raw[0]
return DATA_TYPE_RAW, np.asarray([], dtype=float)
def get_file_time_with_milliseconds(filename):
"""Получает время файла с миллисекундами из имени или mtime."""
full_path = os.path.join(os.getcwd(), filename)
milliseconds = 0
parts = filename.split('_')
if len(parts) >= 4:
try:
last_part = parts[-1].split('.')[0]
if last_part.isdigit() and len(last_part) <= 3:
milliseconds = int(last_part)
except:
pass
file_mtime = os.path.getmtime(full_path)
file_time = datetime.fromtimestamp(file_mtime)
if milliseconds > 0:
file_time = file_time.replace(microsecond=milliseconds * 1000)
else:
now = datetime.now()
file_time = file_time.replace(microsecond=now.microsecond)
return file_time
# ================================================================================
# КЛАСС ПРИЛОЖЕНИЯ
# ================================================================================
class DataAnalyzerApp:
def __init__(self, root):
self.root = root
self.root.title("Radar Data Analyzer (Time Synchronized - Queue Based)")
self.root.geometry("1500x850")
self.data_dir = data_dir
os.makedirs(self.data_dir, exist_ok=True)
os.chdir(self.data_dir)
# Инициализируем с существующими файлами
existing_files = sorted([
f for f in os.listdir()
if f.lower().endswith(('.txt', '.txt1', '.txt2', '.csv'))
])
self.processed_files = set(existing_files)
if existing_files:
timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
print(f"[{timestamp}] Skipping {len(existing_files)} existing files\n")
# Инициализация состояния обработки
self.SizeFirst = STANDARD_RAW_SIZE
self.time_idx = np.arange(1, self.SizeFirst + 1)
self.k = 1
self.SUM = 0.0
self.B = None
self.meandr = None
self.signal = np.array([])
self.signalView = np.array([])
self.timeSignal = np.array([])
# История B_scan с привязкой к времени файла
self.B_scan_data = []
self.B_scan_times = []
self.B_scan_types = [] # RAW, SYNC_DET, FOURIER, GAP
self.last_file_time = None
# Интервал между файлами (в миллисекундах) - задаётся пользователем
self.file_interval_ms = DEFAULT_FILE_INTERVAL_MS
self.time_gap_threshold_ms = self.file_interval_ms * GAP_THRESHOLD_MULTIPLIER
# ✓ ИСПРАВЛЕНИЕ: Счётчик пропущенных кадров
self.gap_frames_count = 0
self.startPointTime = 0
self.timestart = time.perf_counter()
self.timestart2 = None
self.FshiftS = None
# Очередь для добавления колонок (без блокировок)
self.bscan_queue = queue.Queue()
# Для потоковой отрисовки
self.update_lock = threading.Lock()
self.pending_update = False
self.pending_original_size = None
self.pending_data_type = None
# Автопрокрутка по времени
self.auto_follow = tk.BooleanVar(value=True)
# Флаг для отключения автопрокрутки при ручном выборе
self.manual_range_selection = False
# Время опроса файлов (в миллисекундах)
self.file_poll_interval_ms = DEFAULT_FILE_POLL_INTERVAL_MS
# Параметры слайдеров для B_scan
self.bscan_row_min = 0
self.bscan_row_max = height - 1
self.bscan_col_min = 0
self.bscan_col_max = timePoint - 1
# Главный фрейм с двумя колонками
self.main_frame = ttk.Frame(self.root)
self.main_frame.pack(fill='both', expand=True, padx=3, pady=3)
# Левая колонка: Графики (65%)
self.left_frame = ttk.Frame(self.main_frame)
self.left_frame.pack(side='left', fill='both', expand=True, padx=(0, 3))
# Правая колонка: Настройки (35%)
self.right_frame = ttk.Frame(self.main_frame, width=210)
self.right_frame.pack(side='right', fill='both', padx=(3, 0))
self.right_frame.pack_propagate(False)
# Инициализация графиков и настроек
self.init_plots_panel()
self.init_settings_panel()
self.processed_count = 0
self.skipped_count = 0
# ============================================================================
# ИНИЦИАЛИЗАЦИЯ ИНТЕРФЕЙСА
# ============================================================================
def init_plots_panel(self):
"""Инициализация панели с графиками (слева)."""
title = ttk.Label(self.left_frame, text="Графики", font=('Arial', 11, 'bold'))
title.pack()
fig_frame = ttk.Frame(self.left_frame)
fig_frame.pack(fill='both', expand=True, padx=3, pady=3)
self.fig = plt.Figure(figsize=(9, 6.5), dpi=100)
gs = self.fig.add_gridspec(2, 2, hspace=0.4, wspace=0.35)
self.ax_raw = self.fig.add_subplot(gs[0, 0])
self.ax_processed = self.fig.add_subplot(gs[0, 1])
self.ax_fourier = self.fig.add_subplot(gs[1, 0])
self.ax_bscan = self.fig.add_subplot(gs[1, 1])
self.canvas = FigureCanvasTkAgg(self.fig, master=fig_frame)
self.canvas.draw()
self.canvas.get_tk_widget().pack(fill='both', expand=True)
self.draw_empty_plots()
def init_settings_panel(self):
"""Инициализация панели с настройками (справа)."""
title = ttk.Label(self.right_frame, text="⚙️ Настройки", font=('Arial', 10, 'bold'))
title.pack(pady=3, padx=3)
# Canvas с прокруткой
self.settings_canvas = tk.Canvas(self.right_frame, bg='white', highlightthickness=0)
scrollbar = ttk.Scrollbar(self.right_frame, orient='vertical', command=self.settings_canvas.yview)
scrollable_frame = ttk.Frame(self.settings_canvas)
scrollable_frame.bind(
"<Configure>",
lambda e: self.settings_canvas.configure(scrollregion=self.settings_canvas.bbox("all"))
)
self.settings_canvas.create_window((0, 0), window=scrollable_frame, anchor="nw")
self.settings_canvas.configure(yscrollcommand=scrollbar.set)
self.settings_canvas.pack(side='left', fill='both', expand=True)
scrollbar.pack(side='right', fill='y')
# Контроль интервала между файлами
interval_label = ttk.Label(scrollable_frame, text="📊 File Interval (ms):",
font=('Arial', 9, 'bold'), foreground='darkblue')
interval_label.pack(anchor='w', padx=3, pady=(5, 0))
interval_control_frame = ttk.Frame(scrollable_frame)
interval_control_frame.pack(fill='x', padx=3, pady=(0, 2))
self.interval_var = tk.IntVar(value=self.file_interval_ms)
interval_spinbox = ttk.Spinbox(interval_control_frame, from_=50, to=5000,
textvariable=self.interval_var,
width=6, command=self.on_file_interval_changed)
interval_spinbox.pack(side='left', padx=(0, 3))
self.interval_scale = ttk.Scale(interval_control_frame, from_=50, to=5000, orient='horizontal',
variable=self.interval_var,
command=self.on_interval_scale_changed)
self.interval_scale.pack(side='left', fill='x', expand=True, padx=(0, 3))
self.interval_label = ttk.Label(interval_control_frame, text=f"{self.file_interval_ms}ms",
font=('Arial', 8), foreground='darkblue', width=6,
relief='sunken', anchor='center')
self.interval_label.pack(side='left')
interval_hint = ttk.Label(scrollable_frame, text="Диапазон: 50-5000 мс",
font=('Arial', 7), foreground='gray')
interval_hint.pack(anchor='w', padx=3, pady=(0, 3))
# Контроль времени опроса файлов
poll_label = ttk.Label(scrollable_frame, text="⏱️ Poll Interval (ms):",
font=('Arial', 9, 'bold'), foreground='darkred')
poll_label.pack(anchor='w', padx=3, pady=(5, 0))
poll_control_frame = ttk.Frame(scrollable_frame)
poll_control_frame.pack(fill='x', padx=3, pady=(0, 2))
self.poll_interval_var = tk.IntVar(value=self.file_poll_interval_ms)
poll_spinbox = ttk.Spinbox(poll_control_frame, from_=10, to=5000,
textvariable=self.poll_interval_var,
width=6, command=self.on_poll_interval_changed)
poll_spinbox.pack(side='left', padx=(0, 3))
self.poll_scale = ttk.Scale(poll_control_frame, from_=10, to=5000, orient='horizontal',
variable=self.poll_interval_var,
command=self.on_poll_scale_changed)
self.poll_scale.pack(side='left', fill='x', expand=True, padx=(0, 3))
self.poll_label = ttk.Label(poll_control_frame, text=f"{self.file_poll_interval_ms}ms",
font=('Arial', 8), foreground='darkred', width=6,
relief='sunken', anchor='center')
self.poll_label.pack(side='left')
poll_hint = ttk.Label(scrollable_frame, text="Диапазон: 10-5000 мс",
font=('Arial', 7), foreground='gray')
poll_hint.pack(anchor='w', padx=3, pady=(0, 3))
ttk.Separator(scrollable_frame, orient='horizontal').pack(fill='x', padx=3, pady=3)
# Информация о кодировке кадров
legend_label = ttk.Label(scrollable_frame, text="B-Scan Legend:",
font=('Arial', 9, 'bold'), foreground='darkgreen')
legend_label.pack(anchor='w', padx=3, pady=3)
legend_text = ttk.Label(scrollable_frame,
text="■ Colors: DATA\n■ White: GAP (lost frame)\n■ Black: ZERO (missing)",
font=('Arial', 8), foreground='darkgreen', justify='left')
legend_text.pack(anchor='w', padx=3, pady=(0, 3))
ttk.Separator(scrollable_frame, orient='horizontal').pack(fill='x', padx=3, pady=3)
# Информация о типах данных и размерах
info_label = ttk.Label(scrollable_frame, text=f"B-scan types:",
font=('Arial', 9, 'bold'), foreground='darkblue')
info_label.pack(anchor='w', padx=3, pady=3)
info_text = ttk.Label(scrollable_frame,
text=f"RAW/SYNC_DET: {height}px\nFOURIER: {STANDARD_FOURIER_SIZE}px",
font=('Arial', 8), foreground='darkblue', justify='left')
info_text.pack(anchor='w', padx=3, pady=(0, 3))
ttk.Separator(scrollable_frame, orient='horizontal').pack(fill='x', padx=3, pady=2)
# Row Min/Max для выбора диапазона отображения
row_min_label = ttk.Label(scrollable_frame, text="Row Min (display range):", font=('Arial', 8))
row_min_label.pack(anchor='w', padx=3, pady=(3, 0))
self.slider_row_min_var = tk.IntVar(value=0)
self.scale_row_min = ttk.Scale(scrollable_frame, from_=0, to=STANDARD_FOURIER_SIZE - 1,
orient='horizontal', variable=self.slider_row_min_var,
command=self.on_slider_changed)
self.scale_row_min.pack(fill='x', expand=True, padx=3, pady=(0, 1))
self.label_row_min = ttk.Label(scrollable_frame, text="0", font=('Arial', 8), foreground='blue')
self.label_row_min.pack(anchor='e', padx=3, pady=(0, 3))
# Row Max
row_max_label = ttk.Label(scrollable_frame, text="Row Max (display range):", font=('Arial', 8))
row_max_label.pack(anchor='w', padx=3, pady=(3, 0))
self.slider_row_max_var = tk.IntVar(value=STANDARD_FOURIER_SIZE - 1)
self.scale_row_max = ttk.Scale(scrollable_frame, from_=0, to=STANDARD_FOURIER_SIZE - 1,
orient='horizontal', variable=self.slider_row_max_var,
command=self.on_slider_changed)
self.scale_row_max.pack(fill='x', expand=True, padx=3, pady=(0, 1))
self.label_row_max = ttk.Label(scrollable_frame, text=str(STANDARD_FOURIER_SIZE - 1), font=('Arial', 8),
foreground='blue')
self.label_row_max.pack(anchor='e', padx=3, pady=(0, 3))
# Col Min
col_min_label = ttk.Label(scrollable_frame, text="Col Min:", font=('Arial', 8))
col_min_label.pack(anchor='w', padx=3, pady=(3, 0))
self.slider_col_min_var = tk.IntVar(value=0)
self.scale_col_min = ttk.Scale(scrollable_frame, from_=0, to=1000,
orient='horizontal', variable=self.slider_col_min_var,
command=self.on_col_slider_changed)
self.scale_col_min.pack(fill='x', expand=True, padx=3, pady=(0, 1))
self.label_col_min = ttk.Label(scrollable_frame, text="0", font=('Arial', 8), foreground='blue')
self.label_col_min.pack(anchor='e', padx=3, pady=(0, 3))
# Col Max
col_max_label = ttk.Label(scrollable_frame, text="Col Max:", font=('Arial', 8))
col_max_label.pack(anchor='w', padx=3, pady=(3, 0))
self.slider_col_max_var = tk.IntVar(value=timePoint - 1)
self.scale_col_max = ttk.Scale(scrollable_frame, from_=0, to=1000,
orient='horizontal', variable=self.slider_col_max_var,
command=self.on_col_slider_changed)
self.scale_col_max.pack(fill='x', expand=True, padx=3, pady=(0, 1))
self.label_col_max = ttk.Label(scrollable_frame, text=str(timePoint - 1), font=('Arial', 8), foreground='blue')
self.label_col_max.pack(anchor='e', padx=3, pady=(0, 3))
# Разделитель
ttk.Separator(scrollable_frame, orient='horizontal').pack(fill='x', padx=3, pady=3)
# Кнопка автопрокрутки
follow_check = tk.Checkbutton(scrollable_frame, text="📌 Автопрокрутка",
variable=self.auto_follow, font=('Arial', 8),
bg='white', activebackground='white', activeforeground='black',
command=self.on_auto_follow_changed)
follow_check.pack(anchor='w', padx=3, pady=2)
# Кнопки
button_frame = ttk.Frame(scrollable_frame)
button_frame.pack(fill='x', padx=3, pady=2)
reset_btn = ttk.Button(button_frame, text='🔄 Сброс', command=self.on_reset_clicked)
reset_btn.pack(fill='x', pady=1)
export_btn = ttk.Button(button_frame, text='💾 Экспорт', command=self.on_export_clicked)
export_btn.pack(fill='x', pady=1)
# Разделитель
ttk.Separator(scrollable_frame, orient='horizontal').pack(fill='x', padx=3, pady=3)
# Статус
status_label = ttk.Label(scrollable_frame, text="Статус:", font=('Arial', 9, 'bold'))
status_label.pack(anchor='w', padx=3)
self.status_text = ttk.Label(scrollable_frame, text='Ожидание...',
font=('Arial', 7, 'italic'), foreground='blue',
wraplength=160, justify='left')
self.status_text.pack(anchor='w', padx=3, pady=2)
# Разделитель
ttk.Separator(scrollable_frame, orient='horizontal').pack(fill='x', padx=3, pady=3)
# Статистика
stats_label = ttk.Label(scrollable_frame, text="Статистика:", font=('Arial', 9, 'bold'))
stats_label.pack(anchor='w', padx=3)
self.skipped_text = ttk.Label(scrollable_frame, text='Пропущено: 0',
font=('Arial', 8), foreground='orange')
self.skipped_text.pack(anchor='w', padx=3, pady=1)
self.processed_text = ttk.Label(scrollable_frame, text='Обработано: 0',
font=('Arial', 8), foreground='green')
self.processed_text.pack(anchor='w', padx=3, pady=1)
# ✓ ИСПРАВЛЕНИЕ: Счётчик пропущенных кадров
self.gap_frames_text = ttk.Label(scrollable_frame, text='Пропущено кадров: 0',
font=('Arial', 8), foreground='red')
self.gap_frames_text.pack(anchor='w', padx=3, pady=1)
self.bscan_text = ttk.Label(scrollable_frame, text='B-scan: 0 columns',
font=('Arial', 8), foreground='purple')
self.bscan_text.pack(anchor='w', padx=3, pady=1)
self.time_text = ttk.Label(scrollable_frame, text='Время: --:--:--',
font=('Arial', 8), foreground='green')
self.time_text.pack(anchor='w', padx=3, pady=1)
def draw_empty_plots(self):
"""Рисует пустые графики с подписями."""
self.ax_raw.cla()
self.ax_raw.plot([], [])
self.ax_raw.set_xlabel('Такты', fontsize=9)
self.ax_raw.set_ylabel('Сигнал', fontsize=9)
self.ax_raw.set_title('Данные с АЦП', fontsize=10)
self.ax_raw.grid(True, alpha=0.3)
self.ax_raw.tick_params(labelsize=8)
self.ax_processed.cla()
self.ax_processed.plot([], [])
self.ax_processed.set_xlabel('Частота', fontsize=9)
self.ax_processed.set_ylabel('Сигнал', fontsize=9)
self.ax_processed.set_title('Обработанные данные', fontsize=10)
self.ax_processed.grid(True, alpha=0.3)
self.ax_processed.tick_params(labelsize=8)
self.ax_fourier.cla()
self.ax_fourier.plot([], [])
self.ax_fourier.set_title('Фурье образ', fontsize=10)
self.ax_fourier.grid(True, alpha=0.3)
self.ax_fourier.tick_params(labelsize=8)
self.ax_bscan.cla()
self.ax_bscan.set_title('B-Scan (динамический)', fontsize=10)
self.ax_bscan.set_xlabel('Время →', fontsize=9)
self.ax_bscan.set_ylabel('Частота/Дистанция', fontsize=9)
self.ax_bscan.tick_params(labelsize=8)
self.canvas.draw()
# ============================================================================
# CALLBACKS ИНТЕРФЕЙСА
# ============================================================================
def on_file_interval_changed(self):
"""Callback при изменении интервала между файлами через Spinbox."""
try:
new_interval = self.interval_var.get()
if 50 <= new_interval <= 5000:
self.file_interval_ms = new_interval
self.time_gap_threshold_ms = new_interval * GAP_THRESHOLD_MULTIPLIER
self.interval_label.config(text=f"{new_interval}ms")
self.interval_scale.set(new_interval)
timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
print(
f"[{timestamp}] 📊 File interval changed to {new_interval}ms (gap threshold: {self.time_gap_threshold_ms:.0f}ms)")
else:
self.interval_var.set(self.file_interval_ms)
except:
self.interval_var.set(self.file_interval_ms)
def on_interval_scale_changed(self, val):
"""Callback при изменении интервала между файлами через Scale."""
try:
new_interval = int(float(val))
if 50 <= new_interval <= 5000:
self.file_interval_ms = new_interval
self.time_gap_threshold_ms = new_interval * GAP_THRESHOLD_MULTIPLIER
self.interval_var.set(new_interval)
self.interval_label.config(text=f"{new_interval}ms")
timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
print(
f"[{timestamp}] 📊 File interval changed to {new_interval}ms (via slider, gap threshold: {self.time_gap_threshold_ms:.0f}ms)")
else:
self.interval_scale.set(self.file_interval_ms)
except:
self.interval_scale.set(self.file_interval_ms)
def on_poll_interval_changed(self):
"""Callback при изменении времени опроса файлов через Spinbox."""
try:
new_interval = self.poll_interval_var.get()
if 10 <= new_interval <= 5000:
self.file_poll_interval_ms = new_interval
self.poll_label.config(text=f"{new_interval}ms")
self.poll_scale.set(new_interval)
timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
print(f"[{timestamp}] ⏱️ Poll interval changed to {new_interval}ms")
else:
self.poll_interval_var.set(self.file_poll_interval_ms)
except:
self.poll_interval_var.set(self.file_poll_interval_ms)
def on_poll_scale_changed(self, val):
"""Callback при изменении времени опроса файлов через Scale."""
try:
new_interval = int(float(val))
if 10 <= new_interval <= 5000:
self.file_poll_interval_ms = new_interval
self.poll_interval_var.set(new_interval)
self.poll_label.config(text=f"{new_interval}ms")
timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
print(f"[{timestamp}] ⏱️ Poll interval changed to {new_interval}ms (via slider)")
else:
self.poll_scale.set(self.file_poll_interval_ms)
except:
self.poll_scale.set(self.file_poll_interval_ms)
def on_slider_changed(self, val=None):
"""Callback при изменении слайдеров Row Min/Max."""
self.bscan_row_min = self.slider_row_min_var.get()
self.bscan_row_max = self.slider_row_max_var.get()
self.label_row_min.config(text=str(self.bscan_row_min))
self.label_row_max.config(text=str(self.bscan_row_max))
self.redraw_bscan_immediate()
def on_col_slider_changed(self, val=None):
"""Callback для Col Min/Max - обновляет B_scan и перерисовывает."""
self.bscan_col_min = self.slider_col_min_var.get()
self.bscan_col_max = self.slider_col_max_var.get()
self.label_col_min.config(text=str(self.bscan_col_min))
self.label_col_max.config(text=str(self.bscan_col_max))
# Отключаем автопрокрутку при ручном выборе
self.manual_range_selection = True
self.auto_follow.set(False)
# СРАЗУ перерисовываем B_scan
self.redraw_bscan_immediate()
def on_auto_follow_changed(self):
"""Callback при изменении автопрокрутки."""
if self.auto_follow.get():
self.manual_range_selection = False
def on_reset_clicked(self):
"""Callback при нажатии кнопки Reset."""
self.slider_row_min_var.set(0)
self.slider_row_max_var.set(STANDARD_FOURIER_SIZE - 1)
self.slider_col_min_var.set(0)
self.slider_col_max_var.set(min(timePoint - 1, max(0, len(self.B_scan_data) - 1)))
self.manual_range_selection = False
self.auto_follow.set(True)
def on_export_clicked(self):
"""Callback для экспорта графиков."""
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S_%f")[:-3]
filename = f"export_{timestamp}.png"
self.fig.savefig(filename, dpi=150, bbox_inches='tight')
print(f"✓ Экспортировано: {filename}")
def redraw_bscan_immediate(self):
"""Немедленная перерисовка B_scan при изменении ползунков."""
if len(self.B_scan_data) == 0:
return
total_cols = len(self.B_scan_data)
col_min = min(self.bscan_col_min, total_cols - 1)
col_max = min(self.bscan_col_max + 1, total_cols)
# Проверяем корректность диапазона
if col_min >= col_max:
return
row_min = self.bscan_row_min
row_max = self.bscan_row_max
# Собираем данные для отображения
display_cols = []
display_types = [] # Тип каждого столбца
for i in range(col_min, col_max):
col_data = self.B_scan_data[i]
col_type = self.B_scan_types[i]
# Берём диапазон по высоте
if len(col_data) > row_min:
row_end = min(row_max + 1, len(col_data))
row_start = min(row_min, len(col_data) - 1)
if row_end > row_start:
display_cols.append(col_data[row_start:row_end])
display_types.append(col_type)
display_times = self.B_scan_times[col_min:col_max]
# Перерисовываем B_scan
self.ax_bscan.cla()
if display_cols:
# Находим максимальную длину для создания матрицы
max_len = max(len(col) for col in display_cols)
# Создаём матрицу, заполняя нулями если необходимо
display_data = np.zeros((max_len, len(display_cols)))
for i, col in enumerate(display_cols):
display_data[:len(col), i] = col
# Проверяем столбцы на GAP
gap_mask = np.array([col_type == "GAP" for col_type in display_types])
# Применяем маску: GAP столбцы становятся белыми (все = max_val * 1.5)
max_val = display_data.max()
if max_val > 0:
for i, is_gap in enumerate(gap_mask):
if is_gap:
display_data[:, i] = max_val * 1.5 # Белый (выше максимума)
im = self.ax_bscan.imshow(display_data, origin='lower', aspect='auto', cmap='viridis')
time_labels = [t.strftime("%H:%M:%S.%f")[:-3] if t else "?" for t in display_times]
if len(time_labels) > 1:
step = max(1, len(time_labels) // 10)
tick_positions = np.arange(0, len(time_labels), step)
tick_labels_subset = [time_labels[i] if i < len(time_labels) else "" for i in tick_positions]
self.ax_bscan.set_xticks(tick_positions)
self.ax_bscan.set_xticklabels(tick_labels_subset, rotation=45, ha='right', fontsize=7)
else:
self.ax_bscan.set_xticks([])
self.ax_bscan.set_title(f'B-Scan (Cols {col_min}-{col_max - 1})', fontsize=10)
self.ax_bscan.set_xlabel('Время →', fontsize=9)
self.ax_bscan.set_ylabel(f'Диапазон [{row_min}-{row_max}]', fontsize=9)
self.ax_bscan.tick_params(labelsize=8)
# Сразу рисуем
self.canvas.draw()
# ============================================================================
# ОБРАБОТКА ДАННЫХ
# ============================================================================
def process_raw_data(self, A, original_size):
"""Обработка сырых данных."""
A_1d = A[:, 0] if A.ndim > 1 else A
A_resized = resize_1d_interpolate(A_1d, STANDARD_RAW_SIZE)
if self.SizeFirst != STANDARD_RAW_SIZE:
self.SizeFirst = STANDARD_RAW_SIZE
self.time_idx = np.arange(1, self.SizeFirst + 1)
self.B = None
self.meandr = None
self.k = 1
self.SUM = 0.0
if self.k < PeriodIntegrate:
if isinstance(self.SUM, float) and self.SUM == 0.0:
self.SUM = A_resized.astype(float)
else:
self.SUM = self.SUM + A_resized
if self.k == 1:
self.B = np.zeros((self.SizeFirst, PeriodIntegrate + 2))
self.B[:, 0] = A_resized
self.meandr = square(self.time_idx * np.pi)
self.timestart2 = time.perf_counter()
else:
self.B[:, self.k] = A_resized
self.k += 1
return False, None
else:
last_col = PeriodIntegrate + 1
self.B[:, last_col] = A_resized
self.B = np.roll(self.B, 1, axis=1)
if PeriodIntegrate > 1:
meanB = np.sum(self.B[:, 1:PeriodIntegrate], axis=1) / PeriodIntegrate
else:
meanB = self.B[:, 0]
mwanBmeandr = meanB * self.meandr
signal_list = []
signalView_list = []
timeSignal_list = []
start = 0
numberOfFreqChangeStart = 0
for numberOfFreqChange in range(self.SizeFirst):
if (numberOfFreqChange - start) > pontInOneFqChange:
segment = mwanBmeandr[numberOfFreqChangeStart:numberOfFreqChange]
if segment.size > 0:
signal_list.append(np.sum(segment))
signalView_list.append(np.mean(segment))
timeSignal_list.append(numberOfFreqChange)
start = numberOfFreqChange
numberOfFreqChangeStart = numberOfFreqChange
self.signal = np.array(signal_list, dtype=float)
self.signalView = np.array(signalView_list, dtype=float)
self.timeSignal = np.array(timeSignal_list, dtype=int)
self.compute_fft()
if self.FshiftS is not None:
center = len(self.FshiftS) // 2
src_start = center + FFT0_delta
src_end = src_start + height
if src_end <= len(self.FshiftS):
bscan_col = self.FshiftS[src_start:src_end].copy()
return True, bscan_col
return True, None
def process_sync_det_data(self, A, original_size):
"""Обработка данных синхронного детектирования."""
A_1d = A[:, 0] if A.ndim > 1 else A
A_resized = resize_1d_interpolate(A_1d, STANDARD_SYNC_SIZE)
self.signal = A_resized
self.signalView = self.signal * 0.1
self.timeSignal = np.arange(len(self.signal))
self.compute_fft()
if self.FshiftS is not None:
center = len(self.FshiftS) // 2
src_start = center + FFT0_delta
src_end = src_start + height
if src_end <= len(self.FshiftS):
bscan_col = self.FshiftS[src_start:src_end].copy()
return True, bscan_col
return True, None
def process_fourier_data(self, A, original_size):
"""Обработка FOURIER без интерполяции. Поддерживает несколько сегментов."""
columns_to_add = []
# A может быть: list[np.ndarray] (из HEX) или numpy.ndarray
if isinstance(A, list):
for seg in A:
col = np.asarray(seg, dtype=float)
columns_to_add.append(col)
return True, columns_to_add
if A.ndim == 1:
columns_to_add.append(A.astype(float))
return True, columns_to_add
# Если A двумерный: считаем колонками столбцы или строки — выбираем более длинное измерение как длину спектра
if A.ndim == 2:
rows, cols = A.shape
if rows >= cols:
for i in range(cols):
columns_to_add.append(A[:, i].astype(float))
else:
for i in range(rows):
columns_to_add.append(A[i, :].astype(float))
return True, columns_to_add
return True, columns_to_add
def add_bscan_column(self, data_col, current_time, data_type):
"""Добавить колонку в B-скан (может быть разного размера)."""
if self.last_file_time is None:
self.B_scan_data.append(data_col)
self.B_scan_times.append(current_time)
self.B_scan_types.append(data_type)
self.last_file_time = current_time
return
# Используем переменный интервал между файлами
time_diff_ms = (current_time - self.last_file_time).total_seconds() * 1000
if time_diff_ms > self.time_gap_threshold_ms:
missing_count = int(round(time_diff_ms / self.file_interval_ms)) - 1
timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
print(
f"[{timestamp}] ⚠️ Gap detected: {time_diff_ms:.1f}ms (missing ~{missing_count} columns, threshold: {self.time_gap_threshold_ms:.0f}ms)")
# ✓ ИСПРАВЛЕНИЕ: Увеличиваем счётчик пропущенных кадров
self.gap_frames_count += missing_count
last_size = len(self.B_scan_data[-1]) if self.B_scan_data else height
for i in range(missing_count):
zero_col = np.zeros(last_size)
self.B_scan_data.append(zero_col)
gap_time = self.last_file_time + timedelta(milliseconds=self.file_interval_ms * (i + 1))
self.B_scan_times.append(gap_time)
# Отмечаем как GAP (пропущенный кадр)
self.B_scan_types.append("GAP")
self.B_scan_data.append(data_col)
self.B_scan_times.append(current_time)
self.B_scan_types.append(data_type)
self.last_file_time = current_time
def compute_fft(self):
"""Вычисляем FFT спектр."""
if self.signal.size > 0:
sig_cut = self.signal[:FQend] if len(self.signal) >= FQend else self.signal
sig_cut = np.sqrt(np.abs(sig_cut))
F = np.fft.fft(sig_cut)
Fshift = np.abs(np.fft.fftshift(F))
center = len(sig_cut) // 2
if center < len(Fshift):
Fshift[max(center - 0, 0):min(center + 1, len(Fshift))] = 0
self.FshiftS = gaussian_filter1d(Fshift, 5)
# ============================================================================
# ОБНОВЛЕНИЕ GUI
# ============================================================================
def schedule_update(self, original_size, data_type):
"""Отложить обновление GUI."""
with self.update_lock:
self.pending_original_size = original_size
self.pending_data_type = data_type
self.pending_update = True
def do_pending_update(self):
"""Обновляем GUI."""
with self.update_lock:
if not self.pending_update:
return
original_size = self.pending_original_size
data_type = self.pending_data_type
self.pending_update = False
# Извлекаем все элементы из очереди
while not self.bscan_queue.empty():
try:
bscan_col, file_time, data_type_col = self.bscan_queue.get_nowait()
self.add_bscan_column(bscan_col, file_time, data_type_col)
except queue.Empty:
break
# Обновление графиков
self.ax_raw.cla()
if self.B is not None and self.meandr is not None:
self.ax_raw.plot(self.time_idx, np.abs(self.B[:, 0] if self.B.ndim > 1 else self.B),
label='B', linewidth=0.5, alpha=0.7)
if self.timeSignal.size > 0:
self.ax_raw.plot(self.timeSignal, np.abs(self.signalView), linewidth=0.5)
self.ax_raw.set_xlabel('Такты', fontsize=9)
self.ax_raw.set_ylabel('Сигнал', fontsize=9)
self.ax_raw.set_title('Данные с АЦП', fontsize=10)
self.ax_raw.grid(True, alpha=0.3)
self.ax_raw.tick_params(labelsize=8)
self.ax_processed.cla()
if self.signal.size > 0:
ssS = self.signal.size
perpointFq = 10.67 / ssS
XSignal = 3 + (np.arange(1, ssS + 1) * perpointFq)
self.ax_processed.plot(XSignal, np.abs(self.signal), linewidth=1)
self.ax_processed.set_xlabel('Частота', fontsize=9)
self.ax_processed.set_ylabel('Сигнал', fontsize=9)
self.ax_processed.set_title('Обработанные данные', fontsize=10)
self.ax_processed.grid(True, alpha=0.3)
self.ax_processed.tick_params(labelsize=8)
self.ax_fourier.cla()
if self.FshiftS is not None:
self.ax_fourier.plot(self.FshiftS, linewidth=1)
self.ax_fourier.set_xlim([0, min(FQend, len(self.FshiftS))])
self.ax_fourier.set_title('Фурье образ', fontsize=10)
self.ax_fourier.grid(True, alpha=0.3)
self.ax_fourier.tick_params(labelsize=8)
# Отрисовываем B_scan с автопрокруткой
if len(self.B_scan_data) > 0:
total_cols = len(self.B_scan_data)
if self.auto_follow.get() and not self.manual_range_selection:
self.bscan_col_min = max(0, total_cols - timePoint)
self.bscan_col_max = total_cols - 1
self.slider_col_min_var.set(self.bscan_col_min)
self.slider_col_max_var.set(self.bscan_col_max)
self.redraw_bscan_immediate()
total_cols = len(self.B_scan_data)
self.bscan_text.config(text=f'B-scan: {total_cols} columns')
# Обновляем статус
status_msg = f"Тип: {data_type}\nИсх: {original_size}"
self.status_text.config(text=status_msg)
self.skipped_text.config(text=f'Пропущено: {self.skipped_count}')
self.processed_text.config(text=f'Обработано: {self.processed_count}')
# ✓ ИСПРАВЛЕНИЕ: Обновляем счётчик пропущенных кадров
self.gap_frames_text.config(text=f'Пропущено кадров: {self.gap_frames_count}')
if self.B_scan_times:
last_time = self.B_scan_times[-1].strftime("%H:%M:%S.%f")[:-3]
self.time_text.config(text=f'Время: {last_time}')
# Обновляем диапазоны слайдеров
if len(self.B_scan_data) > 0:
self.scale_col_max.config(to=len(self.B_scan_data) - 1)
if not self.auto_follow.get():
self.slider_col_max_var.set(min(self.slider_col_max_var.get(), len(self.B_scan_data) - 1))
# Принудительный draw()
self.canvas.draw()
# ============================================================================
# ОСНОВНОЙ ЦИКЛ ОБРАБОТКИ ФАЙЛОВ
# ============================================================================
def process_file_thread(self, fname, data_type, A, original_size):
"""Обработка файла в отдельном потоке."""
try:
file_time = get_file_time_with_milliseconds(fname)
bscan_col = None
add_to_bscan = False
if data_type == DATA_TYPE_RAW:
# Может прийти список сегментов (HEX с FE)
if isinstance(A, list):
for i, seg in enumerate(A):
add_to_bscan, bscan_col = self.process_raw_data(np.asarray(seg), len(seg))
if add_to_bscan and bscan_col is not None:
col_time = file_time + timedelta(milliseconds=i * 10)
self.bscan_queue.put((bscan_col, col_time, DATA_TYPE_RAW))
add_to_bscan, bscan_col = False, None
else:
add_to_bscan, bscan_col = self.process_raw_data(A, original_size)
elif data_type == DATA_TYPE_SYNC_DET:
if isinstance(A, list):
for i, seg in enumerate(A):
add_to_bscan, bscan_col = self.process_sync_det_data(np.asarray(seg), len(seg))
if add_to_bscan and bscan_col is not None:
col_time = file_time + timedelta(milliseconds=i * 10)
self.bscan_queue.put((bscan_col, col_time, DATA_TYPE_SYNC_DET))
add_to_bscan, bscan_col = False, None
else:
add_to_bscan, bscan_col = self.process_sync_det_data(A, original_size)
elif data_type == DATA_TYPE_FOURIER:
add_to_bscan, columns = self.process_fourier_data(A, original_size)
if add_to_bscan and columns:
for i, col in enumerate(columns):
col_time = file_time + timedelta(milliseconds=i * 10)
self.bscan_queue.put((col, col_time, DATA_TYPE_FOURIER))
bscan_col = None
if add_to_bscan and bscan_col is not None and data_type != DATA_TYPE_FOURIER:
self.bscan_queue.put((bscan_col, file_time, data_type))
self.schedule_update(original_size, data_type)
self.processed_count += 1
timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
file_time_str = file_time.strftime("%H:%M:%S.%f")[:-3]
print(f"[{timestamp}] ✓ {fname} ({data_type}) [FileTime: {file_time_str}]")
except Exception as e:
timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
print(f"[{timestamp}] ✗ Error: {str(e)[:50]}")
def run(self):
"""Основной цикл."""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
print("=" * 80)
print(f" Radar Data Analyzer - Started: {timestamp}")
print(f" Waiting for .txt files in: {self.data_dir}")
print(f" RAW/SYNC_DET A-scan height: {height} pixels")
print(f" FOURIER A-scan size: {STANDARD_FOURIER_SIZE} pixels (N/2)")
print(f" B-scan: Time-synchronized (milliseconds), display range selectable")
print(f" File interval: {self.file_interval_ms}ms (user adjustable)")
print(f" Gap threshold: {self.time_gap_threshold_ms:.0f}ms ({GAP_THRESHOLD_MULTIPLIER}x interval)")
print(f" Poll interval: {self.file_poll_interval_ms}ms (user adjustable)")
print(f" FOURIER_MODE: {FOURIER_MODE}")
print(f" B-Scan display: GAP (lost frames) shown as WHITE")
print("=" * 80 + "\n")
self.process_files()
def process_files(self):
"""Обработка файлов в цикле."""
files = sorted([f for f in os.listdir() if f.endswith('.csv') or
f.endswith('.txt1') or f.endswith('.txt2') or f.endswith('.csv')])
new_files = [f for f in files if f not in self.processed_files]
print("new files:", new_files, files)
for fname in new_files:
time_start = time.perf_counter()
try:
data_type, A = load_data_with_type(fname)
# Поддержка списка сегментов (HEX с FE)
if isinstance(A, list):
original_size = len(A[0]) if len(A) > 0 else 0
elif isinstance(A, np.ndarray):
original_size = A.shape[0]
else:
original_size = 0
# Если после парсинга данных нет — пропускаем файл
if (isinstance(A, list) and len(A) == 0) or (isinstance(A, np.ndarray) and A.size == 0):
timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
print(f"[{timestamp}] ⏭️ SKIP {fname} (no data parsed)")
self.skipped_count += 1
self.processed_files.add(fname)
continue
elapsed_time_ms = (time.perf_counter() - time_start) * 1000
if elapsed_time_ms > MAX_PROCESSING_TIME_MS:
timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
print(f"[{timestamp}] ⏭️ SKIP {fname} (load time: {elapsed_time_ms:.1f}ms)")
self.skipped_count += 1
else:
thread = threading.Thread(
target=self.process_file_thread,
args=(fname, data_type, A, original_size),
daemon=True
)
thread.start()
self.processed_files.add(fname)
except Exception as e:
timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
print(f"[{timestamp}] ✗ Load error: {str(e)[:50]}")
self.processed_files.add(fname)
self.do_pending_update()
# Используем переменное время опроса
self.root.after(self.file_poll_interval_ms, self.process_files)
# ================================================================================
# ТОЧКА ВХОДА
# ================================================================================
if __name__ == "__main__":
root = tk.Tk()
app = DataAnalyzerApp(root)
app.run()
try:
root.mainloop()
except KeyboardInterrupt:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
print("\n" + "=" * 80)
print(f" Program stopped by user: {timestamp}")
print("=" * 80)