From 263c869091d68682e6a1186d6628aab82f2ffe6e Mon Sep 17 00:00:00 2001 From: perf3ct Date: Sat, 12 Apr 2025 19:09:25 +0000 Subject: [PATCH] hmm --- src/services/llm/ai_interface.ts | 64 +++--- src/services/llm/providers/ollama_service.ts | 2 +- src/services/llm/providers/stream_handler.ts | 22 +- src/services/llm/rest_chat_service.ts | 202 +++++++++++++++++-- 4 files changed, 238 insertions(+), 52 deletions(-) diff --git a/src/services/llm/ai_interface.ts b/src/services/llm/ai_interface.ts index ca07b0ae8..3f75bf113 100644 --- a/src/services/llm/ai_interface.ts +++ b/src/services/llm/ai_interface.ts @@ -15,29 +15,29 @@ export interface Message { /** * Interface for streaming response chunks - * + * * This is the standardized format for all streaming chunks across * different providers (OpenAI, Anthropic, Ollama, etc.). * The original provider-specific chunks are available through * the extended interface in the stream_manager. - * + * * See STREAMING.md for complete documentation on streaming usage. */ export interface StreamChunk { /** The text content in this chunk (may be empty for status updates) */ text: string; - + /** Whether this is the final chunk in the stream */ done: boolean; - + /** Optional token usage statistics (rarely available in streaming mode) */ usage?: { promptTokens?: number; completionTokens?: number; totalTokens?: number; }; - - /** + + /** * Raw provider-specific data from the original response chunk * This can include thinking state, tool execution info, etc. */ @@ -46,20 +46,20 @@ export interface StreamChunk { /** * Options for chat completion requests - * + * * Key properties: * - stream: If true, the response will be streamed * - model: Model name to use * - provider: Provider to use (openai, anthropic, ollama, etc.) * - enableTools: If true, enables tool support - * + * * The stream option is particularly important and should be consistently handled * throughout the pipeline. It should be explicitly set to true or false. - * + * * Streaming supports two approaches: * 1. Callback-based: Provide a streamCallback to receive chunks directly * 2. API-based: Use the stream property in the response to process chunks - * + * * See STREAMING.md for complete documentation on streaming usage. */ export interface ChatCompletionOptions { @@ -74,7 +74,7 @@ export interface ChatCompletionOptions { preserveSystemPrompt?: boolean; // Whether to preserve existing system message bypassFormatter?: boolean; // Whether to bypass the message formatter entirely expectsJsonResponse?: boolean; // Whether this request expects a JSON response - + /** * Whether to stream the response * When true, response will be delivered incrementally via either: @@ -82,70 +82,82 @@ export interface ChatCompletionOptions { * - The stream property in the response object */ stream?: boolean; - + /** * Optional callback function for streaming responses * When provided along with stream:true, this function will be called * for each chunk of the response. - * + * * @param text The text content in this chunk * @param isDone Whether this is the final chunk * @param originalChunk Optional original provider-specific chunk for advanced usage */ streamCallback?: (text: string, isDone: boolean, originalChunk?: any) => Promise | void; - + enableTools?: boolean; // Whether to enable tool calling tools?: any[]; // Tools to provide to the LLM useAdvancedContext?: boolean; // Whether to use advanced context enrichment toolExecutionStatus?: any[]; // Status information about executed tools for feedback providerMetadata?: ModelMetadata; // Metadata about the provider and model capabilities + + /** + * Maximum number of tool execution iterations + * Used to prevent infinite loops in tool execution + */ + maxToolIterations?: number; + + /** + * Current tool execution iteration counter + * Internal use for tracking nested tool executions + */ + currentToolIteration?: number; } /** * Response from a chat completion request - * + * * When streaming is used, the behavior depends on how streaming was requested: - * + * * 1. With streamCallback: The text field contains the complete response * collected from all chunks, and the stream property is not present. - * + * * 2. Without streamCallback: The text field is initially empty, and the * stream property provides a function to process chunks and collect * the complete response. - * + * * See STREAMING.md for complete documentation on streaming usage. */ export interface ChatResponse { - /** - * The complete text response. + /** + * The complete text response. * If streaming was used with streamCallback, this contains the collected response. * If streaming was used without streamCallback, this is initially empty. */ text: string; - + /** The model that generated the response */ model: string; - + /** The provider that served the request (openai, anthropic, ollama, etc.) */ provider: string; - + /** Token usage statistics (may not be available when streaming) */ usage?: { promptTokens?: number; completionTokens?: number; totalTokens?: number; }; - + /** * Stream processor function - only present when streaming is enabled * without a streamCallback. When called with a chunk processor function, * it returns a Promise that resolves to the complete response text. - * + * * @param callback Function to process each chunk of the stream * @returns Promise resolving to the complete text after stream processing */ stream?: (callback: (chunk: StreamChunk) => Promise | void) => Promise; - + /** Tool calls from the LLM (if tools were used and the model supports them) */ tool_calls?: ToolCall[] | any[]; } diff --git a/src/services/llm/providers/ollama_service.ts b/src/services/llm/providers/ollama_service.ts index 71bd8bdd8..b65f435ae 100644 --- a/src/services/llm/providers/ollama_service.ts +++ b/src/services/llm/providers/ollama_service.ts @@ -328,7 +328,7 @@ export class OllamaService extends BaseAIService { responseToolCalls = toolCalls; } - // Send to callback + // Send to callback - directly pass the content without accumulating await callback({ text: chunk.message?.content || '', done: false, // Add done property to satisfy StreamChunk diff --git a/src/services/llm/providers/stream_handler.ts b/src/services/llm/providers/stream_handler.ts index 192e729e1..29aa7b114 100644 --- a/src/services/llm/providers/stream_handler.ts +++ b/src/services/llm/providers/stream_handler.ts @@ -145,24 +145,20 @@ export class StreamProcessor { */ export function createStreamHandler( options: StreamProcessingOptions, - streamImplementation: (callback: (chunk: StreamChunk) => Promise) => Promise -) { - // Return a standard stream handler function that providers can use - return async (callback: (chunk: BaseStreamChunk) => Promise): Promise => { - let completeText = ''; + processFn: ( + callback: (chunk: StreamChunk) => Promise | void + ) => Promise +): (callback: (chunk: StreamChunk) => Promise | void) => Promise { + return async (callback) => { let chunkCount = 0; try { - // Call the provided implementation - return await streamImplementation(async (chunk: StreamChunk) => { + // Run the processor function with our callback + return await processFn(async (chunk) => { chunkCount++; - // Process the chunk - if (chunk.text) { - completeText += chunk.text; - } - - // Forward to callback - ensure done is always boolean for BaseStreamChunk + // Pass each chunk directly to the callback as it arrives + // without modifying or accumulating its content await callback({ text: chunk.text || '', done: !!chunk.done, // Ensure done is boolean diff --git a/src/services/llm/rest_chat_service.ts b/src/services/llm/rest_chat_service.ts index 4b390a56b..b8379beaa 100644 --- a/src/services/llm/rest_chat_service.ts +++ b/src/services/llm/rest_chat_service.ts @@ -939,19 +939,197 @@ class RestChatService { tool_calls: response.tool_calls }, ...toolResults]; - // Use non-streaming for the follow-up to get a complete response - const followUpOptions = { ...chatOptions, stream: false, enableTools: false }; + // Preserve streaming for follow-up if it was enabled in the original request + const followUpOptions = { + ...chatOptions, + // Only disable streaming if it wasn't explicitly requested + stream: chatOptions.stream === true, + // Allow tools but track iterations to prevent infinite loops + enableTools: true, + maxToolIterations: chatOptions.maxToolIterations || 5, + currentToolIteration: 1 // Start counting tool iterations + }; + const followUpResponse = await service.generateChatCompletion(toolMessages, followUpOptions); - messageContent = followUpResponse.text || ""; + // Handle streaming follow-up response if streaming is enabled + if (followUpOptions.stream && followUpResponse.stream) { + log.info(`Streaming follow-up response after tool execution`); + let followUpContent = ''; - // Send the complete response with done flag in the same message - wsService.sendMessageToAllClients({ - type: 'llm-stream', - sessionId, - content: messageContent, - done: true - } as LLMStreamMessage); + // Process the streaming response + await followUpResponse.stream(async (chunk: StreamChunk) => { + if (chunk.text) { + followUpContent += chunk.text; + + // Send each chunk via WebSocket + wsService.sendMessageToAllClients({ + type: 'llm-stream', + sessionId, + content: chunk.text + } as LLMStreamMessage); + } + + // Signal completion when done + if (chunk.done) { + // Check if there are more tool calls to execute + if (followUpResponse.tool_calls && followUpResponse.tool_calls.length > 0 && + followUpOptions.currentToolIteration < followUpOptions.maxToolIterations) { + + log.info(`Found ${followUpResponse.tool_calls.length} more tool calls in iteration ${followUpOptions.currentToolIteration}`); + + // Execute these tool calls in another iteration + // First, capture the current content for the assistant message + const assistantMessage = { + role: 'assistant' as const, + content: followUpContent, + tool_calls: followUpResponse.tool_calls + }; + + // Execute the tools from this follow-up + const nextToolResults = await this.executeToolCalls(followUpResponse); + + // Create a new messages array with the latest tool results + const nextToolMessages = [...toolMessages, assistantMessage, ...nextToolResults]; + + // Increment the tool iteration counter for the next call + const nextFollowUpOptions = { + ...followUpOptions, + currentToolIteration: followUpOptions.currentToolIteration + 1 + }; + + log.info(`Making another follow-up request with ${nextToolResults.length} tool results (iteration ${nextFollowUpOptions.currentToolIteration}/${nextFollowUpOptions.maxToolIterations})`); + + // Make another follow-up request + const nextResponse = await service.generateChatCompletion(nextToolMessages, nextFollowUpOptions); + + // Handle this new response (recursive streaming if needed) + if (nextFollowUpOptions.stream && nextResponse.stream) { + let nextContent = followUpContent; // Start with the existing content + + await nextResponse.stream(async (nextChunk: StreamChunk) => { + if (nextChunk.text) { + nextContent += nextChunk.text; + + // Stream this content to the client + wsService.sendMessageToAllClients({ + type: 'llm-stream', + sessionId, + content: nextChunk.text + } as LLMStreamMessage); + } + + if (nextChunk.done) { + // Final completion message + wsService.sendMessageToAllClients({ + type: 'llm-stream', + sessionId, + done: true + } as LLMStreamMessage); + + // Update message content with the complete response after all iterations + messageContent = nextContent; + + // Store in session history + session.messages.push({ + role: 'assistant', + content: messageContent, + timestamp: new Date() + }); + } + }); + } else { + // For non-streaming next response + messageContent = nextResponse.text || ""; + + // Send the final complete message + wsService.sendMessageToAllClients({ + type: 'llm-stream', + sessionId, + content: messageContent, + done: true + } as LLMStreamMessage); + + // Store in session + session.messages.push({ + role: 'assistant', + content: messageContent, + timestamp: new Date() + }); + } + } else { + // No more tool calls or reached iteration limit + wsService.sendMessageToAllClients({ + type: 'llm-stream', + sessionId, + done: true + } as LLMStreamMessage); + + // Update message content for session storage + messageContent = followUpContent; + + // Store the final response in the session + session.messages.push({ + role: 'assistant', + content: messageContent, + timestamp: new Date() + }); + } + } + }); + } else { + // Non-streaming follow-up handling (original behavior) + messageContent = followUpResponse.text || ""; + + // Check if there are more tool calls to execute + if (followUpResponse.tool_calls && followUpResponse.tool_calls.length > 0 && + followUpOptions.currentToolIteration < (followUpOptions.maxToolIterations || 5)) { + + log.info(`Found ${followUpResponse.tool_calls.length} more tool calls in non-streaming follow-up (iteration ${followUpOptions.currentToolIteration})`); + + // Execute these tool calls in another iteration + const assistantMessage = { + role: 'assistant' as const, + content: messageContent, + tool_calls: followUpResponse.tool_calls + }; + + // Execute the next round of tools + const nextToolResults = await this.executeToolCalls(followUpResponse); + + // Create a new messages array with the latest tool results + const nextToolMessages = [...toolMessages, assistantMessage, ...nextToolResults]; + + // Increment the tool iteration counter for the next call + const nextFollowUpOptions = { + ...followUpOptions, + currentToolIteration: followUpOptions.currentToolIteration + 1 + }; + + log.info(`Making another non-streaming follow-up request (iteration ${nextFollowUpOptions.currentToolIteration}/${nextFollowUpOptions.maxToolIterations || 5})`); + + // Make another follow-up request + const nextResponse = await service.generateChatCompletion(nextToolMessages, nextFollowUpOptions); + + // Update the message content with the final response + messageContent = nextResponse.text || ""; + } + + // Send the complete response with done flag in the same message + wsService.sendMessageToAllClients({ + type: 'llm-stream', + sessionId, + content: messageContent, + done: true + } as LLMStreamMessage); + + // Store the response in the session + session.messages.push({ + role: 'assistant', + content: messageContent, + timestamp: new Date() + }); + } // Store the response in the session session.messages.push({ @@ -1438,11 +1616,11 @@ class RestChatService { if (registeredTools.length === 0) { log.info("No tools found in registry."); log.info("Note: Tools should be initialized in the AIServiceManager constructor."); - + // Create AI service manager instance to trigger tool initialization const aiServiceManager = (await import('./ai_service_manager.js')).default; aiServiceManager.getInstance(); - + // Check again after AIServiceManager instantiation const tools = toolRegistry.getAllTools(); log.info(`After AIServiceManager instantiation: ${tools.length} tools available`);