do a better job of implementing a stream_handler

This commit is contained in:
perf3ct 2025-04-12 17:13:37 +00:00
parent 253dbf92fa
commit 519076148d
No known key found for this signature in database
GPG Key ID: 569C4EEC436F5232
2 changed files with 277 additions and 149 deletions

View File

@ -1,4 +1,3 @@
import options from '../../options.js';
import { BaseAIService } from '../base_ai_service.js';
import type { Message, ChatCompletionOptions, ChatResponse, StreamChunk } from '../ai_interface.js';
import { OllamaMessageFormatter } from '../formatters/ollama_formatter.js';
@ -8,6 +7,8 @@ import toolRegistry from '../tools/tool_registry.js';
import type { OllamaOptions } from './provider_options.js';
import { getOllamaOptions } from './providers.js';
import { Ollama, type ChatRequest, type ChatResponse as OllamaChatResponse } from 'ollama';
import options from '../../options.js';
import { StreamProcessor, createStreamHandler } from './stream_handler.js';
// Add an interface for tool execution feedback status
interface ToolExecutionStatus {
@ -268,127 +269,86 @@ export class OllamaService extends BaseAIService {
// Log detailed information about the streaming setup
log.info(`Ollama streaming details: model=${providerOptions.model}, streamCallback=${opts.streamCallback ? 'provided' : 'not provided'}`);
// Create a stream handler function that processes the SDK's stream
const streamHandler = async (callback: (chunk: StreamChunk) => Promise<void> | void): Promise<string> => {
let completeText = '';
let responseToolCalls: any[] = [];
let chunkCount = 0;
// Create a stream handler using our reusable StreamProcessor
const streamHandler = createStreamHandler(
{
providerName: this.getName(),
modelName: providerOptions.model,
streamCallback: opts.streamCallback
},
async (callback) => {
let completeText = '';
let responseToolCalls: any[] = [];
let chunkCount = 0;
try {
// Create streaming request
const streamingRequest = {
...requestOptions,
stream: true as const // Use const assertion to fix the type
};
log.info(`Creating Ollama streaming request with options: model=${streamingRequest.model}, stream=${streamingRequest.stream}, tools=${streamingRequest.tools ? streamingRequest.tools.length : 0}`);
// Get the async iterator
log.info(`Calling Ollama chat API with streaming enabled`);
let streamIterator;
try {
log.info(`About to call client.chat with streaming request to ${options.getOption('ollamaBaseUrl')}`);
log.info(`Stream request: model=${streamingRequest.model}, messages count=${streamingRequest.messages?.length || 0}`);
// Create streaming request
const streamingRequest = {
...requestOptions,
stream: true as const
};
// Check if we can connect to Ollama by getting available models
log.info(`Creating Ollama streaming request with options: model=${streamingRequest.model}, stream=${streamingRequest.stream}, tools=${streamingRequest.tools ? streamingRequest.tools.length : 0}`);
// Perform health check
try {
log.info(`Performing Ollama health check...`);
const healthCheck = await client.list();
log.info(`Ollama health check successful. Available models: ${healthCheck.models.map(m => m.name).join(', ')}`);
} catch (healthError) {
log.error(`Ollama health check failed: ${healthError instanceof Error ? healthError.message : String(healthError)}`);
log.error(`This indicates a connection issue to the Ollama server at ${options.getOption('ollamaBaseUrl')}`);
throw new Error(`Unable to connect to Ollama server: ${healthError instanceof Error ? healthError.message : String(healthError)}`);
}
// Make the streaming request
log.info(`Proceeding with Ollama streaming request after successful health check`);
streamIterator = await client.chat(streamingRequest);
log.info(`Successfully obtained Ollama stream iterator`);
// Get the stream iterator
log.info(`Getting stream iterator from Ollama`);
const streamIterator = await client.chat(streamingRequest);
if (!streamIterator || typeof streamIterator[Symbol.asyncIterator] !== 'function') {
log.error(`Invalid stream iterator returned: ${JSON.stringify(streamIterator)}`);
throw new Error('Stream iterator is not valid');
throw new Error('Invalid stream iterator returned');
}
} catch (error) {
log.error(`Error getting stream iterator: ${error instanceof Error ? error.message : String(error)}`);
log.error(`Error stack: ${error instanceof Error ? error.stack : 'No stack trace'}`);
throw error;
}
// Process each chunk
try {
log.info(`About to start processing stream chunks`);
// Process each chunk using our stream processor
for await (const chunk of streamIterator) {
chunkCount++;
// Log first chunk and then periodic updates
if (chunkCount === 1 || chunkCount % 10 === 0) {
log.info(`Processing Ollama stream chunk #${chunkCount}, done=${!!chunk.done}, has content=${!!chunk.message?.content}`);
// Process the chunk and update our accumulated text
const result = await StreamProcessor.processChunk(
chunk,
completeText,
chunkCount,
{ providerName: this.getName(), modelName: providerOptions.model }
);
completeText = result.completeText;
// Extract any tool calls
const toolCalls = StreamProcessor.extractToolCalls(chunk);
if (toolCalls.length > 0) {
responseToolCalls = toolCalls;
}
// Accumulate text
if (chunk.message?.content) {
const newContent = chunk.message.content;
completeText += newContent;
if (chunkCount === 1) {
log.info(`First content chunk received: "${newContent.substring(0, 50)}${newContent.length > 50 ? '...' : ''}"`);
}
}
// Check for tool calls
if (chunk.message?.tool_calls && chunk.message.tool_calls.length > 0) {
responseToolCalls = [...chunk.message.tool_calls];
log.info(`Received tool calls in stream: ${chunk.message.tool_calls.length} tools`);
}
// Send the chunk to the caller
// Send to callback
await callback({
text: chunk.message?.content || '',
done: false, // Never mark as done during chunk processing
raw: chunk // Include the raw chunk for advanced processing
done: false, // Add done property to satisfy StreamChunk
raw: chunk
});
// If this is the done chunk, log it
if (chunk.done) {
log.info(`Reached final chunk (done=true) after ${chunkCount} chunks, total content length: ${completeText.length}`);
// Log completion
if (chunk.done && !result.logged) {
log.info(`Reached final chunk after ${chunkCount} chunks, content length: ${completeText.length} chars`);
}
}
log.info(`Completed streaming from Ollama: processed ${chunkCount} chunks, total content: ${completeText.length} chars`);
// Signal completion with a separate final callback after all processing is done
await callback({
text: '',
done: true
});
} catch (streamProcessError) {
log.error(`Error processing Ollama stream: ${streamProcessError instanceof Error ? streamProcessError.message : String(streamProcessError)}`);
log.error(`Stream process error stack: ${streamProcessError instanceof Error ? streamProcessError.stack : 'No stack trace'}`);
// Try to signal completion with error
try {
await callback({
text: '',
done: true,
raw: { error: streamProcessError instanceof Error ? streamProcessError.message : String(streamProcessError) }
});
} catch (finalError) {
log.error(`Error sending final error chunk: ${finalError}`);
}
throw streamProcessError;
return completeText;
} catch (error) {
log.error(`Error in Ollama streaming: ${error}`);
log.error(`Error details: ${error instanceof Error ? error.stack : 'No stack trace available'}`);
throw error;
}
return completeText;
} catch (error) {
log.error(`Error in Ollama streaming: ${error}`);
log.error(`Error details: ${error instanceof Error ? error.stack : 'No stack trace available'}`);
throw error;
}
};
);
// Handle direct streamCallback if provided
if (opts.streamCallback) {
@ -452,67 +412,45 @@ export class OllamaService extends BaseAIService {
chunkCount++;
finalChunk = chunk;
// Log first chunk and periodic updates
if (chunkCount === 1 || chunkCount % 10 === 0) {
log.info(`Processing Ollama direct stream chunk #${chunkCount}, done=${!!chunk.done}, has content=${!!chunk.message?.content}`);
}
// Process chunk with StreamProcessor
const result = await StreamProcessor.processChunk(
chunk,
completeText,
chunkCount,
{ providerName: this.getName(), modelName: providerOptions.model }
);
// Accumulate text
if (chunk.message?.content) {
const newContent = chunk.message.content;
completeText += newContent;
completeText = result.completeText;
if (chunkCount === 1) {
log.info(`First direct content chunk: "${newContent.substring(0, 50)}${newContent.length > 50 ? '...' : ''}"`);
// Extract tool calls
const toolCalls = StreamProcessor.extractToolCalls(chunk);
if (toolCalls.length > 0) {
responseToolCalls = toolCalls;
}
}
// Check for tool calls
if (chunk.message?.tool_calls && chunk.message.tool_calls.length > 0) {
responseToolCalls = [...chunk.message.tool_calls];
log.info(`Received tool calls in direct stream: ${chunk.message.tool_calls.length} tools`);
}
// Call the callback with the current chunk content
if (opts.streamCallback) {
try {
// Only mark as done on the final chunk if we have actual content
// This ensures consistent behavior with and without tool calls
// We'll send a separate final callback after the loop completes
const shouldMarkAsDone = false; // Never mark as done during chunk processing
await opts.streamCallback(
// Call the callback with the current chunk content
if (opts.streamCallback) {
await StreamProcessor.sendChunkToCallback(
opts.streamCallback,
chunk.message?.content || '',
shouldMarkAsDone,
chunk
false, // Never mark as done during processing
chunk,
chunkCount
);
}
if (chunkCount === 1) {
log.info(`Successfully called streamCallback with first chunk`);
}
} catch (callbackError) {
log.error(`Error in streamCallback: ${callbackError}`);
// If this is the done chunk, log it
if (chunk.done && !result.logged) {
log.info(`Reached final direct chunk (done=true) after ${chunkCount} chunks, total content length: ${completeText.length}`);
}
}
// If this is the done chunk, log it
if (chunk.done) {
log.info(`Reached final direct chunk (done=true) after ${chunkCount} chunks, total content length: ${completeText.length}`);
// Send one final callback with done=true after all chunks have been processed
if (opts.streamCallback) {
await StreamProcessor.sendFinalCallback(opts.streamCallback, completeText);
}
}
// Send one final callback with done=true after all chunks have been processed
// This ensures we get the complete response regardless of tool calls
if (opts.streamCallback) {
try {
log.info(`Sending final done=true callback after processing all chunks`);
await opts.streamCallback('', true, { done: true });
} catch (finalCallbackError) {
log.error(`Error in final streamCallback: ${finalCallbackError}`);
}
}
log.info(`Completed direct streaming from Ollama: processed ${chunkCount} chunks, final content: ${completeText.length} chars`);
log.info(`Completed direct streaming from Ollama: processed ${chunkCount} chunks, final content: ${completeText.length} chars`);
} catch (iterationError) {
log.error(`Error iterating through Ollama stream chunks: ${iterationError instanceof Error ? iterationError.message : String(iterationError)}`);
log.error(`Iteration error stack: ${iterationError instanceof Error ? iterationError.stack : 'No stack trace'}`);
@ -520,17 +458,17 @@ export class OllamaService extends BaseAIService {
}
// Create the final response after streaming is complete
return {
text: completeText,
model: providerOptions.model,
provider: this.getName(),
tool_calls: this.transformToolCalls(responseToolCalls),
usage: {
return StreamProcessor.createFinalResponse(
completeText,
providerOptions.model,
this.getName(),
this.transformToolCalls(responseToolCalls),
{
promptTokens: finalChunk?.prompt_eval_count || 0,
completionTokens: finalChunk?.eval_count || 0,
totalTokens: (finalChunk?.prompt_eval_count || 0) + (finalChunk?.eval_count || 0)
}
};
);
} catch (error) {
log.error(`Error in Ollama streaming with callback: ${error}`);
log.error(`Error details: ${error instanceof Error ? error.stack : 'No stack trace available'}`);
@ -543,7 +481,8 @@ export class OllamaService extends BaseAIService {
text: '', // Initial text is empty, will be populated during streaming
model: providerOptions.model,
provider: this.getName(),
stream: streamHandler
stream: streamHandler as (callback: (chunk: StreamChunk) => Promise<void> | void) => Promise<string>
};
}

View File

@ -0,0 +1,189 @@
/**
* Stream Handler - Reusable streaming implementation for LLM providers
*
* This module provides common streaming utilities that can be used by any LLM provider.
* It abstracts the complexities of handling streaming responses and tool executions.
*/
import type { StreamChunk as BaseStreamChunk, ChatCompletionOptions } from '../ai_interface.js';
import log from '../../log.js';
/**
* Extended StreamChunk interface that makes 'done' optional for internal use
*/
export interface StreamChunk extends Omit<BaseStreamChunk, 'done'> {
done?: boolean;
}
/**
* Stream processing options
*/
export interface StreamProcessingOptions {
streamCallback?: (text: string, done: boolean, chunk?: any) => Promise<void> | void;
providerName: string;
modelName: string;
}
/**
* Stream processor that handles common streaming operations
*/
export class StreamProcessor {
/**
* Process an individual chunk from a streaming response
*/
static async processChunk(
chunk: any,
completeText: string,
chunkCount: number,
options: StreamProcessingOptions
): Promise<{completeText: string, logged: boolean}> {
let textToAdd = '';
let logged = false;
// Log first chunk and periodic updates
if (chunkCount === 1 || chunkCount % 10 === 0) {
log.info(`Processing ${options.providerName} stream chunk #${chunkCount}, done=${!!chunk.done}, has content=${!!chunk.message?.content}`);
logged = true;
}
// Extract content if available
if (chunk.message?.content) {
textToAdd = chunk.message.content;
const newCompleteText = completeText + textToAdd;
if (chunkCount === 1) {
log.info(`First content chunk: "${textToAdd.substring(0, 50)}${textToAdd.length > 50 ? '...' : ''}"`);
}
return { completeText: newCompleteText, logged };
}
return { completeText, logged };
}
/**
* Send a streaming chunk to the callback
*/
static async sendChunkToCallback(
callback: (text: string, done: boolean, chunk?: any) => Promise<void> | void,
content: string,
done: boolean,
chunk: any,
chunkNumber: number
): Promise<void> {
try {
const result = callback(content || '', done, chunk);
// Handle both Promise and void return types
if (result instanceof Promise) {
await result;
}
if (chunkNumber === 1) {
log.info(`Successfully called streamCallback with first chunk`);
}
} catch (callbackError) {
log.error(`Error in streamCallback: ${callbackError}`);
}
}
/**
* Send final completion callback
*/
static async sendFinalCallback(
callback: (text: string, done: boolean, chunk?: any) => Promise<void> | void,
completeText: string
): Promise<void> {
try {
log.info(`Sending final done=true callback after processing all chunks`);
const result = callback('', true, { done: true });
// Handle both Promise and void return types
if (result instanceof Promise) {
await result;
}
} catch (finalCallbackError) {
log.error(`Error in final streamCallback: ${finalCallbackError}`);
}
}
/**
* Detect and extract tool calls from a response chunk
*/
static extractToolCalls(chunk: any): any[] {
if (chunk.message?.tool_calls &&
Array.isArray(chunk.message.tool_calls) &&
chunk.message.tool_calls.length > 0) {
log.info(`Detected ${chunk.message.tool_calls.length} tool calls in stream chunk`);
return [...chunk.message.tool_calls];
}
return [];
}
/**
* Create a standard response object from streaming results
*/
static createFinalResponse(
completeText: string,
modelName: string,
providerName: string,
toolCalls: any[],
usage: any = {}
) {
return {
text: completeText,
model: modelName,
provider: providerName,
tool_calls: toolCalls,
usage
};
}
}
/**
* Create a streaming handler that follows a consistent pattern
*/
export function createStreamHandler(
options: StreamProcessingOptions,
streamImplementation: (callback: (chunk: StreamChunk) => Promise<void>) => Promise<string>
) {
// Return a standard stream handler function that providers can use
return async (callback: (chunk: BaseStreamChunk) => Promise<void>): Promise<string> => {
let completeText = '';
let chunkCount = 0;
try {
// Call the provided implementation
return await streamImplementation(async (chunk: StreamChunk) => {
chunkCount++;
// Process the chunk
if (chunk.text) {
completeText += chunk.text;
}
// Forward to callback - ensure done is always boolean for BaseStreamChunk
await callback({
text: chunk.text || '',
done: !!chunk.done, // Ensure done is boolean
raw: chunk.raw || chunk // Include raw data
});
});
} catch (error) {
log.error(`Error in stream handler: ${error}`);
throw error;
} finally {
// Always ensure a final done=true chunk is sent
if (chunkCount > 0) {
try {
await callback({
text: '',
done: true
});
} catch (e) {
log.error(`Error sending final chunk: ${e}`);
}
}
}
};
}