fix the duplicated...messages through the websocket?

This commit is contained in:
perf3ct 2025-04-15 01:29:12 +00:00
parent 3bddb60ecc
commit aadb8cce5d
No known key found for this signature in database
GPG Key ID: 569C4EEC436F5232
2 changed files with 41 additions and 43 deletions

View File

@ -98,19 +98,19 @@ export async function setupStreamingResponse(
if (message.type === 'tool_result' && message.toolExecution) { if (message.type === 'tool_result' && message.toolExecution) {
console.log(`[${responseId}] Processing tool result: ${JSON.stringify(message.toolExecution)}`); console.log(`[${responseId}] Processing tool result: ${JSON.stringify(message.toolExecution)}`);
// If tool execution doesn't have an action, add 'result' as the default // If tool execution doesn't have an action, add 'result' as the default
if (!message.toolExecution.action) { if (!message.toolExecution.action) {
message.toolExecution.action = 'result'; message.toolExecution.action = 'result';
} }
// First send a 'start' action to ensure the container is created // First send a 'start' action to ensure the container is created
onToolExecution({ onToolExecution({
action: 'start', action: 'start',
tool: 'tools', tool: 'tools',
result: 'Tool execution initialized' result: 'Tool execution initialized'
}); });
// Then send the actual tool execution data // Then send the actual tool execution data
onToolExecution(message.toolExecution); onToolExecution(message.toolExecution);
return; // Skip accumulating content from this message return; // Skip accumulating content from this message
@ -142,8 +142,13 @@ export async function setupStreamingResponse(
console.log(`[${responseId}] Received content chunk of length ${message.content.length}, preview: "${message.content.substring(0, 50)}${message.content.length > 50 ? '...' : ''}"`); console.log(`[${responseId}] Received content chunk of length ${message.content.length}, preview: "${message.content.substring(0, 50)}${message.content.length > 50 ? '...' : ''}"`);
// Add to our accumulated response // Check if this is a duplicated message containing the same content we already have
assistantResponse += message.content; if (message.done && assistantResponse.includes(message.content)) {
console.log(`[${responseId}] Ignoring duplicated content in done message`);
} else {
// Add to our accumulated response
assistantResponse += message.content;
}
// Update the UI immediately with each chunk // Update the UI immediately with each chunk
onContentUpdate(assistantResponse, false); onContentUpdate(assistantResponse, false);
@ -171,23 +176,23 @@ export async function setupStreamingResponse(
onToolExecution(message.toolExecution); onToolExecution(message.toolExecution);
} }
} }
// Handle tool calls from the raw data or direct in message (OpenAI format) // Handle tool calls from the raw data or direct in message (OpenAI format)
const toolCalls = message.tool_calls || (message.raw && message.raw.tool_calls); const toolCalls = message.tool_calls || (message.raw && message.raw.tool_calls);
if (toolCalls && Array.isArray(toolCalls)) { if (toolCalls && Array.isArray(toolCalls)) {
console.log(`[${responseId}] Received tool calls: ${toolCalls.length} tools`); console.log(`[${responseId}] Received tool calls: ${toolCalls.length} tools`);
// First send a 'start' action to ensure the container is created // First send a 'start' action to ensure the container is created
onToolExecution({ onToolExecution({
action: 'start', action: 'start',
tool: 'tools', tool: 'tools',
result: 'Tool execution initialized' result: 'Tool execution initialized'
}); });
// Then process each tool call // Then process each tool call
for (const toolCall of toolCalls) { for (const toolCall of toolCalls) {
let args = toolCall.function?.arguments || {}; let args = toolCall.function?.arguments || {};
// Try to parse arguments if they're a string // Try to parse arguments if they're a string
if (typeof args === 'string') { if (typeof args === 'string') {
try { try {
@ -197,7 +202,7 @@ export async function setupStreamingResponse(
args = { raw: args }; args = { raw: args };
} }
} }
onToolExecution({ onToolExecution({
action: 'executing', action: 'executing',
tool: toolCall.function?.name || 'unknown', tool: toolCall.function?.name || 'unknown',
@ -228,25 +233,21 @@ export async function setupStreamingResponse(
timeoutId = null; timeoutId = null;
} }
// Check if we have content in the done message // Check if we have content in the done message - ONLY process if we haven't received any content yet
if (message.content) { if (message.content && !receivedAnyContent) {
console.log(`[${responseId}] Processing content in done message: ${message.content.length} chars`); console.log(`[${responseId}] Processing content in done message: ${message.content.length} chars`);
receivedAnyContent = true; receivedAnyContent = true;
// Replace current response if we didn't have content before or if it's empty // Use content from done message as full response
if (assistantResponse.length === 0) { console.log(`[${responseId}] Using content from done message as full response`);
console.log(`[${responseId}] Using content from done message as full response`); assistantResponse = message.content;
assistantResponse = message.content; onContentUpdate(assistantResponse, true);
} } else if (message.content) {
// Otherwise append it if it's different // We already have content, signal as done but don't duplicate
else if (message.content !== assistantResponse) { console.log(`[${responseId}] Content in done message ignored as we already have streamed content`);
console.log(`[${responseId}] Appending content from done message to existing response`); onContentUpdate(assistantResponse, true);
assistantResponse += message.content; } else {
} // No content in done message, just mark as done
else {
console.log(`[${responseId}] Content in done message is identical to existing response, not appending`);
}
onContentUpdate(assistantResponse, true); onContentUpdate(assistantResponse, true);
} }

View File

@ -544,11 +544,8 @@ class RestChatService {
if (done) { if (done) {
streamFinished = true; streamFinished = true;
// Always send the accumulated content with the done=true message // Don't send another "done:true" message here - we'll let the streaming handler
// This ensures the client receives the complete content even if earlier messages were missed // handle the completion notification with its own done:true message
message.content = messageContent;
log.info(`Stream complete, sending final message with ${messageContent.length} chars of content`);
// Store the response in the session when done // Store the response in the session when done
session.messages.push({ session.messages.push({
@ -1241,17 +1238,17 @@ class RestChatService {
toolExecution: chunk.raw.toolExecution toolExecution: chunk.raw.toolExecution
} as LLMStreamMessage); } as LLMStreamMessage);
} }
// Handle direct tool_calls in the response (for OpenAI) // Handle direct tool_calls in the response (for OpenAI)
if (chunk.tool_calls && chunk.tool_calls.length > 0) { if (chunk.tool_calls && chunk.tool_calls.length > 0) {
log.info(`Detected direct tool_calls in stream chunk: ${chunk.tool_calls.length} tools`); log.info(`Detected direct tool_calls in stream chunk: ${chunk.tool_calls.length} tools`);
// Send tool execution notification // Send tool execution notification
wsService.sendMessageToAllClients({ wsService.sendMessageToAllClients({
type: 'tool_execution_start', type: 'tool_execution_start',
sessionId sessionId
} as LLMStreamMessage); } as LLMStreamMessage);
// Process each tool call // Process each tool call
for (const toolCall of chunk.tool_calls) { for (const toolCall of chunk.tool_calls) {
// Process arguments // Process arguments
@ -1264,7 +1261,7 @@ class RestChatService {
args = { raw: args }; args = { raw: args };
} }
} }
// Format into a standardized tool execution message // Format into a standardized tool execution message
wsService.sendMessageToAllClients({ wsService.sendMessageToAllClients({
type: 'tool_result', type: 'tool_result',
@ -1278,17 +1275,17 @@ class RestChatService {
} as LLMStreamMessage); } as LLMStreamMessage);
} }
} }
// Also handle tool_calls in raw data if present but not directly in chunk // Also handle tool_calls in raw data if present but not directly in chunk
if (!chunk.tool_calls && chunk.raw?.tool_calls && Array.isArray(chunk.raw.tool_calls)) { if (!chunk.tool_calls && chunk.raw?.tool_calls && Array.isArray(chunk.raw.tool_calls)) {
log.info(`Detected tool_calls in raw data: ${chunk.raw.tool_calls.length} tools`); log.info(`Detected tool_calls in raw data: ${chunk.raw.tool_calls.length} tools`);
// Send tool execution notification if we haven't already // Send tool execution notification if we haven't already
wsService.sendMessageToAllClients({ wsService.sendMessageToAllClients({
type: 'tool_execution_start', type: 'tool_execution_start',
sessionId sessionId
} as LLMStreamMessage); } as LLMStreamMessage);
// Process each tool call // Process each tool call
for (const toolCall of chunk.raw.tool_calls) { for (const toolCall of chunk.raw.tool_calls) {
// Process arguments // Process arguments
@ -1301,7 +1298,7 @@ class RestChatService {
args = { raw: args }; args = { raw: args };
} }
} }
// Format into a standardized tool execution message // Format into a standardized tool execution message
wsService.sendMessageToAllClients({ wsService.sendMessageToAllClients({
type: 'tool_result', type: 'tool_result',
@ -1358,15 +1355,15 @@ class RestChatService {
// Only send final done message if it wasn't already sent with content // Only send final done message if it wasn't already sent with content
// This ensures we don't duplicate the content but still mark completion // This ensures we don't duplicate the content but still mark completion
if (!chunk.text) { if (!chunk.text) {
// Send final message with both content and done flag together log.info(`No content in final chunk, sending explicit completion message`);
// Send final message with done flag only (no content)
// This avoids sending the entire messageContent again and causing duplicates
wsService.sendMessageToAllClients({ wsService.sendMessageToAllClients({
type: 'llm-stream', type: 'llm-stream',
sessionId, sessionId,
content: messageContent, // Send the accumulated content
done: true done: true
} as LLMStreamMessage); } as LLMStreamMessage);
log.info(`Sent explicit final completion message with accumulated content`);
} else { } else {
log.info(`Final done flag was already sent with content chunk, no need for extra message`); log.info(`Final done flag was already sent with content chunk, no need for extra message`);
} }