Files
RFG_Receiver_GUI/datagen.py

334 lines
13 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/python3
import os
import time
import numpy as np
from datetime import datetime, timedelta
#from builtins import True
# ================================================================================
# ПАРАМЕТРЫ ЭМУЛЯЦИИ
# ================================================================================
#DATA_DIR = r"D:\data"
DATA_DIR = './data'
# ✓ ИСПРАВЛЕНИЕ: Выбор типа данных и количество
NUM_FILES = 1000 # Количесйтво файлов для генерации
# Размеры данных
RAW_SIZE = 64000
SYNC_DET_SIZE = 1000
FOURIER_SIZE = SYNC_DET_SIZE // 2 # 500 (положительные частоты)
# Интервал между файлами (в миллисекундах)
FILE_INTERVAL_MS = 300 # 300 мс между файлами
# ================================================================================
# ВСПОМОГАТЕЛЬНЫЕ ФУНКЦИИ
# ================================================================================
def create_raw_data(size=RAW_SIZE, index=0):
"""Генерирует RAW данные."""
# Синусоида + шум, зависит от индекса для разнообразия
t = np.linspace(0, 10 * np.pi, size)
freq_mult = 1.0 + 0.1 * np.sin(index / 100)
signal = np.sin(freq_mult * t) + 0.1 * np.random.randn(size)
return signal
def create_sync_det_data(size=SYNC_DET_SIZE, index=0):
"""Генерирует SYNC_DET данные."""
# Модулированная синусоида, зависит от индекса
t = np.linspace(0, 20 * np.pi, size)
damping = np.exp(-t / (20 * np.pi)) * (1 + 0.2 * np.sin(index / 50))
signal = np.sin(t) * damping
return signal
def create_fourier_data(sync_det_data=None, fft_size=SYNC_DET_SIZE, output_size=FOURIER_SIZE, index=0):
"""✓ Генерирует FOURIER = |FFT(SYNC_DET)|[:N/2].
FFT от сигнала размером N имеет только N/2 независимых значений.
Args:
sync_det_data: вектор SYNC_DET размером ~1000
fft_size: размер FFT (1000)
output_size: размер выходного спектра (500)
index: индекс файла для вариативности
Returns:
fourier_data: амплитудный спектр размером output_size (500)
"""
if sync_det_data is None:
sync_det_data = create_sync_det_data(index=index)
# ✓ Вычисляем FFT от SYNC_DET
fft_result = np.fft.fft(sync_det_data[:fft_size])
# ✓ Берём амплитудный спектр
amplitude_spectrum = np.abs(fft_result)
# ✓ Центрируем спектр
fft_shift = np.fft.fftshift(amplitude_spectrum)
# ✓ Берём только положительные частоты (N/2 = 500)
fft_positive = fft_shift[len(fft_shift) // 2:]
assert len(fft_positive) == output_size, \
f"FFT positive frequencies size {len(fft_positive)} != expected {output_size}"
fourier_data = fft_positive.astype(float)
# Нормализуем
if fourier_data.max() > 0:
fourier_data = fourier_data / fourier_data.max() * 100
return fourier_data
# ================================================================================
# ГЕНЕРАЦИЯ ФАЙЛОВ
# ================================================================================
def emit_raw_files(count=NUM_FILES, start_time=None, hex_mode=True):
"""Генерирует RAW файлы количеством count."""
if start_time is None:
start_time = datetime.now()
print(f"\n{'=' * 80}")
print(f"📝 ГЕНЕРИРОВАНИЕ {count} RAW ФАЙЛОВ")
print(f"{'=' * 80}")
print(f"Размер: {RAW_SIZE} точек")
print(f"Интервал: {FILE_INTERVAL_MS}мс\n")
for i in range(count):
# Вычисляем время файла
file_time = start_time + timedelta(milliseconds=i * FILE_INTERVAL_MS)
# Форматируем имя с миллисекундами
time_str = file_time.strftime("%H_%M_%S")
ms = file_time.microsecond // 1000
filename = f"RAW_{time_str}_{ms:03d}.txt"
# Генерируем данные
data = create_raw_data(index=i)
# Добавляем заголовок
filepath = os.path.join(DATA_DIR, filename)
with open(filepath, 'w') as f:
if hex_mode:
#f.write("SYNC_DET_HEX\n")
#np.savetxt(f, np.uint32(data*1000), fmt='0xD0%06X')
np.savetxt(f, ((data * 1000).astype(np.int32) & 0xFFFFFF), fmt='0xD0%06X')
else:
f.write("RAW\n")
np.savetxt(f, data, fmt='%.6f')
# Устанавливаем время модификации
timestamp = file_time.timestamp()
os.utime(filepath, (timestamp, timestamp))
# Прогресс
if (i + 1) % 100 == 0 or (i + 1) == count:
print(f"{i + 1}/{count} файлов созданы", end='\r')
print(f"\n✅ Готово: {count} RAW файлов")
return start_time + timedelta(milliseconds=count * FILE_INTERVAL_MS)
def emit_sync_det_files(count=NUM_FILES, start_time=None, hex_mode=True):
"""Генерирует SYNC_DET файлы количеством count."""
if start_time is None:
start_time = datetime.now()
print(f"\n{'=' * 80}")
print(f"📝 ГЕНЕРИРОВАНИЕ {count} SYNC_DET ФАЙЛОВ")
print(f"{'=' * 80}")
print(f"Размер: {SYNC_DET_SIZE} точек")
print(f"Интервал: {FILE_INTERVAL_MS}мс\n")
for i in range(count):
# Вычисляем время файла
file_time = start_time + timedelta(milliseconds=i * FILE_INTERVAL_MS)
# Форматируем имя с миллисекундами
time_str = file_time.strftime("%H_%M_%S")
ms = file_time.microsecond // 1000
filename = f"SYNC_DET_{time_str}_{ms:03d}.txt"
# Генерируем данные
data = create_sync_det_data(index=i)
#print("data:", data)
# Добавляем заголовок
filepath = os.path.join(DATA_DIR, filename)
with open(filepath, 'w') as f:
if hex_mode:
#f.write("SYNC_DET_HEX\n")
#np.savetxt(f, np.uint32(data*1000), fmt='0xF0%06X')
np.savetxt(f, ((data * 1000).astype(np.int32) & 0xFFFFFF), fmt='0xF0%06X')
else:
f.write("SYNC_DET\n")
np.savetxt(f, data, fmt='%.6f')
# Устанавливаем время модификации
timestamp = file_time.timestamp()
os.utime(filepath, (timestamp, timestamp))
# Прогресс
if (i + 1) % 100 == 0 or (i + 1) == count:
print(f"{i + 1}/{count} файлов созданы", end='\r')
print(f"\n✅ Готово: {count} SYNC_DET файлов")
return start_time + timedelta(milliseconds=count * FILE_INTERVAL_MS)
def emit_fourier_files(count=NUM_FILES, start_time=None, hex_mode=True):
"""✓ Генерирует FOURIER файлы количеством count.
Каждый файл содержит амплитудный спектр размером 500.
"""
if start_time is None:
start_time = datetime.now()
print(f"\n{'=' * 80}")
print(f"📝 ГЕНЕРИРОВАНИЕ {count} FOURIER ФАЙЛОВ")
print(f"{'=' * 80}")
print(f"Размер: {FOURIER_SIZE} точек (|FFT|[:N/2])")
print(f"Интервал: {FILE_INTERVAL_MS}мс\n")
for i in range(count):
# Вычисляем время файла
file_time = start_time + timedelta(milliseconds=i * FILE_INTERVAL_MS)
# Форматируем имя с миллисекундами
time_str = file_time.strftime("%H_%M_%S")
ms = file_time.microsecond // 1000
filename = f"FOURIER_{time_str}_{ms:03d}.txt"
# Генерируем SYNC_DET
sync_det = create_sync_det_data(index=i)
# Вычисляем FOURIER как |FFT(SYNC_DET)|[:N/2]
data = create_fourier_data(
sync_det_data=sync_det,
fft_size=SYNC_DET_SIZE,
output_size=FOURIER_SIZE,
index=i
)
assert len(data) == FOURIER_SIZE, \
f"FOURIER size {len(data)} != expected {FOURIER_SIZE}"
# Добавляем заголовок
filepath = os.path.join(DATA_DIR, filename)
with open(filepath, 'w') as f:
if hex_mode:
#f.write("SYNC_DET_HEX\n")
#np.savetxt(f, np.uint32(data*1000), fmt='0xF4%06X')
np.savetxt(f, ((data * 1000).astype(np.int32) & 0xFFFFFF), fmt='0xF4%06X')
else:
f.write("FOURIER\n")
np.savetxt(f, data, fmt='%.6f')
# Устанавливаем время модификации
timestamp = file_time.timestamp()
os.utime(filepath, (timestamp, timestamp))
# Прогресс
if (i + 1) % 100 == 0 or (i + 1) == count:
print(f"{i + 1}/{count} файлов созданы", end='\r')
print(f"\n✅ Готово: {count} FOURIER файлов")
return start_time + timedelta(milliseconds=count * FILE_INTERVAL_MS)
# ================================================================================
# ОСНОВНАЯ ПРОГРАММА
# ================================================================================
def show_menu():
"""Показывает меню выбора типа данных."""
print("\n" + "=" * 80)
print(" РАДАР ЭМУЛЯТОР - ВЫБОР ТИПА ДАННЫХ")
print("=" * 80)
print()
print(" Выберите тип данных для генерирования:")
print()
print(" 1. RAW - Сырые данные с АЦП ({} точек)".format(RAW_SIZE))
print(" 2. SYNC_DET - Обработанные данные ({} точек)".format(SYNC_DET_SIZE))
print(" 3. FOURIER - Амплитудный спектр ({} точек)".format(FOURIER_SIZE))
print()
print("=" * 80)
print()
if __name__ == "__main__":
# Создаём директорию если её нет
os.makedirs(DATA_DIR, exist_ok=True)
print("\n" + "=" * 80)
print(" РАДАР ЭМУЛЯТОР - ГЕНЕРИРОВАНИЕ ТЕСТОВЫХ ДАННЫХ")
print("=" * 80)
print(f" Директория: {DATA_DIR}")
print(f" Количество файлов: {NUM_FILES}")
print(f" Интервал между файлами: {FILE_INTERVAL_MS}мс")
print(f" Формат времени: HH:MM:SS.mmm")
print("=" * 80)
show_menu()
while True:
try:
hex_choice = input(" Генерировать в HEX (h) или float (f)? 'q' для выхода: ").strip().lower()
if hex_choice == "h":
hex_mode = True
elif hex_choice == "f":
hex_mode = False
else:
print(" ❌ Неверный выбор. Введите h, f или q")
choice = input(" Введите номер (1/2/3) или 'q' для выхода: ").strip().lower()
if choice == 'q':
print("\n Выход.")
break
elif choice == '1':
start_time = datetime.now().replace(microsecond=0)
emit_raw_files(NUM_FILES, start_time, hex_mode=hex_mode)
break
elif choice == '2':
start_time = datetime.now().replace(microsecond=0)
emit_sync_det_files(NUM_FILES, start_time, hex_mode=hex_mode)
break
elif choice == '3':
start_time = datetime.now().replace(microsecond=0)
emit_fourier_files(NUM_FILES, start_time, hex_mode=hex_mode)
break
else:
print(" ❌ Неверный выбор. Введите 1, 2, 3 или q")
except KeyboardInterrupt:
print("\n\n Прервано пользователем.")
break
except Exception as e:
print(f" ❌ Ошибка: {e}")
continue
print(f"\n📂 Файлы сохранены в: {DATA_DIR}")
print(f"\n💡 Запустите main_analyzer.py и откройте директорию {DATA_DIR}")
print(f" Анализатор автоматически подхватит новые файлы\n")