fixing proc load in progress
This commit is contained in:
@ -678,6 +678,24 @@ class BaseProcessor:
|
|||||||
def _get_metadata(self) -> dict[str, Any]:
|
def _get_metadata(self) -> dict[str, Any]:
|
||||||
"""
|
"""
|
||||||
Return diagnostic metadata bundled with each `ProcessedResult`.
|
Return diagnostic metadata bundled with each `ProcessedResult`.
|
||||||
|
|
||||||
|
Note: This is sent with every websocket broadcast, so it should be lightweight.
|
||||||
|
For full state including sweep_history, use get_full_state().
|
||||||
|
"""
|
||||||
|
with self._lock:
|
||||||
|
return {
|
||||||
|
"processor_id": self.processor_id,
|
||||||
|
"config": self._config.copy(),
|
||||||
|
"history_count": len(self._sweep_history),
|
||||||
|
"max_history": self._max_history,
|
||||||
|
}
|
||||||
|
|
||||||
|
def get_full_state(self) -> dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Return complete processor state including sweep history.
|
||||||
|
|
||||||
|
This should be called explicitly when needed (e.g., for export/download),
|
||||||
|
not on every broadcast.
|
||||||
"""
|
"""
|
||||||
with self._lock:
|
with self._lock:
|
||||||
return {
|
return {
|
||||||
|
|||||||
@ -102,6 +102,8 @@ class ProcessorWebSocketHandler:
|
|||||||
await self._handle_get_history(websocket, message)
|
await self._handle_get_history(websocket, message)
|
||||||
elif mtype == "load_history":
|
elif mtype == "load_history":
|
||||||
await self._handle_load_history(websocket, message)
|
await self._handle_load_history(websocket, message)
|
||||||
|
elif mtype == "get_processor_state":
|
||||||
|
await self._handle_get_processor_state(websocket, message)
|
||||||
else:
|
else:
|
||||||
await self._send_error(websocket, f"Unknown message type: {mtype!r}")
|
await self._send_error(websocket, f"Unknown message type: {mtype!r}")
|
||||||
except json.JSONDecodeError as json_error:
|
except json.JSONDecodeError as json_error:
|
||||||
@ -197,6 +199,46 @@ class ProcessorWebSocketHandler:
|
|||||||
logger.error("History load failed", processor_id=processor_id, error=repr(exc))
|
logger.error("History load failed", processor_id=processor_id, error=repr(exc))
|
||||||
await self._send_error(websocket, f"History load failed: {exc}")
|
await self._send_error(websocket, f"History load failed: {exc}")
|
||||||
|
|
||||||
|
async def _handle_get_processor_state(self, websocket: WebSocket, message: dict[str, Any]) -> None:
|
||||||
|
"""
|
||||||
|
Fetch complete processor state including sweep history and current data.
|
||||||
|
|
||||||
|
This should be called explicitly by the UI (e.g., via a download/export button)
|
||||||
|
rather than being sent automatically with every result.
|
||||||
|
"""
|
||||||
|
processor_id = message.get("processor_id")
|
||||||
|
|
||||||
|
if not processor_id:
|
||||||
|
await self._send_error(websocket, "processor_id is required")
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
processor = self.processor_manager.get_processor(processor_id)
|
||||||
|
if processor is None:
|
||||||
|
await self._send_error(websocket, f"Processor {processor_id} not found")
|
||||||
|
return
|
||||||
|
|
||||||
|
# Get full state with sweep history
|
||||||
|
full_state = processor.get_full_state()
|
||||||
|
|
||||||
|
# Get current result to include data and plotly_config
|
||||||
|
current_result = processor.recalculate()
|
||||||
|
|
||||||
|
response = {
|
||||||
|
"type": "processor_state",
|
||||||
|
"processor_id": processor_id,
|
||||||
|
"state": full_state,
|
||||||
|
"current_data": {
|
||||||
|
"data": current_result.data if current_result else {},
|
||||||
|
"plotly_config": current_result.plotly_config if current_result else {},
|
||||||
|
"timestamp": current_result.timestamp if current_result else datetime.now().timestamp(),
|
||||||
|
} if current_result else None,
|
||||||
|
}
|
||||||
|
await websocket.send_text(json.dumps(response))
|
||||||
|
except Exception as exc: # noqa: BLE001
|
||||||
|
logger.error("Error getting processor state", processor_id=processor_id, error=repr(exc))
|
||||||
|
await self._send_error(websocket, f"Error getting processor state: {exc}")
|
||||||
|
|
||||||
# --------------------------------------------------------------------- #
|
# --------------------------------------------------------------------- #
|
||||||
# Outbound helpers
|
# Outbound helpers
|
||||||
# --------------------------------------------------------------------- #
|
# --------------------------------------------------------------------- #
|
||||||
@ -224,13 +266,17 @@ class ProcessorWebSocketHandler:
|
|||||||
return obj
|
return obj
|
||||||
|
|
||||||
def _result_to_message(self, processor_id: str, result: ProcessedResult) -> dict[str, Any]:
|
def _result_to_message(self, processor_id: str, result: ProcessedResult) -> dict[str, Any]:
|
||||||
"""Convert a `ProcessedResult` into a JSON-serializable message."""
|
"""
|
||||||
|
Convert a `ProcessedResult` into a lightweight JSON-serializable message for broadcasting.
|
||||||
|
|
||||||
|
Note: This excludes heavy data fields like 'data' to minimize websocket traffic.
|
||||||
|
Use get_processor_state command to retrieve full data including sweep history.
|
||||||
|
"""
|
||||||
return {
|
return {
|
||||||
"type": "processor_result",
|
"type": "processor_result",
|
||||||
"processor_id": processor_id,
|
"processor_id": processor_id,
|
||||||
"timestamp": result.timestamp,
|
"timestamp": result.timestamp,
|
||||||
"data": self._make_json_serializable(result.data),
|
"plotly_config": result.plotly_config,
|
||||||
"plotly_config": self._make_json_serializable(result.plotly_config),
|
|
||||||
"ui_parameters": [asdict(param) for param in result.ui_parameters],
|
"ui_parameters": [asdict(param) for param in result.ui_parameters],
|
||||||
"metadata": self._make_json_serializable(result.metadata),
|
"metadata": self._make_json_serializable(result.metadata),
|
||||||
}
|
}
|
||||||
|
|||||||
@ -164,6 +164,9 @@ export class WebSocketManager {
|
|||||||
case 'processor_history':
|
case 'processor_history':
|
||||||
this.emit('processor_history', payload);
|
this.emit('processor_history', payload);
|
||||||
break;
|
break;
|
||||||
|
case 'processor_state':
|
||||||
|
this.emit('processor_state', payload);
|
||||||
|
break;
|
||||||
case 'error':
|
case 'error':
|
||||||
console.error('Server error:', payload);
|
console.error('Server error:', payload);
|
||||||
console.error('Error details:', {
|
console.error('Error details:', {
|
||||||
@ -322,6 +325,14 @@ export class WebSocketManager {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Get full processor state including sweep history (EXISTS ON BACKEND) */
|
||||||
|
getProcessorState(processorId) {
|
||||||
|
return this.send({
|
||||||
|
type: 'get_processor_state',
|
||||||
|
processor_id: processorId
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
// === Events ===
|
// === Events ===
|
||||||
on(event, callback) {
|
on(event, callback) {
|
||||||
if (!this.eventListeners.has(event)) this.eventListeners.set(event, []);
|
if (!this.eventListeners.has(event)) this.eventListeners.set(event, []);
|
||||||
|
|||||||
Reference in New Issue
Block a user