refactor(llm): streamline chat response handling by simplifying content accumulation and removing unnecessary thinking content processing

This commit is contained in:
perf3ct 2025-06-02 23:25:15 +00:00
parent aad92b57c7
commit e7e04b7ccd
No known key found for this signature in database
GPG Key ID: 569C4EEC436F5232
4 changed files with 87 additions and 398 deletions

View File

@ -62,15 +62,11 @@ export async function setupStreamingResponse(
): Promise<void> { ): Promise<void> {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
let assistantResponse = ''; let assistantResponse = '';
let postToolResponse = ''; // Separate accumulator for post-tool execution content
let receivedAnyContent = false; let receivedAnyContent = false;
let receivedPostToolContent = false; // Track if we've started receiving post-tool content
let timeoutId: number | null = null; let timeoutId: number | null = null;
let initialTimeoutId: number | null = null; let initialTimeoutId: number | null = null;
let cleanupTimeoutId: number | null = null; let cleanupTimeoutId: number | null = null;
let receivedAnyMessage = false; let receivedAnyMessage = false;
let toolsExecuted = false; // Flag to track if tools were executed in this session
let toolExecutionCompleted = false; // Flag to track if tool execution is completed
let eventListener: ((event: Event) => void) | null = null; let eventListener: ((event: Event) => void) | null = null;
let lastMessageTimestamp = 0; let lastMessageTimestamp = 0;
@ -118,28 +114,14 @@ export async function setupStreamingResponse(
resolve(); resolve();
}; };
// Function to schedule cleanup with ability to cancel // Set initial timeout to catch cases where no message is received at all
const scheduleCleanup = (delay: number) => { initialTimeoutId = window.setTimeout(() => {
// Clear any existing cleanup timeout if (!receivedAnyMessage) {
if (cleanupTimeoutId) { console.error(`[${responseId}] No initial message received within timeout`);
window.clearTimeout(cleanupTimeoutId); performCleanup();
reject(new Error('No response received from server'));
} }
}, 10000);
console.log(`[${responseId}] Scheduling listener cleanup in ${delay}ms`);
// Set new cleanup timeout
cleanupTimeoutId = window.setTimeout(() => {
// Only clean up if no messages received recently (in last 2 seconds)
const timeSinceLastMessage = Date.now() - lastMessageTimestamp;
if (timeSinceLastMessage > 2000) {
performCleanup();
} else {
console.log(`[${responseId}] Received message recently, delaying cleanup`);
// Reschedule cleanup
scheduleCleanup(2000);
}
}, delay);
};
// Create a message handler for CustomEvents // Create a message handler for CustomEvents
eventListener = (event: Event) => { eventListener = (event: Event) => {
@ -161,7 +143,7 @@ export async function setupStreamingResponse(
cleanupTimeoutId = null; cleanupTimeoutId = null;
} }
console.log(`[${responseId}] LLM Stream message received via CustomEvent: chatNoteId=${noteId}, content=${!!message.content}, contentLength=${message.content?.length || 0}, thinking=${!!message.thinking}, toolExecution=${!!message.toolExecution}, done=${!!message.done}, type=${message.type || 'llm-stream'}`); console.log(`[${responseId}] LLM Stream message received: content=${!!message.content}, contentLength=${message.content?.length || 0}, thinking=${!!message.thinking}, toolExecution=${!!message.toolExecution}, done=${!!message.done}`);
// Mark first message received // Mark first message received
if (!receivedAnyMessage) { if (!receivedAnyMessage) {
@ -175,109 +157,33 @@ export async function setupStreamingResponse(
} }
} }
// Handle specific message types // Handle error
if (message.type === 'tool_execution_start') { if (message.error) {
toolsExecuted = true; // Mark that tools were executed console.error(`[${responseId}] Stream error: ${message.error}`);
onThinkingUpdate('Executing tools...'); performCleanup();
// Also trigger tool execution UI with a specific format reject(new Error(message.error));
onToolExecution({ return;
action: 'start',
tool: 'tools',
result: 'Executing tools...'
});
return; // Skip accumulating content from this message
} }
if (message.type === 'tool_result' && message.toolExecution) { // Handle thinking updates - only show if showThinking is enabled
toolsExecuted = true; // Mark that tools were executed if (message.thinking && messageParams.showThinking) {
console.log(`[${responseId}] Processing tool result: ${JSON.stringify(message.toolExecution)}`); console.log(`[${responseId}] Received thinking: ${message.thinking.substring(0, 100)}...`);
onThinkingUpdate(message.thinking);
}
// If tool execution doesn't have an action, add 'result' as the default // Handle tool execution updates
if (!message.toolExecution.action) { if (message.toolExecution) {
message.toolExecution.action = 'result'; console.log(`[${responseId}] Tool execution update:`, message.toolExecution);
}
// First send a 'start' action to ensure the container is created
onToolExecution({
action: 'start',
tool: 'tools',
result: 'Tool execution initialized'
});
// Then send the actual tool execution data
onToolExecution(message.toolExecution); onToolExecution(message.toolExecution);
// Mark tool execution as completed if this is a result or error
if (message.toolExecution.action === 'result' || message.toolExecution.action === 'complete' || message.toolExecution.action === 'error') {
toolExecutionCompleted = true;
console.log(`[${responseId}] Tool execution completed`);
}
return; // Skip accumulating content from this message
}
if (message.type === 'tool_execution_error' && message.toolExecution) {
toolsExecuted = true; // Mark that tools were executed
toolExecutionCompleted = true; // Mark tool execution as completed
onToolExecution({
...message.toolExecution,
action: 'error',
error: message.toolExecution.error || 'Unknown error during tool execution'
});
return; // Skip accumulating content from this message
}
if (message.type === 'tool_completion_processing') {
toolsExecuted = true; // Mark that tools were executed
toolExecutionCompleted = true; // Tools are done, now processing the result
onThinkingUpdate('Generating response with tool results...');
// Also trigger tool execution UI with a specific format
onToolExecution({
action: 'generating',
tool: 'tools',
result: 'Generating response with tool results...'
});
return; // Skip accumulating content from this message
} }
// Handle content updates // Handle content updates
if (message.content) { if (message.content) {
console.log(`[${responseId}] Received content chunk of length ${message.content.length}, preview: "${message.content.substring(0, 50)}${message.content.length > 50 ? '...' : ''}"`); // Simply append the new content - no complex deduplication
assistantResponse += message.content;
// If tools were executed and completed, and we're now getting new content,
// this is likely the final response after tool execution from Anthropic
if (toolsExecuted && toolExecutionCompleted && message.content) {
console.log(`[${responseId}] Post-tool execution content detected`);
// If this is the first post-tool chunk, indicate we're starting a new response
if (!receivedPostToolContent) {
receivedPostToolContent = true;
postToolResponse = ''; // Clear any previous post-tool response
console.log(`[${responseId}] First post-tool content chunk, starting fresh accumulation`);
}
// Accumulate post-tool execution content
postToolResponse += message.content;
console.log(`[${responseId}] Accumulated post-tool content, now ${postToolResponse.length} chars`);
// Update the UI with the accumulated post-tool content
// This replaces the pre-tool content with our accumulated post-tool content
onContentUpdate(postToolResponse, message.done || false);
} else {
// Standard content handling for non-tool cases or initial tool response
// Check if this is a duplicated message containing the same content we already have
if (message.done && assistantResponse.includes(message.content)) {
console.log(`[${responseId}] Ignoring duplicated content in done message`);
} else {
// Add to our accumulated response
assistantResponse += message.content;
}
// Update the UI immediately with each chunk
onContentUpdate(assistantResponse, message.done || false);
}
// Update the UI immediately with each chunk
onContentUpdate(assistantResponse, message.done || false);
receivedAnyContent = true; receivedAnyContent = true;
// Reset timeout since we got content // Reset timeout since we got content
@ -288,150 +194,32 @@ export async function setupStreamingResponse(
// Set new timeout // Set new timeout
timeoutId = window.setTimeout(() => { timeoutId = window.setTimeout(() => {
console.warn(`[${responseId}] Stream timeout for chat note ${noteId}`); console.warn(`[${responseId}] Stream timeout for chat note ${noteId}`);
// Clean up
performCleanup(); performCleanup();
reject(new Error('Stream timeout')); reject(new Error('Stream timeout'));
}, 30000); }, 30000);
} }
// Handle tool execution updates (legacy format and standard format with llm-stream type)
if (message.toolExecution) {
// Only process if we haven't already handled this message via specific message types
if (message.type === 'llm-stream' || !message.type) {
console.log(`[${responseId}] Received tool execution update: action=${message.toolExecution.action || 'unknown'}`);
toolsExecuted = true; // Mark that tools were executed
// Mark tool execution as completed if this is a result or error
if (message.toolExecution.action === 'result' ||
message.toolExecution.action === 'complete' ||
message.toolExecution.action === 'error') {
toolExecutionCompleted = true;
console.log(`[${responseId}] Tool execution completed via toolExecution message`);
}
onToolExecution(message.toolExecution);
}
}
// Handle tool calls from the raw data or direct in message (OpenAI format)
const toolCalls = message.tool_calls || (message.raw && message.raw.tool_calls);
if (toolCalls && Array.isArray(toolCalls)) {
console.log(`[${responseId}] Received tool calls: ${toolCalls.length} tools`);
toolsExecuted = true; // Mark that tools were executed
// First send a 'start' action to ensure the container is created
onToolExecution({
action: 'start',
tool: 'tools',
result: 'Tool execution initialized'
});
// Then process each tool call
for (const toolCall of toolCalls) {
let args = toolCall.function?.arguments || {};
// Try to parse arguments if they're a string
if (typeof args === 'string') {
try {
args = JSON.parse(args);
} catch (e) {
console.log(`[${responseId}] Could not parse tool arguments as JSON: ${e}`);
args = { raw: args };
}
}
onToolExecution({
action: 'executing',
tool: toolCall.function?.name || 'unknown',
toolCallId: toolCall.id,
args: args
});
}
}
// Handle thinking state updates
if (message.thinking) {
console.log(`[${responseId}] Received thinking update: ${message.thinking.substring(0, 50)}...`);
onThinkingUpdate(message.thinking);
}
// Handle completion // Handle completion
if (message.done) { if (message.done) {
console.log(`[${responseId}] Stream completed for chat note ${noteId}, has content: ${!!message.content}, content length: ${message.content?.length || 0}, current response: ${assistantResponse.length} chars`); console.log(`[${responseId}] Stream completed for chat note ${noteId}, final response: ${assistantResponse.length} chars`);
// Dump message content to console for debugging // Clear all timeouts
if (message.content) {
console.log(`[${responseId}] CONTENT IN DONE MESSAGE (first 200 chars): "${message.content.substring(0, 200)}..."`);
// Check if the done message contains the exact same content as our accumulated response
// We normalize by removing whitespace to avoid false negatives due to spacing differences
const normalizedMessage = message.content.trim();
const normalizedResponse = assistantResponse.trim();
if (normalizedMessage === normalizedResponse) {
console.log(`[${responseId}] Final message is identical to accumulated response, no need to update`);
}
// If the done message is longer but contains our accumulated response, use the done message
else if (normalizedMessage.includes(normalizedResponse) && normalizedMessage.length > normalizedResponse.length) {
console.log(`[${responseId}] Final message is more complete than accumulated response, using it`);
assistantResponse = message.content;
}
// If the done message is different and not already included, append it to avoid duplication
else if (!normalizedResponse.includes(normalizedMessage) && normalizedMessage.length > 0) {
console.log(`[${responseId}] Final message has unique content, using it`);
assistantResponse = message.content;
}
// Otherwise, we already have the content accumulated, so no need to update
else {
console.log(`[${responseId}] Already have this content accumulated, not updating`);
}
}
// Clear timeout if set
if (timeoutId !== null) { if (timeoutId !== null) {
window.clearTimeout(timeoutId); window.clearTimeout(timeoutId);
timeoutId = null; timeoutId = null;
} }
// Always mark as done when we receive the done flag // Schedule cleanup after a brief delay to ensure all processing is complete
onContentUpdate(assistantResponse, true); cleanupTimeoutId = window.setTimeout(() => {
performCleanup();
// Set a longer delay before cleanup to allow for post-tool execution messages }, 100);
// Especially important for Anthropic which may send final message after tool execution
const cleanupDelay = toolsExecuted ? 15000 : 1000; // 15 seconds if tools were used, otherwise 1 second
console.log(`[${responseId}] Setting cleanup delay of ${cleanupDelay}ms since toolsExecuted=${toolsExecuted}`);
scheduleCleanup(cleanupDelay);
} }
}; };
// Register event listener for the custom event // Register the event listener for WebSocket messages
try { window.addEventListener('llm-stream-message', eventListener);
window.addEventListener('llm-stream-message', eventListener);
console.log(`[${responseId}] Event listener added for llm-stream-message events`);
} catch (err) {
console.error(`[${responseId}] Error setting up event listener:`, err);
reject(err);
return;
}
// Set initial timeout for receiving any message console.log(`[${responseId}] Event listener registered, waiting for messages...`);
initialTimeoutId = window.setTimeout(() => {
console.warn(`[${responseId}] No messages received for initial period in chat note ${noteId}`);
if (!receivedAnyMessage) {
console.error(`[${responseId}] WebSocket connection not established for chat note ${noteId}`);
if (timeoutId !== null) {
window.clearTimeout(timeoutId);
}
// Clean up
cleanupEventListener(eventListener);
// Show error message to user
reject(new Error('WebSocket connection not established'));
}
}, 10000);
}); });
} }

View File

@ -1068,16 +1068,6 @@ export default class LlmChatPanel extends BasicWidget {
* Update the UI with streaming content * Update the UI with streaming content
*/ */
private updateStreamingUI(assistantResponse: string, isDone: boolean = false) { private updateStreamingUI(assistantResponse: string, isDone: boolean = false) {
// Parse and handle thinking content if present
if (!isDone) {
const thinkingContent = this.parseThinkingContent(assistantResponse);
if (thinkingContent) {
this.updateThinkingText(thinkingContent);
// Don't display the raw response with think tags in the chat
return;
}
}
// Get the existing assistant message or create a new one // Get the existing assistant message or create a new one
let assistantMessageEl = this.noteContextChatMessages.querySelector('.assistant-message:last-child'); let assistantMessageEl = this.noteContextChatMessages.querySelector('.assistant-message:last-child');
@ -1099,12 +1089,9 @@ export default class LlmChatPanel extends BasicWidget {
assistantMessageEl.appendChild(messageContent); assistantMessageEl.appendChild(messageContent);
} }
// Clean the response to remove thinking tags before displaying // Update the content with the current response (no thinking content removal)
const cleanedResponse = this.removeThinkingTags(assistantResponse);
// Update the content
const messageContent = assistantMessageEl.querySelector('.message-content') as HTMLElement; const messageContent = assistantMessageEl.querySelector('.message-content') as HTMLElement;
messageContent.innerHTML = formatMarkdown(cleanedResponse); messageContent.innerHTML = formatMarkdown(assistantResponse);
// Apply syntax highlighting if this is the final update // Apply syntax highlighting if this is the final update
if (isDone) { if (isDone) {
@ -1120,65 +1107,24 @@ export default class LlmChatPanel extends BasicWidget {
this.messages.lastIndexOf(assistantMessages[assistantMessages.length - 1]) : -1; this.messages.lastIndexOf(assistantMessages[assistantMessages.length - 1]) : -1;
if (lastAssistantMsgIndex >= 0) { if (lastAssistantMsgIndex >= 0) {
// Update existing message with cleaned content // Update existing message
this.messages[lastAssistantMsgIndex].content = cleanedResponse; this.messages[lastAssistantMsgIndex].content = assistantResponse;
} else { } else {
// Add new message with cleaned content // Add new message
this.messages.push({ this.messages.push({
role: 'assistant', role: 'assistant',
content: cleanedResponse content: assistantResponse
}); });
} }
// Hide loading indicator // Save the data
hideLoadingIndicator(this.loadingIndicator); this.saveCurrentData();
// DON'T save here immediately - let the server save the accumulated response first
// to avoid race conditions. We'll reload the data from the server after a short delay.
console.log("Stream completed, waiting for server to save then reloading data...");
setTimeout(async () => {
try {
console.log("About to reload data from server...");
const currentMessageCount = this.messages.length;
console.log(`Current client message count before reload: ${currentMessageCount}`);
// Reload the data from the server which should have the complete conversation
const reloadSuccess = await this.loadSavedData();
const newMessageCount = this.messages.length;
console.log(`Reload success: ${reloadSuccess}, message count after reload: ${newMessageCount}`);
if (reloadSuccess && newMessageCount > currentMessageCount) {
console.log("Successfully reloaded data with more complete conversation");
} else if (!reloadSuccess) {
console.warn("Reload failed, keeping current client state");
} else {
console.warn("Reload succeeded but message count didn't increase");
}
} catch (error) {
console.error("Failed to reload data after stream completion:", error);
// Fallback: save our current state if reload fails
this.saveCurrentData().catch(err => {
console.error("Failed to save assistant response to note:", err);
});
}
}, 1500); // Wait 1.5 seconds for server to complete its save
} }
// Scroll to bottom // Scroll to bottom
this.chatContainer.scrollTop = this.chatContainer.scrollHeight; this.chatContainer.scrollTop = this.chatContainer.scrollHeight;
} }
/**
* Remove thinking tags from response content
*/
private removeThinkingTags(content: string): string {
if (!content) return content;
// Remove <think>...</think> blocks from the content
return content.replace(/<think>[\s\S]*?<\/think>/gi, '').trim();
}
/** /**
* Handle general errors in the send message flow * Handle general errors in the send message flow
*/ */

View File

@ -858,92 +858,52 @@ async function streamMessage(req: Request, res: Response) {
} }
} }
// Create request parameters for the pipeline // Import the WebSocket service to send immediate feedback
const requestParams = {
chatNoteId: chatNoteId,
content: enhancedContent,
useAdvancedContext: useAdvancedContext === true,
showThinking: showThinking === true,
stream: true // Always stream for this endpoint
};
// Create a fake request/response pair to pass to the handler
const fakeReq = {
...req,
method: 'GET', // Set to GET to indicate streaming
query: {
stream: 'true', // Set stream param - don't use format: 'stream' to avoid confusion
useAdvancedContext: String(useAdvancedContext === true),
showThinking: String(showThinking === true)
},
params: {
chatNoteId: chatNoteId
},
// Make sure the enhanced content is available to the handler
body: {
content: enhancedContent,
useAdvancedContext: useAdvancedContext === true,
showThinking: showThinking === true
}
} as unknown as Request;
// Log to verify correct parameters
log.info(`WebSocket stream settings - useAdvancedContext=${useAdvancedContext === true}, in query=${fakeReq.query.useAdvancedContext}, in body=${fakeReq.body.useAdvancedContext}`);
// Extra safety to ensure the parameters are passed correctly
if (useAdvancedContext === true) {
log.info(`Enhanced context IS enabled for this request`);
} else {
log.info(`Enhanced context is NOT enabled for this request`);
}
// Process the request in the background
Promise.resolve().then(async () => {
try {
await restChatService.handleSendMessage(fakeReq, res);
} catch (error) {
log.error(`Background message processing error: ${error}`);
// Import the WebSocket service
const wsService = (await import('../../services/ws.js')).default;
// Define LLMStreamMessage interface
interface LLMStreamMessage {
type: 'llm-stream';
chatNoteId: string;
content?: string;
thinking?: string;
toolExecution?: any;
done?: boolean;
error?: string;
raw?: unknown;
}
// Send error to client via WebSocket
wsService.sendMessageToAllClients({
type: 'llm-stream',
chatNoteId: chatNoteId,
error: `Error processing message: ${error}`,
done: true
} as LLMStreamMessage);
}
});
// Import the WebSocket service
const wsService = (await import('../../services/ws.js')).default; const wsService = (await import('../../services/ws.js')).default;
// Let the client know streaming has started via WebSocket (helps client confirm connection is working) // Let the client know streaming has started
wsService.sendMessageToAllClients({ wsService.sendMessageToAllClients({
type: 'llm-stream', type: 'llm-stream',
chatNoteId: chatNoteId, chatNoteId: chatNoteId,
thinking: 'Initializing streaming LLM response...' thinking: showThinking ? 'Initializing streaming LLM response...' : undefined
}); });
// Let the client know streaming has started via HTTP response // Process the streaming request directly
return { try {
success: true, const result = await restChatService.handleSendMessage({
message: 'Streaming started', ...req,
chatNoteId: chatNoteId method: 'GET', // Indicate streaming mode
}; query: {
...req.query,
stream: 'true' // Add the required stream parameter
},
body: {
content: enhancedContent,
useAdvancedContext: useAdvancedContext === true,
showThinking: showThinking === true
},
params: { chatNoteId }
} as unknown as Request, res);
// Since we're streaming, the result will be null
return {
success: true,
message: 'Streaming started',
chatNoteId: chatNoteId
};
} catch (error) {
log.error(`Error during streaming: ${error}`);
// Send error to client via WebSocket
wsService.sendMessageToAllClients({
type: 'llm-stream',
chatNoteId: chatNoteId,
error: `Error processing message: ${error}`,
done: true
});
throw error;
}
} catch (error: any) { } catch (error: any) {
log.error(`Error starting message stream: ${error.message}`); log.error(`Error starting message stream: ${error.message}`);
throw error; throw error;

View File

@ -231,21 +231,16 @@ class RestChatService {
if (data) { if (data) {
message.content = data; message.content = data;
// Handle accumulation carefully - if this appears to be a complete response // Simple accumulation - just append the new data
// (done=true and data is much longer than current accumulated), replace rather than append accumulatedContentRef.value += data;
if (done && data.length > accumulatedContentRef.value.length && data.includes(accumulatedContentRef.value)) {
// This looks like a complete final response that includes what we've accumulated
accumulatedContentRef.value = data;
} else {
// Normal incremental accumulation
accumulatedContentRef.value += data;
}
} }
// Only include thinking if explicitly present in rawChunk
if (rawChunk && 'thinking' in rawChunk && rawChunk.thinking) { if (rawChunk && 'thinking' in rawChunk && rawChunk.thinking) {
message.thinking = rawChunk.thinking as string; message.thinking = rawChunk.thinking as string;
} }
// Only include tool execution if explicitly present in rawChunk
if (rawChunk && 'toolExecution' in rawChunk && rawChunk.toolExecution) { if (rawChunk && 'toolExecution' in rawChunk && rawChunk.toolExecution) {
const toolExec = rawChunk.toolExecution; const toolExec = rawChunk.toolExecution;
message.toolExecution = { message.toolExecution = {
@ -262,7 +257,7 @@ class RestChatService {
// Send WebSocket message // Send WebSocket message
wsService.sendMessageToAllClients(message); wsService.sendMessageToAllClients(message);
// Send SSE response // Send SSE response for compatibility
const responseData: any = { content: data, done }; const responseData: any = { content: data, done };
if (rawChunk?.toolExecution) { if (rawChunk?.toolExecution) {
responseData.toolExecution = rawChunk.toolExecution; responseData.toolExecution = rawChunk.toolExecution;