This commit is contained in:
awe
2025-12-02 16:46:18 +03:00
3 changed files with 1953 additions and 1533 deletions

35
AGENTS.md Normal file
View File

@ -0,0 +1,35 @@
# Repository Guidelines
## Project Structure & Module Organization
- `main.py`: Tkinter GUI for radar data analysis; watches a data folder, parses RAW/SYNC_DET/FOURIER files, and renders Bscan/Fourier views.
- `datagen.py`: Test data generator for RAW, SYNC_DET, and FOURIER; produces timestamped files to feed the GUI.
- `testLadArrayGround.m`: MATLAB scratch for algorithm experiments.
- Tests: none yet. Add under `tests/` (e.g., `tests/test_io.py`). If processing grows, factor helpers into a `processing/` package (e.g., `processing/signal.py`).
## Build, Test, and Development Commands
- Create venv: `python3 -m venv .venv && source .venv/bin/activate` (Win: `.venv\Scripts\activate`).
- Install deps: `pip install numpy scipy matplotlib`. Linux may require Tk: `sudo apt-get install -y python3-tk`.
- Run GUI: `python main.py`.
- Generate sample data: `python datagen.py` and choose 1/2/3; files go to the configured data folder.
- Run tests (when added): `pytest -q`.
## Coding Style & Naming Conventions
- Follow PEP 8 with 4space indents; add type hints for new/edited functions.
- Naming: snake_case (functions/vars), PascalCase (classes), UPPER_SNAKE_CASE (constants).
- Keep GUI logic in `main.py`; move pure processing/IO into small, testable modules.
## Testing Guidelines
- Framework: pytest (recommended). No suite exists yet.
- Naming: `tests/test_*.py`. Use temp dirs for filebased tests; dont write to system paths.
- Aim for ≥70% coverage on new modules. Add smoke tests for file parsing and queue/processing logic; use a noninteractive Matplotlib backend for tests.
## Commit & Pull Request Guidelines
- Commits: imperative mood with scope, e.g., `gui: improve Bscan update`, `datagen: add FOURIER mode`.
- PRs: include description, linked issues, reproduction steps, and screenshots/GIFs for UI changes. Keep changes focused and update docs when constants/paths change.
## Configuration Tips
- Data paths are hardcoded; update before running on nonWindows systems:
- `main.py:18``data_dir = r"D:\\data"`
- `datagen.py:11``DATA_DIR = r"D:\\data"`
- Prefer a writable local path (e.g., `/tmp/data`) and do not commit generated data.

627
datagen.py Normal file → Executable file
View File

@ -1,294 +1,333 @@
import os #!/usr/bin/python3
import time import os
import numpy as np import time
from datetime import datetime, timedelta import numpy as np
from datetime import datetime, timedelta
# ================================================================================ #from builtins import True
# ПАРАМЕТРЫ ЭМУЛЯЦИИ
# ================================================================================ # ================================================================================
# ПАРАМЕТРЫ ЭМУЛЯЦИИ
DATA_DIR = r"D:\data" # ================================================================================
# ✓ ИСПРАВЛЕНИЕ: Выбор типа данных и количество #DATA_DIR = r"D:\data"
NUM_FILES = 1000 # Количесйтво файлов для генерации DATA_DIR = './data'
# Размеры данных # ✓ ИСПРАВЛЕНИЕ: Выбор типа данных и количество
RAW_SIZE = 64000 NUM_FILES = 1000 # Количесйтво файлов для генерации
SYNC_DET_SIZE = 1000
FOURIER_SIZE = SYNC_DET_SIZE // 2 # 500 (положительные частоты) # Размеры данных
RAW_SIZE = 64000
# Интервал между файлами (в миллисекундах) SYNC_DET_SIZE = 1000
FILE_INTERVAL_MS = 300 # 300 мс между файлами FOURIER_SIZE = SYNC_DET_SIZE // 2 # 500 (положительные частоты)
# Интервал между файлами (в миллисекундах)
# ================================================================================ FILE_INTERVAL_MS = 300 # 300 мс между файлами
# ВСПОМОГАТЕЛЬНЫЕ ФУНКЦИИ
# ================================================================================
# ================================================================================
def create_raw_data(size=RAW_SIZE, index=0): # ВСПОМОГАТЕЛЬНЫЕ ФУНКЦИИ
"""Генерирует RAW данные.""" # ================================================================================
# Синусоида + шум, зависит от индекса для разнообразия
t = np.linspace(0, 10 * np.pi, size) def create_raw_data(size=RAW_SIZE, index=0):
freq_mult = 1.0 + 0.1 * np.sin(index / 100) """Генерирует RAW данные."""
signal = np.sin(freq_mult * t) + 0.1 * np.random.randn(size) # Синусоида + шум, зависит от индекса для разнообразия
return signal t = np.linspace(0, 10 * np.pi, size)
freq_mult = 1.0 + 0.1 * np.sin(index / 100)
signal = np.sin(freq_mult * t) + 0.1 * np.random.randn(size)
def create_sync_det_data(size=SYNC_DET_SIZE, index=0): return signal
"""Генерирует SYNC_DET данные."""
# Модулированная синусоида, зависит от индекса
t = np.linspace(0, 20 * np.pi, size) def create_sync_det_data(size=SYNC_DET_SIZE, index=0):
damping = np.exp(-t / (20 * np.pi)) * (1 + 0.2 * np.sin(index / 50)) """Генерирует SYNC_DET данные."""
signal = np.sin(t) * damping # Модулированная синусоида, зависит от индекса
return signal t = np.linspace(0, 20 * np.pi, size)
damping = np.exp(-t / (20 * np.pi)) * (1 + 0.2 * np.sin(index / 50))
signal = np.sin(t) * damping
def create_fourier_data(sync_det_data=None, fft_size=SYNC_DET_SIZE, output_size=FOURIER_SIZE, index=0): return signal
"""✓ Генерирует FOURIER = |FFT(SYNC_DET)|[:N/2].
FFT от сигнала размером N имеет только N/2 независимых значений. def create_fourier_data(sync_det_data=None, fft_size=SYNC_DET_SIZE, output_size=FOURIER_SIZE, index=0):
"""✓ Генерирует FOURIER = |FFT(SYNC_DET)|[:N/2].
Args:
sync_det_data: вектор SYNC_DET размером ~1000 FFT от сигнала размером N имеет только N/2 независимых значений.
fft_size: размер FFT (1000)
output_size: размер выходного спектра (500) Args:
index: индекс файла для вариативности sync_det_data: вектор SYNC_DET размером ~1000
fft_size: размер FFT (1000)
Returns: output_size: размер выходного спектра (500)
fourier_data: амплитудный спектр размером output_size (500) index: индекс файла для вариативности
"""
Returns:
if sync_det_data is None: fourier_data: амплитудный спектр размером output_size (500)
sync_det_data = create_sync_det_data(index=index) """
# ✓ Вычисляем FFT от SYNC_DET if sync_det_data is None:
fft_result = np.fft.fft(sync_det_data[:fft_size]) sync_det_data = create_sync_det_data(index=index)
# ✓ Берём амплитудный спектр # ✓ Вычисляем FFT от SYNC_DET
amplitude_spectrum = np.abs(fft_result) fft_result = np.fft.fft(sync_det_data[:fft_size])
# ✓ Центрируем спектр # ✓ Берём амплитудный спектр
fft_shift = np.fft.fftshift(amplitude_spectrum) amplitude_spectrum = np.abs(fft_result)
# ✓ Берём только положительные частоты (N/2 = 500) # ✓ Центрируем спектр
fft_positive = fft_shift[len(fft_shift) // 2:] fft_shift = np.fft.fftshift(amplitude_spectrum)
assert len(fft_positive) == output_size, \ # ✓ Берём только положительные частоты (N/2 = 500)
f"FFT positive frequencies size {len(fft_positive)} != expected {output_size}" fft_positive = fft_shift[len(fft_shift) // 2:]
fourier_data = fft_positive.astype(float) assert len(fft_positive) == output_size, \
f"FFT positive frequencies size {len(fft_positive)} != expected {output_size}"
# Нормализуем
if fourier_data.max() > 0: fourier_data = fft_positive.astype(float)
fourier_data = fourier_data / fourier_data.max() * 100
# Нормализуем
return fourier_data if fourier_data.max() > 0:
fourier_data = fourier_data / fourier_data.max() * 100
# ================================================================================ return fourier_data
# ГЕНЕРАЦИЯ ФАЙЛОВ
# ================================================================================
# ================================================================================
def emit_raw_files(count=NUM_FILES, start_time=None): # ГЕНЕРАЦИЯ ФАЙЛОВ
"""Генерирует RAW файлы количеством count.""" # ================================================================================
if start_time is None:
start_time = datetime.now() def emit_raw_files(count=NUM_FILES, start_time=None, hex_mode=True):
"""Генерирует RAW файлы количеством count."""
print(f"\n{'=' * 80}") if start_time is None:
print(f"📝 ГЕНЕРИРОВАНИЕ {count} RAW ФАЙЛОВ") start_time = datetime.now()
print(f"{'=' * 80}")
print(f"Размер: {RAW_SIZE} точек") print(f"\n{'=' * 80}")
print(f"Интервал: {FILE_INTERVAL_MS}мс\n") print(f"📝 ГЕНЕРИРОВАНИЕ {count} RAW ФАЙЛОВ")
print(f"{'=' * 80}")
for i in range(count): print(f"Размер: {RAW_SIZE} точек")
# Вычисляем время файла print(f"Интервал: {FILE_INTERVAL_MS}мс\n")
file_time = start_time + timedelta(milliseconds=i * FILE_INTERVAL_MS)
for i in range(count):
# Форматируем имя с миллисекундами # Вычисляем время файла
time_str = file_time.strftime("%H_%M_%S") file_time = start_time + timedelta(milliseconds=i * FILE_INTERVAL_MS)
ms = file_time.microsecond // 1000
filename = f"RAW_{time_str}_{ms:03d}.txt" # Форматируем имя с миллисекундами
time_str = file_time.strftime("%H_%M_%S")
# Генерируем данные ms = file_time.microsecond // 1000
data = create_raw_data(index=i) filename = f"RAW_{time_str}_{ms:03d}.txt"
# Добавляем заголовок # Генерируем данные
filepath = os.path.join(DATA_DIR, filename) data = create_raw_data(index=i)
with open(filepath, 'w') as f:
f.write("RAW\n") # Добавляем заголовок
np.savetxt(f, data, fmt='%.6f') filepath = os.path.join(DATA_DIR, filename)
with open(filepath, 'w') as f:
# Устанавливаем время модификации
timestamp = file_time.timestamp()
os.utime(filepath, (timestamp, timestamp)) if hex_mode:
#f.write("SYNC_DET_HEX\n")
# Прогресс #np.savetxt(f, np.uint32(data*1000), fmt='0xD0%06X')
if (i + 1) % 100 == 0 or (i + 1) == count: np.savetxt(f, ((data * 1000).astype(np.int32) & 0xFFFFFF), fmt='0xD0%06X')
print(f"{i + 1}/{count} файлов созданы", end='\r')
else:
print(f"\n✅ Готово: {count} RAW файлов") f.write("RAW\n")
return start_time + timedelta(milliseconds=count * FILE_INTERVAL_MS) np.savetxt(f, data, fmt='%.6f')
def emit_sync_det_files(count=NUM_FILES, start_time=None): # Устанавливаем время модификации
"""Генерирует SYNC_DET файлы количеством count.""" timestamp = file_time.timestamp()
if start_time is None: os.utime(filepath, (timestamp, timestamp))
start_time = datetime.now()
# Прогресс
print(f"\n{'=' * 80}") if (i + 1) % 100 == 0 or (i + 1) == count:
print(f"📝 ГЕНЕРИРОВАНИЕ {count} SYNC_DET ФАЙЛОВ") print(f"{i + 1}/{count} файлов созданы", end='\r')
print(f"{'=' * 80}")
print(f"Размер: {SYNC_DET_SIZE} точек") print(f"\n✅ Готово: {count} RAW файлов")
print(f"Интервал: {FILE_INTERVAL_MS}мс\n") return start_time + timedelta(milliseconds=count * FILE_INTERVAL_MS)
for i in range(count):
# Вычисляем время файла def emit_sync_det_files(count=NUM_FILES, start_time=None, hex_mode=True):
file_time = start_time + timedelta(milliseconds=i * FILE_INTERVAL_MS) """Генерирует SYNC_DET файлы количеством count."""
if start_time is None:
# Форматируем имя с миллисекундами start_time = datetime.now()
time_str = file_time.strftime("%H_%M_%S")
ms = file_time.microsecond // 1000 print(f"\n{'=' * 80}")
filename = f"SYNC_DET_{time_str}_{ms:03d}.txt" print(f"📝 ГЕНЕРИРОВАНИЕ {count} SYNC_DET ФАЙЛОВ")
print(f"{'=' * 80}")
# Генерируем данные print(f"Размер: {SYNC_DET_SIZE} точек")
data = create_sync_det_data(index=i) print(f"Интервал: {FILE_INTERVAL_MS}мс\n")
# Добавляем заголовок for i in range(count):
filepath = os.path.join(DATA_DIR, filename) # Вычисляем время файла
with open(filepath, 'w') as f: file_time = start_time + timedelta(milliseconds=i * FILE_INTERVAL_MS)
f.write("SYNC_DET\n")
np.savetxt(f, data, fmt='%.6f') # Форматируем имя с миллисекундами
time_str = file_time.strftime("%H_%M_%S")
# Устанавливаем время модификации ms = file_time.microsecond // 1000
timestamp = file_time.timestamp() filename = f"SYNC_DET_{time_str}_{ms:03d}.txt"
os.utime(filepath, (timestamp, timestamp))
# Генерируем данные
# Прогресс data = create_sync_det_data(index=i)
if (i + 1) % 100 == 0 or (i + 1) == count: #print("data:", data)
print(f"{i + 1}/{count} файлов созданы", end='\r')
# Добавляем заголовок
print(f"\n✅ Готово: {count} SYNC_DET файлов") filepath = os.path.join(DATA_DIR, filename)
return start_time + timedelta(milliseconds=count * FILE_INTERVAL_MS) with open(filepath, 'w') as f:
if hex_mode:
def emit_fourier_files(count=NUM_FILES, start_time=None): #f.write("SYNC_DET_HEX\n")
"""✓ Генерирует FOURIER файлы количеством count. #np.savetxt(f, np.uint32(data*1000), fmt='0xF0%06X')
np.savetxt(f, ((data * 1000).astype(np.int32) & 0xFFFFFF), fmt='0xF0%06X')
Каждый файл содержит амплитудный спектр размером 500. else:
""" f.write("SYNC_DET\n")
if start_time is None: np.savetxt(f, data, fmt='%.6f')
start_time = datetime.now()
# Устанавливаем время модификации
print(f"\n{'=' * 80}") timestamp = file_time.timestamp()
print(f"📝 ГЕНЕРИРОВАНИЕ {count} FOURIER ФАЙЛОВ") os.utime(filepath, (timestamp, timestamp))
print(f"{'=' * 80}")
print(f"Размер: {FOURIER_SIZE} точек (|FFT|[:N/2])") # Прогресс
print(f"Интервал: {FILE_INTERVAL_MS}мс\n") if (i + 1) % 100 == 0 or (i + 1) == count:
print(f"{i + 1}/{count} файлов созданы", end='\r')
for i in range(count):
# Вычисляем время файла print(f"\n✅ Готово: {count} SYNC_DET файлов")
file_time = start_time + timedelta(milliseconds=i * FILE_INTERVAL_MS) return start_time + timedelta(milliseconds=count * FILE_INTERVAL_MS)
# Форматируем имя с миллисекундами
time_str = file_time.strftime("%H_%M_%S") def emit_fourier_files(count=NUM_FILES, start_time=None, hex_mode=True):
ms = file_time.microsecond // 1000 """✓ Генерирует FOURIER файлы количеством count.
filename = f"FOURIER_{time_str}_{ms:03d}.txt"
Каждый файл содержит амплитудный спектр размером 500.
# Генерируем SYNC_DET """
sync_det = create_sync_det_data(index=i) if start_time is None:
start_time = datetime.now()
# Вычисляем FOURIER как |FFT(SYNC_DET)|[:N/2]
data = create_fourier_data( print(f"\n{'=' * 80}")
sync_det_data=sync_det, print(f"📝 ГЕНЕРИРОВАНИЕ {count} FOURIER ФАЙЛОВ")
fft_size=SYNC_DET_SIZE, print(f"{'=' * 80}")
output_size=FOURIER_SIZE, print(f"Размер: {FOURIER_SIZE} точек (|FFT|[:N/2])")
index=i print(f"Интервал: {FILE_INTERVAL_MS}мс\n")
)
for i in range(count):
assert len(data) == FOURIER_SIZE, \ # Вычисляем время файла
f"FOURIER size {len(data)} != expected {FOURIER_SIZE}" file_time = start_time + timedelta(milliseconds=i * FILE_INTERVAL_MS)
# Добавляем заголовок # Форматируем имя с миллисекундами
filepath = os.path.join(DATA_DIR, filename) time_str = file_time.strftime("%H_%M_%S")
with open(filepath, 'w') as f: ms = file_time.microsecond // 1000
f.write("FOURIER\n") filename = f"FOURIER_{time_str}_{ms:03d}.txt"
np.savetxt(f, data, fmt='%.6f')
# Генерируем SYNC_DET
# Устанавливаем время модификации sync_det = create_sync_det_data(index=i)
timestamp = file_time.timestamp()
os.utime(filepath, (timestamp, timestamp)) # Вычисляем FOURIER как |FFT(SYNC_DET)|[:N/2]
data = create_fourier_data(
# Прогресс sync_det_data=sync_det,
if (i + 1) % 100 == 0 or (i + 1) == count: fft_size=SYNC_DET_SIZE,
print(f"{i + 1}/{count} файлов созданы", end='\r') output_size=FOURIER_SIZE,
index=i
print(f"\n✅ Готово: {count} FOURIER файлов") )
return start_time + timedelta(milliseconds=count * FILE_INTERVAL_MS)
assert len(data) == FOURIER_SIZE, \
f"FOURIER size {len(data)} != expected {FOURIER_SIZE}"
# ================================================================================
# ОСНОВНАЯ ПРОГРАММА # Добавляем заголовок
# ================================================================================ filepath = os.path.join(DATA_DIR, filename)
with open(filepath, 'w') as f:
def show_menu():
"""Показывает меню выбора типа данных.""" if hex_mode:
print("\n" + "=" * 80) #f.write("SYNC_DET_HEX\n")
print(" РАДАР ЭМУЛЯТОР - ВЫБОР ТИПА ДАННЫХ") #np.savetxt(f, np.uint32(data*1000), fmt='0xF4%06X')
print("=" * 80) np.savetxt(f, ((data * 1000).astype(np.int32) & 0xFFFFFF), fmt='0xF4%06X')
print() else:
print(" Выберите тип данных для генерирования:") f.write("FOURIER\n")
print() np.savetxt(f, data, fmt='%.6f')
print(" 1. RAW - Сырые данные с АЦП ({} точек)".format(RAW_SIZE))
print(" 2. SYNC_DET - Обработанные данные ({} точек)".format(SYNC_DET_SIZE))
print(" 3. FOURIER - Амплитудный спектр ({} точек)".format(FOURIER_SIZE)) # Устанавливаем время модификации
print() timestamp = file_time.timestamp()
print("=" * 80) os.utime(filepath, (timestamp, timestamp))
print()
# Прогресс
if (i + 1) % 100 == 0 or (i + 1) == count:
if __name__ == "__main__": print(f"{i + 1}/{count} файлов созданы", end='\r')
# Создаём директорию если её нет
os.makedirs(DATA_DIR, exist_ok=True) print(f"\n✅ Готово: {count} FOURIER файлов")
return start_time + timedelta(milliseconds=count * FILE_INTERVAL_MS)
print("\n" + "=" * 80)
print(" РАДАР ЭМУЛЯТОР - ГЕНЕРИРОВАНИЕ ТЕСТОВЫХ ДАННЫХ")
print("=" * 80) # ================================================================================
print(f" Директория: {DATA_DIR}") # ОСНОВНАЯ ПРОГРАММА
print(f" Количество файлов: {NUM_FILES}") # ================================================================================
print(f" Интервал между файлами: {FILE_INTERVAL_MS}мс")
print(f" Формат времени: HH:MM:SS.mmm") def show_menu():
print("=" * 80) """Показывает меню выбора типа данных."""
print("\n" + "=" * 80)
show_menu() print(" РАДАР ЭМУЛЯТОР - ВЫБОР ТИПА ДАННЫХ")
print("=" * 80)
while True: print()
try: print(" Выберите тип данных для генерирования:")
choice = input(" Введите номер (1/2/3) или 'q' для выхода: ").strip().lower() print()
print(" 1. RAW - Сырые данные с АЦП ({} точек)".format(RAW_SIZE))
if choice == 'q': print(" 2. SYNC_DET - Обработанные данные ({} точек)".format(SYNC_DET_SIZE))
print("\n Выход.") print(" 3. FOURIER - Амплитудный спектр ({} точек)".format(FOURIER_SIZE))
break print()
elif choice == '1': print("=" * 80)
start_time = datetime.now().replace(microsecond=0) print()
emit_raw_files(NUM_FILES, start_time)
break
elif choice == '2': if __name__ == "__main__":
start_time = datetime.now().replace(microsecond=0) # Создаём директорию если её нет
emit_sync_det_files(NUM_FILES, start_time) os.makedirs(DATA_DIR, exist_ok=True)
break
elif choice == '3': print("\n" + "=" * 80)
start_time = datetime.now().replace(microsecond=0) print(" РАДАР ЭМУЛЯТОР - ГЕНЕРИРОВАНИЕ ТЕСТОВЫХ ДАННЫХ")
emit_fourier_files(NUM_FILES, start_time) print("=" * 80)
break print(f" Директория: {DATA_DIR}")
else: print(f" Количество файлов: {NUM_FILES}")
print(" ❌ Неверный выбор. Введите 1, 2, 3 или q") print(f" Интервал между файлами: {FILE_INTERVAL_MS}мс")
except KeyboardInterrupt: print(f" Формат времени: HH:MM:SS.mmm")
print("\n\n Прервано пользователем.") print("=" * 80)
break
except Exception as e: show_menu()
print(f" ❌ Ошибка: {e}")
continue
while True:
print(f"\n📂 Файлы сохранены в: {DATA_DIR}") try:
print(f"\n💡 Запустите main_analyzer.py и откройте директорию {DATA_DIR}") hex_choice = input(" Генерировать в HEX (h) или float (f)? 'q' для выхода: ").strip().lower()
print(f" Анализатор автоматически подхватит новые файлы\n") if hex_choice == "h":
hex_mode = True
elif hex_choice == "f":
hex_mode = False
else:
print(" ❌ Неверный выбор. Введите h, f или q")
choice = input(" Введите номер (1/2/3) или 'q' для выхода: ").strip().lower()
if choice == 'q':
print("\n Выход.")
break
elif choice == '1':
start_time = datetime.now().replace(microsecond=0)
emit_raw_files(NUM_FILES, start_time, hex_mode=hex_mode)
break
elif choice == '2':
start_time = datetime.now().replace(microsecond=0)
emit_sync_det_files(NUM_FILES, start_time, hex_mode=hex_mode)
break
elif choice == '3':
start_time = datetime.now().replace(microsecond=0)
emit_fourier_files(NUM_FILES, start_time, hex_mode=hex_mode)
break
else:
print(" ❌ Неверный выбор. Введите 1, 2, 3 или q")
except KeyboardInterrupt:
print("\n\n Прервано пользователем.")
break
except Exception as e:
print(f" ❌ Ошибка: {e}")
continue
print(f"\n📂 Файлы сохранены в: {DATA_DIR}")
print(f"\n💡 Запустите main_analyzer.py и откройте директорию {DATA_DIR}")
print(f" Анализатор автоматически подхватит новые файлы\n")

2824
main.py

File diff suppressed because it is too large Load Diff