import os import time import numpy as np from datetime import datetime, timedelta # ================================================================================ # ПАРАМЕТРЫ ЭМУЛЯЦИИ # ================================================================================ DATA_DIR = r"D:\data" # ✓ ИСПРАВЛЕНИЕ: Выбор типа данных и количество NUM_FILES = 1000 # Количесйтво файлов для генерации # Размеры данных RAW_SIZE = 64000 SYNC_DET_SIZE = 1000 FOURIER_SIZE = SYNC_DET_SIZE // 2 # 500 (положительные частоты) # Интервал между файлами (в миллисекундах) FILE_INTERVAL_MS = 300 # 300 мс между файлами # ================================================================================ # ВСПОМОГАТЕЛЬНЫЕ ФУНКЦИИ # ================================================================================ def create_raw_data(size=RAW_SIZE, index=0): """Генерирует RAW данные.""" # Синусоида + шум, зависит от индекса для разнообразия t = np.linspace(0, 10 * np.pi, size) freq_mult = 1.0 + 0.1 * np.sin(index / 100) signal = np.sin(freq_mult * t) + 0.1 * np.random.randn(size) return signal def create_sync_det_data(size=SYNC_DET_SIZE, index=0): """Генерирует SYNC_DET данные.""" # Модулированная синусоида, зависит от индекса t = np.linspace(0, 20 * np.pi, size) damping = np.exp(-t / (20 * np.pi)) * (1 + 0.2 * np.sin(index / 50)) signal = np.sin(t) * damping return signal def create_fourier_data(sync_det_data=None, fft_size=SYNC_DET_SIZE, output_size=FOURIER_SIZE, index=0): """✓ Генерирует FOURIER = |FFT(SYNC_DET)|[:N/2]. FFT от сигнала размером N имеет только N/2 независимых значений. Args: sync_det_data: вектор SYNC_DET размером ~1000 fft_size: размер FFT (1000) output_size: размер выходного спектра (500) index: индекс файла для вариативности Returns: fourier_data: амплитудный спектр размером output_size (500) """ if sync_det_data is None: sync_det_data = create_sync_det_data(index=index) # ✓ Вычисляем FFT от SYNC_DET fft_result = np.fft.fft(sync_det_data[:fft_size]) # ✓ Берём амплитудный спектр amplitude_spectrum = np.abs(fft_result) # ✓ Центрируем спектр fft_shift = np.fft.fftshift(amplitude_spectrum) # ✓ Берём только положительные частоты (N/2 = 500) fft_positive = fft_shift[len(fft_shift) // 2:] assert len(fft_positive) == output_size, \ f"FFT positive frequencies size {len(fft_positive)} != expected {output_size}" fourier_data = fft_positive.astype(float) # Нормализуем if fourier_data.max() > 0: fourier_data = fourier_data / fourier_data.max() * 100 return fourier_data # ================================================================================ # ГЕНЕРАЦИЯ ФАЙЛОВ # ================================================================================ def emit_raw_files(count=NUM_FILES, start_time=None): """Генерирует RAW файлы количеством count.""" if start_time is None: start_time = datetime.now() print(f"\n{'=' * 80}") print(f"📝 ГЕНЕРИРОВАНИЕ {count} RAW ФАЙЛОВ") print(f"{'=' * 80}") print(f"Размер: {RAW_SIZE} точек") print(f"Интервал: {FILE_INTERVAL_MS}мс\n") for i in range(count): # Вычисляем время файла file_time = start_time + timedelta(milliseconds=i * FILE_INTERVAL_MS) # Форматируем имя с миллисекундами time_str = file_time.strftime("%H_%M_%S") ms = file_time.microsecond // 1000 filename = f"RAW_{time_str}_{ms:03d}.txt" # Генерируем данные data = create_raw_data(index=i) # Добавляем заголовок filepath = os.path.join(DATA_DIR, filename) with open(filepath, 'w') as f: f.write("RAW\n") np.savetxt(f, data, fmt='%.6f') # Устанавливаем время модификации timestamp = file_time.timestamp() os.utime(filepath, (timestamp, timestamp)) # Прогресс if (i + 1) % 100 == 0 or (i + 1) == count: print(f" ✓ {i + 1}/{count} файлов созданы", end='\r') print(f"\n✅ Готово: {count} RAW файлов") return start_time + timedelta(milliseconds=count * FILE_INTERVAL_MS) def emit_sync_det_files(count=NUM_FILES, start_time=None): """Генерирует SYNC_DET файлы количеством count.""" if start_time is None: start_time = datetime.now() print(f"\n{'=' * 80}") print(f"📝 ГЕНЕРИРОВАНИЕ {count} SYNC_DET ФАЙЛОВ") print(f"{'=' * 80}") print(f"Размер: {SYNC_DET_SIZE} точек") print(f"Интервал: {FILE_INTERVAL_MS}мс\n") for i in range(count): # Вычисляем время файла file_time = start_time + timedelta(milliseconds=i * FILE_INTERVAL_MS) # Форматируем имя с миллисекундами time_str = file_time.strftime("%H_%M_%S") ms = file_time.microsecond // 1000 filename = f"SYNC_DET_{time_str}_{ms:03d}.txt" # Генерируем данные data = create_sync_det_data(index=i) # Добавляем заголовок filepath = os.path.join(DATA_DIR, filename) with open(filepath, 'w') as f: f.write("SYNC_DET\n") np.savetxt(f, data, fmt='%.6f') # Устанавливаем время модификации timestamp = file_time.timestamp() os.utime(filepath, (timestamp, timestamp)) # Прогресс if (i + 1) % 100 == 0 or (i + 1) == count: print(f" ✓ {i + 1}/{count} файлов созданы", end='\r') print(f"\n✅ Готово: {count} SYNC_DET файлов") return start_time + timedelta(milliseconds=count * FILE_INTERVAL_MS) def emit_fourier_files(count=NUM_FILES, start_time=None): """✓ Генерирует FOURIER файлы количеством count. Каждый файл содержит амплитудный спектр размером 500. """ if start_time is None: start_time = datetime.now() print(f"\n{'=' * 80}") print(f"📝 ГЕНЕРИРОВАНИЕ {count} FOURIER ФАЙЛОВ") print(f"{'=' * 80}") print(f"Размер: {FOURIER_SIZE} точек (|FFT|[:N/2])") print(f"Интервал: {FILE_INTERVAL_MS}мс\n") for i in range(count): # Вычисляем время файла file_time = start_time + timedelta(milliseconds=i * FILE_INTERVAL_MS) # Форматируем имя с миллисекундами time_str = file_time.strftime("%H_%M_%S") ms = file_time.microsecond // 1000 filename = f"FOURIER_{time_str}_{ms:03d}.txt" # Генерируем SYNC_DET sync_det = create_sync_det_data(index=i) # Вычисляем FOURIER как |FFT(SYNC_DET)|[:N/2] data = create_fourier_data( sync_det_data=sync_det, fft_size=SYNC_DET_SIZE, output_size=FOURIER_SIZE, index=i ) assert len(data) == FOURIER_SIZE, \ f"FOURIER size {len(data)} != expected {FOURIER_SIZE}" # Добавляем заголовок filepath = os.path.join(DATA_DIR, filename) with open(filepath, 'w') as f: f.write("FOURIER\n") np.savetxt(f, data, fmt='%.6f') # Устанавливаем время модификации timestamp = file_time.timestamp() os.utime(filepath, (timestamp, timestamp)) # Прогресс if (i + 1) % 100 == 0 or (i + 1) == count: print(f" ✓ {i + 1}/{count} файлов созданы", end='\r') print(f"\n✅ Готово: {count} FOURIER файлов") return start_time + timedelta(milliseconds=count * FILE_INTERVAL_MS) # ================================================================================ # ОСНОВНАЯ ПРОГРАММА # ================================================================================ def show_menu(): """Показывает меню выбора типа данных.""" print("\n" + "=" * 80) print(" РАДАР ЭМУЛЯТОР - ВЫБОР ТИПА ДАННЫХ") print("=" * 80) print() print(" Выберите тип данных для генерирования:") print() print(" 1. RAW - Сырые данные с АЦП ({} точек)".format(RAW_SIZE)) print(" 2. SYNC_DET - Обработанные данные ({} точек)".format(SYNC_DET_SIZE)) print(" 3. FOURIER - Амплитудный спектр ({} точек)".format(FOURIER_SIZE)) print() print("=" * 80) print() if __name__ == "__main__": # Создаём директорию если её нет os.makedirs(DATA_DIR, exist_ok=True) print("\n" + "=" * 80) print(" РАДАР ЭМУЛЯТОР - ГЕНЕРИРОВАНИЕ ТЕСТОВЫХ ДАННЫХ") print("=" * 80) print(f" Директория: {DATA_DIR}") print(f" Количество файлов: {NUM_FILES}") print(f" Интервал между файлами: {FILE_INTERVAL_MS}мс") print(f" Формат времени: HH:MM:SS.mmm") print("=" * 80) show_menu() while True: try: choice = input(" Введите номер (1/2/3) или 'q' для выхода: ").strip().lower() if choice == 'q': print("\n Выход.") break elif choice == '1': start_time = datetime.now().replace(microsecond=0) emit_raw_files(NUM_FILES, start_time) break elif choice == '2': start_time = datetime.now().replace(microsecond=0) emit_sync_det_files(NUM_FILES, start_time) break elif choice == '3': start_time = datetime.now().replace(microsecond=0) emit_fourier_files(NUM_FILES, start_time) break else: print(" ❌ Неверный выбор. Введите 1, 2, 3 или q") except KeyboardInterrupt: print("\n\n Прервано пользователем.") break except Exception as e: print(f" ❌ Ошибка: {e}") continue print(f"\n📂 Файлы сохранены в: {DATA_DIR}") print(f"\n💡 Запустите main_analyzer.py и откройте директорию {DATA_DIR}") print(f" Анализатор автоматически подхватит новые файлы\n")