parser v0

This commit is contained in:
awe
2025-12-01 20:25:17 +03:00
parent bfc3949c4d
commit e43ce26fdf
7 changed files with 883 additions and 6 deletions

View File

@ -580,11 +580,27 @@ class VNADataAcquisition:
) )
# Still process the data if it has valid bits # Still process the data if it has valid bits
# Convert to dB if non-zero # Convert unsigned 24-bit to signed 24-bit (as in main.py to_int24)
# If MSB is set, treat as negative (two's complement)
if raw_value & 0x800000:
raw_value -= 0x1000000 # Convert to signed: -8388608 to +8388607
points.append((float(int(raw_value)), 0.)) points.append((float(raw_value), 0.))
if i == 0 or i == 100: if i == 0 or i == 100:
logger.debug(f"raw_value: {raw_value}, marker: {marker}, word= {word}" ) logger.debug(f"raw_value: {raw_value}, marker: {marker}, word= {word}" )
# Log statistics about parsed data
if points:
values = [p[0] for p in points]
logger.info(
"📊 Radar data parsed from pipe",
total_points=len(points),
min_value=min(values),
max_value=max(values),
mean_value=sum(values) / len(values) if values else 0,
non_zero=sum(1 for v in values if v != 0)
)
return points return points
def _parse_measurement_data(self, payload: bytes) -> list[tuple[float, float]]: def _parse_measurement_data(self, payload: bytes) -> list[tuple[float, float]]:

View File

@ -1,7 +1,7 @@
{ {
"y_min": -80, "y_min": -80,
"y_max": -15, "y_max": 40,
"autoscale": true, "autoscale": false,
"show_magnitude": true, "show_magnitude": true,
"show_phase": false, "show_phase": false,
"open_air": false "open_air": false

View File

@ -0,0 +1,19 @@
{
"data_type": "SYNC_DET",
"pont_in_one_fq_change": 86,
"gaussian_sigma": 5.0,
"fft0_delta": 5,
"standard_raw_size": 64000,
"standard_sync_size": 1000,
"freq_start_ghz": 3.0,
"freq_stop_ghz": 13.67,
"fq_end": 512,
"show_processed": true,
"show_fourier": true,
"normalize_signal": false,
"log_scale": false,
"disable_interpolation": false,
"y_max_processed": 900000.0,
"y_max_fourier": 1000.0,
"auto_scale": false
}

View File

@ -0,0 +1,197 @@
# RFG Processor - Радиофотонный радар
## Описание
RFGProcessor - новый процессор для обработки данных радиофотонного радара, основанный на алгоритмах из `RFG_Receiver_GUI/main.py`.
## Основные возможности
### Типы данных
- **SYNC_DET (0xF0)** - Данные синхронного детектирования (~1000 сэмплов)
- **RAW (0xD0)** - Сырые ADC данные с меандровой модуляцией (~64000 сэмплов)
### Обработка данных
#### Для SYNC_DET (по умолчанию):
1. Интерполяция до 1000 точек
2. Прямая FFT обработка (данные уже демодулированы)
3. Гауссово сглаживание
4. Обнуление центра FFT
#### Для RAW:
1. Интерполяция до 64000 точек
2. Генерация меандрового сигнала
3. Синхронное детектирование (умножение на меандр)
4. Частотная сегментация (по 86 точек)
5. FFT обработка с гауссовым сглаживанием
6. Обнуление центра FFT
### Выходные графики
1. **Processed Signal** - Обработанный сигнал в частотной области (3-13.67 ГГц)
2. **Fourier Transform** - Спектр FFT с гауссовым сглаживанием
## Конфигурация
### Файл: `configs/rfg_config.json`
```json
{
"data_type": "SYNC_DET", // Тип данных: "RAW" или "SYNC_DET"
"pont_in_one_fq_change": 86, // Размер сегмента частоты
"gaussian_sigma": 5.0, // Параметр сглаживания (σ)
"fft0_delta": 5, // Смещение обнуления центра FFT
"standard_raw_size": 64000, // Целевой размер для RAW
"standard_sync_size": 1000, // Целевой размер для SYNC_DET
"freq_start_ghz": 3.0, // Начало частотного диапазона
"freq_stop_ghz": 13.67, // Конец частотного диапазона
"fq_end": 512, // Точка отсечки FFT
"show_processed": true, // Показать Processed Signal
"show_fourier": true // Показать Fourier Transform
}
```
### UI параметры
Процессор предоставляет следующие настройки через веб-интерфейс:
- **Тип данных** - Выбор между RAW и SYNC_DET
- **Гауссово сглаживание (σ)** - Слайдер 0.1-20.0
- **Размер сегмента частоты** - Слайдер 50-200
- **Смещение центра FFT** - Слайдер 0-20
- **Точка отсечки FFT** - Слайдер 100-1000
- **Частота начала (ГГц)** - Слайдер 1.0-15.0
- **Частота конца (ГГц)** - Слайдер 1.0-15.0
- **Переключатели** - Показать/скрыть графики
## Использование
### Автоматическая регистрация
Процессор автоматически регистрируется в `ProcessorManager` при запуске системы.
### Входные данные
Процессор ожидает данные из pipe в формате `SweepData`:
- `points`: список пар `[(real, imag), ...]`
- Для ADC данных используется только `real` часть
- Данные должны соответствовать формату 0xF0 (SYNC_DET) или 0xD0 (RAW)
### Пример структуры данных
```python
sweep_data = SweepData(
sweep_number=1,
timestamp=1234567890.0,
points=[(adc_sample_1, 0), (adc_sample_2, 0), ...],
total_points=1000 # или 64000 для RAW
)
```
## Алгоритмы обработки
### 1. Интерполяция данных
```python
# Линейная интерполяция scipy.interpolate.interp1d
old_indices = np.linspace(0, 1, len(data))
new_indices = np.linspace(0, 1, target_size)
```
### 2. Меандровая демодуляция (только RAW)
```python
# Генерация квадратного сигнала
time_idx = np.arange(1, size + 1)
meander = square(time_idx * np.pi)
demodulated = data * meander
```
### 3. Частотная сегментация (только RAW)
```python
# Разделение на сегменты и суммирование
segment_size = 86
for segment in segments:
signal.append(np.sum(segment))
```
### 4. FFT обработка
```python
# Обрезка + FFT + сдвиг
sig_cut = np.sqrt(np.abs(signal[:512]))
F = np.fft.fft(sig_cut)
Fshift = np.abs(np.fft.fftshift(F))
# Обнуление центра
center = len(sig_cut) // 2
Fshift[center:center+1] = 0
# Гауссово сглаживание
FshiftS = gaussian_filter1d(Fshift, sigma=5.0)
```
## Отличия от оригинального main.py
### Изменено:
1. **Вход данных**: Вместо CSV файлов - pipe через `SweepData`
2. **Без накопления**: Обработка каждой развёртки отдельно (убран `PeriodIntegrate=2`)
3. **Без B-scan**: Только Processed Signal + Fourier Transform
4. **Plotly вместо Matplotlib**: Веб-визуализация
5. **JSON конфигурация**: Вместо хардкода констант
6. **Один sweep**: Нет потокового чтения файлов
### Сохранено:
- ✅ Алгоритмы обработки (интерполяция, меандр, сегментация, FFT)
- ✅ Гауссово сглаживание
- ✅ Обнуление центра FFT
- ✅ Частотный диапазон 3-13.67 ГГц
- ✅ Параметры обработки (sigma=5, segment=86, delta=5)
## Файлы
```
vna_system/core/processors/
├── implementations/
│ ├── rfg_processor.py # Основной класс
│ └── RFG_PROCESSOR_README.md # Эта документация
├── configs/
│ └── rfg_config.json # Конфигурация по умолчанию
└── manager.py # Регистрация процессора
```
## Разработка и отладка
### Логирование
Процессор использует стандартную систему логирования:
```python
from vna_system.core.logging.logger import get_component_logger
logger = get_component_logger(__file__)
```
### Тестирование
Для тестирования создайте `SweepData` с тестовыми ADC данными:
```python
# Тест SYNC_DET (1000 точек)
test_data = [(float(i), 0.0) for i in range(1000)]
# Тест RAW (64000 точек)
test_data = [(float(i), 0.0) for i in range(64000)]
```
## Зависимости
- `numpy` - Численные операции
- `scipy.signal.square` - Генерация меандра
- `scipy.ndimage.gaussian_filter1d` - Гауссово сглаживание
- `scipy.interpolate.interp1d` - Интерполяция
- `plotly` - Визуализация
## Авторство
Создан на основе алгоритмов из `/home/awe/Documents/RFG_Receiver_GUI/main.py`
Адаптирован для VNA System architecture.
## Версия
v1.0 - Первая реализация (2025-11-28)
- Поддержка SYNC_DET и RAW форматов
- Два графика: Processed Signal + Fourier Transform
- Полная интеграция с VNA System

View File

@ -94,7 +94,7 @@ class MagnitudeProcessor(BaseProcessor):
real_points.append(float(real)) real_points.append(float(real))
imag_points.append(float(imag)) imag_points.append(float(imag))
mag = abs(complex_val) mag = abs(complex_val)
mags_db.append(20.0 * np.log10(mag) if mag > 0.0 else -120.0) mags_db.append( 20*np.log10(mag) if mag > 0.0 else -120.0)
phases_deg.append(np.degrees(np.angle(complex_val))) phases_deg.append(np.degrees(np.angle(complex_val)))
result = { result = {

View File

@ -0,0 +1,643 @@
from __future__ import annotations
from datetime import datetime
from pathlib import Path
from typing import Any
import numpy as np
from numpy.typing import NDArray
from scipy.signal import square
from scipy.ndimage import gaussian_filter1d
from scipy.interpolate import interp1d
from vna_system.core.logging.logger import get_component_logger
from vna_system.core.processors.base_processor import BaseProcessor, UIParameter, ProcessedResult
from vna_system.core.acquisition.sweep_buffer import SweepData
logger = get_component_logger(__file__)
class RFGProcessor(BaseProcessor):
"""
Radiophotonic radar processor based on RFG_Receiver_GUI algorithm.
Processes ADC data with synchronous detection (0xF0 format) or RAW data (0xD0 format).
Outputs two graphs:
- Processed Signal: Frequency domain signal (3-13.67 GHz range)
- Fourier Transform: FFT magnitude spectrum with Gaussian smoothing
Features
--------
- Data interpolation to standard size
- Meander demodulation (for RAW data)
- Frequency segmentation
- FFT processing with Gaussian smoothing
- Center zeroing for artifact reduction
"""
def __init__(self, config_dir: Path) -> None:
super().__init__("rfg_radar", config_dir)
# No history accumulation needed (process single sweeps)
self._max_history = 1
# Pre-computed meander signal (will be initialized on first use)
self._meandr: NDArray[np.floating] | None = None
self._last_size: int = 0
logger.info("RFGProcessor initialized", processor_id=self.processor_id)
# -------------------------------------------------------------------------
# Configuration
# -------------------------------------------------------------------------
def _get_default_config(self) -> dict[str, Any]:
"""Return default configuration values."""
return {
"data_type": "SYNC_DET", # "RAW" or "SYNC_DET"
"pont_in_one_fq_change": 86, # Frequency segment size
"gaussian_sigma": 5.0, # FFT smoothing sigma
"fft0_delta": 5, # Center zero offset
"standard_raw_size": 64000, # Target size for RAW data
"standard_sync_size": 1000, # Target size for SYNC_DET data
"freq_start_ghz": 3.0, # Display frequency start (GHz)
"freq_stop_ghz": 13.67, # Display frequency stop (GHz)
"fq_end": 512, # FFT cutoff point
"show_processed": True, # Show processed signal graph
"show_fourier": True, # Show Fourier transform graph
"normalize_signal": False, # Normalize processed signal to [0, 1]
"log_scale": False, # Use logarithmic scale for signal
"disable_interpolation": False, # Skip interpolation (use raw data size)
"y_max_processed": 900000.0, # Max Y-axis value for processed signal
"y_max_fourier": 1000.0, # Max Y-axis value for Fourier spectrum
"auto_scale": False, # Auto-scale Y-axis (ignore y_max if True)
}
def get_ui_parameters(self) -> list[UIParameter]:
"""Return UI parameter schema for configuration."""
cfg = self._config
return [
UIParameter(
name="data_type",
label="Тип данных",
type="select",
value=cfg["data_type"],
options={"choices": ["RAW", "SYNC_DET"]},
),
UIParameter(
name="gaussian_sigma",
label="Гауссово сглаживание (σ)",
type="slider",
value=cfg["gaussian_sigma"],
options={"min": 0.1, "max": 20.0, "step": 0.1, "dtype": "float"},
),
UIParameter(
name="pont_in_one_fq_change",
label="Размер сегмента частоты",
type="slider",
value=cfg["pont_in_one_fq_change"],
options={"min": 50, "max": 200, "step": 1, "dtype": "int"},
),
UIParameter(
name="fft0_delta",
label="Смещение центра FFT",
type="slider",
value=cfg["fft0_delta"],
options={"min": 0, "max": 20, "step": 1, "dtype": "int"},
),
UIParameter(
name="fq_end",
label="Точка отсечки FFT",
type="slider",
value=cfg["fq_end"],
options={"min": 100, "max": 1000, "step": 10, "dtype": "int"},
),
UIParameter(
name="freq_start_ghz",
label="Частота начала (ГГц)",
type="slider",
value=cfg["freq_start_ghz"],
options={"min": 1.0, "max": 15.0, "step": 0.1, "dtype": "float"},
),
UIParameter(
name="freq_stop_ghz",
label="Частота конца (ГГц)",
type="slider",
value=cfg["freq_stop_ghz"],
options={"min": 1.0, "max": 15.0, "step": 0.1, "dtype": "float"},
),
UIParameter(
name="show_processed",
label="Показать обработанный сигнал",
type="toggle",
value=cfg["show_processed"],
),
UIParameter(
name="show_fourier",
label="Показать Фурье образ",
type="toggle",
value=cfg["show_fourier"],
),
UIParameter(
name="normalize_signal",
label="Нормализовать сигнал",
type="toggle",
value=cfg["normalize_signal"],
),
UIParameter(
name="log_scale",
label="Логарифмическая шкала",
type="toggle",
value=cfg["log_scale"],
),
UIParameter(
name="disable_interpolation",
label="Без интерполяции (как FOURIER)",
type="toggle",
value=cfg["disable_interpolation"],
),
UIParameter(
name="auto_scale",
label="Автоматический масштаб Y",
type="toggle",
value=cfg["auto_scale"],
),
UIParameter(
name="y_max_processed",
label="Макс. амплитуда (Processed)",
type="slider",
value=cfg["y_max_processed"],
options={"min": 1000.0, "max": 200000.0, "step": 1000.0, "dtype": "float"},
),
UIParameter(
name="y_max_fourier",
label="Макс. амплитуда (Fourier)",
type="slider",
value=cfg["y_max_fourier"],
options={"min": 1000.0, "max": 200000.0, "step": 1000.0, "dtype": "float"},
),
]
# -------------------------------------------------------------------------
# Processing
# -------------------------------------------------------------------------
def process_sweep(
self,
sweep_data: SweepData,
calibrated_data: SweepData | None,
vna_config: dict[str, Any],
) -> dict[str, Any]:
"""
Process a single sweep of ADC data.
Returns
-------
dict
Keys: processed_signal, fourier_spectrum, freq_axis, data_type, points_processed
Or: {"error": "..."} on failure.
"""
try:
# Extract raw ADC data from sweep (use real part only)
adc_data = self._extract_adc_data(sweep_data)
if adc_data is None or adc_data.size == 0:
logger.warning("No valid ADC data for RFG processing")
return {"error": "No valid ADC data"}
data_type = self._config["data_type"]
if data_type == "RAW":
# Full processing with meander demodulation
processed_signal, fourier_spectrum = self._process_raw_data(adc_data)
elif data_type == "SYNC_DET":
# Direct FFT processing (data already demodulated)
processed_signal, fourier_spectrum = self._process_sync_det_data(adc_data)
else:
return {"error": f"Unknown data type: {data_type}"}
# Generate frequency axis for visualization
freq_axis = self._generate_frequency_axis(processed_signal)
fourier_spectrum /= (2*3.14)
return {
"processed_signal": processed_signal.tolist(),
"fourier_spectrum": fourier_spectrum.tolist(),
"freq_axis": freq_axis.tolist(),
"data_type": data_type,
"points_processed": int(adc_data.size),
"original_size": int(adc_data.size),
}
except Exception as exc: # noqa: BLE001
logger.error("RFG processing failed", error=repr(exc))
return {"error": str(exc)}
# -------------------------------------------------------------------------
# Visualization
# -------------------------------------------------------------------------
def generate_plotly_config(
self,
processed_data: dict[str, Any],
vna_config: dict[str, Any], # noqa: ARG002
) -> dict[str, Any]:
"""
Produce Plotly configuration for two subplots: Processed Signal + Fourier Transform.
"""
if "error" in processed_data:
return {
"data": [],
"layout": {
"title": "RFG Радар - Ошибка",
"annotations": [
{
"text": f"Ошибка: {processed_data['error']}",
"x": 0.5,
"y": 0.5,
"xref": "paper",
"yref": "paper",
"showarrow": False,
}
],
"template": "plotly_dark",
},
}
processed_signal = processed_data.get("processed_signal", [])
fourier_spectrum = processed_data.get("fourier_spectrum", [])
freq_axis = processed_data.get("freq_axis", [])
# Create subplot configuration
traces = []
if self._config["show_processed"] and processed_signal:
# Processed Signal trace - use absolute value as in main.py line 1043
import numpy as np
processed_signal_abs = np.abs(np.array(processed_signal))
# Apply normalization if enabled
if self._config.get("normalize_signal", False):
max_val = np.max(processed_signal_abs)
if max_val > 0:
processed_signal_abs = processed_signal_abs / max_val
# Apply log scale if enabled
if self._config.get("log_scale", False):
processed_signal_abs = np.log10(processed_signal_abs + 1e-12) # Add small epsilon to avoid log(0)
logger.debug(
"Processed signal stats",
min=float(np.min(processed_signal_abs)),
max=float(np.max(processed_signal_abs)),
mean=float(np.mean(processed_signal_abs)),
size=len(processed_signal_abs)
)
traces.append({
"type": "scatter",
"mode": "lines",
"x": freq_axis,
"y": processed_signal_abs.tolist(),
"name": "Обработанный сигнал",
"line": {"width": 1, "color": "cyan"},
"xaxis": "x",
"yaxis": "y",
})
if self._config["show_fourier"] and fourier_spectrum:
# Fourier Transform trace
fft_x = list(range(len(fourier_spectrum)))
traces.append({
"type": "scatter",
"mode": "lines",
"x": fft_x,
"y": fourier_spectrum,
"name": "Фурье образ",
"line": {"width": 1, "color": "yellow"},
"xaxis": "x2",
"yaxis": "y2",
})
# Build layout with two subplots
layout = {
"title": (
f"RFG Радар - {processed_data.get('data_type', 'N/A')} | "
f"Точек: {processed_data.get('points_processed', 0)}"
),
"grid": {"rows": 2, "columns": 1, "pattern": "independent"},
"xaxis": {
"title": "Частота (ГГц)",
"domain": [0, 1],
"anchor": "y",
},
"yaxis": {
"title": "Амплитуда",
"domain": [0.55, 1],
"anchor": "x",
},
"xaxis2": {
"title": "Индекс",
"domain": [0, 1],
"anchor": "y2",
},
"yaxis2": {
"title": "Магнитуда FFT",
"domain": [0, 0.45],
"anchor": "x2",
},
"template": "plotly_dark",
"height": 700,
"showlegend": True,
"hovermode": "closest",
}
# Apply Y-axis limits if not auto-scaling
if not self._config.get("auto_scale", False):
y_max_processed = float(self._config.get("y_max_processed", 90000.0))
y_max_fourier = float(self._config.get("y_max_fourier", 60000.0))
layout["yaxis"]["range"] = [0, y_max_processed]
layout["yaxis2"]["range"] = [0, y_max_fourier]
logger.debug(
"Y-axis limits applied",
processed_max=y_max_processed,
fourier_max=y_max_fourier
)
return {"data": traces, "layout": layout}
# -------------------------------------------------------------------------
# Data Processing Helpers
# -------------------------------------------------------------------------
def _extract_adc_data(self, sweep_data: SweepData) -> NDArray[np.floating] | None:
"""
Extract ADC data from SweepData.
Assumes sweep_data.points contains [(real, imag), ...] pairs.
For ADC data, we take only the real part.
"""
try:
if not sweep_data.points:
return None
# Extract real part (ADC samples)
arr = np.asarray(sweep_data.points, dtype=float)
logger.info(
"🔍 RAW INPUT DATA SHAPE",
shape=arr.shape,
ndim=arr.ndim,
total_points=sweep_data.total_points,
dtype=arr.dtype
)
if arr.ndim == 2 and arr.shape[1] >= 1:
# Take first column (real part)
adc_data = arr[:, 0]
elif arr.ndim == 1:
# Already 1D array
adc_data = arr
else:
raise ValueError("Unexpected data shape for ADC extraction")
logger.info(
"📊 EXTRACTED ADC DATA",
size=adc_data.size,
min=float(np.min(adc_data)),
max=float(np.max(adc_data)),
mean=float(np.mean(adc_data)),
non_zero_count=int(np.count_nonzero(adc_data))
)
return adc_data.astype(float, copy=False)
except Exception as exc: # noqa: BLE001
logger.error("Failed to extract ADC data", error=repr(exc))
return None
def _resize_1d_interpolate(
self,
data: NDArray[np.floating],
target_size: int,
) -> NDArray[np.floating]:
"""
Resize 1D array using linear interpolation.
Based on main.py resize_1d_interpolate() function.
"""
if len(data) == target_size:
return data
old_indices = np.linspace(0, 1, len(data))
new_indices = np.linspace(0, 1, target_size)
f = interp1d(old_indices, data, kind='linear', fill_value='extrapolate')
return f(new_indices)
def _process_raw_data(
self,
adc_data: NDArray[np.floating],
) -> tuple[NDArray[np.floating], NDArray[np.floating]]:
"""
Process RAW ADC data (0xD0 format).
Pipeline (based on main.py lines 824-896):
1. Interpolate to standard size (64000)
2. Generate meander signal for demodulation
3. Synchronous detection (multiply by meander)
4. Frequency segmentation
5. FFT processing with Gaussian smoothing
Returns
-------
tuple
(processed_signal, fourier_spectrum)
"""
# Step 1: Resize to standard RAW size
target_size = int(self._config["standard_raw_size"])
data_resized = self._resize_1d_interpolate(adc_data, target_size)
# Step 2: Generate meander signal (square wave)
if self._meandr is None or self._last_size != target_size:
time_idx = np.arange(1, target_size + 1)
self._meandr = square(time_idx * np.pi)
self._last_size = target_size
logger.debug("Meander signal regenerated", size=target_size)
# Step 3: Meander demodulation (synchronous detection)
demodulated = data_resized * self._meandr
# Step 4: Frequency segmentation
processed_signal = self._frequency_segmentation(demodulated)
# Step 5: FFT processing
fourier_spectrum = self._compute_fft_spectrum(processed_signal) / (2*3.14)
return processed_signal, fourier_spectrum
def _process_sync_det_data(
self,
adc_data: NDArray[np.floating],
) -> tuple[NDArray[np.floating], NDArray[np.floating]]:
"""
Process SYNC_DET data (0xF0 format).
Pipeline (based on main.py lines 898-917):
1. Interpolate to standard size (1000)
2. Use data directly as signal (already demodulated)
3. FFT processing with Gaussian smoothing
Returns
-------
tuple
(processed_signal, fourier_spectrum)
"""
# Step 1: Optionally resize to standard SYNC_DET size
if self._config.get("disable_interpolation", False):
# FOURIER mode: no interpolation, use raw data size
data_resized = adc_data
logger.info("🚫 Interpolation disabled - using raw data size", size=adc_data.size)
else:
# SYNC_DET mode: interpolate to standard size
target_size = int(self._config["standard_sync_size"])
data_resized = self._resize_1d_interpolate(adc_data, target_size)
logger.debug(
"SYNC_DET data resized",
original_size=adc_data.size,
target_size=target_size,
min_val=float(np.min(data_resized)),
max_val=float(np.max(data_resized)),
mean_val=float(np.mean(data_resized))
)
# Step 2: Data is already demodulated, use directly
processed_signal = data_resized
# Step 3: FFT processing
fourier_spectrum = self._compute_fft_spectrum(processed_signal)
logger.debug(
"SYNC_DET processing complete",
signal_size=processed_signal.size,
fft_size=fourier_spectrum.size
)
return processed_signal, fourier_spectrum
def _frequency_segmentation(
self,
data: NDArray[np.floating],
) -> NDArray[np.floating]:
"""
Divide data into frequency segments and sum each segment.
Based on main.py lines 866-884.
Parameters
----------
data : ndarray
Demodulated signal
Returns
-------
ndarray
Segmented and summed signal
"""
pont_in_one_fq = int(self._config["pont_in_one_fq_change"])
signal_list = []
start = 0
segment_start_idx = 0
for idx in range(len(data)):
if (idx - start) > pont_in_one_fq:
segment = data[segment_start_idx:idx]
if segment.size > 0:
# Sum the segment to extract signal
signal_list.append(np.sum(segment))
start = idx
segment_start_idx = idx
return np.array(signal_list, dtype=float)
def _compute_fft_spectrum(
self,
signal: NDArray[np.floating],
) -> NDArray[np.floating]:
"""
Compute FFT spectrum with Gaussian smoothing.
Based on main.py lines 984-994.
Parameters
----------
signal : ndarray
Input signal (processed or raw)
Returns
-------
ndarray
Smoothed FFT magnitude spectrum
"""
fq_end = int(self._config["fq_end"])
gaussian_sigma = float(self._config["gaussian_sigma"])
fft0_delta = int(self._config["fft0_delta"])
# Cut signal to FFT length
sig_cut = signal[:fq_end] if len(signal) >= fq_end else signal
# Take square root of absolute value (as in main.py)
sig_cut = np.sqrt(np.abs(sig_cut))
# Compute FFT
F = np.fft.fft(sig_cut)
Fshift = np.abs(np.fft.fftshift(F))
# Zero out the center (remove DC component and nearby artifacts)
center = len(sig_cut) // 2
if center < len(Fshift):
zero_start = max(center - 0, 0)
zero_end = min(center + 1, len(Fshift))
Fshift[zero_start:zero_end] = 0
# Apply Gaussian smoothing
FshiftS = gaussian_filter1d(Fshift, gaussian_sigma)
return FshiftS
def _generate_frequency_axis(
self,
signal: NDArray[np.floating],
) -> NDArray[np.floating]:
"""
Generate frequency axis for visualization.
Based on main.py lines 1041-1042: maps signal indices to 3-13.67 GHz range.
Parameters
----------
signal : ndarray
Processed signal
Returns
-------
ndarray
Frequency axis in GHz
"""
freq_start = float(self._config["freq_start_ghz"])
freq_stop = float(self._config["freq_stop_ghz"])
signal_size = signal.size
if signal_size == 0:
return np.array([])
# Calculate frequency per point
freq_range = freq_stop - freq_start
per_point_fq = freq_range / signal_size
# Generate frequency axis: start + (index * per_point_fq)
freq_axis = freq_start + (np.arange(1, signal_size + 1) * per_point_fq)
return freq_axis

View File

@ -397,11 +397,13 @@ class ProcessorManager:
try: try:
from .implementations.magnitude_processor import MagnitudeProcessor from .implementations.magnitude_processor import MagnitudeProcessor
from .implementations.bscan_processor import BScanProcessor from .implementations.bscan_processor import BScanProcessor
from .implementations.rfg_processor import RFGProcessor
# self.register_processor(PhaseProcessor(self.config_dir)) # self.register_processor(PhaseProcessor(self.config_dir))
self.register_processor(BScanProcessor(self.config_dir)) self.register_processor(BScanProcessor(self.config_dir))
self.register_processor(MagnitudeProcessor(self.config_dir)) self.register_processor(MagnitudeProcessor(self.config_dir))
self.register_processor(RFGProcessor(self.config_dir))
# self.register_processor(SmithChartProcessor(self.config_dir)) # self.register_processor(SmithChartProcessor(self.config_dir))
logger.info("Default processors registered", count=len(self._processors)) logger.info("Default processors registered", count=len(self._processors))