From 76d13f682e0b071e3a1899e4483766ac304272ce Mon Sep 17 00:00:00 2001 From: perf3ct Date: Tue, 15 Apr 2025 22:22:31 +0000 Subject: [PATCH] this works, mostly const cleanupDelay = toolsExecuted ? 15000 : 1000 --- .../app/widgets/llm_chat/communication.ts | 143 +++++++++++++----- 1 file changed, 109 insertions(+), 34 deletions(-) diff --git a/src/public/app/widgets/llm_chat/communication.ts b/src/public/app/widgets/llm_chat/communication.ts index 42c21ab95..c77d54cff 100644 --- a/src/public/app/widgets/llm_chat/communication.ts +++ b/src/public/app/widgets/llm_chat/communication.ts @@ -53,13 +53,53 @@ export async function setupStreamingResponse( let receivedAnyContent = false; let timeoutId: number | null = null; let initialTimeoutId: number | null = null; + let cleanupTimeoutId: number | null = null; let receivedAnyMessage = false; + let toolsExecuted = false; // Flag to track if tools were executed in this session + let toolExecutionCompleted = false; // Flag to track if tool execution is completed let eventListener: ((event: Event) => void) | null = null; + let lastMessageTimestamp = 0; // Create a unique identifier for this response process const responseId = `llm-stream-${Date.now()}-${Math.floor(Math.random() * 1000)}`; console.log(`[${responseId}] Setting up WebSocket streaming for session ${sessionId}`); + // Function to safely perform cleanup + const performCleanup = () => { + if (cleanupTimeoutId) { + window.clearTimeout(cleanupTimeoutId); + cleanupTimeoutId = null; + } + + console.log(`[${responseId}] Performing final cleanup of event listener`); + cleanupEventListener(eventListener); + onComplete(); + resolve(); + }; + + // Function to schedule cleanup with ability to cancel + const scheduleCleanup = (delay: number) => { + // Clear any existing cleanup timeout + if (cleanupTimeoutId) { + window.clearTimeout(cleanupTimeoutId); + } + + console.log(`[${responseId}] Scheduling listener cleanup in ${delay}ms`); + + // Set new cleanup timeout + cleanupTimeoutId = window.setTimeout(() => { + // Only clean up if no messages received recently (in last 2 seconds) + const timeSinceLastMessage = Date.now() - lastMessageTimestamp; + if (timeSinceLastMessage > 2000) { + performCleanup(); + } else { + console.log(`[${responseId}] Received message recently, delaying cleanup`); + // Reschedule cleanup + scheduleCleanup(2000); + } + }, delay); + }; + // Create a message handler for CustomEvents eventListener = (event: Event) => { const customEvent = event as CustomEvent; @@ -70,6 +110,16 @@ export async function setupStreamingResponse( return; } + // Update last message timestamp + lastMessageTimestamp = Date.now(); + + // Cancel any pending cleanup when we receive a new message + if (cleanupTimeoutId) { + console.log(`[${responseId}] Cancelling scheduled cleanup due to new message`); + window.clearTimeout(cleanupTimeoutId); + cleanupTimeoutId = null; + } + console.log(`[${responseId}] LLM Stream message received via CustomEvent: session=${sessionId}, content=${!!message.content}, contentLength=${message.content?.length || 0}, thinking=${!!message.thinking}, toolExecution=${!!message.toolExecution}, done=${!!message.done}, type=${message.type || 'llm-stream'}`); // Mark first message received @@ -86,6 +136,7 @@ export async function setupStreamingResponse( // Handle specific message types if (message.type === 'tool_execution_start') { + toolsExecuted = true; // Mark that tools were executed onThinkingUpdate('Executing tools...'); // Also trigger tool execution UI with a specific format onToolExecution({ @@ -97,6 +148,7 @@ export async function setupStreamingResponse( } if (message.type === 'tool_result' && message.toolExecution) { + toolsExecuted = true; // Mark that tools were executed console.log(`[${responseId}] Processing tool result: ${JSON.stringify(message.toolExecution)}`); // If tool execution doesn't have an action, add 'result' as the default @@ -113,10 +165,19 @@ export async function setupStreamingResponse( // Then send the actual tool execution data onToolExecution(message.toolExecution); + + // Mark tool execution as completed if this is a result or error + if (message.toolExecution.action === 'result' || message.toolExecution.action === 'complete' || message.toolExecution.action === 'error') { + toolExecutionCompleted = true; + console.log(`[${responseId}] Tool execution completed`); + } + return; // Skip accumulating content from this message } if (message.type === 'tool_execution_error' && message.toolExecution) { + toolsExecuted = true; // Mark that tools were executed + toolExecutionCompleted = true; // Mark tool execution as completed onToolExecution({ ...message.toolExecution, action: 'error', @@ -126,6 +187,8 @@ export async function setupStreamingResponse( } if (message.type === 'tool_completion_processing') { + toolsExecuted = true; // Mark that tools were executed + toolExecutionCompleted = true; // Tools are done, now processing the result onThinkingUpdate('Generating response with tool results...'); // Also trigger tool execution UI with a specific format onToolExecution({ @@ -138,20 +201,34 @@ export async function setupStreamingResponse( // Handle content updates if (message.content) { - receivedAnyContent = true; - console.log(`[${responseId}] Received content chunk of length ${message.content.length}, preview: "${message.content.substring(0, 50)}${message.content.length > 50 ? '...' : ''}"`); - // Check if this is a duplicated message containing the same content we already have - if (message.done && assistantResponse.includes(message.content)) { - console.log(`[${responseId}] Ignoring duplicated content in done message`); + // If tools were executed and completed, and we're now getting new content, + // this is likely the final response after tool execution from Anthropic + if (toolsExecuted && toolExecutionCompleted && message.content) { + console.log(`[${responseId}] Post-tool execution content detected, resetting previous content`); + + // Reset accumulated response for post-tool execution response + assistantResponse = message.content; + + // Update the UI with the fresh content + onContentUpdate(assistantResponse, message.done || false); } else { - // Add to our accumulated response - assistantResponse += message.content; + // Standard content handling for non-tool cases or initial tool response + + // Check if this is a duplicated message containing the same content we already have + if (message.done && assistantResponse.includes(message.content)) { + console.log(`[${responseId}] Ignoring duplicated content in done message`); + } else { + // Add to our accumulated response + assistantResponse += message.content; + } + + // Update the UI immediately with each chunk + onContentUpdate(assistantResponse, message.done || false); } - // Update the UI immediately with each chunk - onContentUpdate(assistantResponse, false); + receivedAnyContent = true; // Reset timeout since we got content if (timeoutId !== null) { @@ -163,7 +240,7 @@ export async function setupStreamingResponse( console.warn(`[${responseId}] Stream timeout for session ${sessionId}`); // Clean up - cleanupEventListener(eventListener); + performCleanup(); reject(new Error('Stream timeout')); }, 30000); } @@ -173,6 +250,16 @@ export async function setupStreamingResponse( // Only process if we haven't already handled this message via specific message types if (message.type === 'llm-stream' || !message.type) { console.log(`[${responseId}] Received tool execution update: action=${message.toolExecution.action || 'unknown'}`); + toolsExecuted = true; // Mark that tools were executed + + // Mark tool execution as completed if this is a result or error + if (message.toolExecution.action === 'result' || + message.toolExecution.action === 'complete' || + message.toolExecution.action === 'error') { + toolExecutionCompleted = true; + console.log(`[${responseId}] Tool execution completed via toolExecution message`); + } + onToolExecution(message.toolExecution); } } @@ -181,6 +268,7 @@ export async function setupStreamingResponse( const toolCalls = message.tool_calls || (message.raw && message.raw.tool_calls); if (toolCalls && Array.isArray(toolCalls)) { console.log(`[${responseId}] Received tool calls: ${toolCalls.length} tools`); + toolsExecuted = true; // Mark that tools were executed // First send a 'start' action to ensure the container is created onToolExecution({ @@ -233,33 +321,20 @@ export async function setupStreamingResponse( timeoutId = null; } - // Check if we have content in the done message - ONLY process if we haven't received any content yet - if (message.content && !receivedAnyContent) { - console.log(`[${responseId}] Processing content in done message: ${message.content.length} chars`); - receivedAnyContent = true; - - // Use content from done message as full response - console.log(`[${responseId}] Using content from done message as full response`); + // Make sure the final message is displayed + if (message.content && !assistantResponse.includes(message.content)) { + console.log(`[${responseId}] Final message has unique content, using it`); assistantResponse = message.content; - onContentUpdate(assistantResponse, true); - } else if (message.content) { - // We already have content, signal as done but don't duplicate - console.log(`[${responseId}] Content in done message ignored as we already have streamed content`); - onContentUpdate(assistantResponse, true); - } else { - // No content in done message, just mark as done - onContentUpdate(assistantResponse, true); } - // Set a short delay before cleanup to allow any immediately following - // tool execution messages to be processed - setTimeout(() => { - // Clean up and resolve - console.log(`[${responseId}] Cleaning up event listener after delay`); - cleanupEventListener(eventListener); - onComplete(); - resolve(); - }, 1000); // 1 second delay to allow tool execution messages to arrive + // Always mark as done when we receive the done flag + onContentUpdate(assistantResponse, true); + + // Set a longer delay before cleanup to allow for post-tool execution messages + // Especially important for Anthropic which may send final message after tool execution + const cleanupDelay = toolsExecuted ? 15000 : 1000; // 15 seconds if tools were used, otherwise 1 second + console.log(`[${responseId}] Setting cleanup delay of ${cleanupDelay}ms since toolsExecuted=${toolsExecuted}`); + scheduleCleanup(cleanupDelay); } };