diff --git a/apps/client/src/widgets/llm_chat/communication.ts b/apps/client/src/widgets/llm_chat/communication.ts index 87687cab9..614add7ad 100644 --- a/apps/client/src/widgets/llm_chat/communication.ts +++ b/apps/client/src/widgets/llm_chat/communication.ts @@ -62,15 +62,11 @@ export async function setupStreamingResponse( ): Promise { return new Promise((resolve, reject) => { let assistantResponse = ''; - let postToolResponse = ''; // Separate accumulator for post-tool execution content let receivedAnyContent = false; - let receivedPostToolContent = false; // Track if we've started receiving post-tool content let timeoutId: number | null = null; let initialTimeoutId: number | null = null; let cleanupTimeoutId: number | null = null; let receivedAnyMessage = false; - let toolsExecuted = false; // Flag to track if tools were executed in this session - let toolExecutionCompleted = false; // Flag to track if tool execution is completed let eventListener: ((event: Event) => void) | null = null; let lastMessageTimestamp = 0; @@ -118,28 +114,14 @@ export async function setupStreamingResponse( resolve(); }; - // Function to schedule cleanup with ability to cancel - const scheduleCleanup = (delay: number) => { - // Clear any existing cleanup timeout - if (cleanupTimeoutId) { - window.clearTimeout(cleanupTimeoutId); + // Set initial timeout to catch cases where no message is received at all + initialTimeoutId = window.setTimeout(() => { + if (!receivedAnyMessage) { + console.error(`[${responseId}] No initial message received within timeout`); + performCleanup(); + reject(new Error('No response received from server')); } - - console.log(`[${responseId}] Scheduling listener cleanup in ${delay}ms`); - - // Set new cleanup timeout - cleanupTimeoutId = window.setTimeout(() => { - // Only clean up if no messages received recently (in last 2 seconds) - const timeSinceLastMessage = Date.now() - lastMessageTimestamp; - if (timeSinceLastMessage > 2000) { - performCleanup(); - } else { - console.log(`[${responseId}] Received message recently, delaying cleanup`); - // Reschedule cleanup - scheduleCleanup(2000); - } - }, delay); - }; + }, 10000); // Create a message handler for CustomEvents eventListener = (event: Event) => { @@ -161,7 +143,7 @@ export async function setupStreamingResponse( cleanupTimeoutId = null; } - console.log(`[${responseId}] LLM Stream message received via CustomEvent: chatNoteId=${noteId}, content=${!!message.content}, contentLength=${message.content?.length || 0}, thinking=${!!message.thinking}, toolExecution=${!!message.toolExecution}, done=${!!message.done}, type=${message.type || 'llm-stream'}`); + console.log(`[${responseId}] LLM Stream message received: content=${!!message.content}, contentLength=${message.content?.length || 0}, thinking=${!!message.thinking}, toolExecution=${!!message.toolExecution}, done=${!!message.done}`); // Mark first message received if (!receivedAnyMessage) { @@ -175,109 +157,33 @@ export async function setupStreamingResponse( } } - // Handle specific message types - if (message.type === 'tool_execution_start') { - toolsExecuted = true; // Mark that tools were executed - onThinkingUpdate('Executing tools...'); - // Also trigger tool execution UI with a specific format - onToolExecution({ - action: 'start', - tool: 'tools', - result: 'Executing tools...' - }); - return; // Skip accumulating content from this message + // Handle error + if (message.error) { + console.error(`[${responseId}] Stream error: ${message.error}`); + performCleanup(); + reject(new Error(message.error)); + return; } - if (message.type === 'tool_result' && message.toolExecution) { - toolsExecuted = true; // Mark that tools were executed - console.log(`[${responseId}] Processing tool result: ${JSON.stringify(message.toolExecution)}`); + // Handle thinking updates - only show if showThinking is enabled + if (message.thinking && messageParams.showThinking) { + console.log(`[${responseId}] Received thinking: ${message.thinking.substring(0, 100)}...`); + onThinkingUpdate(message.thinking); + } - // If tool execution doesn't have an action, add 'result' as the default - if (!message.toolExecution.action) { - message.toolExecution.action = 'result'; - } - - // First send a 'start' action to ensure the container is created - onToolExecution({ - action: 'start', - tool: 'tools', - result: 'Tool execution initialized' - }); - - // Then send the actual tool execution data + // Handle tool execution updates + if (message.toolExecution) { + console.log(`[${responseId}] Tool execution update:`, message.toolExecution); onToolExecution(message.toolExecution); - - // Mark tool execution as completed if this is a result or error - if (message.toolExecution.action === 'result' || message.toolExecution.action === 'complete' || message.toolExecution.action === 'error') { - toolExecutionCompleted = true; - console.log(`[${responseId}] Tool execution completed`); - } - - return; // Skip accumulating content from this message - } - - if (message.type === 'tool_execution_error' && message.toolExecution) { - toolsExecuted = true; // Mark that tools were executed - toolExecutionCompleted = true; // Mark tool execution as completed - onToolExecution({ - ...message.toolExecution, - action: 'error', - error: message.toolExecution.error || 'Unknown error during tool execution' - }); - return; // Skip accumulating content from this message - } - - if (message.type === 'tool_completion_processing') { - toolsExecuted = true; // Mark that tools were executed - toolExecutionCompleted = true; // Tools are done, now processing the result - onThinkingUpdate('Generating response with tool results...'); - // Also trigger tool execution UI with a specific format - onToolExecution({ - action: 'generating', - tool: 'tools', - result: 'Generating response with tool results...' - }); - return; // Skip accumulating content from this message } // Handle content updates if (message.content) { - console.log(`[${responseId}] Received content chunk of length ${message.content.length}, preview: "${message.content.substring(0, 50)}${message.content.length > 50 ? '...' : ''}"`); - - // If tools were executed and completed, and we're now getting new content, - // this is likely the final response after tool execution from Anthropic - if (toolsExecuted && toolExecutionCompleted && message.content) { - console.log(`[${responseId}] Post-tool execution content detected`); - - // If this is the first post-tool chunk, indicate we're starting a new response - if (!receivedPostToolContent) { - receivedPostToolContent = true; - postToolResponse = ''; // Clear any previous post-tool response - console.log(`[${responseId}] First post-tool content chunk, starting fresh accumulation`); - } - - // Accumulate post-tool execution content - postToolResponse += message.content; - console.log(`[${responseId}] Accumulated post-tool content, now ${postToolResponse.length} chars`); - - // Update the UI with the accumulated post-tool content - // This replaces the pre-tool content with our accumulated post-tool content - onContentUpdate(postToolResponse, message.done || false); - } else { - // Standard content handling for non-tool cases or initial tool response - - // Check if this is a duplicated message containing the same content we already have - if (message.done && assistantResponse.includes(message.content)) { - console.log(`[${responseId}] Ignoring duplicated content in done message`); - } else { - // Add to our accumulated response - assistantResponse += message.content; - } - - // Update the UI immediately with each chunk - onContentUpdate(assistantResponse, message.done || false); - } + // Simply append the new content - no complex deduplication + assistantResponse += message.content; + // Update the UI immediately with each chunk + onContentUpdate(assistantResponse, message.done || false); receivedAnyContent = true; // Reset timeout since we got content @@ -288,150 +194,32 @@ export async function setupStreamingResponse( // Set new timeout timeoutId = window.setTimeout(() => { console.warn(`[${responseId}] Stream timeout for chat note ${noteId}`); - - // Clean up performCleanup(); reject(new Error('Stream timeout')); }, 30000); } - // Handle tool execution updates (legacy format and standard format with llm-stream type) - if (message.toolExecution) { - // Only process if we haven't already handled this message via specific message types - if (message.type === 'llm-stream' || !message.type) { - console.log(`[${responseId}] Received tool execution update: action=${message.toolExecution.action || 'unknown'}`); - toolsExecuted = true; // Mark that tools were executed - - // Mark tool execution as completed if this is a result or error - if (message.toolExecution.action === 'result' || - message.toolExecution.action === 'complete' || - message.toolExecution.action === 'error') { - toolExecutionCompleted = true; - console.log(`[${responseId}] Tool execution completed via toolExecution message`); - } - - onToolExecution(message.toolExecution); - } - } - - // Handle tool calls from the raw data or direct in message (OpenAI format) - const toolCalls = message.tool_calls || (message.raw && message.raw.tool_calls); - if (toolCalls && Array.isArray(toolCalls)) { - console.log(`[${responseId}] Received tool calls: ${toolCalls.length} tools`); - toolsExecuted = true; // Mark that tools were executed - - // First send a 'start' action to ensure the container is created - onToolExecution({ - action: 'start', - tool: 'tools', - result: 'Tool execution initialized' - }); - - // Then process each tool call - for (const toolCall of toolCalls) { - let args = toolCall.function?.arguments || {}; - - // Try to parse arguments if they're a string - if (typeof args === 'string') { - try { - args = JSON.parse(args); - } catch (e) { - console.log(`[${responseId}] Could not parse tool arguments as JSON: ${e}`); - args = { raw: args }; - } - } - - onToolExecution({ - action: 'executing', - tool: toolCall.function?.name || 'unknown', - toolCallId: toolCall.id, - args: args - }); - } - } - - // Handle thinking state updates - if (message.thinking) { - console.log(`[${responseId}] Received thinking update: ${message.thinking.substring(0, 50)}...`); - onThinkingUpdate(message.thinking); - } - // Handle completion if (message.done) { - console.log(`[${responseId}] Stream completed for chat note ${noteId}, has content: ${!!message.content}, content length: ${message.content?.length || 0}, current response: ${assistantResponse.length} chars`); + console.log(`[${responseId}] Stream completed for chat note ${noteId}, final response: ${assistantResponse.length} chars`); - // Dump message content to console for debugging - if (message.content) { - console.log(`[${responseId}] CONTENT IN DONE MESSAGE (first 200 chars): "${message.content.substring(0, 200)}..."`); - - // Check if the done message contains the exact same content as our accumulated response - // We normalize by removing whitespace to avoid false negatives due to spacing differences - const normalizedMessage = message.content.trim(); - const normalizedResponse = assistantResponse.trim(); - - if (normalizedMessage === normalizedResponse) { - console.log(`[${responseId}] Final message is identical to accumulated response, no need to update`); - } - // If the done message is longer but contains our accumulated response, use the done message - else if (normalizedMessage.includes(normalizedResponse) && normalizedMessage.length > normalizedResponse.length) { - console.log(`[${responseId}] Final message is more complete than accumulated response, using it`); - assistantResponse = message.content; - } - // If the done message is different and not already included, append it to avoid duplication - else if (!normalizedResponse.includes(normalizedMessage) && normalizedMessage.length > 0) { - console.log(`[${responseId}] Final message has unique content, using it`); - assistantResponse = message.content; - } - // Otherwise, we already have the content accumulated, so no need to update - else { - console.log(`[${responseId}] Already have this content accumulated, not updating`); - } - } - - // Clear timeout if set + // Clear all timeouts if (timeoutId !== null) { window.clearTimeout(timeoutId); timeoutId = null; } - // Always mark as done when we receive the done flag - onContentUpdate(assistantResponse, true); - - // Set a longer delay before cleanup to allow for post-tool execution messages - // Especially important for Anthropic which may send final message after tool execution - const cleanupDelay = toolsExecuted ? 15000 : 1000; // 15 seconds if tools were used, otherwise 1 second - console.log(`[${responseId}] Setting cleanup delay of ${cleanupDelay}ms since toolsExecuted=${toolsExecuted}`); - scheduleCleanup(cleanupDelay); + // Schedule cleanup after a brief delay to ensure all processing is complete + cleanupTimeoutId = window.setTimeout(() => { + performCleanup(); + }, 100); } }; - // Register event listener for the custom event - try { - window.addEventListener('llm-stream-message', eventListener); - console.log(`[${responseId}] Event listener added for llm-stream-message events`); - } catch (err) { - console.error(`[${responseId}] Error setting up event listener:`, err); - reject(err); - return; - } + // Register the event listener for WebSocket messages + window.addEventListener('llm-stream-message', eventListener); - // Set initial timeout for receiving any message - initialTimeoutId = window.setTimeout(() => { - console.warn(`[${responseId}] No messages received for initial period in chat note ${noteId}`); - if (!receivedAnyMessage) { - console.error(`[${responseId}] WebSocket connection not established for chat note ${noteId}`); - - if (timeoutId !== null) { - window.clearTimeout(timeoutId); - } - - // Clean up - cleanupEventListener(eventListener); - - // Show error message to user - reject(new Error('WebSocket connection not established')); - } - }, 10000); + console.log(`[${responseId}] Event listener registered, waiting for messages...`); }); } diff --git a/apps/client/src/widgets/llm_chat/llm_chat_panel.ts b/apps/client/src/widgets/llm_chat/llm_chat_panel.ts index b9b3228ef..ce9b5dd12 100644 --- a/apps/client/src/widgets/llm_chat/llm_chat_panel.ts +++ b/apps/client/src/widgets/llm_chat/llm_chat_panel.ts @@ -1068,16 +1068,6 @@ export default class LlmChatPanel extends BasicWidget { * Update the UI with streaming content */ private updateStreamingUI(assistantResponse: string, isDone: boolean = false) { - // Parse and handle thinking content if present - if (!isDone) { - const thinkingContent = this.parseThinkingContent(assistantResponse); - if (thinkingContent) { - this.updateThinkingText(thinkingContent); - // Don't display the raw response with think tags in the chat - return; - } - } - // Get the existing assistant message or create a new one let assistantMessageEl = this.noteContextChatMessages.querySelector('.assistant-message:last-child'); @@ -1099,12 +1089,9 @@ export default class LlmChatPanel extends BasicWidget { assistantMessageEl.appendChild(messageContent); } - // Clean the response to remove thinking tags before displaying - const cleanedResponse = this.removeThinkingTags(assistantResponse); - - // Update the content + // Update the content with the current response (no thinking content removal) const messageContent = assistantMessageEl.querySelector('.message-content') as HTMLElement; - messageContent.innerHTML = formatMarkdown(cleanedResponse); + messageContent.innerHTML = formatMarkdown(assistantResponse); // Apply syntax highlighting if this is the final update if (isDone) { @@ -1120,65 +1107,24 @@ export default class LlmChatPanel extends BasicWidget { this.messages.lastIndexOf(assistantMessages[assistantMessages.length - 1]) : -1; if (lastAssistantMsgIndex >= 0) { - // Update existing message with cleaned content - this.messages[lastAssistantMsgIndex].content = cleanedResponse; + // Update existing message + this.messages[lastAssistantMsgIndex].content = assistantResponse; } else { - // Add new message with cleaned content + // Add new message this.messages.push({ role: 'assistant', - content: cleanedResponse + content: assistantResponse }); } - // Hide loading indicator - hideLoadingIndicator(this.loadingIndicator); - - // DON'T save here immediately - let the server save the accumulated response first - // to avoid race conditions. We'll reload the data from the server after a short delay. - console.log("Stream completed, waiting for server to save then reloading data..."); - setTimeout(async () => { - try { - console.log("About to reload data from server..."); - const currentMessageCount = this.messages.length; - console.log(`Current client message count before reload: ${currentMessageCount}`); - - // Reload the data from the server which should have the complete conversation - const reloadSuccess = await this.loadSavedData(); - - const newMessageCount = this.messages.length; - console.log(`Reload success: ${reloadSuccess}, message count after reload: ${newMessageCount}`); - - if (reloadSuccess && newMessageCount > currentMessageCount) { - console.log("Successfully reloaded data with more complete conversation"); - } else if (!reloadSuccess) { - console.warn("Reload failed, keeping current client state"); - } else { - console.warn("Reload succeeded but message count didn't increase"); - } - } catch (error) { - console.error("Failed to reload data after stream completion:", error); - // Fallback: save our current state if reload fails - this.saveCurrentData().catch(err => { - console.error("Failed to save assistant response to note:", err); - }); - } - }, 1500); // Wait 1.5 seconds for server to complete its save + // Save the data + this.saveCurrentData(); } // Scroll to bottom this.chatContainer.scrollTop = this.chatContainer.scrollHeight; } - /** - * Remove thinking tags from response content - */ - private removeThinkingTags(content: string): string { - if (!content) return content; - - // Remove ... blocks from the content - return content.replace(/[\s\S]*?<\/think>/gi, '').trim(); - } - /** * Handle general errors in the send message flow */ diff --git a/apps/server/src/routes/api/llm.ts b/apps/server/src/routes/api/llm.ts index 05e8ee879..bd56830bb 100644 --- a/apps/server/src/routes/api/llm.ts +++ b/apps/server/src/routes/api/llm.ts @@ -858,92 +858,52 @@ async function streamMessage(req: Request, res: Response) { } } - // Create request parameters for the pipeline - const requestParams = { - chatNoteId: chatNoteId, - content: enhancedContent, - useAdvancedContext: useAdvancedContext === true, - showThinking: showThinking === true, - stream: true // Always stream for this endpoint - }; - - // Create a fake request/response pair to pass to the handler - const fakeReq = { - ...req, - method: 'GET', // Set to GET to indicate streaming - query: { - stream: 'true', // Set stream param - don't use format: 'stream' to avoid confusion - useAdvancedContext: String(useAdvancedContext === true), - showThinking: String(showThinking === true) - }, - params: { - chatNoteId: chatNoteId - }, - // Make sure the enhanced content is available to the handler - body: { - content: enhancedContent, - useAdvancedContext: useAdvancedContext === true, - showThinking: showThinking === true - } - } as unknown as Request; - - // Log to verify correct parameters - log.info(`WebSocket stream settings - useAdvancedContext=${useAdvancedContext === true}, in query=${fakeReq.query.useAdvancedContext}, in body=${fakeReq.body.useAdvancedContext}`); - // Extra safety to ensure the parameters are passed correctly - if (useAdvancedContext === true) { - log.info(`Enhanced context IS enabled for this request`); - } else { - log.info(`Enhanced context is NOT enabled for this request`); - } - - // Process the request in the background - Promise.resolve().then(async () => { - try { - await restChatService.handleSendMessage(fakeReq, res); - } catch (error) { - log.error(`Background message processing error: ${error}`); - - // Import the WebSocket service - const wsService = (await import('../../services/ws.js')).default; - - // Define LLMStreamMessage interface - interface LLMStreamMessage { - type: 'llm-stream'; - chatNoteId: string; - content?: string; - thinking?: string; - toolExecution?: any; - done?: boolean; - error?: string; - raw?: unknown; - } - - // Send error to client via WebSocket - wsService.sendMessageToAllClients({ - type: 'llm-stream', - chatNoteId: chatNoteId, - error: `Error processing message: ${error}`, - done: true - } as LLMStreamMessage); - } - }); - - // Import the WebSocket service + // Import the WebSocket service to send immediate feedback const wsService = (await import('../../services/ws.js')).default; - // Let the client know streaming has started via WebSocket (helps client confirm connection is working) + // Let the client know streaming has started wsService.sendMessageToAllClients({ type: 'llm-stream', chatNoteId: chatNoteId, - thinking: 'Initializing streaming LLM response...' + thinking: showThinking ? 'Initializing streaming LLM response...' : undefined }); - // Let the client know streaming has started via HTTP response - return { - success: true, - message: 'Streaming started', - chatNoteId: chatNoteId - }; + // Process the streaming request directly + try { + const result = await restChatService.handleSendMessage({ + ...req, + method: 'GET', // Indicate streaming mode + query: { + ...req.query, + stream: 'true' // Add the required stream parameter + }, + body: { + content: enhancedContent, + useAdvancedContext: useAdvancedContext === true, + showThinking: showThinking === true + }, + params: { chatNoteId } + } as unknown as Request, res); + + // Since we're streaming, the result will be null + return { + success: true, + message: 'Streaming started', + chatNoteId: chatNoteId + }; + } catch (error) { + log.error(`Error during streaming: ${error}`); + + // Send error to client via WebSocket + wsService.sendMessageToAllClients({ + type: 'llm-stream', + chatNoteId: chatNoteId, + error: `Error processing message: ${error}`, + done: true + }); + + throw error; + } } catch (error: any) { log.error(`Error starting message stream: ${error.message}`); throw error; diff --git a/apps/server/src/services/llm/chat/rest_chat_service.ts b/apps/server/src/services/llm/chat/rest_chat_service.ts index 53c682862..2f67422f3 100644 --- a/apps/server/src/services/llm/chat/rest_chat_service.ts +++ b/apps/server/src/services/llm/chat/rest_chat_service.ts @@ -231,21 +231,16 @@ class RestChatService { if (data) { message.content = data; - // Handle accumulation carefully - if this appears to be a complete response - // (done=true and data is much longer than current accumulated), replace rather than append - if (done && data.length > accumulatedContentRef.value.length && data.includes(accumulatedContentRef.value)) { - // This looks like a complete final response that includes what we've accumulated - accumulatedContentRef.value = data; - } else { - // Normal incremental accumulation - accumulatedContentRef.value += data; - } + // Simple accumulation - just append the new data + accumulatedContentRef.value += data; } + // Only include thinking if explicitly present in rawChunk if (rawChunk && 'thinking' in rawChunk && rawChunk.thinking) { message.thinking = rawChunk.thinking as string; } + // Only include tool execution if explicitly present in rawChunk if (rawChunk && 'toolExecution' in rawChunk && rawChunk.toolExecution) { const toolExec = rawChunk.toolExecution; message.toolExecution = { @@ -262,7 +257,7 @@ class RestChatService { // Send WebSocket message wsService.sendMessageToAllClients(message); - // Send SSE response + // Send SSE response for compatibility const responseData: any = { content: data, done }; if (rawChunk?.toolExecution) { responseData.toolExecution = rawChunk.toolExecution;