diff --git a/src/services/llm/providers/ollama_service.ts b/src/services/llm/providers/ollama_service.ts index da3c42845..71bd8bdd8 100644 --- a/src/services/llm/providers/ollama_service.ts +++ b/src/services/llm/providers/ollama_service.ts @@ -1,4 +1,3 @@ -import options from '../../options.js'; import { BaseAIService } from '../base_ai_service.js'; import type { Message, ChatCompletionOptions, ChatResponse, StreamChunk } from '../ai_interface.js'; import { OllamaMessageFormatter } from '../formatters/ollama_formatter.js'; @@ -8,6 +7,8 @@ import toolRegistry from '../tools/tool_registry.js'; import type { OllamaOptions } from './provider_options.js'; import { getOllamaOptions } from './providers.js'; import { Ollama, type ChatRequest, type ChatResponse as OllamaChatResponse } from 'ollama'; +import options from '../../options.js'; +import { StreamProcessor, createStreamHandler } from './stream_handler.js'; // Add an interface for tool execution feedback status interface ToolExecutionStatus { @@ -268,127 +269,86 @@ export class OllamaService extends BaseAIService { // Log detailed information about the streaming setup log.info(`Ollama streaming details: model=${providerOptions.model}, streamCallback=${opts.streamCallback ? 'provided' : 'not provided'}`); - // Create a stream handler function that processes the SDK's stream - const streamHandler = async (callback: (chunk: StreamChunk) => Promise | void): Promise => { - let completeText = ''; - let responseToolCalls: any[] = []; - let chunkCount = 0; + // Create a stream handler using our reusable StreamProcessor + const streamHandler = createStreamHandler( + { + providerName: this.getName(), + modelName: providerOptions.model, + streamCallback: opts.streamCallback + }, + async (callback) => { + let completeText = ''; + let responseToolCalls: any[] = []; + let chunkCount = 0; - try { - // Create streaming request - const streamingRequest = { - ...requestOptions, - stream: true as const // Use const assertion to fix the type - }; - - log.info(`Creating Ollama streaming request with options: model=${streamingRequest.model}, stream=${streamingRequest.stream}, tools=${streamingRequest.tools ? streamingRequest.tools.length : 0}`); - - // Get the async iterator - log.info(`Calling Ollama chat API with streaming enabled`); - let streamIterator; try { - log.info(`About to call client.chat with streaming request to ${options.getOption('ollamaBaseUrl')}`); - log.info(`Stream request: model=${streamingRequest.model}, messages count=${streamingRequest.messages?.length || 0}`); + // Create streaming request + const streamingRequest = { + ...requestOptions, + stream: true as const + }; - // Check if we can connect to Ollama by getting available models + log.info(`Creating Ollama streaming request with options: model=${streamingRequest.model}, stream=${streamingRequest.stream}, tools=${streamingRequest.tools ? streamingRequest.tools.length : 0}`); + + // Perform health check try { log.info(`Performing Ollama health check...`); const healthCheck = await client.list(); log.info(`Ollama health check successful. Available models: ${healthCheck.models.map(m => m.name).join(', ')}`); } catch (healthError) { log.error(`Ollama health check failed: ${healthError instanceof Error ? healthError.message : String(healthError)}`); - log.error(`This indicates a connection issue to the Ollama server at ${options.getOption('ollamaBaseUrl')}`); throw new Error(`Unable to connect to Ollama server: ${healthError instanceof Error ? healthError.message : String(healthError)}`); } - // Make the streaming request - log.info(`Proceeding with Ollama streaming request after successful health check`); - streamIterator = await client.chat(streamingRequest); - - log.info(`Successfully obtained Ollama stream iterator`); + // Get the stream iterator + log.info(`Getting stream iterator from Ollama`); + const streamIterator = await client.chat(streamingRequest); if (!streamIterator || typeof streamIterator[Symbol.asyncIterator] !== 'function') { - log.error(`Invalid stream iterator returned: ${JSON.stringify(streamIterator)}`); - throw new Error('Stream iterator is not valid'); + throw new Error('Invalid stream iterator returned'); } - } catch (error) { - log.error(`Error getting stream iterator: ${error instanceof Error ? error.message : String(error)}`); - log.error(`Error stack: ${error instanceof Error ? error.stack : 'No stack trace'}`); - throw error; - } - // Process each chunk - try { - log.info(`About to start processing stream chunks`); + // Process each chunk using our stream processor for await (const chunk of streamIterator) { chunkCount++; - // Log first chunk and then periodic updates - if (chunkCount === 1 || chunkCount % 10 === 0) { - log.info(`Processing Ollama stream chunk #${chunkCount}, done=${!!chunk.done}, has content=${!!chunk.message?.content}`); + // Process the chunk and update our accumulated text + const result = await StreamProcessor.processChunk( + chunk, + completeText, + chunkCount, + { providerName: this.getName(), modelName: providerOptions.model } + ); + + completeText = result.completeText; + + // Extract any tool calls + const toolCalls = StreamProcessor.extractToolCalls(chunk); + if (toolCalls.length > 0) { + responseToolCalls = toolCalls; } - // Accumulate text - if (chunk.message?.content) { - const newContent = chunk.message.content; - completeText += newContent; - - if (chunkCount === 1) { - log.info(`First content chunk received: "${newContent.substring(0, 50)}${newContent.length > 50 ? '...' : ''}"`); - } - } - - // Check for tool calls - if (chunk.message?.tool_calls && chunk.message.tool_calls.length > 0) { - responseToolCalls = [...chunk.message.tool_calls]; - log.info(`Received tool calls in stream: ${chunk.message.tool_calls.length} tools`); - } - - // Send the chunk to the caller + // Send to callback await callback({ text: chunk.message?.content || '', - done: false, // Never mark as done during chunk processing - raw: chunk // Include the raw chunk for advanced processing + done: false, // Add done property to satisfy StreamChunk + raw: chunk }); - // If this is the done chunk, log it - if (chunk.done) { - log.info(`Reached final chunk (done=true) after ${chunkCount} chunks, total content length: ${completeText.length}`); + // Log completion + if (chunk.done && !result.logged) { + log.info(`Reached final chunk after ${chunkCount} chunks, content length: ${completeText.length} chars`); } } - log.info(`Completed streaming from Ollama: processed ${chunkCount} chunks, total content: ${completeText.length} chars`); - - // Signal completion with a separate final callback after all processing is done - await callback({ - text: '', - done: true - }); - } catch (streamProcessError) { - log.error(`Error processing Ollama stream: ${streamProcessError instanceof Error ? streamProcessError.message : String(streamProcessError)}`); - log.error(`Stream process error stack: ${streamProcessError instanceof Error ? streamProcessError.stack : 'No stack trace'}`); - - // Try to signal completion with error - try { - await callback({ - text: '', - done: true, - raw: { error: streamProcessError instanceof Error ? streamProcessError.message : String(streamProcessError) } - }); - } catch (finalError) { - log.error(`Error sending final error chunk: ${finalError}`); - } - - throw streamProcessError; + return completeText; + } catch (error) { + log.error(`Error in Ollama streaming: ${error}`); + log.error(`Error details: ${error instanceof Error ? error.stack : 'No stack trace available'}`); + throw error; } - - return completeText; - } catch (error) { - log.error(`Error in Ollama streaming: ${error}`); - log.error(`Error details: ${error instanceof Error ? error.stack : 'No stack trace available'}`); - throw error; } - }; + ); // Handle direct streamCallback if provided if (opts.streamCallback) { @@ -452,67 +412,45 @@ export class OllamaService extends BaseAIService { chunkCount++; finalChunk = chunk; - // Log first chunk and periodic updates - if (chunkCount === 1 || chunkCount % 10 === 0) { - log.info(`Processing Ollama direct stream chunk #${chunkCount}, done=${!!chunk.done}, has content=${!!chunk.message?.content}`); - } + // Process chunk with StreamProcessor + const result = await StreamProcessor.processChunk( + chunk, + completeText, + chunkCount, + { providerName: this.getName(), modelName: providerOptions.model } + ); - // Accumulate text - if (chunk.message?.content) { - const newContent = chunk.message.content; - completeText += newContent; + completeText = result.completeText; - if (chunkCount === 1) { - log.info(`First direct content chunk: "${newContent.substring(0, 50)}${newContent.length > 50 ? '...' : ''}"`); + // Extract tool calls + const toolCalls = StreamProcessor.extractToolCalls(chunk); + if (toolCalls.length > 0) { + responseToolCalls = toolCalls; } - } - // Check for tool calls - if (chunk.message?.tool_calls && chunk.message.tool_calls.length > 0) { - responseToolCalls = [...chunk.message.tool_calls]; - log.info(`Received tool calls in direct stream: ${chunk.message.tool_calls.length} tools`); - } - - // Call the callback with the current chunk content - if (opts.streamCallback) { - try { - // Only mark as done on the final chunk if we have actual content - // This ensures consistent behavior with and without tool calls - // We'll send a separate final callback after the loop completes - const shouldMarkAsDone = false; // Never mark as done during chunk processing - - await opts.streamCallback( + // Call the callback with the current chunk content + if (opts.streamCallback) { + await StreamProcessor.sendChunkToCallback( + opts.streamCallback, chunk.message?.content || '', - shouldMarkAsDone, - chunk + false, // Never mark as done during processing + chunk, + chunkCount ); + } - if (chunkCount === 1) { - log.info(`Successfully called streamCallback with first chunk`); - } - } catch (callbackError) { - log.error(`Error in streamCallback: ${callbackError}`); + // If this is the done chunk, log it + if (chunk.done && !result.logged) { + log.info(`Reached final direct chunk (done=true) after ${chunkCount} chunks, total content length: ${completeText.length}`); } } - // If this is the done chunk, log it - if (chunk.done) { - log.info(`Reached final direct chunk (done=true) after ${chunkCount} chunks, total content length: ${completeText.length}`); + // Send one final callback with done=true after all chunks have been processed + if (opts.streamCallback) { + await StreamProcessor.sendFinalCallback(opts.streamCallback, completeText); } - } - // Send one final callback with done=true after all chunks have been processed - // This ensures we get the complete response regardless of tool calls - if (opts.streamCallback) { - try { - log.info(`Sending final done=true callback after processing all chunks`); - await opts.streamCallback('', true, { done: true }); - } catch (finalCallbackError) { - log.error(`Error in final streamCallback: ${finalCallbackError}`); - } - } - - log.info(`Completed direct streaming from Ollama: processed ${chunkCount} chunks, final content: ${completeText.length} chars`); + log.info(`Completed direct streaming from Ollama: processed ${chunkCount} chunks, final content: ${completeText.length} chars`); } catch (iterationError) { log.error(`Error iterating through Ollama stream chunks: ${iterationError instanceof Error ? iterationError.message : String(iterationError)}`); log.error(`Iteration error stack: ${iterationError instanceof Error ? iterationError.stack : 'No stack trace'}`); @@ -520,17 +458,17 @@ export class OllamaService extends BaseAIService { } // Create the final response after streaming is complete - return { - text: completeText, - model: providerOptions.model, - provider: this.getName(), - tool_calls: this.transformToolCalls(responseToolCalls), - usage: { + return StreamProcessor.createFinalResponse( + completeText, + providerOptions.model, + this.getName(), + this.transformToolCalls(responseToolCalls), + { promptTokens: finalChunk?.prompt_eval_count || 0, completionTokens: finalChunk?.eval_count || 0, totalTokens: (finalChunk?.prompt_eval_count || 0) + (finalChunk?.eval_count || 0) } - }; + ); } catch (error) { log.error(`Error in Ollama streaming with callback: ${error}`); log.error(`Error details: ${error instanceof Error ? error.stack : 'No stack trace available'}`); @@ -543,7 +481,8 @@ export class OllamaService extends BaseAIService { text: '', // Initial text is empty, will be populated during streaming model: providerOptions.model, provider: this.getName(), - stream: streamHandler + stream: streamHandler as (callback: (chunk: StreamChunk) => Promise | void) => Promise + }; } diff --git a/src/services/llm/providers/stream_handler.ts b/src/services/llm/providers/stream_handler.ts new file mode 100644 index 000000000..192e729e1 --- /dev/null +++ b/src/services/llm/providers/stream_handler.ts @@ -0,0 +1,189 @@ +/** + * Stream Handler - Reusable streaming implementation for LLM providers + * + * This module provides common streaming utilities that can be used by any LLM provider. + * It abstracts the complexities of handling streaming responses and tool executions. + */ + +import type { StreamChunk as BaseStreamChunk, ChatCompletionOptions } from '../ai_interface.js'; +import log from '../../log.js'; + +/** + * Extended StreamChunk interface that makes 'done' optional for internal use + */ +export interface StreamChunk extends Omit { + done?: boolean; +} + +/** + * Stream processing options + */ +export interface StreamProcessingOptions { + streamCallback?: (text: string, done: boolean, chunk?: any) => Promise | void; + providerName: string; + modelName: string; +} + +/** + * Stream processor that handles common streaming operations + */ +export class StreamProcessor { + /** + * Process an individual chunk from a streaming response + */ + static async processChunk( + chunk: any, + completeText: string, + chunkCount: number, + options: StreamProcessingOptions + ): Promise<{completeText: string, logged: boolean}> { + let textToAdd = ''; + let logged = false; + + // Log first chunk and periodic updates + if (chunkCount === 1 || chunkCount % 10 === 0) { + log.info(`Processing ${options.providerName} stream chunk #${chunkCount}, done=${!!chunk.done}, has content=${!!chunk.message?.content}`); + logged = true; + } + + // Extract content if available + if (chunk.message?.content) { + textToAdd = chunk.message.content; + const newCompleteText = completeText + textToAdd; + + if (chunkCount === 1) { + log.info(`First content chunk: "${textToAdd.substring(0, 50)}${textToAdd.length > 50 ? '...' : ''}"`); + } + + return { completeText: newCompleteText, logged }; + } + + return { completeText, logged }; + } + + /** + * Send a streaming chunk to the callback + */ + static async sendChunkToCallback( + callback: (text: string, done: boolean, chunk?: any) => Promise | void, + content: string, + done: boolean, + chunk: any, + chunkNumber: number + ): Promise { + try { + const result = callback(content || '', done, chunk); + // Handle both Promise and void return types + if (result instanceof Promise) { + await result; + } + + if (chunkNumber === 1) { + log.info(`Successfully called streamCallback with first chunk`); + } + } catch (callbackError) { + log.error(`Error in streamCallback: ${callbackError}`); + } + } + + /** + * Send final completion callback + */ + static async sendFinalCallback( + callback: (text: string, done: boolean, chunk?: any) => Promise | void, + completeText: string + ): Promise { + try { + log.info(`Sending final done=true callback after processing all chunks`); + const result = callback('', true, { done: true }); + // Handle both Promise and void return types + if (result instanceof Promise) { + await result; + } + } catch (finalCallbackError) { + log.error(`Error in final streamCallback: ${finalCallbackError}`); + } + } + + /** + * Detect and extract tool calls from a response chunk + */ + static extractToolCalls(chunk: any): any[] { + if (chunk.message?.tool_calls && + Array.isArray(chunk.message.tool_calls) && + chunk.message.tool_calls.length > 0) { + + log.info(`Detected ${chunk.message.tool_calls.length} tool calls in stream chunk`); + return [...chunk.message.tool_calls]; + } + + return []; + } + + /** + * Create a standard response object from streaming results + */ + static createFinalResponse( + completeText: string, + modelName: string, + providerName: string, + toolCalls: any[], + usage: any = {} + ) { + return { + text: completeText, + model: modelName, + provider: providerName, + tool_calls: toolCalls, + usage + }; + } +} + +/** + * Create a streaming handler that follows a consistent pattern + */ +export function createStreamHandler( + options: StreamProcessingOptions, + streamImplementation: (callback: (chunk: StreamChunk) => Promise) => Promise +) { + // Return a standard stream handler function that providers can use + return async (callback: (chunk: BaseStreamChunk) => Promise): Promise => { + let completeText = ''; + let chunkCount = 0; + + try { + // Call the provided implementation + return await streamImplementation(async (chunk: StreamChunk) => { + chunkCount++; + + // Process the chunk + if (chunk.text) { + completeText += chunk.text; + } + + // Forward to callback - ensure done is always boolean for BaseStreamChunk + await callback({ + text: chunk.text || '', + done: !!chunk.done, // Ensure done is boolean + raw: chunk.raw || chunk // Include raw data + }); + }); + } catch (error) { + log.error(`Error in stream handler: ${error}`); + throw error; + } finally { + // Always ensure a final done=true chunk is sent + if (chunkCount > 0) { + try { + await callback({ + text: '', + done: true + }); + } catch (e) { + log.error(`Error sending final chunk: ${e}`); + } + } + } + }; +}