this works, mostly

const cleanupDelay = toolsExecuted ? 15000 : 1000
This commit is contained in:
perf3ct 2025-04-15 22:22:31 +00:00
parent edd075cba1
commit 76d13f682e
No known key found for this signature in database
GPG Key ID: 569C4EEC436F5232

View File

@ -53,13 +53,53 @@ export async function setupStreamingResponse(
let receivedAnyContent = false; let receivedAnyContent = false;
let timeoutId: number | null = null; let timeoutId: number | null = null;
let initialTimeoutId: number | null = null; let initialTimeoutId: number | null = null;
let cleanupTimeoutId: number | null = null;
let receivedAnyMessage = false; let receivedAnyMessage = false;
let toolsExecuted = false; // Flag to track if tools were executed in this session
let toolExecutionCompleted = false; // Flag to track if tool execution is completed
let eventListener: ((event: Event) => void) | null = null; let eventListener: ((event: Event) => void) | null = null;
let lastMessageTimestamp = 0;
// Create a unique identifier for this response process // Create a unique identifier for this response process
const responseId = `llm-stream-${Date.now()}-${Math.floor(Math.random() * 1000)}`; const responseId = `llm-stream-${Date.now()}-${Math.floor(Math.random() * 1000)}`;
console.log(`[${responseId}] Setting up WebSocket streaming for session ${sessionId}`); console.log(`[${responseId}] Setting up WebSocket streaming for session ${sessionId}`);
// Function to safely perform cleanup
const performCleanup = () => {
if (cleanupTimeoutId) {
window.clearTimeout(cleanupTimeoutId);
cleanupTimeoutId = null;
}
console.log(`[${responseId}] Performing final cleanup of event listener`);
cleanupEventListener(eventListener);
onComplete();
resolve();
};
// Function to schedule cleanup with ability to cancel
const scheduleCleanup = (delay: number) => {
// Clear any existing cleanup timeout
if (cleanupTimeoutId) {
window.clearTimeout(cleanupTimeoutId);
}
console.log(`[${responseId}] Scheduling listener cleanup in ${delay}ms`);
// Set new cleanup timeout
cleanupTimeoutId = window.setTimeout(() => {
// Only clean up if no messages received recently (in last 2 seconds)
const timeSinceLastMessage = Date.now() - lastMessageTimestamp;
if (timeSinceLastMessage > 2000) {
performCleanup();
} else {
console.log(`[${responseId}] Received message recently, delaying cleanup`);
// Reschedule cleanup
scheduleCleanup(2000);
}
}, delay);
};
// Create a message handler for CustomEvents // Create a message handler for CustomEvents
eventListener = (event: Event) => { eventListener = (event: Event) => {
const customEvent = event as CustomEvent; const customEvent = event as CustomEvent;
@ -70,6 +110,16 @@ export async function setupStreamingResponse(
return; return;
} }
// Update last message timestamp
lastMessageTimestamp = Date.now();
// Cancel any pending cleanup when we receive a new message
if (cleanupTimeoutId) {
console.log(`[${responseId}] Cancelling scheduled cleanup due to new message`);
window.clearTimeout(cleanupTimeoutId);
cleanupTimeoutId = null;
}
console.log(`[${responseId}] LLM Stream message received via CustomEvent: session=${sessionId}, content=${!!message.content}, contentLength=${message.content?.length || 0}, thinking=${!!message.thinking}, toolExecution=${!!message.toolExecution}, done=${!!message.done}, type=${message.type || 'llm-stream'}`); console.log(`[${responseId}] LLM Stream message received via CustomEvent: session=${sessionId}, content=${!!message.content}, contentLength=${message.content?.length || 0}, thinking=${!!message.thinking}, toolExecution=${!!message.toolExecution}, done=${!!message.done}, type=${message.type || 'llm-stream'}`);
// Mark first message received // Mark first message received
@ -86,6 +136,7 @@ export async function setupStreamingResponse(
// Handle specific message types // Handle specific message types
if (message.type === 'tool_execution_start') { if (message.type === 'tool_execution_start') {
toolsExecuted = true; // Mark that tools were executed
onThinkingUpdate('Executing tools...'); onThinkingUpdate('Executing tools...');
// Also trigger tool execution UI with a specific format // Also trigger tool execution UI with a specific format
onToolExecution({ onToolExecution({
@ -97,6 +148,7 @@ export async function setupStreamingResponse(
} }
if (message.type === 'tool_result' && message.toolExecution) { if (message.type === 'tool_result' && message.toolExecution) {
toolsExecuted = true; // Mark that tools were executed
console.log(`[${responseId}] Processing tool result: ${JSON.stringify(message.toolExecution)}`); console.log(`[${responseId}] Processing tool result: ${JSON.stringify(message.toolExecution)}`);
// If tool execution doesn't have an action, add 'result' as the default // If tool execution doesn't have an action, add 'result' as the default
@ -113,10 +165,19 @@ export async function setupStreamingResponse(
// Then send the actual tool execution data // Then send the actual tool execution data
onToolExecution(message.toolExecution); onToolExecution(message.toolExecution);
// Mark tool execution as completed if this is a result or error
if (message.toolExecution.action === 'result' || message.toolExecution.action === 'complete' || message.toolExecution.action === 'error') {
toolExecutionCompleted = true;
console.log(`[${responseId}] Tool execution completed`);
}
return; // Skip accumulating content from this message return; // Skip accumulating content from this message
} }
if (message.type === 'tool_execution_error' && message.toolExecution) { if (message.type === 'tool_execution_error' && message.toolExecution) {
toolsExecuted = true; // Mark that tools were executed
toolExecutionCompleted = true; // Mark tool execution as completed
onToolExecution({ onToolExecution({
...message.toolExecution, ...message.toolExecution,
action: 'error', action: 'error',
@ -126,6 +187,8 @@ export async function setupStreamingResponse(
} }
if (message.type === 'tool_completion_processing') { if (message.type === 'tool_completion_processing') {
toolsExecuted = true; // Mark that tools were executed
toolExecutionCompleted = true; // Tools are done, now processing the result
onThinkingUpdate('Generating response with tool results...'); onThinkingUpdate('Generating response with tool results...');
// Also trigger tool execution UI with a specific format // Also trigger tool execution UI with a specific format
onToolExecution({ onToolExecution({
@ -138,10 +201,21 @@ export async function setupStreamingResponse(
// Handle content updates // Handle content updates
if (message.content) { if (message.content) {
receivedAnyContent = true;
console.log(`[${responseId}] Received content chunk of length ${message.content.length}, preview: "${message.content.substring(0, 50)}${message.content.length > 50 ? '...' : ''}"`); console.log(`[${responseId}] Received content chunk of length ${message.content.length}, preview: "${message.content.substring(0, 50)}${message.content.length > 50 ? '...' : ''}"`);
// If tools were executed and completed, and we're now getting new content,
// this is likely the final response after tool execution from Anthropic
if (toolsExecuted && toolExecutionCompleted && message.content) {
console.log(`[${responseId}] Post-tool execution content detected, resetting previous content`);
// Reset accumulated response for post-tool execution response
assistantResponse = message.content;
// Update the UI with the fresh content
onContentUpdate(assistantResponse, message.done || false);
} else {
// Standard content handling for non-tool cases or initial tool response
// Check if this is a duplicated message containing the same content we already have // Check if this is a duplicated message containing the same content we already have
if (message.done && assistantResponse.includes(message.content)) { if (message.done && assistantResponse.includes(message.content)) {
console.log(`[${responseId}] Ignoring duplicated content in done message`); console.log(`[${responseId}] Ignoring duplicated content in done message`);
@ -151,7 +225,10 @@ export async function setupStreamingResponse(
} }
// Update the UI immediately with each chunk // Update the UI immediately with each chunk
onContentUpdate(assistantResponse, false); onContentUpdate(assistantResponse, message.done || false);
}
receivedAnyContent = true;
// Reset timeout since we got content // Reset timeout since we got content
if (timeoutId !== null) { if (timeoutId !== null) {
@ -163,7 +240,7 @@ export async function setupStreamingResponse(
console.warn(`[${responseId}] Stream timeout for session ${sessionId}`); console.warn(`[${responseId}] Stream timeout for session ${sessionId}`);
// Clean up // Clean up
cleanupEventListener(eventListener); performCleanup();
reject(new Error('Stream timeout')); reject(new Error('Stream timeout'));
}, 30000); }, 30000);
} }
@ -173,6 +250,16 @@ export async function setupStreamingResponse(
// Only process if we haven't already handled this message via specific message types // Only process if we haven't already handled this message via specific message types
if (message.type === 'llm-stream' || !message.type) { if (message.type === 'llm-stream' || !message.type) {
console.log(`[${responseId}] Received tool execution update: action=${message.toolExecution.action || 'unknown'}`); console.log(`[${responseId}] Received tool execution update: action=${message.toolExecution.action || 'unknown'}`);
toolsExecuted = true; // Mark that tools were executed
// Mark tool execution as completed if this is a result or error
if (message.toolExecution.action === 'result' ||
message.toolExecution.action === 'complete' ||
message.toolExecution.action === 'error') {
toolExecutionCompleted = true;
console.log(`[${responseId}] Tool execution completed via toolExecution message`);
}
onToolExecution(message.toolExecution); onToolExecution(message.toolExecution);
} }
} }
@ -181,6 +268,7 @@ export async function setupStreamingResponse(
const toolCalls = message.tool_calls || (message.raw && message.raw.tool_calls); const toolCalls = message.tool_calls || (message.raw && message.raw.tool_calls);
if (toolCalls && Array.isArray(toolCalls)) { if (toolCalls && Array.isArray(toolCalls)) {
console.log(`[${responseId}] Received tool calls: ${toolCalls.length} tools`); console.log(`[${responseId}] Received tool calls: ${toolCalls.length} tools`);
toolsExecuted = true; // Mark that tools were executed
// First send a 'start' action to ensure the container is created // First send a 'start' action to ensure the container is created
onToolExecution({ onToolExecution({
@ -233,33 +321,20 @@ export async function setupStreamingResponse(
timeoutId = null; timeoutId = null;
} }
// Check if we have content in the done message - ONLY process if we haven't received any content yet // Make sure the final message is displayed
if (message.content && !receivedAnyContent) { if (message.content && !assistantResponse.includes(message.content)) {
console.log(`[${responseId}] Processing content in done message: ${message.content.length} chars`); console.log(`[${responseId}] Final message has unique content, using it`);
receivedAnyContent = true;
// Use content from done message as full response
console.log(`[${responseId}] Using content from done message as full response`);
assistantResponse = message.content; assistantResponse = message.content;
onContentUpdate(assistantResponse, true);
} else if (message.content) {
// We already have content, signal as done but don't duplicate
console.log(`[${responseId}] Content in done message ignored as we already have streamed content`);
onContentUpdate(assistantResponse, true);
} else {
// No content in done message, just mark as done
onContentUpdate(assistantResponse, true);
} }
// Set a short delay before cleanup to allow any immediately following // Always mark as done when we receive the done flag
// tool execution messages to be processed onContentUpdate(assistantResponse, true);
setTimeout(() => {
// Clean up and resolve // Set a longer delay before cleanup to allow for post-tool execution messages
console.log(`[${responseId}] Cleaning up event listener after delay`); // Especially important for Anthropic which may send final message after tool execution
cleanupEventListener(eventListener); const cleanupDelay = toolsExecuted ? 15000 : 1000; // 15 seconds if tools were used, otherwise 1 second
onComplete(); console.log(`[${responseId}] Setting cleanup delay of ${cleanupDelay}ms since toolsExecuted=${toolsExecuted}`);
resolve(); scheduleCleanup(cleanupDelay);
}, 1000); // 1 second delay to allow tool execution messages to arrive
} }
}; };