diff --git a/src/services/llm/pipeline/chat_pipeline.ts b/src/services/llm/pipeline/chat_pipeline.ts index 073cf6d7a..c3051030e 100644 --- a/src/services/llm/pipeline/chat_pipeline.ts +++ b/src/services/llm/pipeline/chat_pipeline.ts @@ -127,7 +127,7 @@ export class ChatPipeline { // Determine if we should use tools or semantic context const useTools = modelSelection.options.enableTools === true; const useEnhancedContext = input.options?.useAdvancedContext === true; - + // Log details about the advanced context parameter log.info(`Enhanced context option check: input.options=${JSON.stringify(input.options || {})}`); log.info(`Enhanced context decision: useEnhancedContext=${useEnhancedContext}, hasQuery=${!!input.query}`); @@ -236,17 +236,17 @@ export class ChatPipeline { const streamFormatRequested = input.format === 'stream'; const streamRequestedInOptions = modelSelection.options.stream === true; const streamCallbackAvailable = typeof streamCallback === 'function'; - + log.info(`[ChatPipeline] Request type info - Format: ${input.format || 'not specified'}, Options from pipelineInput: ${JSON.stringify({stream: input.options?.stream})}`); log.info(`[ChatPipeline] Stream settings - config.enableStreaming: ${streamEnabledInConfig}, format parameter: ${input.format}, modelSelection.options.stream: ${modelSelection.options.stream}, streamCallback available: ${streamCallbackAvailable}`); - + // IMPORTANT: Respect the existing stream option but with special handling for callbacks: // 1. If a stream callback is available, streaming MUST be enabled for it to work // 2. Otherwise, preserve the original stream setting from input options - + // First, determine what the stream value should be based on various factors: let shouldEnableStream = modelSelection.options.stream; - + if (streamCallbackAvailable) { // If we have a stream callback, we NEED to enable streaming // This is critical for GET requests with EventSource @@ -265,12 +265,12 @@ export class ChatPipeline { log.info(`[ChatPipeline] No explicit stream settings, using config default: ${streamEnabledInConfig}`); shouldEnableStream = streamEnabledInConfig; } - + // Set the final stream option modelSelection.options.stream = shouldEnableStream; - + log.info(`[ChatPipeline] Final streaming decision: stream=${shouldEnableStream}, will stream to client=${streamCallbackAvailable && shouldEnableStream}`); - + // STAGE 5 & 6: Handle LLM completion and tool execution loop log.info(`========== STAGE 5: LLM COMPLETION ==========`); @@ -282,7 +282,7 @@ export class ChatPipeline { this.updateStageMetrics('llmCompletion', llmStartTime); log.info(`Received LLM response from model: ${completion.response.model}, provider: ${completion.response.provider}`); - // Handle streaming if enabled and available + // Handle streaming if enabled and available // Use shouldEnableStream variable which contains our streaming decision if (shouldEnableStream && completion.response.stream && streamCallback) { // Setup stream handler that passes chunks through response processing @@ -344,7 +344,8 @@ export class ChatPipeline { // If streaming was enabled, send an update to the user if (isStreaming && streamCallback) { streamingPaused = true; - await streamCallback('', true); // Signal pause in streaming + // IMPORTANT: Don't send done:true here, as it causes the client to stop processing messages + // Instead, send a marker message that indicates tools will be executed await streamCallback('\n\n[Executing tools...]\n\n', false); } @@ -566,8 +567,15 @@ export class ChatPipeline { // If streaming was paused for tool execution, resume it now with the final response if (isStreaming && streamCallback && streamingPaused) { + // First log for debugging + log.info(`Resuming streaming with final response: ${currentResponse.text.length} chars`); + // Resume streaming with the final response text + // This is where we send the definitive done:true signal with the complete content await streamCallback(currentResponse.text, true); + + // Log confirmation + log.info(`Sent final response with done=true signal`); } } else if (toolsEnabled) { log.info(`========== NO TOOL CALLS DETECTED ==========`); diff --git a/src/services/llm/providers/ollama_service.ts b/src/services/llm/providers/ollama_service.ts index e83c86b87..6121f0c52 100644 --- a/src/services/llm/providers/ollama_service.ts +++ b/src/services/llm/providers/ollama_service.ts @@ -476,29 +476,14 @@ export class OllamaService extends BaseAIService { // Call the callback with the current chunk content if (opts.streamCallback) { try { - // For the final chunk, make sure to send the complete text with done=true - if (chunk.done) { - log.info(`Sending final callback with done=true and complete content (${completeText.length} chars)`); - await opts.streamCallback( - completeText, // Send the full accumulated content for the final chunk - true, - { ...chunk, message: { ...chunk.message, content: completeText } } - ); - } else if (chunk.message?.content) { - // For content chunks, send them as they come - await opts.streamCallback( - chunk.message.content, - !!chunk.done, - chunk - ); - } else if (chunk.message?.tool_calls && chunk.message.tool_calls.length > 0) { - // For tool call chunks, send an empty content string but include the tool calls - await opts.streamCallback( - '', - !!chunk.done, - chunk - ); - } + // Don't send done:true when tool calls are present to avoid premature completion + const shouldMarkAsDone = !!chunk.done && !responseToolCalls.length; + + await opts.streamCallback( + chunk.message?.content || '', + shouldMarkAsDone, + chunk + ); if (chunkCount === 1) { log.info(`Successfully called streamCallback with first chunk`);