From ed84ebdfe89d2a92caca246642259682f000d275 Mon Sep 17 00:00:00 2001 From: awe Date: Tue, 2 Dec 2025 16:45:14 +0300 Subject: [PATCH] upd path --- .gitignore | 2 + main.py | 440 ++++++++++++++++++++++++++--------------------------- 2 files changed, 222 insertions(+), 220 deletions(-) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0e5ac79 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +.venv +__pycache__ \ No newline at end of file diff --git a/main.py b/main.py index 82a3cbb..c05d740 100755 --- a/main.py +++ b/main.py @@ -17,7 +17,7 @@ import queue # ПАРАМЕТРЫ И КОНСТАНТЫ # ================================================================================ #data_dir = r"D:\data" -data_dir = "./data" +data_dir = "/home/awe/Documents/E502_ADC_BF_PC_companion/tmp" PeriodIntegrate = 2 pontInOneFqChange = 86 @@ -40,10 +40,10 @@ STANDARD_FOURIER_COLS = 100 MAX_PROCESSING_TIME_MS = 250 # ПЕРЕЧИСЛЕНИЕ ТИПОВ ДАННЫХ -DATA_TYPE_RAW = "RAW" -DATA_TYPE_SYNC_DET = "SYNC_DET" -DATA_TYPE_FOURIER = "FOURIER" -DATA_TYPE_HEX = "HEX" +DATA_TYPE_RAW = "RAW" +DATA_TYPE_SYNC_DET = "SYNC_DET" +DATA_TYPE_FOURIER = "FOURIER" +DATA_TYPE_HEX = "HEX" # Режим обработки FOURIER файлов FOURIER_MODE = 'collapse_mean' @@ -63,23 +63,23 @@ DEFAULT_FILE_POLL_INTERVAL_MS = 100 # 100 мс # ВСПОМОГАТЕЛЬНЫЕ ФУНКЦИИ # ================================================================================ -def detect_data_type(first_line): - """Определяет тип данных по первой строке файла. - - Логика: если первая строка начинается с ключевого слова RAW/SYNC_DET/FOURIER/FFT, - считаем соответствующий тип. Иначе — HEX. - """ - try: - up = first_line.strip().upper() - if up.startswith('RAW'): - return DATA_TYPE_RAW - if up.startswith('SYNC_DET') or up.startswith('SYNC DET'): - return DATA_TYPE_SYNC_DET - if up.startswith('FOURIER') or up.startswith('FFT'): - return DATA_TYPE_FOURIER - return DATA_TYPE_HEX - except Exception: - return DATA_TYPE_HEX +def detect_data_type(first_line): + """Определяет тип данных по первой строке файла. + + Логика: если первая строка начинается с ключевого слова RAW/SYNC_DET/FOURIER/FFT, + считаем соответствующий тип. Иначе — HEX. + """ + try: + up = first_line.strip().upper() + if up.startswith('RAW'): + return DATA_TYPE_RAW + if up.startswith('SYNC_DET') or up.startswith('SYNC DET'): + return DATA_TYPE_SYNC_DET + if up.startswith('FOURIER') or up.startswith('FFT'): + return DATA_TYPE_FOURIER + return DATA_TYPE_HEX + except Exception: + return DATA_TYPE_HEX def resize_1d_interpolate(data, target_size): @@ -114,128 +114,128 @@ def resize_2d_interpolate(data, target_rows, target_cols): return data_resampled -def load_data_with_type(filename): - """Загружает данные и определяет их тип по первой строке.""" - with open(filename, 'r') as f: - first_line = f.readline() - - detected_type = detect_data_type(first_line) - - if detected_type != DATA_TYPE_HEX: - try: - data = np.loadtxt(filename, skiprows=1) - except: - data = np.loadtxt(filename) - return detected_type, data - - # HEX формат: строки вида 0xAABBBBBB, где AA — тип, BBBBBB — int24_t - return parse_hex_file(filename) - - -def parse_hex_file(filename): - """Парсит HEX формат с разделением по FE и мапит к RAW/SYNC_DET/FOURIER. - - Возвращает (data_type, data), где data может быть: - - numpy.ndarray (1D) для одного сегмента - - list[numpy.ndarray] для нескольких сегментов (используется для FOURIER, а также RAW/SYNC_DET) - """ - - def to_int24(v): - x = int(v, 16) - if x & 0x800000: - x -= 0x1000000 - return float(x) - - # Текущий накапливаемый сегмент - cur = {"D0": [], "F0": [], "F1": [], "F2": [], "F3": [], "F4": []} - # Списки сегментов по типам данных - seg_raw = [] - seg_sync = [] - seg_fourier = [] - - def finalize_segment(): - nonlocal cur - # Приоритет выбора, что считать сегментом - if cur["F4"]: - seg_fourier.append(np.asarray(cur["F4"], dtype=float)) - elif cur["F3"]: - arr = np.asarray(cur["F3"], dtype=float) - seg_fourier.append(np.sqrt(np.maximum(0.0, arr))) - elif cur["F1"] and cur["F2"] and len(cur["F1"]) == len(cur["F2"]): - re = np.asarray(cur["F1"], dtype=float) - im = np.asarray(cur["F2"], dtype=float) - seg_fourier.append(np.sqrt(re * re + im * im)) - elif cur["F0"]: - seg_sync.append(np.asarray(cur["F0"], dtype=float)) - elif cur["D0"]: - seg_raw.append(np.asarray(cur["D0"], dtype=float)) - # Сброс - cur = {"D0": [], "F0": [], "F1": [], "F2": [], "F3": [], "F4": []} - - with open(filename, 'r') as f: - for line in f: - s = line.strip() - if not s: - continue - # Требование: учитывать только строки, начинающиеся с 0x/0X - if not (s.startswith('0x') or s.startswith('0X')): - continue - h = s[2:] - h = ''.join(ch for ch in h if ch in '0123456789abcdefABCDEF') - if len(h) < 2: - continue - t_byte = h[:2].upper() - - # FE — завершить текущий сегмент - if t_byte == 'FE': - finalize_segment() - continue - - # E0..E9 — игнор - if t_byte.startswith('E') and len(t_byte) == 2 and t_byte[1] in '0123456789': - continue - - # 00 — цифровые биты, пока пропускаем - if t_byte == '00': - continue - - if len(h) < 8: - continue - # Значение 24 бита - val_hex = h[2:8] - try: - value = to_int24(val_hex) - except Exception: - continue - - if t_byte == 'D0': - cur['D0'].append(value) - elif t_byte == 'F0': - cur['F0'].append(value) - elif t_byte == 'F1': - cur['F1'].append(value) - elif t_byte == 'F2': - cur['F2'].append(value) - elif t_byte == 'F3': - cur['F3'].append(value) - elif t_byte == 'F4': - cur['F4'].append(value) - else: - # Неизвестные — пропускаем - continue - - # Финализируем хвост - finalize_segment() - - if seg_fourier: - return DATA_TYPE_FOURIER, seg_fourier - if seg_sync: - # Если несколько, вернём список сегментов - return DATA_TYPE_SYNC_DET, seg_sync if len(seg_sync) > 1 else seg_sync[0] - if seg_raw: - return DATA_TYPE_RAW, seg_raw if len(seg_raw) > 1 else seg_raw[0] - - return DATA_TYPE_RAW, np.asarray([], dtype=float) +def load_data_with_type(filename): + """Загружает данные и определяет их тип по первой строке.""" + with open(filename, 'r') as f: + first_line = f.readline() + + detected_type = detect_data_type(first_line) + + if detected_type != DATA_TYPE_HEX: + try: + data = np.loadtxt(filename, skiprows=1) + except: + data = np.loadtxt(filename) + return detected_type, data + + # HEX формат: строки вида 0xAABBBBBB, где AA — тип, BBBBBB — int24_t + return parse_hex_file(filename) + + +def parse_hex_file(filename): + """Парсит HEX формат с разделением по FE и мапит к RAW/SYNC_DET/FOURIER. + + Возвращает (data_type, data), где data может быть: + - numpy.ndarray (1D) для одного сегмента + - list[numpy.ndarray] для нескольких сегментов (используется для FOURIER, а также RAW/SYNC_DET) + """ + + def to_int24(v): + x = int(v, 16) + if x & 0x800000: + x -= 0x1000000 + return float(x) + + # Текущий накапливаемый сегмент + cur = {"D0": [], "F0": [], "F1": [], "F2": [], "F3": [], "F4": []} + # Списки сегментов по типам данных + seg_raw = [] + seg_sync = [] + seg_fourier = [] + + def finalize_segment(): + nonlocal cur + # Приоритет выбора, что считать сегментом + if cur["F4"]: + seg_fourier.append(np.asarray(cur["F4"], dtype=float)) + elif cur["F3"]: + arr = np.asarray(cur["F3"], dtype=float) + seg_fourier.append(np.sqrt(np.maximum(0.0, arr))) + elif cur["F1"] and cur["F2"] and len(cur["F1"]) == len(cur["F2"]): + re = np.asarray(cur["F1"], dtype=float) + im = np.asarray(cur["F2"], dtype=float) + seg_fourier.append(np.sqrt(re * re + im * im)) + elif cur["F0"]: + seg_sync.append(np.asarray(cur["F0"], dtype=float)) + elif cur["D0"]: + seg_raw.append(np.asarray(cur["D0"], dtype=float)) + # Сброс + cur = {"D0": [], "F0": [], "F1": [], "F2": [], "F3": [], "F4": []} + + with open(filename, 'r') as f: + for line in f: + s = line.strip() + if not s: + continue + # Требование: учитывать только строки, начинающиеся с 0x/0X + if not (s.startswith('0x') or s.startswith('0X')): + continue + h = s[2:] + h = ''.join(ch for ch in h if ch in '0123456789abcdefABCDEF') + if len(h) < 2: + continue + t_byte = h[:2].upper() + + # FE — завершить текущий сегмент + if t_byte == 'FE': + finalize_segment() + continue + + # E0..E9 — игнор + if t_byte.startswith('E') and len(t_byte) == 2 and t_byte[1] in '0123456789': + continue + + # 00 — цифровые биты, пока пропускаем + if t_byte == '00': + continue + + if len(h) < 8: + continue + # Значение 24 бита + val_hex = h[2:8] + try: + value = to_int24(val_hex) + except Exception: + continue + + if t_byte == 'D0': + cur['D0'].append(value) + elif t_byte == 'F0': + cur['F0'].append(value) + elif t_byte == 'F1': + cur['F1'].append(value) + elif t_byte == 'F2': + cur['F2'].append(value) + elif t_byte == 'F3': + cur['F3'].append(value) + elif t_byte == 'F4': + cur['F4'].append(value) + else: + # Неизвестные — пропускаем + continue + + # Финализируем хвост + finalize_segment() + + if seg_fourier: + return DATA_TYPE_FOURIER, seg_fourier + if seg_sync: + # Если несколько, вернём список сегментов + return DATA_TYPE_SYNC_DET, seg_sync if len(seg_sync) > 1 else seg_sync[0] + if seg_raw: + return DATA_TYPE_RAW, seg_raw if len(seg_raw) > 1 else seg_raw[0] + + return DATA_TYPE_RAW, np.asarray([], dtype=float) def get_file_time_with_milliseconds(filename): @@ -279,10 +279,10 @@ class DataAnalyzerApp: os.chdir(self.data_dir) # Инициализируем с существующими файлами - existing_files = sorted([ - f for f in os.listdir() - if f.lower().endswith(('.txt', '.txt1', '.txt2', '.csv')) - ]) + existing_files = sorted([ + f for f in os.listdir() + if f.lower().endswith(('.txt', '.txt1', '.txt2', '.csv')) + ]) self.processed_files = set(existing_files) if existing_files: @@ -916,33 +916,33 @@ class DataAnalyzerApp: return True, None - def process_fourier_data(self, A, original_size): - """Обработка FOURIER без интерполяции. Поддерживает несколько сегментов.""" - columns_to_add = [] - - # A может быть: list[np.ndarray] (из HEX) или numpy.ndarray - if isinstance(A, list): - for seg in A: - col = np.asarray(seg, dtype=float) - columns_to_add.append(col) - return True, columns_to_add - - if A.ndim == 1: - columns_to_add.append(A.astype(float)) - return True, columns_to_add - - # Если A двумерный: считаем колонками столбцы или строки — выбираем более длинное измерение как длину спектра - if A.ndim == 2: - rows, cols = A.shape - if rows >= cols: - for i in range(cols): - columns_to_add.append(A[:, i].astype(float)) - else: - for i in range(rows): - columns_to_add.append(A[i, :].astype(float)) - return True, columns_to_add - - return True, columns_to_add + def process_fourier_data(self, A, original_size): + """Обработка FOURIER без интерполяции. Поддерживает несколько сегментов.""" + columns_to_add = [] + + # A может быть: list[np.ndarray] (из HEX) или numpy.ndarray + if isinstance(A, list): + for seg in A: + col = np.asarray(seg, dtype=float) + columns_to_add.append(col) + return True, columns_to_add + + if A.ndim == 1: + columns_to_add.append(A.astype(float)) + return True, columns_to_add + + # Если A двумерный: считаем колонками столбцы или строки — выбираем более длинное измерение как длину спектра + if A.ndim == 2: + rows, cols = A.shape + if rows >= cols: + for i in range(cols): + columns_to_add.append(A[:, i].astype(float)) + else: + for i in range(rows): + columns_to_add.append(A[i, :].astype(float)) + return True, columns_to_add + + return True, columns_to_add def add_bscan_column(self, data_col, current_time, data_type): """Добавить колонку в B-скан (может быть разного размера).""" @@ -1105,34 +1105,34 @@ class DataAnalyzerApp: bscan_col = None add_to_bscan = False - if data_type == DATA_TYPE_RAW: - # Может прийти список сегментов (HEX с FE) - if isinstance(A, list): - for i, seg in enumerate(A): - add_to_bscan, bscan_col = self.process_raw_data(np.asarray(seg), len(seg)) - if add_to_bscan and bscan_col is not None: - col_time = file_time + timedelta(milliseconds=i * 10) - self.bscan_queue.put((bscan_col, col_time, DATA_TYPE_RAW)) - add_to_bscan, bscan_col = False, None - else: - add_to_bscan, bscan_col = self.process_raw_data(A, original_size) - elif data_type == DATA_TYPE_SYNC_DET: - if isinstance(A, list): - for i, seg in enumerate(A): - add_to_bscan, bscan_col = self.process_sync_det_data(np.asarray(seg), len(seg)) - if add_to_bscan and bscan_col is not None: - col_time = file_time + timedelta(milliseconds=i * 10) - self.bscan_queue.put((bscan_col, col_time, DATA_TYPE_SYNC_DET)) - add_to_bscan, bscan_col = False, None - else: - add_to_bscan, bscan_col = self.process_sync_det_data(A, original_size) - elif data_type == DATA_TYPE_FOURIER: - add_to_bscan, columns = self.process_fourier_data(A, original_size) - if add_to_bscan and columns: - for i, col in enumerate(columns): - col_time = file_time + timedelta(milliseconds=i * 10) - self.bscan_queue.put((col, col_time, DATA_TYPE_FOURIER)) - bscan_col = None + if data_type == DATA_TYPE_RAW: + # Может прийти список сегментов (HEX с FE) + if isinstance(A, list): + for i, seg in enumerate(A): + add_to_bscan, bscan_col = self.process_raw_data(np.asarray(seg), len(seg)) + if add_to_bscan and bscan_col is not None: + col_time = file_time + timedelta(milliseconds=i * 10) + self.bscan_queue.put((bscan_col, col_time, DATA_TYPE_RAW)) + add_to_bscan, bscan_col = False, None + else: + add_to_bscan, bscan_col = self.process_raw_data(A, original_size) + elif data_type == DATA_TYPE_SYNC_DET: + if isinstance(A, list): + for i, seg in enumerate(A): + add_to_bscan, bscan_col = self.process_sync_det_data(np.asarray(seg), len(seg)) + if add_to_bscan and bscan_col is not None: + col_time = file_time + timedelta(milliseconds=i * 10) + self.bscan_queue.put((bscan_col, col_time, DATA_TYPE_SYNC_DET)) + add_to_bscan, bscan_col = False, None + else: + add_to_bscan, bscan_col = self.process_sync_det_data(A, original_size) + elif data_type == DATA_TYPE_FOURIER: + add_to_bscan, columns = self.process_fourier_data(A, original_size) + if add_to_bscan and columns: + for i, col in enumerate(columns): + col_time = file_time + timedelta(milliseconds=i * 10) + self.bscan_queue.put((col, col_time, DATA_TYPE_FOURIER)) + bscan_col = None if add_to_bscan and bscan_col is not None and data_type != DATA_TYPE_FOURIER: self.bscan_queue.put((bscan_col, file_time, data_type)) @@ -1177,23 +1177,23 @@ class DataAnalyzerApp: for fname in new_files: time_start = time.perf_counter() - try: - data_type, A = load_data_with_type(fname) - # Поддержка списка сегментов (HEX с FE) - if isinstance(A, list): - original_size = len(A[0]) if len(A) > 0 else 0 - elif isinstance(A, np.ndarray): - original_size = A.shape[0] - else: - original_size = 0 - - # Если после парсинга данных нет — пропускаем файл - if (isinstance(A, list) and len(A) == 0) or (isinstance(A, np.ndarray) and A.size == 0): - timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3] - print(f"[{timestamp}] ⏭️ SKIP {fname} (no data parsed)") - self.skipped_count += 1 - self.processed_files.add(fname) - continue + try: + data_type, A = load_data_with_type(fname) + # Поддержка списка сегментов (HEX с FE) + if isinstance(A, list): + original_size = len(A[0]) if len(A) > 0 else 0 + elif isinstance(A, np.ndarray): + original_size = A.shape[0] + else: + original_size = 0 + + # Если после парсинга данных нет — пропускаем файл + if (isinstance(A, list) and len(A) == 0) or (isinstance(A, np.ndarray) and A.size == 0): + timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3] + print(f"[{timestamp}] ⏭️ SKIP {fname} (no data parsed)") + self.skipped_count += 1 + self.processed_files.add(fname) + continue elapsed_time_ms = (time.perf_counter() - time_start) * 1000