This commit is contained in:
perf3ct 2025-04-12 19:09:25 +00:00
parent 6bba1be5f4
commit 263c869091
No known key found for this signature in database
GPG Key ID: 569C4EEC436F5232
4 changed files with 238 additions and 52 deletions

View File

@ -15,29 +15,29 @@ export interface Message {
/**
* Interface for streaming response chunks
*
*
* This is the standardized format for all streaming chunks across
* different providers (OpenAI, Anthropic, Ollama, etc.).
* The original provider-specific chunks are available through
* the extended interface in the stream_manager.
*
*
* See STREAMING.md for complete documentation on streaming usage.
*/
export interface StreamChunk {
/** The text content in this chunk (may be empty for status updates) */
text: string;
/** Whether this is the final chunk in the stream */
done: boolean;
/** Optional token usage statistics (rarely available in streaming mode) */
usage?: {
promptTokens?: number;
completionTokens?: number;
totalTokens?: number;
};
/**
/**
* Raw provider-specific data from the original response chunk
* This can include thinking state, tool execution info, etc.
*/
@ -46,20 +46,20 @@ export interface StreamChunk {
/**
* Options for chat completion requests
*
*
* Key properties:
* - stream: If true, the response will be streamed
* - model: Model name to use
* - provider: Provider to use (openai, anthropic, ollama, etc.)
* - enableTools: If true, enables tool support
*
*
* The stream option is particularly important and should be consistently handled
* throughout the pipeline. It should be explicitly set to true or false.
*
*
* Streaming supports two approaches:
* 1. Callback-based: Provide a streamCallback to receive chunks directly
* 2. API-based: Use the stream property in the response to process chunks
*
*
* See STREAMING.md for complete documentation on streaming usage.
*/
export interface ChatCompletionOptions {
@ -74,7 +74,7 @@ export interface ChatCompletionOptions {
preserveSystemPrompt?: boolean; // Whether to preserve existing system message
bypassFormatter?: boolean; // Whether to bypass the message formatter entirely
expectsJsonResponse?: boolean; // Whether this request expects a JSON response
/**
* Whether to stream the response
* When true, response will be delivered incrementally via either:
@ -82,70 +82,82 @@ export interface ChatCompletionOptions {
* - The stream property in the response object
*/
stream?: boolean;
/**
* Optional callback function for streaming responses
* When provided along with stream:true, this function will be called
* for each chunk of the response.
*
*
* @param text The text content in this chunk
* @param isDone Whether this is the final chunk
* @param originalChunk Optional original provider-specific chunk for advanced usage
*/
streamCallback?: (text: string, isDone: boolean, originalChunk?: any) => Promise<void> | void;
enableTools?: boolean; // Whether to enable tool calling
tools?: any[]; // Tools to provide to the LLM
useAdvancedContext?: boolean; // Whether to use advanced context enrichment
toolExecutionStatus?: any[]; // Status information about executed tools for feedback
providerMetadata?: ModelMetadata; // Metadata about the provider and model capabilities
/**
* Maximum number of tool execution iterations
* Used to prevent infinite loops in tool execution
*/
maxToolIterations?: number;
/**
* Current tool execution iteration counter
* Internal use for tracking nested tool executions
*/
currentToolIteration?: number;
}
/**
* Response from a chat completion request
*
*
* When streaming is used, the behavior depends on how streaming was requested:
*
*
* 1. With streamCallback: The text field contains the complete response
* collected from all chunks, and the stream property is not present.
*
*
* 2. Without streamCallback: The text field is initially empty, and the
* stream property provides a function to process chunks and collect
* the complete response.
*
*
* See STREAMING.md for complete documentation on streaming usage.
*/
export interface ChatResponse {
/**
* The complete text response.
/**
* The complete text response.
* If streaming was used with streamCallback, this contains the collected response.
* If streaming was used without streamCallback, this is initially empty.
*/
text: string;
/** The model that generated the response */
model: string;
/** The provider that served the request (openai, anthropic, ollama, etc.) */
provider: string;
/** Token usage statistics (may not be available when streaming) */
usage?: {
promptTokens?: number;
completionTokens?: number;
totalTokens?: number;
};
/**
* Stream processor function - only present when streaming is enabled
* without a streamCallback. When called with a chunk processor function,
* it returns a Promise that resolves to the complete response text.
*
*
* @param callback Function to process each chunk of the stream
* @returns Promise resolving to the complete text after stream processing
*/
stream?: (callback: (chunk: StreamChunk) => Promise<void> | void) => Promise<string>;
/** Tool calls from the LLM (if tools were used and the model supports them) */
tool_calls?: ToolCall[] | any[];
}

View File

@ -328,7 +328,7 @@ export class OllamaService extends BaseAIService {
responseToolCalls = toolCalls;
}
// Send to callback
// Send to callback - directly pass the content without accumulating
await callback({
text: chunk.message?.content || '',
done: false, // Add done property to satisfy StreamChunk

View File

@ -145,24 +145,20 @@ export class StreamProcessor {
*/
export function createStreamHandler(
options: StreamProcessingOptions,
streamImplementation: (callback: (chunk: StreamChunk) => Promise<void>) => Promise<string>
) {
// Return a standard stream handler function that providers can use
return async (callback: (chunk: BaseStreamChunk) => Promise<void>): Promise<string> => {
let completeText = '';
processFn: (
callback: (chunk: StreamChunk) => Promise<void> | void
) => Promise<string>
): (callback: (chunk: StreamChunk) => Promise<void> | void) => Promise<string> {
return async (callback) => {
let chunkCount = 0;
try {
// Call the provided implementation
return await streamImplementation(async (chunk: StreamChunk) => {
// Run the processor function with our callback
return await processFn(async (chunk) => {
chunkCount++;
// Process the chunk
if (chunk.text) {
completeText += chunk.text;
}
// Forward to callback - ensure done is always boolean for BaseStreamChunk
// Pass each chunk directly to the callback as it arrives
// without modifying or accumulating its content
await callback({
text: chunk.text || '',
done: !!chunk.done, // Ensure done is boolean

View File

@ -939,19 +939,197 @@ class RestChatService {
tool_calls: response.tool_calls
}, ...toolResults];
// Use non-streaming for the follow-up to get a complete response
const followUpOptions = { ...chatOptions, stream: false, enableTools: false };
// Preserve streaming for follow-up if it was enabled in the original request
const followUpOptions = {
...chatOptions,
// Only disable streaming if it wasn't explicitly requested
stream: chatOptions.stream === true,
// Allow tools but track iterations to prevent infinite loops
enableTools: true,
maxToolIterations: chatOptions.maxToolIterations || 5,
currentToolIteration: 1 // Start counting tool iterations
};
const followUpResponse = await service.generateChatCompletion(toolMessages, followUpOptions);
messageContent = followUpResponse.text || "";
// Handle streaming follow-up response if streaming is enabled
if (followUpOptions.stream && followUpResponse.stream) {
log.info(`Streaming follow-up response after tool execution`);
let followUpContent = '';
// Send the complete response with done flag in the same message
wsService.sendMessageToAllClients({
type: 'llm-stream',
sessionId,
content: messageContent,
done: true
} as LLMStreamMessage);
// Process the streaming response
await followUpResponse.stream(async (chunk: StreamChunk) => {
if (chunk.text) {
followUpContent += chunk.text;
// Send each chunk via WebSocket
wsService.sendMessageToAllClients({
type: 'llm-stream',
sessionId,
content: chunk.text
} as LLMStreamMessage);
}
// Signal completion when done
if (chunk.done) {
// Check if there are more tool calls to execute
if (followUpResponse.tool_calls && followUpResponse.tool_calls.length > 0 &&
followUpOptions.currentToolIteration < followUpOptions.maxToolIterations) {
log.info(`Found ${followUpResponse.tool_calls.length} more tool calls in iteration ${followUpOptions.currentToolIteration}`);
// Execute these tool calls in another iteration
// First, capture the current content for the assistant message
const assistantMessage = {
role: 'assistant' as const,
content: followUpContent,
tool_calls: followUpResponse.tool_calls
};
// Execute the tools from this follow-up
const nextToolResults = await this.executeToolCalls(followUpResponse);
// Create a new messages array with the latest tool results
const nextToolMessages = [...toolMessages, assistantMessage, ...nextToolResults];
// Increment the tool iteration counter for the next call
const nextFollowUpOptions = {
...followUpOptions,
currentToolIteration: followUpOptions.currentToolIteration + 1
};
log.info(`Making another follow-up request with ${nextToolResults.length} tool results (iteration ${nextFollowUpOptions.currentToolIteration}/${nextFollowUpOptions.maxToolIterations})`);
// Make another follow-up request
const nextResponse = await service.generateChatCompletion(nextToolMessages, nextFollowUpOptions);
// Handle this new response (recursive streaming if needed)
if (nextFollowUpOptions.stream && nextResponse.stream) {
let nextContent = followUpContent; // Start with the existing content
await nextResponse.stream(async (nextChunk: StreamChunk) => {
if (nextChunk.text) {
nextContent += nextChunk.text;
// Stream this content to the client
wsService.sendMessageToAllClients({
type: 'llm-stream',
sessionId,
content: nextChunk.text
} as LLMStreamMessage);
}
if (nextChunk.done) {
// Final completion message
wsService.sendMessageToAllClients({
type: 'llm-stream',
sessionId,
done: true
} as LLMStreamMessage);
// Update message content with the complete response after all iterations
messageContent = nextContent;
// Store in session history
session.messages.push({
role: 'assistant',
content: messageContent,
timestamp: new Date()
});
}
});
} else {
// For non-streaming next response
messageContent = nextResponse.text || "";
// Send the final complete message
wsService.sendMessageToAllClients({
type: 'llm-stream',
sessionId,
content: messageContent,
done: true
} as LLMStreamMessage);
// Store in session
session.messages.push({
role: 'assistant',
content: messageContent,
timestamp: new Date()
});
}
} else {
// No more tool calls or reached iteration limit
wsService.sendMessageToAllClients({
type: 'llm-stream',
sessionId,
done: true
} as LLMStreamMessage);
// Update message content for session storage
messageContent = followUpContent;
// Store the final response in the session
session.messages.push({
role: 'assistant',
content: messageContent,
timestamp: new Date()
});
}
}
});
} else {
// Non-streaming follow-up handling (original behavior)
messageContent = followUpResponse.text || "";
// Check if there are more tool calls to execute
if (followUpResponse.tool_calls && followUpResponse.tool_calls.length > 0 &&
followUpOptions.currentToolIteration < (followUpOptions.maxToolIterations || 5)) {
log.info(`Found ${followUpResponse.tool_calls.length} more tool calls in non-streaming follow-up (iteration ${followUpOptions.currentToolIteration})`);
// Execute these tool calls in another iteration
const assistantMessage = {
role: 'assistant' as const,
content: messageContent,
tool_calls: followUpResponse.tool_calls
};
// Execute the next round of tools
const nextToolResults = await this.executeToolCalls(followUpResponse);
// Create a new messages array with the latest tool results
const nextToolMessages = [...toolMessages, assistantMessage, ...nextToolResults];
// Increment the tool iteration counter for the next call
const nextFollowUpOptions = {
...followUpOptions,
currentToolIteration: followUpOptions.currentToolIteration + 1
};
log.info(`Making another non-streaming follow-up request (iteration ${nextFollowUpOptions.currentToolIteration}/${nextFollowUpOptions.maxToolIterations || 5})`);
// Make another follow-up request
const nextResponse = await service.generateChatCompletion(nextToolMessages, nextFollowUpOptions);
// Update the message content with the final response
messageContent = nextResponse.text || "";
}
// Send the complete response with done flag in the same message
wsService.sendMessageToAllClients({
type: 'llm-stream',
sessionId,
content: messageContent,
done: true
} as LLMStreamMessage);
// Store the response in the session
session.messages.push({
role: 'assistant',
content: messageContent,
timestamp: new Date()
});
}
// Store the response in the session
session.messages.push({
@ -1438,11 +1616,11 @@ class RestChatService {
if (registeredTools.length === 0) {
log.info("No tools found in registry.");
log.info("Note: Tools should be initialized in the AIServiceManager constructor.");
// Create AI service manager instance to trigger tool initialization
const aiServiceManager = (await import('./ai_service_manager.js')).default;
aiServiceManager.getInstance();
// Check again after AIServiceManager instantiation
const tools = toolRegistry.getAllTools();
log.info(`After AIServiceManager instantiation: ${tools.length} tools available`);