From 7b498cf384d1ffd389e62dda1967ace746316670 Mon Sep 17 00:00:00 2001 From: perf3ct Date: Mon, 2 Jun 2025 22:30:59 +0000 Subject: [PATCH] refactor(llm): update chat saving logic to prevent race conditions between client and server --- .../src/widgets/llm_chat/llm_chat_panel.ts | 48 ++++++++++++++----- .../services/llm/chat/rest_chat_service.ts | 25 +++++++--- 2 files changed, 54 insertions(+), 19 deletions(-) diff --git a/apps/client/src/widgets/llm_chat/llm_chat_panel.ts b/apps/client/src/widgets/llm_chat/llm_chat_panel.ts index fc6a28f26..1350302ac 100644 --- a/apps/client/src/widgets/llm_chat/llm_chat_panel.ts +++ b/apps/client/src/widgets/llm_chat/llm_chat_panel.ts @@ -951,9 +951,9 @@ export default class LlmChatPanel extends BasicWidget { } } - // Save the updated data to the note - this.saveCurrentData() - .catch(err => console.error("Failed to save data after streaming completed:", err)); + // DON'T save here - let the server handle saving the complete conversation + // to avoid race conditions between client and server saves + console.log("Updated metadata after streaming completion, server should save"); }) .catch(err => console.error("Error fetching session data after streaming:", err)); } @@ -991,11 +991,9 @@ export default class LlmChatPanel extends BasicWidget { console.log(`Cached tool execution for ${toolData.tool} to be saved later`); - // Save immediately after receiving a tool execution - // This ensures we don't lose tool execution data if streaming fails - this.saveCurrentData().catch(err => { - console.error("Failed to save tool execution data:", err); - }); + // DON'T save immediately during streaming - let the server handle saving + // to avoid race conditions between client and server saves + console.log(`Tool execution cached, will be saved by server`); } }, // Complete handler @@ -1078,10 +1076,36 @@ export default class LlmChatPanel extends BasicWidget { // Hide loading indicator hideLoadingIndicator(this.loadingIndicator); - // Save the final state to the Chat Note - this.saveCurrentData().catch(err => { - console.error("Failed to save assistant response to note:", err); - }); + // DON'T save here immediately - let the server save the accumulated response first + // to avoid race conditions. We'll reload the data from the server after a short delay. + console.log("Stream completed, waiting for server to save then reloading data..."); + setTimeout(async () => { + try { + console.log("About to reload data from server..."); + const currentMessageCount = this.messages.length; + console.log(`Current client message count before reload: ${currentMessageCount}`); + + // Reload the data from the server which should have the complete conversation + const reloadSuccess = await this.loadSavedData(); + + const newMessageCount = this.messages.length; + console.log(`Reload success: ${reloadSuccess}, message count after reload: ${newMessageCount}`); + + if (reloadSuccess && newMessageCount > currentMessageCount) { + console.log("Successfully reloaded data with more complete conversation"); + } else if (!reloadSuccess) { + console.warn("Reload failed, keeping current client state"); + } else { + console.warn("Reload succeeded but message count didn't increase"); + } + } catch (error) { + console.error("Failed to reload data after stream completion:", error); + // Fallback: save our current state if reload fails + this.saveCurrentData().catch(err => { + console.error("Failed to save assistant response to note:", err); + }); + } + }, 3000); // Wait 3 seconds for server to complete its save } // Scroll to bottom diff --git a/apps/server/src/services/llm/chat/rest_chat_service.ts b/apps/server/src/services/llm/chat/rest_chat_service.ts index b6b0bc820..53c682862 100644 --- a/apps/server/src/services/llm/chat/rest_chat_service.ts +++ b/apps/server/src/services/llm/chat/rest_chat_service.ts @@ -150,7 +150,7 @@ class RestChatService { // Import WebSocket service for streaming const wsService = await import('../../ws.js'); - let accumulatedContent = ''; + const accumulatedContentRef = { value: '' }; const pipelineInput: ChatPipelineInput = { messages: chat.messages.map(msg => ({ @@ -162,8 +162,7 @@ class RestChatService { showThinking: showThinking, options: pipelineOptions, streamCallback: req.method === 'GET' ? (data, done, rawChunk) => { - this.handleStreamCallback(data, done, rawChunk, wsService.default, chatNoteId, res); - if (data) accumulatedContent += data; + this.handleStreamCallback(data, done, rawChunk, wsService.default, chatNoteId, res, accumulatedContentRef); } : undefined }; @@ -194,13 +193,15 @@ class RestChatService { }; } else { // For streaming, response is already sent via WebSocket/SSE - // Save the accumulated content - if (accumulatedContent) { + // Save the accumulated content - prefer accumulated content over response.text + const finalContent = accumulatedContentRef.value || response.text || ''; + if (finalContent) { chat.messages.push({ role: 'assistant', - content: accumulatedContent + content: finalContent }); await chatStorageService.updateChat(chat.id, chat.messages, chat.title); + log.info(`Saved accumulated streaming content: ${finalContent.length} characters`); } return null; } @@ -219,7 +220,8 @@ class RestChatService { rawChunk: any, wsService: any, chatNoteId: string, - res: Response + res: Response, + accumulatedContentRef: { value: string } ) { const message: LLMStreamMessage = { type: 'llm-stream', @@ -229,6 +231,15 @@ class RestChatService { if (data) { message.content = data; + // Handle accumulation carefully - if this appears to be a complete response + // (done=true and data is much longer than current accumulated), replace rather than append + if (done && data.length > accumulatedContentRef.value.length && data.includes(accumulatedContentRef.value)) { + // This looks like a complete final response that includes what we've accumulated + accumulatedContentRef.value = data; + } else { + // Normal incremental accumulation + accumulatedContentRef.value += data; + } } if (rawChunk && 'thinking' in rawChunk && rawChunk.thinking) {