fixed problems
This commit is contained in:
@ -1,4 +1,4 @@
|
||||
from dataclasses import dataclass, asdict
|
||||
from dataclasses import dataclass, asdict, is_dataclass
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
@ -324,7 +324,7 @@ class BaseProcessor:
|
||||
{
|
||||
"sweep_data": sweep_data,
|
||||
"calibrated_data": calibrated_data,
|
||||
"vna_config": asdict(vna_config) if vna_config is not None else {},
|
||||
"vna_config": self._snapshot_vna_config(vna_config),
|
||||
"reference_data": reference_data,
|
||||
"timestamp": datetime.now().timestamp(),
|
||||
}
|
||||
@ -562,25 +562,40 @@ class BaseProcessor:
|
||||
# --------------------------------------------------------------------- #
|
||||
# History Export/Import
|
||||
# --------------------------------------------------------------------- #
|
||||
def _make_json_serializable(self, obj: Any) -> Any:
|
||||
"""
|
||||
Recursively convert non-serializable objects to JSON-compatible types.
|
||||
@staticmethod
|
||||
def _snapshot_vna_config(vna_config: ConfigPreset | dict[str, Any] | None) -> dict[str, Any]:
|
||||
"""Create a JSON-friendly copy of VNA preset data."""
|
||||
if vna_config is None:
|
||||
return {}
|
||||
|
||||
Handles Enums, custom objects, etc.
|
||||
"""
|
||||
from enum import Enum
|
||||
from enum import Enum # Local import to avoid module-level dependency
|
||||
|
||||
if isinstance(obj, Enum):
|
||||
return obj.value
|
||||
elif isinstance(obj, dict):
|
||||
return {key: self._make_json_serializable(value) for key, value in obj.items()}
|
||||
elif isinstance(obj, (list, tuple)):
|
||||
return [self._make_json_serializable(item) for item in obj]
|
||||
elif hasattr(obj, '__dict__'):
|
||||
# Custom object - try to serialize its dict
|
||||
return self._make_json_serializable(obj.__dict__)
|
||||
if isinstance(vna_config, dict):
|
||||
source_items = vna_config.items()
|
||||
elif is_dataclass(vna_config):
|
||||
source_items = asdict(vna_config).items()
|
||||
else:
|
||||
return obj
|
||||
try:
|
||||
source_items = vars(vna_config).items()
|
||||
except TypeError:
|
||||
return {}
|
||||
|
||||
sanitized: dict[str, Any] = {}
|
||||
for key, value in source_items:
|
||||
if isinstance(value, Enum):
|
||||
sanitized[key] = value.value
|
||||
elif hasattr(value, "item") and callable(value.item):
|
||||
try:
|
||||
sanitized[key] = value.item()
|
||||
continue
|
||||
except (TypeError, ValueError):
|
||||
pass
|
||||
elif hasattr(value, "tolist") and callable(value.tolist):
|
||||
sanitized[key] = value.tolist()
|
||||
continue
|
||||
|
||||
sanitized[key] = value
|
||||
return sanitized
|
||||
|
||||
def export_history_data(self) -> list[dict[str, Any]]:
|
||||
"""
|
||||
@ -598,16 +613,13 @@ class BaseProcessor:
|
||||
calibrated_data = entry["calibrated_data"]
|
||||
reference_data = entry.get("reference_data")
|
||||
|
||||
# Convert vna_config to fully serializable format (handles Enums)
|
||||
vna_config = self._make_json_serializable(entry.get("vna_config", {}))
|
||||
|
||||
exported.append({
|
||||
"sweep_number": sweep_data.sweep_number if sweep_data else None,
|
||||
"timestamp": entry.get("timestamp"),
|
||||
"sweep_points": sweep_data.points if sweep_data else [],
|
||||
"calibrated_points": calibrated_data.points if calibrated_data else [],
|
||||
"reference_points": reference_data.points if reference_data else [],
|
||||
"vna_config": vna_config,
|
||||
"timestamp": float(entry.get("timestamp")) if entry.get("timestamp") is not None else None,
|
||||
"sweep_points": self._points_to_list(getattr(sweep_data, "points", [])),
|
||||
"calibrated_points": self._points_to_list(getattr(calibrated_data, "points", [])),
|
||||
"reference_points": self._points_to_list(getattr(reference_data, "points", [])),
|
||||
"vna_config": self._snapshot_vna_config(entry.get("vna_config")),
|
||||
})
|
||||
|
||||
return exported
|
||||
@ -672,6 +684,33 @@ class BaseProcessor:
|
||||
|
||||
logger.info("History imported", processor_id=self.processor_id, records=len(history_data))
|
||||
|
||||
@staticmethod
|
||||
def _points_to_list(points: Any) -> list[tuple[float, float]]:
|
||||
"""Convert sweep points into plain Python floats."""
|
||||
result: list[tuple[float, float]] = []
|
||||
if not points:
|
||||
return result
|
||||
|
||||
for point in points:
|
||||
if isinstance(point, dict):
|
||||
real = point.get("real")
|
||||
imag = point.get("imag")
|
||||
if real is None:
|
||||
real = point.get("r")
|
||||
if imag is None:
|
||||
imag = point.get("i")
|
||||
elif isinstance(point, (list, tuple)) and len(point) >= 2:
|
||||
real, imag = point[0], point[1]
|
||||
else:
|
||||
continue
|
||||
|
||||
try:
|
||||
result.append((float(real), float(imag)))
|
||||
except (TypeError, ValueError):
|
||||
continue
|
||||
|
||||
return result
|
||||
|
||||
# --------------------------------------------------------------------- #
|
||||
# Utilities
|
||||
# --------------------------------------------------------------------- #
|
||||
|
||||
@ -54,7 +54,7 @@ class CalibrationProcessor:
|
||||
else:
|
||||
raise ValueError(f"Unsupported measurement mode: {calibration_set.preset.mode}")
|
||||
|
||||
return [(z.real, z.imag) for z in calibrated]
|
||||
return [(float(z.real), float(z.imag)) for z in calibrated]
|
||||
|
||||
# --------------------------------------------------------------------- #
|
||||
# Helpers
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
{
|
||||
"open_air": false,
|
||||
"axis": "abs",
|
||||
"cut": 0.292,
|
||||
"axis": "phase",
|
||||
"cut": 0.266,
|
||||
"max": 2.3,
|
||||
"gain": 0.3,
|
||||
"start_freq": 100.0,
|
||||
|
||||
@ -1,10 +1,11 @@
|
||||
import numpy as np
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from dataclasses import asdict
|
||||
|
||||
from vna_system.core.acquisition.sweep_buffer import SweepData
|
||||
from vna_system.core.logging.logger import get_component_logger
|
||||
from vna_system.core.processors.base_processor import BaseProcessor, UIParameter
|
||||
from vna_system.core.settings.preset_manager import VNAMode
|
||||
|
||||
logger = get_component_logger(__file__)
|
||||
|
||||
@ -112,7 +113,13 @@ class MagnitudeProcessor(BaseProcessor):
|
||||
autoscale: bool = processed_data["autoscale"]
|
||||
|
||||
# Determine the parameter type from preset mode
|
||||
parameter_type = vna_config["mode"].value.upper() # Convert "s11" -> "S11", "s21" -> "S21"
|
||||
mode = vna_config.get("mode")
|
||||
if isinstance(mode, VNAMode):
|
||||
parameter_type = mode.value.upper()
|
||||
elif isinstance(mode, str):
|
||||
parameter_type = mode.upper()
|
||||
else:
|
||||
parameter_type = "UNKNOWN"
|
||||
|
||||
# Convert Hz to GHz for x-axis
|
||||
freqs_ghz = [f / 1e9 for f in freqs]
|
||||
@ -280,4 +287,3 @@ class MagnitudeProcessor(BaseProcessor):
|
||||
logger.info("Adjusted y_max to maintain y_max > y_min",
|
||||
y_min=self._config["y_min"],
|
||||
y_max=self._config["y_max"])
|
||||
|
||||
|
||||
@ -2,6 +2,7 @@ from pathlib import Path
|
||||
from typing import Any, Callable
|
||||
|
||||
import threading
|
||||
from enum import Enum
|
||||
|
||||
from vna_system.core.logging.logger import get_component_logger
|
||||
from vna_system.core.processors.base_processor import BaseProcessor, ProcessedResult
|
||||
@ -196,6 +197,29 @@ class ProcessorManager:
|
||||
logger.error("History load error", processor_id=processor_id, error=repr(exc))
|
||||
raise
|
||||
|
||||
def build_processor_state(self, processor_id: str) -> dict[str, Any]:
|
||||
"""Return a JSON-ready snapshot of processor state and current result."""
|
||||
processor = self.get_processor(processor_id)
|
||||
if processor is None:
|
||||
raise ValueError(f"Processor {processor_id} not found")
|
||||
|
||||
state_payload = self._json_ready(processor.get_full_state())
|
||||
result = processor.recalculate()
|
||||
current_data = None
|
||||
if result is not None:
|
||||
current_data = {
|
||||
"data": self._json_ready(result.data),
|
||||
"plotly_config": self._json_ready(result.plotly_config),
|
||||
"timestamp": float(result.timestamp),
|
||||
}
|
||||
|
||||
return {
|
||||
"type": "processor_state",
|
||||
"processor_id": processor_id,
|
||||
"state": state_payload,
|
||||
"current_data": current_data,
|
||||
}
|
||||
|
||||
# --------------------------------------------------------------------- #
|
||||
# Runtime control
|
||||
# --------------------------------------------------------------------- #
|
||||
@ -268,12 +292,15 @@ class ProcessorManager:
|
||||
# --------------------------------------------------------------------- #
|
||||
# Calibration
|
||||
# --------------------------------------------------------------------- #
|
||||
def _apply_calibration(self, sweep_data: SweepData) -> SweepData:
|
||||
def _apply_calibration(self, sweep_data: SweepData | None) -> SweepData | None:
|
||||
"""
|
||||
Apply calibration to the sweep when a complete set is available.
|
||||
|
||||
Returns the original sweep on failure or when no calibration is present.
|
||||
"""
|
||||
if sweep_data is None:
|
||||
return None
|
||||
|
||||
try:
|
||||
calib_set = self.settings_manager.get_current_calibration()
|
||||
if calib_set and calib_set.is_complete():
|
||||
@ -319,3 +346,37 @@ class ProcessorManager:
|
||||
logger.info("Default processors registered", count=len(self._processors))
|
||||
except Exception as exc: # noqa: BLE001
|
||||
logger.error("Failed to register default processors", error=repr(exc))
|
||||
|
||||
# --------------------------------------------------------------------- #
|
||||
# JSON coercion helpers
|
||||
# --------------------------------------------------------------------- #
|
||||
@staticmethod
|
||||
def _json_ready(obj: Any) -> Any:
|
||||
"""Convert numpy-like objects to JSON primitives recursively."""
|
||||
if obj is None or isinstance(obj, (str, bool, int, float)):
|
||||
return obj
|
||||
|
||||
if isinstance(obj, Enum):
|
||||
return obj.value
|
||||
|
||||
if isinstance(obj, dict):
|
||||
return {key: ProcessorManager._json_ready(value) for key, value in obj.items()}
|
||||
|
||||
if isinstance(obj, (list, tuple, set)):
|
||||
return [ProcessorManager._json_ready(item) for item in obj]
|
||||
|
||||
item_method = getattr(obj, "item", None)
|
||||
if callable(item_method):
|
||||
try:
|
||||
return item_method()
|
||||
except (TypeError, ValueError):
|
||||
pass
|
||||
|
||||
tolist_method = getattr(obj, "tolist", None)
|
||||
if callable(tolist_method):
|
||||
try:
|
||||
return ProcessorManager._json_ready(tolist_method())
|
||||
except (TypeError, ValueError):
|
||||
pass
|
||||
|
||||
return obj
|
||||
|
||||
@ -213,58 +213,12 @@ class ProcessorWebSocketHandler:
|
||||
return
|
||||
|
||||
try:
|
||||
processor = self.processor_manager.get_processor(processor_id)
|
||||
if processor is None:
|
||||
await self._send_error(websocket, f"Processor {processor_id} not found")
|
||||
return
|
||||
|
||||
# Get full state with sweep history
|
||||
full_state = processor.get_full_state()
|
||||
|
||||
# Get current result to include data and plotly_config
|
||||
current_result = processor.recalculate()
|
||||
|
||||
response = {
|
||||
"type": "processor_state",
|
||||
"processor_id": processor_id,
|
||||
"state": full_state,
|
||||
"current_data": {
|
||||
"data": current_result.data if current_result else {},
|
||||
"plotly_config": current_result.plotly_config if current_result else {},
|
||||
"timestamp": current_result.timestamp if current_result else datetime.now().timestamp(),
|
||||
} if current_result else None,
|
||||
}
|
||||
response = self.processor_manager.build_processor_state(processor_id)
|
||||
await websocket.send_text(json.dumps(response))
|
||||
except Exception as exc: # noqa: BLE001
|
||||
logger.error("Error getting processor state", processor_id=processor_id, error=repr(exc))
|
||||
await self._send_error(websocket, f"Error getting processor state: {exc}")
|
||||
|
||||
# --------------------------------------------------------------------- #
|
||||
# Outbound helpers
|
||||
# --------------------------------------------------------------------- #
|
||||
def _make_json_serializable(self, obj: Any) -> Any:
|
||||
"""
|
||||
Recursively convert non-serializable objects to JSON-compatible types.
|
||||
Handles numpy types, Enums, custom objects, etc.
|
||||
"""
|
||||
import numpy as np
|
||||
from enum import Enum
|
||||
|
||||
if isinstance(obj, (np.integer, np.floating)):
|
||||
return obj.item() # Convert numpy scalar to Python type
|
||||
elif isinstance(obj, np.ndarray):
|
||||
return obj.tolist() # Convert numpy array to list
|
||||
elif isinstance(obj, Enum):
|
||||
return obj.value
|
||||
elif isinstance(obj, dict):
|
||||
return {key: self._make_json_serializable(value) for key, value in obj.items()}
|
||||
elif isinstance(obj, (list, tuple)):
|
||||
return [self._make_json_serializable(item) for item in obj]
|
||||
elif hasattr(obj, '__dict__'):
|
||||
return self._make_json_serializable(obj.__dict__)
|
||||
else:
|
||||
return obj
|
||||
|
||||
def _result_to_message(self, processor_id: str, result: ProcessedResult) -> dict[str, Any]:
|
||||
"""
|
||||
Convert a `ProcessedResult` into a lightweight JSON-serializable message for broadcasting.
|
||||
@ -278,7 +232,7 @@ class ProcessorWebSocketHandler:
|
||||
"timestamp": result.timestamp,
|
||||
"plotly_config": result.plotly_config,
|
||||
"ui_parameters": [asdict(param) for param in result.ui_parameters],
|
||||
"metadata": self._make_json_serializable(result.metadata),
|
||||
"metadata": result.metadata,
|
||||
}
|
||||
|
||||
async def _send_error(
|
||||
|
||||
Reference in New Issue
Block a user