Compare commits

...

4 Commits

Author SHA1 Message Date
awe
e43ce26fdf parser v0 2025-12-01 20:25:17 +03:00
awe
bfc3949c4d interface upd 2025-11-27 18:25:56 +03:00
awe
2742cfe856 working_laser control 2025-11-27 18:11:33 +03:00
awe
c64f2a4d6b acquire data from pipe 2025-11-27 15:57:49 +03:00
10 changed files with 1208 additions and 116 deletions

View File

@ -1,5 +1,6 @@
import io import io
import json import json
import math
import os import os
import struct import struct
import threading import threading
@ -192,6 +193,67 @@ class VNADataAcquisition:
# --------------------------------------------------------------------- # # --------------------------------------------------------------------- #
# Acquisition loop # Acquisition loop
# --------------------------------------------------------------------- # # --------------------------------------------------------------------- #
def _radar_pipe_acquisition_loop(self) -> None:
"""Acquisition loop for reading radar data from named pipe."""
logger.info("Starting radar pipe acquisition loop", pipe_path=cfg.RADAR_PIPE_PATH)
while self._running and not self._stop_event.is_set():
try:
# Honor pause
if self._paused:
time.sleep(0.1)
continue
# Open named pipe for reading
pipe_path = Path(cfg.RADAR_PIPE_PATH)
# Check if pipe exists
if not pipe_path.exists():
logger.warning("Radar pipe not found, waiting...", path=str(pipe_path))
time.sleep(1.0)
continue
# Open pipe in non-blocking mode to avoid hanging
with open(pipe_path, "rb") as pipe:
# Read data from pipe (adjust buffer size as needed)
data = pipe.read(cfg.EXPECTED_POINTS_PER_SWEEP * cfg.RADAR_BYTES_PER_SAMPLE)
if not data:
time.sleep(0.01)
continue
# Parse radar data
points = self._parse_radar_data(data)
if points:
timestamp = time.time()
sweep_number = self._sweep_buffer.add_sweep(points, timestamp=timestamp)
logger.info(
"Radar sweep collected from pipe",
sweep_number=sweep_number,
points=len(points),
timestamp=timestamp
)
# # Play sweep notification sound
# self._sound_player.play()
# Handle single-sweep mode transitions
if not self._continuous_mode:
if self._single_sweep_requested:
self._single_sweep_requested = False
logger.info("Single radar sweep completed; pausing acquisition")
self.pause()
else:
self.pause()
except FileNotFoundError:
logger.warning("Radar pipe not found, retrying...", pipe_path=cfg.RADAR_PIPE_PATH)
time.sleep(1.0)
except Exception as exc: # noqa: BLE001
logger.error("Radar pipe acquisition loop error", error=repr(exc))
time.sleep(1.0)
def _simulator_acquisition_loop(self) -> None: def _simulator_acquisition_loop(self) -> None:
"""Simplified acquisition loop for simulator mode.""" """Simplified acquisition loop for simulator mode."""
logger.info("Starting simulator acquisition loop") logger.info("Starting simulator acquisition loop")
@ -236,6 +298,11 @@ class VNADataAcquisition:
def _acquisition_loop(self) -> None: def _acquisition_loop(self) -> None:
"""Main acquisition loop executed by the background thread.""" """Main acquisition loop executed by the background thread."""
# Use radar pipe loop if enabled
if cfg.USE_RADAR_PIPE:
self._radar_pipe_acquisition_loop()
return
# Use simulator loop if simulator is enabled # Use simulator loop if simulator is enabled
if self._simulator is not None: if self._simulator is not None:
self._simulator_acquisition_loop() self._simulator_acquisition_loop()
@ -468,6 +535,74 @@ class VNADataAcquisition:
# --------------------------------------------------------------------- # # --------------------------------------------------------------------- #
# Parsing & detection # Parsing & detection
# --------------------------------------------------------------------- # # --------------------------------------------------------------------- #
def _parse_radar_data(self, data: bytes) -> list[tuple[float, float]]:
"""
Parse radar data from named pipe format.
Expected format: 4 bytes per 32-bit word (big-endian)
- Word format: 0xF0XXXXXX where:
- F0 is the marker byte (bits 31-24)
- XXXXXX is the 24-bit data value (bits 23-0)
Returns list of (real, imag) tuples where:
- real: dB value converted from raw data
- imag: always 0.0
"""
points: list[tuple[float, float]] = []
# Process data in 4-byte chunks as 32-bit words (big-endian)
num_words = len(data) // cfg.RADAR_BYTES_PER_SAMPLE
for i in range(num_words):
offset = i * cfg.RADAR_BYTES_PER_SAMPLE
chunk = data[offset : offset + cfg.RADAR_BYTES_PER_SAMPLE]
# Unpack as big-endian 32-bit unsigned integer
word = struct.unpack("<I", chunk)[0]
# Extract marker (top 8 bits)
marker = (word >> 24) & 0xFF
# Extract 24-bit data value (lower 24 bits)
raw_value = word & 0xFFFFFF
# Check marker byte - log warning but continue processing
if marker != cfg.RADAR_DATA_MARKER:
# Only log occasionally to avoid spam
if i % 100 == 0:
logger.debug(
"Non-F0 marker detected",
marker=hex(marker),
word=hex(word),
offset=offset
)
# Still process the data if it has valid bits
# Convert unsigned 24-bit to signed 24-bit (as in main.py to_int24)
# If MSB is set, treat as negative (two's complement)
if raw_value & 0x800000:
raw_value -= 0x1000000 # Convert to signed: -8388608 to +8388607
points.append((float(raw_value), 0.))
if i == 0 or i == 100:
logger.debug(f"raw_value: {raw_value}, marker: {marker}, word= {word}" )
# Log statistics about parsed data
if points:
values = [p[0] for p in points]
logger.info(
"📊 Radar data parsed from pipe",
total_points=len(points),
min_value=min(values),
max_value=max(values),
mean_value=sum(values) / len(values) if values else 0,
non_zero=sum(1 for v in values if v != 0)
)
return points
def _parse_measurement_data(self, payload: bytes) -> list[tuple[float, float]]: def _parse_measurement_data(self, payload: bytes) -> list[tuple[float, float]]:
"""Parse complex measurement samples (float32 pairs) from a payload.""" """Parse complex measurement samples (float32 pairs) from a payload."""
if len(payload) <= cfg.MEAS_HEADER_LEN: if len(payload) <= cfg.MEAS_HEADER_LEN:

View File

@ -35,10 +35,18 @@ VNA_PID = 0x5740 # STM32 Virtual ComPort
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Simulator mode settings # Simulator mode settings
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
USE_SIMULATOR = True # Set to True to use simulator instead of real device USE_SIMULATOR = False # Set to True to use simulator instead of real device
SIMULATOR_SWEEP_FILE = BASE_DIR / "binary_input" / "sweep_example" / "example.json" SIMULATOR_SWEEP_FILE = BASE_DIR / "binary_input" / "sweep_example" / "example.json"
SIMULATOR_NOISE_LEVEL = 100 # Standard deviation of Gaussian noise to add to real and imaginary parts SIMULATOR_NOISE_LEVEL = 100 # Standard deviation of Gaussian noise to add to real and imaginary parts
# -----------------------------------------------------------------------------
# Radar pipe mode settings
# -----------------------------------------------------------------------------
USE_RADAR_PIPE = True # Set to True to read radar data from named pipe
RADAR_PIPE_PATH = "/tmp/radar_data_pipe" # Path to the named pipe
RADAR_DATA_MARKER = 0xF0 # First byte marker for radar data packets
RADAR_BYTES_PER_SAMPLE = 4 # Total bytes per radar sample (1 marker + 3 data)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Sweep detection and parsing constants # Sweep detection and parsing constants
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@ -46,7 +54,7 @@ SWEEP_CMD_LEN = 515
SWEEP_CMD_PREFIX = bytes([0xAA, 0x00, 0xDA]) SWEEP_CMD_PREFIX = bytes([0xAA, 0x00, 0xDA])
MEAS_HEADER_LEN = 21 MEAS_HEADER_LEN = 21
MEAS_CMDS_PER_SWEEP = 17 MEAS_CMDS_PER_SWEEP = 17
EXPECTED_POINTS_PER_SWEEP = 1000 EXPECTED_POINTS_PER_SWEEP = 1024
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Buffer settings # Buffer settings

View File

@ -1,11 +1,13 @@
import logging import logging
import time import time
import json
from pathlib import Path
from typing import Optional, Dict, Any from typing import Optional, Dict, Any
from datetime import datetime from datetime import datetime
from ...api.models.laser import LaserParameters, LaserStatus from ...api.models.laser import LaserParameters, LaserStatus
from .RadioPhotonic_PCB_PC_software import device_interaction as dev from .RFG_PCB_PC_controller_supersimple import device_interaction as dev
from .RadioPhotonic_PCB_PC_software import device_commands as cmd from .RFG_PCB_PC_controller_supersimple import device_commands as cmd
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -14,11 +16,19 @@ class LaserController:
""" """
Controller for laser control system. Controller for laser control system.
Uses RFG_PCB_PC_controller_supersimple module for device communication.
Communicates with RadioPhotonic board via serial port (115200 baud). Communicates with RadioPhotonic board via serial port (115200 baud).
Supports both manual control and automated scanning modes. Supports both manual control and automated scanning modes.
Control Logic (RFG implementation):
- Manual mode: send_control_parameters() to set steady current/temperature
- Scan mode: send_control_parameters() THEN send_task_command() for scanning
- Stop: send_control_parameters() to return to steady state (not reset)
TaskType format: String-based ("TT_CHANGE_CURR_1", "TT_CHANGE_CURR_2")
""" """
def __init__(self): def __init__(self, config_path: Optional[str] = None):
self.prt = None # Serial port object self.prt = None # Serial port object
self.is_connected = False self.is_connected = False
self.is_running = False self.is_running = False
@ -26,14 +36,78 @@ class LaserController:
self.current_status = LaserStatus() self.current_status = LaserStatus()
self.last_data: Optional[Dict[str, Any]] = None self.last_data: Optional[Dict[str, Any]] = None
# Default PI coefficients (multiplied by 256 as per device protocol) # Load default parameters from JSON
self._load_default_parameters(config_path)
logger.info("LaserController initialized")
def _load_default_parameters(self, config_path: Optional[str] = None) -> None:
"""
Load default parameters from JSON configuration file.
Args:
config_path: Path to JSON config file. If None, uses default location.
"""
try:
# Determine config file path
if config_path is None:
# Default path relative to this file
current_dir = Path(__file__).parent
config_path = current_dir / "RadioPhotonic_PCB_PC_software" / "init_params.json"
else:
config_path = Path(config_path)
# Load JSON file
if not config_path.exists():
logger.warning(f"Config file not found: {config_path}, using hardcoded defaults")
self._set_hardcoded_defaults()
return
with open(config_path, 'r') as f:
config = json.load(f)
# Set parameters from config
self.initial_temperature_1 = config.get("INITIAL_TEMPERATURE_1", 28)
self.initial_temperature_2 = config.get("INITIAL_TEMPERATURE_2", 28.9)
self.initial_current_1 = config.get("INITIAL_CURRENT_1", 33)
self.initial_current_2 = config.get("INITIAL_CURRENT_2", 35)
# PI coefficients (multiplied by 256 as per device protocol)
self.proportional_coeff_1 = int(config.get("PROPORTIONAL_COEFF_1", 10) * 256)
self.proportional_coeff_2 = int(config.get("PROPORTIONAL_COEFF_2", 10) * 256)
self.integral_coeff_1 = int(config.get("INTEGRAL_COEFF_1", 0.5) * 256)
self.integral_coeff_2 = int(config.get("INTEGRAL_COEFF_2", 0.5) * 256)
self.message_id = config.get("MESSAGE_ID", "00FF")
# Additional parameters
self.gui_timeout_interval = config.get("GUI_TIMEOUT_INTERVAL", 5)
self.save_points_number = config.get("SAVE_POINTS_NUMBER", 1000)
logger.info(f"Default parameters loaded from {config_path}")
logger.info(f" Initial T1: {self.initial_temperature_1}°C, T2: {self.initial_temperature_2}°C")
logger.info(f" Initial I1: {self.initial_current_1} mA, I2: {self.initial_current_2} mA")
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON in config file: {e}")
self._set_hardcoded_defaults()
except Exception as e:
logger.error(f"Error loading config file: {e}")
self._set_hardcoded_defaults()
def _set_hardcoded_defaults(self) -> None:
"""Set hardcoded default parameters as fallback."""
self.initial_temperature_1 = 28
self.initial_temperature_2 = 28.9
self.initial_current_1 = 33
self.initial_current_2 = 35
self.proportional_coeff_1 = int(10 * 256) self.proportional_coeff_1 = int(10 * 256)
self.proportional_coeff_2 = int(10 * 256) self.proportional_coeff_2 = int(10 * 256)
self.integral_coeff_1 = int(0.5 * 256) self.integral_coeff_1 = int(0.5 * 256)
self.integral_coeff_2 = int(0.5 * 256) self.integral_coeff_2 = int(0.5 * 256)
self.message_id = "00FF" self.message_id = "00FF"
self.gui_timeout_interval = 5
logger.info("LaserController initialized") self.save_points_number = 1000
logger.info("Using hardcoded default parameters")
def start_cycle(self, parameters: LaserParameters) -> Dict[str, Any]: def start_cycle(self, parameters: LaserParameters) -> Dict[str, Any]:
""" """
@ -96,11 +170,11 @@ class LaserController:
def _start_manual_mode(self, parameters: LaserParameters) -> Dict[str, Any]: def _start_manual_mode(self, parameters: LaserParameters) -> Dict[str, Any]:
""" """
Start manual control mode with fixed T1, T2, I1, I2 values. Start manual control mode with fixed T1, T2, I1, I2 values.
Uses DECODE_ENABLE (0x1111) command. Uses DECODE_ENABLE (0x1111) command - simplified version from RFG example.
""" """
try: try:
# Prepare control parameters # Prepare control parameters (simplified - direct send)
params = { ctrl_params = {
'Temp_1': parameters.min_temp_1, 'Temp_1': parameters.min_temp_1,
'Temp_2': parameters.min_temp_2, 'Temp_2': parameters.min_temp_2,
'Iset_1': parameters.min_current_1, 'Iset_1': parameters.min_current_1,
@ -113,12 +187,12 @@ class LaserController:
} }
logger.info(f"Sending manual control parameters:") logger.info(f"Sending manual control parameters:")
logger.info(f" T1: {params['Temp_1']}°C, T2: {params['Temp_2']}°C") logger.info(f" T1: {ctrl_params['Temp_1']}°C, T2: {ctrl_params['Temp_2']}°C")
logger.info(f" I1: {params['Iset_1']} mA, I2: {params['Iset_2']} mA") logger.info(f" I1: {ctrl_params['Iset_1']} mA, I2: {ctrl_params['Iset_2']} mA")
# Send control parameters to device
dev.send_control_parameters(self.prt, params)
# Send control parameters to device (steady current mode)
dev.send_control_parameters(self.prt, ctrl_params)
logger.info(dev.request_data(self.prt))
return { return {
"success": True, "success": True,
"message": "Ручное управление запущено", "message": "Ручное управление запущено",
@ -131,13 +205,14 @@ class LaserController:
def _start_scan_mode(self, parameters: LaserParameters) -> Dict[str, Any]: def _start_scan_mode(self, parameters: LaserParameters) -> Dict[str, Any]:
""" """
Start automated scan mode. Start automated scan mode - simplified version from RFG example.
Uses TASK_ENABLE (0x7777) command with TaskType. RFG flow: send_control_parameters() THEN send_task_command()
This initializes the device before starting the scan.
""" """
try: try:
# First, send initial control parameters to set the device to starting values # Step 1: Start lasers with initial control parameters (from RFG example lines 72-75)
logger.info("Setting initial control parameters before scan...") logger.info("Step 1: Starting lasers with control parameters...")
initial_params = { ctrl_params = {
'Temp_1': parameters.min_temp_1, 'Temp_1': parameters.min_temp_1,
'Temp_2': parameters.min_temp_2, 'Temp_2': parameters.min_temp_2,
'Iset_1': parameters.min_current_1, 'Iset_1': parameters.min_current_1,
@ -149,24 +224,20 @@ class LaserController:
'Message_ID': self.message_id 'Message_ID': self.message_id
} }
logger.info(f"Initial parameters: T1={initial_params['Temp_1']}°C, T2={initial_params['Temp_2']}°C, " dev.send_control_parameters(self.prt, ctrl_params)
f"I1={initial_params['Iset_1']} mA, I2={initial_params['Iset_2']} mA") logger.info(dev.request_data(self.prt))
logger.info("Control parameters sent - lasers started")
# Send initial control parameters # Small delay as in RFG example (line 75: sleep(2))
dev.send_control_parameters(self.prt, initial_params) time.sleep(2)
logger.info("Initial control parameters sent successfully")
# Small delay to allow device to process # Step 2: Determine which parameter to scan
import time
time.sleep(0.2)
# Determine which parameter to scan
if parameters.enable_c1: if parameters.enable_c1:
task_type = cmd.TaskType.ChangeCurrentLD1.value task_type = "TT_CHANGE_CURR_1" # RFG uses string format
scan_param = "Current Laser 1" scan_param = "Current Laser 1"
logger.info(f"Scanning Current Laser 1: {parameters.min_current_1} to {parameters.max_current_1} mA") logger.info(f"Scanning Current Laser 1: {parameters.min_current_1} to {parameters.max_current_1} mA")
elif parameters.enable_c2: elif parameters.enable_c2:
task_type = cmd.TaskType.ChangeCurrentLD2.value task_type = "TT_CHANGE_CURR_2" # RFG uses string format
scan_param = "Current Laser 2" scan_param = "Current Laser 2"
logger.info(f"Scanning Current Laser 2: {parameters.min_current_2} to {parameters.max_current_2} mA") logger.info(f"Scanning Current Laser 2: {parameters.min_current_2} to {parameters.max_current_2} mA")
elif parameters.enable_t1: elif parameters.enable_t1:
@ -188,10 +259,11 @@ class LaserController:
"parameters": None "parameters": None
} }
# Build task parameters based on task type # Step 3: Build task parameters for scan (from RFG example lines 78-82)
sending_param = { logger.info("Step 2: Switching to current sweep mode...")
task_params = {
'TaskType': task_type, 'TaskType': task_type,
'Dt': parameters.delta_time / 1000.0, # Convert μs to ms 'Dt': parameters.delta_time ,
'Tau': parameters.tau, 'Tau': parameters.tau,
'ProportionalCoeff_1': self.proportional_coeff_1, 'ProportionalCoeff_1': self.proportional_coeff_1,
'ProportionalCoeff_2': self.proportional_coeff_2, 'ProportionalCoeff_2': self.proportional_coeff_2,
@ -200,8 +272,8 @@ class LaserController:
} }
# Add scan-specific parameters # Add scan-specific parameters
if task_type == cmd.TaskType.ChangeCurrentLD1.value: if task_type == "TT_CHANGE_CURR_1":
sending_param.update({ task_params.update({
'MinC1': parameters.min_current_1, 'MinC1': parameters.min_current_1,
'MaxC1': parameters.max_current_1, 'MaxC1': parameters.max_current_1,
'DeltaC1': parameters.delta_current_1, 'DeltaC1': parameters.delta_current_1,
@ -209,8 +281,8 @@ class LaserController:
'I2': parameters.min_current_2, # Fixed 'I2': parameters.min_current_2, # Fixed
'T2': parameters.min_temp_2 # Fixed 'T2': parameters.min_temp_2 # Fixed
}) })
elif task_type == cmd.TaskType.ChangeCurrentLD2.value: elif task_type == "TT_CHANGE_CURR_2":
sending_param.update({ task_params.update({
'MinC2': parameters.min_current_2, 'MinC2': parameters.min_current_2,
'MaxC2': parameters.max_current_2, 'MaxC2': parameters.max_current_2,
'DeltaC2': parameters.delta_current_2, 'DeltaC2': parameters.delta_current_2,
@ -219,8 +291,10 @@ class LaserController:
'T1': parameters.min_temp_1 # Fixed 'T1': parameters.min_temp_1 # Fixed
}) })
# Send task command to device # Send task command to device (start current variation)
dev.send_task_command(self.prt, sending_param) dev.send_task_command(self.prt, task_params)
# print(dev.request_data(self.prt))
logger.info("Task command sent successfully - scan started")
return { return {
"success": True, "success": True,
@ -235,7 +309,8 @@ class LaserController:
def stop_cycle(self) -> Dict[str, Any]: def stop_cycle(self) -> Dict[str, Any]:
""" """
Stop current laser control cycle. Stop current laser control cycle.
Sends reset command to device. Uses send_control_parameters to return to steady current mode (RFG example logic).
This stops any ongoing scan and maintains the last known parameters.
Returns: Returns:
Dictionary with success status and message Dictionary with success status and message
@ -247,20 +322,40 @@ class LaserController:
logger.info("=" * 60) logger.info("=" * 60)
if self.is_connected and self.prt is not None: if self.is_connected and self.prt is not None:
# Send reset command to device # Stop current variation - go to steady current mode
# Use last known parameters or defaults
if self.current_parameters:
ctrl_params = {
'Temp_1': self.current_parameters.min_temp_1,
'Temp_2': self.current_parameters.min_temp_2,
'Iset_1': self.current_parameters.min_current_1,
'Iset_2': self.current_parameters.min_current_2,
'ProportionalCoeff_1': self.proportional_coeff_1,
'ProportionalCoeff_2': self.proportional_coeff_2,
'IntegralCoeff_1': self.integral_coeff_1,
'IntegralCoeff_2': self.integral_coeff_2,
'Message_ID': self.message_id
}
logger.info(f"Stopping scan - returning to steady current mode:")
logger.info(f" T1: {ctrl_params['Temp_1']}°C, T2: {ctrl_params['Temp_2']}°C")
logger.info(f" I1: {ctrl_params['Iset_1']} mA, I2: {ctrl_params['Iset_2']} mA")
try: try:
dev.reset_port_settings(self.prt) dev.send_control_parameters(self.prt, ctrl_params)
logger.info("Device reset command sent") logger.info(dev.request_data(self.prt))
logger.info("Control parameters sent - laser in steady state")
except Exception as e: except Exception as e:
logger.warning(f"Failed to send reset command: {e}") logger.warning(f"Failed to send control parameters: {e}")
else:
logger.warning("No current parameters stored, cannot send steady state command")
self.is_running = False self.is_running = False
self.current_status.is_running = False self.current_status.is_running = False
self.current_parameters = None
return { return {
"success": True, "success": True,
"message": "Цикл управления лазером остановлен" "message": "Цикл управления лазером остановлен (steady current mode)"
} }
except Exception as e: except Exception as e:
@ -480,14 +575,16 @@ class LaserController:
if self.is_running: if self.is_running:
self.stop_cycle() self.stop_cycle()
# Close serial port # Close serial port using RFG close_connection
if self.prt is not None: if self.prt is not None:
try: try:
cmd.close_port(self.prt) dev.close_connection(self.prt)
logger.info("Serial port closed") logger.info("Serial port closed")
except Exception as e: except Exception as e:
logger.warning(f"Error closing serial port: {e}") logger.warning(f"Error closing serial port: {e}")
# Fallback: just set to None
self.prt = None
else:
self.prt = None self.prt = None
self.is_connected = False self.is_connected = False

View File

@ -1,7 +1,7 @@
{ {
"y_min": -50, "y_min": -80,
"y_max": 40, "y_max": 40,
"autoscale": true, "autoscale": false,
"show_magnitude": true, "show_magnitude": true,
"show_phase": false, "show_phase": false,
"open_air": false "open_air": false

View File

@ -0,0 +1,19 @@
{
"data_type": "SYNC_DET",
"pont_in_one_fq_change": 86,
"gaussian_sigma": 5.0,
"fft0_delta": 5,
"standard_raw_size": 64000,
"standard_sync_size": 1000,
"freq_start_ghz": 3.0,
"freq_stop_ghz": 13.67,
"fq_end": 512,
"show_processed": true,
"show_fourier": true,
"normalize_signal": false,
"log_scale": false,
"disable_interpolation": false,
"y_max_processed": 900000.0,
"y_max_fourier": 1000.0,
"auto_scale": false
}

View File

@ -0,0 +1,197 @@
# RFG Processor - Радиофотонный радар
## Описание
RFGProcessor - новый процессор для обработки данных радиофотонного радара, основанный на алгоритмах из `RFG_Receiver_GUI/main.py`.
## Основные возможности
### Типы данных
- **SYNC_DET (0xF0)** - Данные синхронного детектирования (~1000 сэмплов)
- **RAW (0xD0)** - Сырые ADC данные с меандровой модуляцией (~64000 сэмплов)
### Обработка данных
#### Для SYNC_DET (по умолчанию):
1. Интерполяция до 1000 точек
2. Прямая FFT обработка (данные уже демодулированы)
3. Гауссово сглаживание
4. Обнуление центра FFT
#### Для RAW:
1. Интерполяция до 64000 точек
2. Генерация меандрового сигнала
3. Синхронное детектирование (умножение на меандр)
4. Частотная сегментация (по 86 точек)
5. FFT обработка с гауссовым сглаживанием
6. Обнуление центра FFT
### Выходные графики
1. **Processed Signal** - Обработанный сигнал в частотной области (3-13.67 ГГц)
2. **Fourier Transform** - Спектр FFT с гауссовым сглаживанием
## Конфигурация
### Файл: `configs/rfg_config.json`
```json
{
"data_type": "SYNC_DET", // Тип данных: "RAW" или "SYNC_DET"
"pont_in_one_fq_change": 86, // Размер сегмента частоты
"gaussian_sigma": 5.0, // Параметр сглаживания (σ)
"fft0_delta": 5, // Смещение обнуления центра FFT
"standard_raw_size": 64000, // Целевой размер для RAW
"standard_sync_size": 1000, // Целевой размер для SYNC_DET
"freq_start_ghz": 3.0, // Начало частотного диапазона
"freq_stop_ghz": 13.67, // Конец частотного диапазона
"fq_end": 512, // Точка отсечки FFT
"show_processed": true, // Показать Processed Signal
"show_fourier": true // Показать Fourier Transform
}
```
### UI параметры
Процессор предоставляет следующие настройки через веб-интерфейс:
- **Тип данных** - Выбор между RAW и SYNC_DET
- **Гауссово сглаживание (σ)** - Слайдер 0.1-20.0
- **Размер сегмента частоты** - Слайдер 50-200
- **Смещение центра FFT** - Слайдер 0-20
- **Точка отсечки FFT** - Слайдер 100-1000
- **Частота начала (ГГц)** - Слайдер 1.0-15.0
- **Частота конца (ГГц)** - Слайдер 1.0-15.0
- **Переключатели** - Показать/скрыть графики
## Использование
### Автоматическая регистрация
Процессор автоматически регистрируется в `ProcessorManager` при запуске системы.
### Входные данные
Процессор ожидает данные из pipe в формате `SweepData`:
- `points`: список пар `[(real, imag), ...]`
- Для ADC данных используется только `real` часть
- Данные должны соответствовать формату 0xF0 (SYNC_DET) или 0xD0 (RAW)
### Пример структуры данных
```python
sweep_data = SweepData(
sweep_number=1,
timestamp=1234567890.0,
points=[(adc_sample_1, 0), (adc_sample_2, 0), ...],
total_points=1000 # или 64000 для RAW
)
```
## Алгоритмы обработки
### 1. Интерполяция данных
```python
# Линейная интерполяция scipy.interpolate.interp1d
old_indices = np.linspace(0, 1, len(data))
new_indices = np.linspace(0, 1, target_size)
```
### 2. Меандровая демодуляция (только RAW)
```python
# Генерация квадратного сигнала
time_idx = np.arange(1, size + 1)
meander = square(time_idx * np.pi)
demodulated = data * meander
```
### 3. Частотная сегментация (только RAW)
```python
# Разделение на сегменты и суммирование
segment_size = 86
for segment in segments:
signal.append(np.sum(segment))
```
### 4. FFT обработка
```python
# Обрезка + FFT + сдвиг
sig_cut = np.sqrt(np.abs(signal[:512]))
F = np.fft.fft(sig_cut)
Fshift = np.abs(np.fft.fftshift(F))
# Обнуление центра
center = len(sig_cut) // 2
Fshift[center:center+1] = 0
# Гауссово сглаживание
FshiftS = gaussian_filter1d(Fshift, sigma=5.0)
```
## Отличия от оригинального main.py
### Изменено:
1. **Вход данных**: Вместо CSV файлов - pipe через `SweepData`
2. **Без накопления**: Обработка каждой развёртки отдельно (убран `PeriodIntegrate=2`)
3. **Без B-scan**: Только Processed Signal + Fourier Transform
4. **Plotly вместо Matplotlib**: Веб-визуализация
5. **JSON конфигурация**: Вместо хардкода констант
6. **Один sweep**: Нет потокового чтения файлов
### Сохранено:
- ✅ Алгоритмы обработки (интерполяция, меандр, сегментация, FFT)
- ✅ Гауссово сглаживание
- ✅ Обнуление центра FFT
- ✅ Частотный диапазон 3-13.67 ГГц
- ✅ Параметры обработки (sigma=5, segment=86, delta=5)
## Файлы
```
vna_system/core/processors/
├── implementations/
│ ├── rfg_processor.py # Основной класс
│ └── RFG_PROCESSOR_README.md # Эта документация
├── configs/
│ └── rfg_config.json # Конфигурация по умолчанию
└── manager.py # Регистрация процессора
```
## Разработка и отладка
### Логирование
Процессор использует стандартную систему логирования:
```python
from vna_system.core.logging.logger import get_component_logger
logger = get_component_logger(__file__)
```
### Тестирование
Для тестирования создайте `SweepData` с тестовыми ADC данными:
```python
# Тест SYNC_DET (1000 точек)
test_data = [(float(i), 0.0) for i in range(1000)]
# Тест RAW (64000 точек)
test_data = [(float(i), 0.0) for i in range(64000)]
```
## Зависимости
- `numpy` - Численные операции
- `scipy.signal.square` - Генерация меандра
- `scipy.ndimage.gaussian_filter1d` - Гауссово сглаживание
- `scipy.interpolate.interp1d` - Интерполяция
- `plotly` - Визуализация
## Авторство
Создан на основе алгоритмов из `/home/awe/Documents/RFG_Receiver_GUI/main.py`
Адаптирован для VNA System architecture.
## Версия
v1.0 - Первая реализация (2025-11-28)
- Поддержка SYNC_DET и RAW форматов
- Два графика: Processed Signal + Fourier Transform
- Полная интеграция с VNA System

View File

@ -94,7 +94,7 @@ class MagnitudeProcessor(BaseProcessor):
real_points.append(float(real)) real_points.append(float(real))
imag_points.append(float(imag)) imag_points.append(float(imag))
mag = abs(complex_val) mag = abs(complex_val)
mags_db.append(20.0 * np.log10(mag) if mag > 0.0 else -120.0) mags_db.append( 20*np.log10(mag) if mag > 0.0 else -120.0)
phases_deg.append(np.degrees(np.angle(complex_val))) phases_deg.append(np.degrees(np.angle(complex_val)))
result = { result = {

View File

@ -0,0 +1,643 @@
from __future__ import annotations
from datetime import datetime
from pathlib import Path
from typing import Any
import numpy as np
from numpy.typing import NDArray
from scipy.signal import square
from scipy.ndimage import gaussian_filter1d
from scipy.interpolate import interp1d
from vna_system.core.logging.logger import get_component_logger
from vna_system.core.processors.base_processor import BaseProcessor, UIParameter, ProcessedResult
from vna_system.core.acquisition.sweep_buffer import SweepData
logger = get_component_logger(__file__)
class RFGProcessor(BaseProcessor):
"""
Radiophotonic radar processor based on RFG_Receiver_GUI algorithm.
Processes ADC data with synchronous detection (0xF0 format) or RAW data (0xD0 format).
Outputs two graphs:
- Processed Signal: Frequency domain signal (3-13.67 GHz range)
- Fourier Transform: FFT magnitude spectrum with Gaussian smoothing
Features
--------
- Data interpolation to standard size
- Meander demodulation (for RAW data)
- Frequency segmentation
- FFT processing with Gaussian smoothing
- Center zeroing for artifact reduction
"""
def __init__(self, config_dir: Path) -> None:
super().__init__("rfg_radar", config_dir)
# No history accumulation needed (process single sweeps)
self._max_history = 1
# Pre-computed meander signal (will be initialized on first use)
self._meandr: NDArray[np.floating] | None = None
self._last_size: int = 0
logger.info("RFGProcessor initialized", processor_id=self.processor_id)
# -------------------------------------------------------------------------
# Configuration
# -------------------------------------------------------------------------
def _get_default_config(self) -> dict[str, Any]:
"""Return default configuration values."""
return {
"data_type": "SYNC_DET", # "RAW" or "SYNC_DET"
"pont_in_one_fq_change": 86, # Frequency segment size
"gaussian_sigma": 5.0, # FFT smoothing sigma
"fft0_delta": 5, # Center zero offset
"standard_raw_size": 64000, # Target size for RAW data
"standard_sync_size": 1000, # Target size for SYNC_DET data
"freq_start_ghz": 3.0, # Display frequency start (GHz)
"freq_stop_ghz": 13.67, # Display frequency stop (GHz)
"fq_end": 512, # FFT cutoff point
"show_processed": True, # Show processed signal graph
"show_fourier": True, # Show Fourier transform graph
"normalize_signal": False, # Normalize processed signal to [0, 1]
"log_scale": False, # Use logarithmic scale for signal
"disable_interpolation": False, # Skip interpolation (use raw data size)
"y_max_processed": 900000.0, # Max Y-axis value for processed signal
"y_max_fourier": 1000.0, # Max Y-axis value for Fourier spectrum
"auto_scale": False, # Auto-scale Y-axis (ignore y_max if True)
}
def get_ui_parameters(self) -> list[UIParameter]:
"""Return UI parameter schema for configuration."""
cfg = self._config
return [
UIParameter(
name="data_type",
label="Тип данных",
type="select",
value=cfg["data_type"],
options={"choices": ["RAW", "SYNC_DET"]},
),
UIParameter(
name="gaussian_sigma",
label="Гауссово сглаживание (σ)",
type="slider",
value=cfg["gaussian_sigma"],
options={"min": 0.1, "max": 20.0, "step": 0.1, "dtype": "float"},
),
UIParameter(
name="pont_in_one_fq_change",
label="Размер сегмента частоты",
type="slider",
value=cfg["pont_in_one_fq_change"],
options={"min": 50, "max": 200, "step": 1, "dtype": "int"},
),
UIParameter(
name="fft0_delta",
label="Смещение центра FFT",
type="slider",
value=cfg["fft0_delta"],
options={"min": 0, "max": 20, "step": 1, "dtype": "int"},
),
UIParameter(
name="fq_end",
label="Точка отсечки FFT",
type="slider",
value=cfg["fq_end"],
options={"min": 100, "max": 1000, "step": 10, "dtype": "int"},
),
UIParameter(
name="freq_start_ghz",
label="Частота начала (ГГц)",
type="slider",
value=cfg["freq_start_ghz"],
options={"min": 1.0, "max": 15.0, "step": 0.1, "dtype": "float"},
),
UIParameter(
name="freq_stop_ghz",
label="Частота конца (ГГц)",
type="slider",
value=cfg["freq_stop_ghz"],
options={"min": 1.0, "max": 15.0, "step": 0.1, "dtype": "float"},
),
UIParameter(
name="show_processed",
label="Показать обработанный сигнал",
type="toggle",
value=cfg["show_processed"],
),
UIParameter(
name="show_fourier",
label="Показать Фурье образ",
type="toggle",
value=cfg["show_fourier"],
),
UIParameter(
name="normalize_signal",
label="Нормализовать сигнал",
type="toggle",
value=cfg["normalize_signal"],
),
UIParameter(
name="log_scale",
label="Логарифмическая шкала",
type="toggle",
value=cfg["log_scale"],
),
UIParameter(
name="disable_interpolation",
label="Без интерполяции (как FOURIER)",
type="toggle",
value=cfg["disable_interpolation"],
),
UIParameter(
name="auto_scale",
label="Автоматический масштаб Y",
type="toggle",
value=cfg["auto_scale"],
),
UIParameter(
name="y_max_processed",
label="Макс. амплитуда (Processed)",
type="slider",
value=cfg["y_max_processed"],
options={"min": 1000.0, "max": 200000.0, "step": 1000.0, "dtype": "float"},
),
UIParameter(
name="y_max_fourier",
label="Макс. амплитуда (Fourier)",
type="slider",
value=cfg["y_max_fourier"],
options={"min": 1000.0, "max": 200000.0, "step": 1000.0, "dtype": "float"},
),
]
# -------------------------------------------------------------------------
# Processing
# -------------------------------------------------------------------------
def process_sweep(
self,
sweep_data: SweepData,
calibrated_data: SweepData | None,
vna_config: dict[str, Any],
) -> dict[str, Any]:
"""
Process a single sweep of ADC data.
Returns
-------
dict
Keys: processed_signal, fourier_spectrum, freq_axis, data_type, points_processed
Or: {"error": "..."} on failure.
"""
try:
# Extract raw ADC data from sweep (use real part only)
adc_data = self._extract_adc_data(sweep_data)
if adc_data is None or adc_data.size == 0:
logger.warning("No valid ADC data for RFG processing")
return {"error": "No valid ADC data"}
data_type = self._config["data_type"]
if data_type == "RAW":
# Full processing with meander demodulation
processed_signal, fourier_spectrum = self._process_raw_data(adc_data)
elif data_type == "SYNC_DET":
# Direct FFT processing (data already demodulated)
processed_signal, fourier_spectrum = self._process_sync_det_data(adc_data)
else:
return {"error": f"Unknown data type: {data_type}"}
# Generate frequency axis for visualization
freq_axis = self._generate_frequency_axis(processed_signal)
fourier_spectrum /= (2*3.14)
return {
"processed_signal": processed_signal.tolist(),
"fourier_spectrum": fourier_spectrum.tolist(),
"freq_axis": freq_axis.tolist(),
"data_type": data_type,
"points_processed": int(adc_data.size),
"original_size": int(adc_data.size),
}
except Exception as exc: # noqa: BLE001
logger.error("RFG processing failed", error=repr(exc))
return {"error": str(exc)}
# -------------------------------------------------------------------------
# Visualization
# -------------------------------------------------------------------------
def generate_plotly_config(
self,
processed_data: dict[str, Any],
vna_config: dict[str, Any], # noqa: ARG002
) -> dict[str, Any]:
"""
Produce Plotly configuration for two subplots: Processed Signal + Fourier Transform.
"""
if "error" in processed_data:
return {
"data": [],
"layout": {
"title": "RFG Радар - Ошибка",
"annotations": [
{
"text": f"Ошибка: {processed_data['error']}",
"x": 0.5,
"y": 0.5,
"xref": "paper",
"yref": "paper",
"showarrow": False,
}
],
"template": "plotly_dark",
},
}
processed_signal = processed_data.get("processed_signal", [])
fourier_spectrum = processed_data.get("fourier_spectrum", [])
freq_axis = processed_data.get("freq_axis", [])
# Create subplot configuration
traces = []
if self._config["show_processed"] and processed_signal:
# Processed Signal trace - use absolute value as in main.py line 1043
import numpy as np
processed_signal_abs = np.abs(np.array(processed_signal))
# Apply normalization if enabled
if self._config.get("normalize_signal", False):
max_val = np.max(processed_signal_abs)
if max_val > 0:
processed_signal_abs = processed_signal_abs / max_val
# Apply log scale if enabled
if self._config.get("log_scale", False):
processed_signal_abs = np.log10(processed_signal_abs + 1e-12) # Add small epsilon to avoid log(0)
logger.debug(
"Processed signal stats",
min=float(np.min(processed_signal_abs)),
max=float(np.max(processed_signal_abs)),
mean=float(np.mean(processed_signal_abs)),
size=len(processed_signal_abs)
)
traces.append({
"type": "scatter",
"mode": "lines",
"x": freq_axis,
"y": processed_signal_abs.tolist(),
"name": "Обработанный сигнал",
"line": {"width": 1, "color": "cyan"},
"xaxis": "x",
"yaxis": "y",
})
if self._config["show_fourier"] and fourier_spectrum:
# Fourier Transform trace
fft_x = list(range(len(fourier_spectrum)))
traces.append({
"type": "scatter",
"mode": "lines",
"x": fft_x,
"y": fourier_spectrum,
"name": "Фурье образ",
"line": {"width": 1, "color": "yellow"},
"xaxis": "x2",
"yaxis": "y2",
})
# Build layout with two subplots
layout = {
"title": (
f"RFG Радар - {processed_data.get('data_type', 'N/A')} | "
f"Точек: {processed_data.get('points_processed', 0)}"
),
"grid": {"rows": 2, "columns": 1, "pattern": "independent"},
"xaxis": {
"title": "Частота (ГГц)",
"domain": [0, 1],
"anchor": "y",
},
"yaxis": {
"title": "Амплитуда",
"domain": [0.55, 1],
"anchor": "x",
},
"xaxis2": {
"title": "Индекс",
"domain": [0, 1],
"anchor": "y2",
},
"yaxis2": {
"title": "Магнитуда FFT",
"domain": [0, 0.45],
"anchor": "x2",
},
"template": "plotly_dark",
"height": 700,
"showlegend": True,
"hovermode": "closest",
}
# Apply Y-axis limits if not auto-scaling
if not self._config.get("auto_scale", False):
y_max_processed = float(self._config.get("y_max_processed", 90000.0))
y_max_fourier = float(self._config.get("y_max_fourier", 60000.0))
layout["yaxis"]["range"] = [0, y_max_processed]
layout["yaxis2"]["range"] = [0, y_max_fourier]
logger.debug(
"Y-axis limits applied",
processed_max=y_max_processed,
fourier_max=y_max_fourier
)
return {"data": traces, "layout": layout}
# -------------------------------------------------------------------------
# Data Processing Helpers
# -------------------------------------------------------------------------
def _extract_adc_data(self, sweep_data: SweepData) -> NDArray[np.floating] | None:
"""
Extract ADC data from SweepData.
Assumes sweep_data.points contains [(real, imag), ...] pairs.
For ADC data, we take only the real part.
"""
try:
if not sweep_data.points:
return None
# Extract real part (ADC samples)
arr = np.asarray(sweep_data.points, dtype=float)
logger.info(
"🔍 RAW INPUT DATA SHAPE",
shape=arr.shape,
ndim=arr.ndim,
total_points=sweep_data.total_points,
dtype=arr.dtype
)
if arr.ndim == 2 and arr.shape[1] >= 1:
# Take first column (real part)
adc_data = arr[:, 0]
elif arr.ndim == 1:
# Already 1D array
adc_data = arr
else:
raise ValueError("Unexpected data shape for ADC extraction")
logger.info(
"📊 EXTRACTED ADC DATA",
size=adc_data.size,
min=float(np.min(adc_data)),
max=float(np.max(adc_data)),
mean=float(np.mean(adc_data)),
non_zero_count=int(np.count_nonzero(adc_data))
)
return adc_data.astype(float, copy=False)
except Exception as exc: # noqa: BLE001
logger.error("Failed to extract ADC data", error=repr(exc))
return None
def _resize_1d_interpolate(
self,
data: NDArray[np.floating],
target_size: int,
) -> NDArray[np.floating]:
"""
Resize 1D array using linear interpolation.
Based on main.py resize_1d_interpolate() function.
"""
if len(data) == target_size:
return data
old_indices = np.linspace(0, 1, len(data))
new_indices = np.linspace(0, 1, target_size)
f = interp1d(old_indices, data, kind='linear', fill_value='extrapolate')
return f(new_indices)
def _process_raw_data(
self,
adc_data: NDArray[np.floating],
) -> tuple[NDArray[np.floating], NDArray[np.floating]]:
"""
Process RAW ADC data (0xD0 format).
Pipeline (based on main.py lines 824-896):
1. Interpolate to standard size (64000)
2. Generate meander signal for demodulation
3. Synchronous detection (multiply by meander)
4. Frequency segmentation
5. FFT processing with Gaussian smoothing
Returns
-------
tuple
(processed_signal, fourier_spectrum)
"""
# Step 1: Resize to standard RAW size
target_size = int(self._config["standard_raw_size"])
data_resized = self._resize_1d_interpolate(adc_data, target_size)
# Step 2: Generate meander signal (square wave)
if self._meandr is None or self._last_size != target_size:
time_idx = np.arange(1, target_size + 1)
self._meandr = square(time_idx * np.pi)
self._last_size = target_size
logger.debug("Meander signal regenerated", size=target_size)
# Step 3: Meander demodulation (synchronous detection)
demodulated = data_resized * self._meandr
# Step 4: Frequency segmentation
processed_signal = self._frequency_segmentation(demodulated)
# Step 5: FFT processing
fourier_spectrum = self._compute_fft_spectrum(processed_signal) / (2*3.14)
return processed_signal, fourier_spectrum
def _process_sync_det_data(
self,
adc_data: NDArray[np.floating],
) -> tuple[NDArray[np.floating], NDArray[np.floating]]:
"""
Process SYNC_DET data (0xF0 format).
Pipeline (based on main.py lines 898-917):
1. Interpolate to standard size (1000)
2. Use data directly as signal (already demodulated)
3. FFT processing with Gaussian smoothing
Returns
-------
tuple
(processed_signal, fourier_spectrum)
"""
# Step 1: Optionally resize to standard SYNC_DET size
if self._config.get("disable_interpolation", False):
# FOURIER mode: no interpolation, use raw data size
data_resized = adc_data
logger.info("🚫 Interpolation disabled - using raw data size", size=adc_data.size)
else:
# SYNC_DET mode: interpolate to standard size
target_size = int(self._config["standard_sync_size"])
data_resized = self._resize_1d_interpolate(adc_data, target_size)
logger.debug(
"SYNC_DET data resized",
original_size=adc_data.size,
target_size=target_size,
min_val=float(np.min(data_resized)),
max_val=float(np.max(data_resized)),
mean_val=float(np.mean(data_resized))
)
# Step 2: Data is already demodulated, use directly
processed_signal = data_resized
# Step 3: FFT processing
fourier_spectrum = self._compute_fft_spectrum(processed_signal)
logger.debug(
"SYNC_DET processing complete",
signal_size=processed_signal.size,
fft_size=fourier_spectrum.size
)
return processed_signal, fourier_spectrum
def _frequency_segmentation(
self,
data: NDArray[np.floating],
) -> NDArray[np.floating]:
"""
Divide data into frequency segments and sum each segment.
Based on main.py lines 866-884.
Parameters
----------
data : ndarray
Demodulated signal
Returns
-------
ndarray
Segmented and summed signal
"""
pont_in_one_fq = int(self._config["pont_in_one_fq_change"])
signal_list = []
start = 0
segment_start_idx = 0
for idx in range(len(data)):
if (idx - start) > pont_in_one_fq:
segment = data[segment_start_idx:idx]
if segment.size > 0:
# Sum the segment to extract signal
signal_list.append(np.sum(segment))
start = idx
segment_start_idx = idx
return np.array(signal_list, dtype=float)
def _compute_fft_spectrum(
self,
signal: NDArray[np.floating],
) -> NDArray[np.floating]:
"""
Compute FFT spectrum with Gaussian smoothing.
Based on main.py lines 984-994.
Parameters
----------
signal : ndarray
Input signal (processed or raw)
Returns
-------
ndarray
Smoothed FFT magnitude spectrum
"""
fq_end = int(self._config["fq_end"])
gaussian_sigma = float(self._config["gaussian_sigma"])
fft0_delta = int(self._config["fft0_delta"])
# Cut signal to FFT length
sig_cut = signal[:fq_end] if len(signal) >= fq_end else signal
# Take square root of absolute value (as in main.py)
sig_cut = np.sqrt(np.abs(sig_cut))
# Compute FFT
F = np.fft.fft(sig_cut)
Fshift = np.abs(np.fft.fftshift(F))
# Zero out the center (remove DC component and nearby artifacts)
center = len(sig_cut) // 2
if center < len(Fshift):
zero_start = max(center - 0, 0)
zero_end = min(center + 1, len(Fshift))
Fshift[zero_start:zero_end] = 0
# Apply Gaussian smoothing
FshiftS = gaussian_filter1d(Fshift, gaussian_sigma)
return FshiftS
def _generate_frequency_axis(
self,
signal: NDArray[np.floating],
) -> NDArray[np.floating]:
"""
Generate frequency axis for visualization.
Based on main.py lines 1041-1042: maps signal indices to 3-13.67 GHz range.
Parameters
----------
signal : ndarray
Processed signal
Returns
-------
ndarray
Frequency axis in GHz
"""
freq_start = float(self._config["freq_start_ghz"])
freq_stop = float(self._config["freq_stop_ghz"])
signal_size = signal.size
if signal_size == 0:
return np.array([])
# Calculate frequency per point
freq_range = freq_stop - freq_start
per_point_fq = freq_range / signal_size
# Generate frequency axis: start + (index * per_point_fq)
freq_axis = freq_start + (np.arange(1, signal_size + 1) * per_point_fq)
return freq_axis

View File

@ -397,11 +397,13 @@ class ProcessorManager:
try: try:
from .implementations.magnitude_processor import MagnitudeProcessor from .implementations.magnitude_processor import MagnitudeProcessor
from .implementations.bscan_processor import BScanProcessor from .implementations.bscan_processor import BScanProcessor
from .implementations.rfg_processor import RFGProcessor
# self.register_processor(PhaseProcessor(self.config_dir)) # self.register_processor(PhaseProcessor(self.config_dir))
self.register_processor(BScanProcessor(self.config_dir)) self.register_processor(BScanProcessor(self.config_dir))
self.register_processor(MagnitudeProcessor(self.config_dir)) self.register_processor(MagnitudeProcessor(self.config_dir))
self.register_processor(RFGProcessor(self.config_dir))
# self.register_processor(SmithChartProcessor(self.config_dir)) # self.register_processor(SmithChartProcessor(self.config_dir))
logger.info("Default processors registered", count=len(self._processors)) logger.info("Default processors registered", count=len(self._processors))

View File

@ -348,10 +348,10 @@
<div class="control-section" id="laserManualSection"> <div class="control-section" id="laserManualSection">
<h4 class="control-section-title">Параметры ручного режима</h4> <h4 class="control-section-title">Параметры ручного режима</h4>
<div class="laser-params-table"> <div class="laser-params-table">
<div class="laser-param-cell laser-param-header">Температура лазера 1 (°C)</div> <div class="laser-param-cell laser-param-header">T1 (°C)</div>
<div class="laser-param-cell laser-param-header">Температура лазера 2 (°C)</div> <div class="laser-param-cell laser-param-header">T2 (°C)</div>
<div class="laser-param-cell laser-param-header">Управляющий ток лазера 1 (15-60 мА)</div> <div class="laser-param-cell laser-param-header">Ток L1 (мА)</div>
<div class="laser-param-cell laser-param-header">Управляющий ток лазера 2 (15-60 мА)</div> <div class="laser-param-cell laser-param-header">Ток L2 (мА)</div>
<div class="laser-param-cell laser-param-input"> <div class="laser-param-cell laser-param-input">
<input type="number" class="settings-input" id="laserTemp1" <input type="number" class="settings-input" id="laserTemp1"
@ -374,66 +374,57 @@
<!-- Scan Mode Controls --> <!-- Scan Mode Controls -->
<div class="control-section" id="laserScanSection" style="display: none;"> <div class="control-section" id="laserScanSection" style="display: none;">
<h4 class="control-section-title">Режим сканирования тока лазера 1</h4> <h4 class="control-section-title">Параметры сканирования тока лазера 1</h4>
<div class="laser-params-table">
<!-- Row 1: Headers -->
<div class="laser-param-cell laser-param-header">Мин ток L1 (мА)</div>
<div class="laser-param-cell laser-param-header">Макс ток L1 (мА)</div>
<div class="laser-param-cell laser-param-header">Шаг тока L1 (мА)</div>
<div class="laser-param-cell laser-param-header">Δt (мкс)</div>
<div class="control-subsection"> <!-- Row 2: Inputs -->
<h5 class="control-subsection-title">Параметры сканирования</h5> <div class="laser-param-cell laser-param-input">
<div class="control-grid">
<div class="control-group">
<label class="control-label">Минимальный ток лазера 1 (мА):</label>
<input type="number" class="settings-input" id="laserMinCurrent1" <input type="number" class="settings-input" id="laserMinCurrent1"
min="15" max="70" step="0.1" value="33" disabled> min="15" max="70" step="0.1" value="33" disabled>
</div> </div>
<div class="control-group"> <div class="laser-param-cell laser-param-input">
<label class="control-label">Максимальный ток лазера 1 (мА):</label>
<input type="number" class="settings-input" id="laserMaxCurrent1" <input type="number" class="settings-input" id="laserMaxCurrent1"
min="15" max="70" step="0.1" value="70" disabled> min="15" max="70" step="0.1" value="70" disabled>
</div> </div>
<div class="control-group"> <div class="laser-param-cell laser-param-input">
<label class="control-label">Шаг дискретизации тока лазера 1 (0.002-0.5 мА):</label>
<input type="number" class="settings-input" id="laserDeltaCurrent1" <input type="number" class="settings-input" id="laserDeltaCurrent1"
min="0.002" max="0.5" step="0.001" value="0.05" disabled> min="0.002" max="0.5" step="0.001" value="0.05" disabled>
</div> </div>
</div> <div class="laser-param-cell laser-param-input">
</div>
<div class="control-subsection">
<h5 class="control-subsection-title">Фиксированные параметры</h5>
<div class="control-grid">
<div class="control-group">
<label class="control-label">Температура лазера 1 (°C):</label>
<input type="number" class="settings-input" id="laserScanTemp1"
min="-1" max="45" step="0.1" value="28" disabled>
</div>
<div class="control-group">
<label class="control-label">Температура лазера 2 (°C):</label>
<input type="number" class="settings-input" id="laserScanTemp2"
min="-1" max="45" step="0.1" value="28.9" disabled>
</div>
<div class="control-group">
<label class="control-label">Управляющий ток лазера 2 (мА):</label>
<input type="number" class="settings-input" id="laserScanCurrent2"
min="15" max="60" step="0.1" value="35" disabled>
</div>
</div>
</div>
<div class="control-subsection">
<h5 class="control-subsection-title">Параметры времени</h5>
<div class="control-grid">
<div class="control-group">
<label class="control-label">Шаг дискретизации времени (20-100 мкс, шаг 10):</label>
<input type="number" class="settings-input" id="laserDeltaTime" <input type="number" class="settings-input" id="laserDeltaTime"
min="20" max="100" step="10" value="50" disabled> min="20" max="100" step="10" value="50" disabled>
</div> </div>
<div class="control-group">
<label class="control-label">Время задержки (3-10 мс):</label> <!-- Row 3: Fixed params headers -->
<div class="laser-param-cell laser-param-header">T1 (°C)</div>
<div class="laser-param-cell laser-param-header">T2 (°C)</div>
<div class="laser-param-cell laser-param-header">Ток L2 (мА)</div>
<div class="laser-param-cell laser-param-header">Tau (мс)</div>
<!-- Row 4: Fixed params inputs -->
<div class="laser-param-cell laser-param-input">
<input type="number" class="settings-input" id="laserScanTemp1"
min="-1" max="45" step="0.1" value="28" disabled>
</div>
<div class="laser-param-cell laser-param-input">
<input type="number" class="settings-input" id="laserScanTemp2"
min="-1" max="45" step="0.1" value="28.9" disabled>
</div>
<div class="laser-param-cell laser-param-input">
<input type="number" class="settings-input" id="laserScanCurrent2"
min="15" max="60" step="0.1" value="35" disabled>
</div>
<div class="laser-param-cell laser-param-input">
<input type="number" class="settings-input" id="laserTau" <input type="number" class="settings-input" id="laserTau"
min="3" max="10" step="1" value="10" disabled> min="3" max="10" step="1" value="10" disabled>
</div> </div>
</div> </div>
</div> </div>
</div>
<!-- Control Buttons --> <!-- Control Buttons -->
<div class="control-section"> <div class="control-section">