mirror of
https://github.com/TriliumNext/Notes.git
synced 2025-07-29 19:12:27 +08:00
tool calling works, but still no response yet
closer.. nice definitely remove this for now
This commit is contained in:
parent
6df87fc163
commit
edd075cba1
@ -1,417 +0,0 @@
|
||||
# Trilium LLM Pipeline Documentation
|
||||
|
||||
This document provides an overview of the LLM Pipeline architecture in Trilium Notes, explains the data flow, and provides instructions for extending the pipeline with new providers or stages.
|
||||
|
||||
## Overview
|
||||
|
||||
The LLM Pipeline is a modular architecture that handles the flow of data for LLM chat interactions in Trilium Notes. It breaks down the complex process of context retrieval, message preparation, model selection, completion generation, and response processing into separate, reusable stages.
|
||||
|
||||
## Pipeline Data Flow
|
||||
|
||||
```mermaid
|
||||
flowchart TD
|
||||
Input[Chat Input] --> Pipeline
|
||||
|
||||
subgraph Pipeline
|
||||
direction TB
|
||||
Context[Context Extraction] --> MessagePrep[Message Preparation]
|
||||
MessagePrep --> ModelSelection[Model Selection]
|
||||
ModelSelection --> LLMCompletion[LLM Completion]
|
||||
LLMCompletion --> ResponseProcess[Response Processing]
|
||||
end
|
||||
|
||||
Pipeline --> Output[Chat Response]
|
||||
|
||||
subgraph Optional
|
||||
direction TB
|
||||
ToolExecution[Tool Execution]
|
||||
end
|
||||
|
||||
ResponseProcess -.-> ToolExecution
|
||||
ToolExecution -.-> MessagePrep
|
||||
```
|
||||
|
||||
## Pipeline Architecture
|
||||
|
||||
The pipeline is composed of modular stages that can be configured for different use cases:
|
||||
|
||||
```mermaid
|
||||
classDiagram
|
||||
class ChatPipeline {
|
||||
+stages
|
||||
+config: ChatPipelineConfig
|
||||
+metrics: PipelineMetrics
|
||||
+constructor(config?: ChatPipelineConfig)
|
||||
+execute(input: ChatPipelineInput): Promise~ChatResponse~
|
||||
+getMetrics(): PipelineMetrics
|
||||
+resetMetrics(): void
|
||||
}
|
||||
|
||||
class BasePipelineStage {
|
||||
+name: string
|
||||
+execute(input: PipelineInput): Promise~PipelineOutput~
|
||||
#process(input: PipelineInput): Promise~PipelineOutput~
|
||||
}
|
||||
|
||||
ChatPipeline --> ContextExtractionStage
|
||||
ChatPipeline --> SemanticContextExtractionStage
|
||||
ChatPipeline --> AgentToolsContextStage
|
||||
ChatPipeline --> MessagePreparationStage
|
||||
ChatPipeline --> ModelSelectionStage
|
||||
ChatPipeline --> LLMCompletionStage
|
||||
ChatPipeline --> ResponseProcessingStage
|
||||
|
||||
BasePipelineStage <|-- ContextExtractionStage
|
||||
BasePipelineStage <|-- SemanticContextExtractionStage
|
||||
BasePipelineStage <|-- AgentToolsContextStage
|
||||
BasePipelineStage <|-- MessagePreparationStage
|
||||
BasePipelineStage <|-- ModelSelectionStage
|
||||
BasePipelineStage <|-- LLMCompletionStage
|
||||
BasePipelineStage <|-- ResponseProcessingStage
|
||||
```
|
||||
|
||||
## Pipeline Stages
|
||||
|
||||
Each stage in the pipeline has a specific responsibility:
|
||||
|
||||
1. **Context Extraction**: Retrieves relevant context from notes based on user query
|
||||
2. **Message Preparation**: Formats messages with context in provider-specific way
|
||||
3. **Model Selection**: Chooses appropriate model based on query complexity
|
||||
4. **LLM Completion**: Generates completion using the selected provider and model
|
||||
5. **Response Processing**: Post-processes the response (handles "thinking" output, formats markdown, etc.)
|
||||
|
||||
## Provider-Specific Message Formatting
|
||||
|
||||
Different LLM providers handle context in different ways for optimal performance:
|
||||
|
||||
```mermaid
|
||||
flowchart TD
|
||||
Messages[User Messages] --> Formatter[Message Formatter]
|
||||
Context[Note Context] --> Formatter
|
||||
SystemPrompt[System Prompt] --> Formatter
|
||||
|
||||
Formatter --> Factory{Provider Type}
|
||||
Factory -->|OpenAI| OpenAIFormatter[OpenAI Formatter]
|
||||
Factory -->|Claude| ClaudeFormatter[Claude Formatter]
|
||||
Factory -->|Ollama| OllamaFormatter[Ollama Formatter]
|
||||
Factory -->|Other| DefaultFormatter[Default Formatter]
|
||||
|
||||
OpenAIFormatter --> OpenAIMessage[Optimized Messages]
|
||||
ClaudeFormatter --> ClaudeMessage[Optimized Messages]
|
||||
OllamaFormatter --> OllamaMessage[Optimized Messages]
|
||||
DefaultFormatter --> DefaultMessage[Generic Messages]
|
||||
```
|
||||
|
||||
## Multiple Pipeline Configurations
|
||||
|
||||
The chat service now supports multiple pipeline configurations for different use cases:
|
||||
|
||||
```mermaid
|
||||
flowchart LR
|
||||
ChatService --> DefaultPipeline[Default Pipeline]
|
||||
ChatService --> AgentPipeline[Agent Pipeline]
|
||||
ChatService --> PerformancePipeline[Performance Pipeline]
|
||||
|
||||
DefaultPipeline --> DefaultConfig[enableStreaming: true<br>enableMetrics: true]
|
||||
AgentPipeline --> AgentConfig[enableStreaming: true<br>enableMetrics: true<br>maxToolCallIterations: 5]
|
||||
PerformancePipeline --> PerformanceConfig[enableStreaming: false<br>enableMetrics: true]
|
||||
```
|
||||
|
||||
## Adding a New LLM Provider
|
||||
|
||||
To add a new LLM provider to Trilium, follow these steps:
|
||||
|
||||
### 1. Implement the AIService Interface
|
||||
|
||||
Create a new file in `src/services/llm/providers/your_provider_service.ts`:
|
||||
|
||||
```typescript
|
||||
import type { Message, ChatCompletionOptions, ChatResponse, AIService } from '../ai_interface.js';
|
||||
import log from '../../log.js';
|
||||
import options from '../../options.js';
|
||||
|
||||
export class YourProviderService implements AIService {
|
||||
async generateChatCompletion(messages: Message[], options?: ChatCompletionOptions): Promise<ChatResponse> {
|
||||
// Implement API call to your provider
|
||||
// Return response in standardized format
|
||||
return {
|
||||
text: "Response text from your provider",
|
||||
model: options?.model || "default-model",
|
||||
provider: "your-provider-name"
|
||||
};
|
||||
}
|
||||
|
||||
isAvailable(): boolean {
|
||||
// Check if API key or other required config exists
|
||||
const apiKey = options.getOption('yourProviderApiKey');
|
||||
return !!apiKey;
|
||||
}
|
||||
|
||||
getName(): string {
|
||||
return 'your-provider-name';
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### 2. Create a Message Formatter
|
||||
|
||||
Create a custom message formatter in `src/services/llm/pipeline/interfaces/message_formatter.ts`:
|
||||
|
||||
```typescript
|
||||
export class YourProviderMessageFormatter extends BaseMessageFormatter {
|
||||
formatMessages(messages: Message[], systemPrompt?: string, context?: string): Message[] {
|
||||
// Format messages optimally for your provider
|
||||
const formattedMessages: Message[] = [];
|
||||
|
||||
// Add system message
|
||||
if (systemPrompt) {
|
||||
formattedMessages.push({
|
||||
role: 'system',
|
||||
content: systemPrompt
|
||||
});
|
||||
}
|
||||
|
||||
// Format context according to provider's best practices
|
||||
if (context) {
|
||||
// Example: Add context in provider-specific format
|
||||
formattedMessages.push({
|
||||
role: 'user',
|
||||
content: `Reference information: ${context}`
|
||||
});
|
||||
}
|
||||
|
||||
// Add the rest of messages
|
||||
formattedMessages.push(...this.getMessagesWithoutSystem(messages));
|
||||
|
||||
return formattedMessages;
|
||||
}
|
||||
}
|
||||
|
||||
// Register your formatter with the factory
|
||||
MessageFormatterFactory.registerFormatter('your-provider-name', new YourProviderMessageFormatter());
|
||||
```
|
||||
|
||||
### 3. Register the Provider in AIServiceManager
|
||||
|
||||
Update `src/services/llm/ai_service_manager.ts`:
|
||||
|
||||
```typescript
|
||||
import { YourProviderService } from './providers/your_provider_service.js';
|
||||
|
||||
// Add your provider to the services object
|
||||
private services: Record<ServiceProviders, AIService> = {
|
||||
openai: new OpenAIService(),
|
||||
anthropic: new AnthropicService(),
|
||||
ollama: new OllamaService(),
|
||||
'your-provider-name': new YourProviderService()
|
||||
};
|
||||
|
||||
// Add it to the default provider order
|
||||
private providerOrder: ServiceProviders[] = ['openai', 'anthropic', 'ollama', 'your-provider-name'];
|
||||
```
|
||||
|
||||
### 4. Add Provider Settings to Options
|
||||
|
||||
In the appropriate UI file, add settings for your provider:
|
||||
|
||||
```typescript
|
||||
// Example settings
|
||||
{
|
||||
name: 'yourProviderApiKey',
|
||||
value: '',
|
||||
isSensitive: true
|
||||
},
|
||||
{
|
||||
name: 'yourProviderDefaultModel',
|
||||
value: 'default-model-name'
|
||||
}
|
||||
```
|
||||
|
||||
## Using the Pipeline via ChatService
|
||||
|
||||
The ChatService now provides a unified `processMessage` method for all chat interactions:
|
||||
|
||||
```typescript
|
||||
import chatService from '../services/llm/chat_service.js';
|
||||
|
||||
// Process a regular message
|
||||
const session = await chatService.processMessage(
|
||||
'session-id',
|
||||
'What can you tell me about this note?',
|
||||
{
|
||||
chatOptions: {
|
||||
model: 'openai:gpt-4',
|
||||
temperature: 0.7
|
||||
},
|
||||
streamCallback: (text, isDone) => {
|
||||
console.log('Received text:', text);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
// Process a context-aware message
|
||||
const session = await chatService.processMessage(
|
||||
'session-id',
|
||||
'What can you tell me about this note?',
|
||||
{
|
||||
noteId: 'note-id-for-context',
|
||||
chatOptions: { showThinking: true },
|
||||
pipelineType: 'agent' // Use agent pipeline
|
||||
}
|
||||
);
|
||||
```
|
||||
|
||||
## Using Pipeline Configurations
|
||||
|
||||
You can create specialized pipelines for different use cases:
|
||||
|
||||
```typescript
|
||||
const PIPELINE_CONFIGS = {
|
||||
default: {
|
||||
enableStreaming: true,
|
||||
enableMetrics: true
|
||||
},
|
||||
agent: {
|
||||
enableStreaming: true,
|
||||
enableMetrics: true,
|
||||
maxToolCallIterations: 5
|
||||
},
|
||||
performance: {
|
||||
enableStreaming: false,
|
||||
enableMetrics: true
|
||||
}
|
||||
};
|
||||
|
||||
// Create a pipeline with custom config
|
||||
const pipeline = new ChatPipeline(PIPELINE_CONFIGS.agent);
|
||||
```
|
||||
|
||||
## Pipeline Metrics
|
||||
|
||||
The pipeline now includes built-in performance metrics:
|
||||
|
||||
```typescript
|
||||
// Get pipeline metrics
|
||||
const metrics = chatService.getPipelineMetrics('default');
|
||||
console.log('Total executions:', metrics.totalExecutions);
|
||||
console.log('Average execution time:', metrics.averageExecutionTime, 'ms');
|
||||
|
||||
// Get stage-specific metrics
|
||||
for (const [stage, stageMetrics] of Object.entries(metrics.stageMetrics)) {
|
||||
console.log(`Stage ${stage}:`, stageMetrics.averageExecutionTime, 'ms');
|
||||
}
|
||||
|
||||
// Reset metrics
|
||||
chatService.resetPipelineMetrics();
|
||||
```
|
||||
|
||||
## Streaming Support
|
||||
|
||||
The pipeline now has built-in streaming support:
|
||||
|
||||
```typescript
|
||||
// Create a pipeline with streaming enabled
|
||||
const pipeline = new ChatPipeline({ enableStreaming: true });
|
||||
|
||||
// Execute with streaming callback
|
||||
const response = await pipeline.execute({
|
||||
messages: [...],
|
||||
options: { stream: true },
|
||||
streamCallback: (text, isDone) => {
|
||||
// Update UI with streaming response
|
||||
updateChatUI(text);
|
||||
}
|
||||
});
|
||||
```
|
||||
|
||||
## Extending the Pipeline with Custom Stages
|
||||
|
||||
You can create custom pipeline stages:
|
||||
|
||||
1. Define your stage interface in `interfaces.ts`
|
||||
2. Create a new stage class that extends `BasePipelineStage`
|
||||
3. Instantiate your stage in the `ChatPipeline` constructor
|
||||
4. Modify the `execute` method to include your stage
|
||||
|
||||
Example custom stage:
|
||||
|
||||
```typescript
|
||||
export class CustomStage extends BasePipelineStage<CustomInput, CustomOutput> {
|
||||
constructor() {
|
||||
super('CustomStage');
|
||||
}
|
||||
|
||||
protected async process(input: CustomInput): Promise<CustomOutput> {
|
||||
// Process input and return output
|
||||
return { result: 'processed data' };
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## Tool Execution for Agentic Features
|
||||
|
||||
For implementing agentic features with tool execution:
|
||||
|
||||
```mermaid
|
||||
sequenceDiagram
|
||||
participant User
|
||||
participant Pipeline as LLM Pipeline
|
||||
participant LLM
|
||||
participant Tools as Tool Executor
|
||||
|
||||
User->>Pipeline: User Query
|
||||
Pipeline->>LLM: Formatted Query + Context
|
||||
LLM->>Pipeline: Response with Tool Calls
|
||||
Pipeline->>Tools: Execute Tool Calls
|
||||
Tools->>Pipeline: Tool Results
|
||||
Pipeline->>LLM: Original Messages + Tool Results
|
||||
LLM->>Pipeline: Final Response
|
||||
Pipeline->>User: Formatted Response
|
||||
```
|
||||
|
||||
To implement tool execution:
|
||||
|
||||
1. Create a new `ToolExecutionStage` that:
|
||||
- Parses tool calls from LLM response
|
||||
- Executes the appropriate tools
|
||||
- Returns results formatted for next LLM call
|
||||
|
||||
2. Modify the pipeline to recursively handle tool calls:
|
||||
- If response contains tool calls, execute tools
|
||||
- Feed results back to message preparation
|
||||
- Call LLM completion again with updated messages
|
||||
- Repeat until no more tool calls or max iterations reached
|
||||
- The pipeline already has a `maxToolCallIterations` config for this purpose
|
||||
|
||||
## Error Handling
|
||||
|
||||
All stages include built-in error handling. Errors are logged and propagated up the pipeline, where they're caught and displayed to the user as a friendly error message.
|
||||
|
||||
To add custom error handling to a stage:
|
||||
|
||||
```typescript
|
||||
protected async process(input: YourInput): Promise<YourOutput> {
|
||||
try {
|
||||
// Your processing logic
|
||||
} catch (error) {
|
||||
log.error(`Custom error in stage: ${error.message}`);
|
||||
throw new Error('User-friendly error message');
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## Performance Considerations
|
||||
|
||||
The pipeline implements several performance optimizations:
|
||||
|
||||
1. **Lazy Loading**: Components are only initialized when needed
|
||||
2. **Caching**: Context extraction results are cached when possible
|
||||
3. **Response Streaming**: Supports streaming for immediate feedback
|
||||
4. **Performance Metrics**: Built-in timing metrics for each stage
|
||||
|
||||
When extending the pipeline, consider these best practices:
|
||||
|
||||
- Use the built-in metrics to identify bottlenecks
|
||||
- Cache expensive operations
|
||||
- Consider using the "performance" pipeline configuration for use cases where streaming isn't needed
|
||||
- Use the appropriate level of context for the query complexity
|
@ -318,6 +318,24 @@ export class ChatPipeline {
|
||||
// Enhanced tool_calls detection - check both direct property and getter
|
||||
let hasToolCalls = false;
|
||||
|
||||
log.info(`[TOOL CALL DEBUG] Starting tool call detection for provider: ${currentResponse.provider}`);
|
||||
// Check response object structure
|
||||
log.info(`[TOOL CALL DEBUG] Response properties: ${Object.keys(currentResponse).join(', ')}`);
|
||||
|
||||
// Try to access tool_calls as a property
|
||||
if ('tool_calls' in currentResponse) {
|
||||
log.info(`[TOOL CALL DEBUG] tool_calls exists as a direct property`);
|
||||
log.info(`[TOOL CALL DEBUG] tool_calls type: ${typeof currentResponse.tool_calls}`);
|
||||
|
||||
if (currentResponse.tool_calls && Array.isArray(currentResponse.tool_calls)) {
|
||||
log.info(`[TOOL CALL DEBUG] tool_calls is an array with length: ${currentResponse.tool_calls.length}`);
|
||||
} else {
|
||||
log.info(`[TOOL CALL DEBUG] tool_calls is not an array or is empty: ${JSON.stringify(currentResponse.tool_calls)}`);
|
||||
}
|
||||
} else {
|
||||
log.info(`[TOOL CALL DEBUG] tool_calls does not exist as a direct property`);
|
||||
}
|
||||
|
||||
// First check the direct property
|
||||
if (currentResponse.tool_calls && currentResponse.tool_calls.length > 0) {
|
||||
hasToolCalls = true;
|
||||
@ -326,24 +344,50 @@ export class ChatPipeline {
|
||||
}
|
||||
// Check if it might be a getter (for dynamic tool_calls collection)
|
||||
else {
|
||||
log.info(`[TOOL CALL DEBUG] Direct property check failed, trying getter approach`);
|
||||
try {
|
||||
const toolCallsDesc = Object.getOwnPropertyDescriptor(currentResponse, 'tool_calls');
|
||||
|
||||
if (toolCallsDesc) {
|
||||
log.info(`[TOOL CALL DEBUG] Found property descriptor for tool_calls: ${JSON.stringify({
|
||||
configurable: toolCallsDesc.configurable,
|
||||
enumerable: toolCallsDesc.enumerable,
|
||||
hasGetter: !!toolCallsDesc.get,
|
||||
hasSetter: !!toolCallsDesc.set
|
||||
})}`);
|
||||
} else {
|
||||
log.info(`[TOOL CALL DEBUG] No property descriptor found for tool_calls`);
|
||||
}
|
||||
|
||||
if (toolCallsDesc && typeof toolCallsDesc.get === 'function') {
|
||||
log.info(`[TOOL CALL DEBUG] Attempting to call the tool_calls getter`);
|
||||
const dynamicToolCalls = toolCallsDesc.get.call(currentResponse);
|
||||
|
||||
log.info(`[TOOL CALL DEBUG] Getter returned: ${JSON.stringify(dynamicToolCalls)}`);
|
||||
|
||||
if (dynamicToolCalls && dynamicToolCalls.length > 0) {
|
||||
hasToolCalls = true;
|
||||
log.info(`Response has dynamic tool_calls with ${dynamicToolCalls.length} tools`);
|
||||
log.info(`Dynamic tool calls details: ${JSON.stringify(dynamicToolCalls)}`);
|
||||
// Ensure property is available for subsequent code
|
||||
currentResponse.tool_calls = dynamicToolCalls;
|
||||
log.info(`[TOOL CALL DEBUG] Updated currentResponse.tool_calls with dynamic values`);
|
||||
} else {
|
||||
log.info(`[TOOL CALL DEBUG] Getter returned no valid tool calls`);
|
||||
}
|
||||
} else {
|
||||
log.info(`[TOOL CALL DEBUG] No getter function found for tool_calls`);
|
||||
}
|
||||
} catch (e) {
|
||||
} catch (e: any) {
|
||||
log.error(`Error checking dynamic tool_calls: ${e}`);
|
||||
log.error(`[TOOL CALL DEBUG] Error details: ${e.stack || 'No stack trace'}`);
|
||||
}
|
||||
}
|
||||
|
||||
log.info(`Response has tool_calls: ${hasToolCalls ? 'true' : 'false'}`);
|
||||
if (hasToolCalls && currentResponse.tool_calls) {
|
||||
log.info(`[TOOL CALL DEBUG] Final tool_calls that will be used: ${JSON.stringify(currentResponse.tool_calls)}`);
|
||||
}
|
||||
|
||||
// Tool execution loop
|
||||
if (toolsEnabled && hasToolCalls && currentResponse.tool_calls) {
|
||||
@ -638,14 +682,39 @@ export class ChatPipeline {
|
||||
// If streaming was paused for tool execution, resume it now with the final response
|
||||
if (isStreaming && streamCallback && streamingPaused) {
|
||||
// First log for debugging
|
||||
log.info(`Resuming streaming with final response: ${currentResponse.text.length} chars`);
|
||||
const responseText = currentResponse.text || "";
|
||||
log.info(`Resuming streaming with final response: ${responseText.length} chars`);
|
||||
|
||||
if (responseText.length > 0) {
|
||||
// Resume streaming with the final response text
|
||||
// This is where we send the definitive done:true signal with the complete content
|
||||
streamCallback(currentResponse.text, true);
|
||||
streamCallback(responseText, true);
|
||||
log.info(`Sent final response with done=true signal and text content`);
|
||||
} else {
|
||||
// For Anthropic, sometimes text is empty but response is in stream
|
||||
if (currentResponse.provider === 'Anthropic' && currentResponse.stream) {
|
||||
log.info(`Detected empty response text for Anthropic provider with stream, sending stream content directly`);
|
||||
// For Anthropic with stream mode, we need to stream the final response
|
||||
if (currentResponse.stream) {
|
||||
await currentResponse.stream(async (chunk: StreamChunk) => {
|
||||
// Process the chunk
|
||||
const processedChunk = await this.processStreamChunk(chunk, input.options);
|
||||
|
||||
// Log confirmation
|
||||
log.info(`Sent final response with done=true signal`);
|
||||
// Forward to callback
|
||||
streamCallback(
|
||||
processedChunk.text,
|
||||
processedChunk.done || chunk.done || false,
|
||||
chunk
|
||||
);
|
||||
});
|
||||
log.info(`Completed streaming final Anthropic response after tool execution`);
|
||||
}
|
||||
} else {
|
||||
// Empty response with done=true as fallback
|
||||
streamCallback('', true);
|
||||
log.info(`Sent empty final response with done=true signal`);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if (toolsEnabled) {
|
||||
log.info(`========== NO TOOL CALLS DETECTED ==========`);
|
||||
|
@ -91,6 +91,7 @@ export class AnthropicService extends BaseAIService {
|
||||
|
||||
// Convert OpenAI-style function tools to Anthropic format
|
||||
const anthropicTools = this.convertToolsToAnthropicFormat(opts.tools);
|
||||
|
||||
requestParams.tools = anthropicTools;
|
||||
|
||||
// Add tool_choice parameter if specified
|
||||
@ -111,6 +112,7 @@ export class AnthropicService extends BaseAIService {
|
||||
// Log request summary
|
||||
log.info(`Making ${providerOptions.stream ? 'streaming' : 'non-streaming'} request to Anthropic API with model: ${providerOptions.model}`);
|
||||
|
||||
|
||||
// Handle streaming responses
|
||||
if (providerOptions.stream) {
|
||||
return this.handleStreamingResponse(client, requestParams, opts, providerOptions);
|
||||
@ -118,6 +120,9 @@ export class AnthropicService extends BaseAIService {
|
||||
// Non-streaming request
|
||||
const response = await client.messages.create(requestParams);
|
||||
|
||||
// Log the complete response for debugging
|
||||
log.info(`[DEBUG] Complete Anthropic API response: ${JSON.stringify(response, null, 2)}`);
|
||||
|
||||
// Get the assistant's response text from the content blocks
|
||||
const textContent = response.content
|
||||
.filter((block: any) => block.type === 'text')
|
||||
@ -145,7 +150,7 @@ export class AnthropicService extends BaseAIService {
|
||||
type: 'function', // Convert back to function type for internal use
|
||||
function: {
|
||||
name: block.name,
|
||||
arguments: block.input || '{}'
|
||||
arguments: JSON.stringify(block.input || {})
|
||||
}
|
||||
};
|
||||
}
|
||||
@ -178,183 +183,216 @@ export class AnthropicService extends BaseAIService {
|
||||
/**
|
||||
* Handle streaming response from Anthropic
|
||||
*
|
||||
* Simplified implementation that leverages the Anthropic SDK's streaming capabilities
|
||||
* Uses the MessageStream class from the Anthropic SDK
|
||||
*/
|
||||
private async handleStreamingResponse(
|
||||
client: any,
|
||||
client: Anthropic,
|
||||
params: any,
|
||||
opts: ChatCompletionOptions,
|
||||
providerOptions: AnthropicOptions
|
||||
): Promise<ChatResponse> {
|
||||
// Create a list to collect tool calls during streaming
|
||||
const collectedToolCalls: any[] = [];
|
||||
// Create a ChatResponse object that follows our interface requirements
|
||||
const response: ChatResponse = {
|
||||
text: '',
|
||||
model: providerOptions.model,
|
||||
provider: this.getName(),
|
||||
|
||||
// Create a stream handler function that processes the SDK's stream
|
||||
const streamHandler = async (callback: (chunk: StreamChunk) => Promise<void> | void): Promise<string> => {
|
||||
let completeText = '';
|
||||
let currentToolCall: any = null;
|
||||
// Define the stream function that will be used by consumers
|
||||
stream: async (callback) => {
|
||||
// Accumulated response
|
||||
let fullText = '';
|
||||
let toolCalls: any[] = [];
|
||||
|
||||
try {
|
||||
// Request a streaming response from Anthropic
|
||||
log.info(`Starting Anthropic streaming request to: ${providerOptions.baseUrl}/v1/messages`);
|
||||
log.info(`Creating Anthropic streaming request for model: ${providerOptions.model}`);
|
||||
|
||||
const streamResponse = await client.messages.create({
|
||||
// Request options to pass to the Anthropic SDK
|
||||
const requestOptions = {};
|
||||
|
||||
// Create a message stream using the SDK's stream method
|
||||
// This properly types the streaming response
|
||||
const stream = client.messages.stream({
|
||||
...params,
|
||||
stream: true
|
||||
});
|
||||
}, requestOptions);
|
||||
|
||||
// Process each chunk in the stream
|
||||
for await (const chunk of streamResponse) {
|
||||
if (chunk.type === 'content_block_delta' && chunk.delta?.type === 'text_delta') {
|
||||
const text = chunk.delta.text || '';
|
||||
completeText += text;
|
||||
// Track active tool calls by ID
|
||||
const activeToolCalls = new Map<string, any>();
|
||||
|
||||
// Send the chunk to the caller
|
||||
await callback({
|
||||
text,
|
||||
// Listen for text deltas
|
||||
stream.on('text', (textDelta) => {
|
||||
fullText += textDelta;
|
||||
|
||||
// Pass the text chunk to the caller
|
||||
callback({
|
||||
text: textDelta,
|
||||
done: false,
|
||||
raw: chunk // Include the raw chunk for advanced processing
|
||||
raw: { type: 'text', text: textDelta }
|
||||
});
|
||||
}
|
||||
// Process tool use events - different format in Anthropic API
|
||||
else if (chunk.type === 'content_block_start' && chunk.content_block?.type === 'tool_use') {
|
||||
// Start collecting a new tool call - convert to our internal format (OpenAI-like)
|
||||
currentToolCall = {
|
||||
id: chunk.content_block.id,
|
||||
type: 'function', // Convert to function type for internal consistency
|
||||
});
|
||||
|
||||
// Listen for content blocks starting - used for tool calls
|
||||
stream.on('contentBlock', async (block) => {
|
||||
if (block.type === 'tool_use') {
|
||||
// Create a structured tool call in our expected format
|
||||
const toolCall = {
|
||||
id: block.id,
|
||||
type: 'function',
|
||||
function: {
|
||||
name: chunk.content_block.name,
|
||||
arguments: ''
|
||||
name: block.name,
|
||||
arguments: JSON.stringify(block.input || {})
|
||||
}
|
||||
};
|
||||
|
||||
// Log the tool use event
|
||||
log.info(`Streaming: Tool use started: ${chunk.content_block.name}`);
|
||||
// Store in our active tools map
|
||||
activeToolCalls.set(block.id, toolCall);
|
||||
|
||||
// Send the tool call event
|
||||
// Notify about tool execution start
|
||||
await callback({
|
||||
text: '',
|
||||
done: false,
|
||||
toolExecution: {
|
||||
type: 'start',
|
||||
tool: currentToolCall
|
||||
tool: toolCall
|
||||
},
|
||||
raw: chunk
|
||||
raw: block
|
||||
});
|
||||
}
|
||||
// Process tool input deltas
|
||||
else if (chunk.type === 'content_block_delta' && chunk.delta?.type === 'tool_use_delta' && currentToolCall) {
|
||||
// Accumulate tool input
|
||||
if (chunk.delta.input) {
|
||||
currentToolCall.function.arguments += chunk.delta.input;
|
||||
});
|
||||
|
||||
// Send the tool input update
|
||||
// Listen for input JSON updates (tool arguments)
|
||||
stream.on('inputJson', async (jsonFragment) => {
|
||||
// Find the most recent tool call
|
||||
if (activeToolCalls.size > 0) {
|
||||
const lastToolId = Array.from(activeToolCalls.keys()).pop();
|
||||
if (lastToolId) {
|
||||
const toolCall = activeToolCalls.get(lastToolId);
|
||||
|
||||
// Update the arguments
|
||||
if (toolCall.function.arguments === '{}') {
|
||||
toolCall.function.arguments = jsonFragment;
|
||||
} else {
|
||||
toolCall.function.arguments += jsonFragment;
|
||||
}
|
||||
|
||||
// Notify about the update
|
||||
await callback({
|
||||
text: '',
|
||||
done: false,
|
||||
toolExecution: {
|
||||
type: 'update',
|
||||
tool: currentToolCall
|
||||
tool: toolCall
|
||||
},
|
||||
raw: chunk
|
||||
raw: { type: 'json_fragment', data: jsonFragment }
|
||||
});
|
||||
}
|
||||
}
|
||||
// Process tool use completion
|
||||
else if (chunk.type === 'content_block_stop' && currentToolCall) {
|
||||
// Add the completed tool call to our list
|
||||
collectedToolCalls.push({ ...currentToolCall });
|
||||
});
|
||||
|
||||
// Log the tool completion
|
||||
log.info(`Streaming: Tool use completed: ${currentToolCall.function.name}`);
|
||||
// Listen for message completion
|
||||
stream.on('message', async (message) => {
|
||||
// Process any tool calls from the message
|
||||
if (message.content) {
|
||||
// Find tool use blocks in the content
|
||||
const toolUseBlocks = message.content.filter(
|
||||
block => block.type === 'tool_use'
|
||||
);
|
||||
|
||||
// Send the tool completion event
|
||||
// Convert tool use blocks to our expected format
|
||||
if (toolUseBlocks.length > 0) {
|
||||
toolCalls = toolUseBlocks.map(block => {
|
||||
if (block.type === 'tool_use') {
|
||||
return {
|
||||
id: block.id,
|
||||
type: 'function',
|
||||
function: {
|
||||
name: block.name,
|
||||
arguments: JSON.stringify(block.input || {})
|
||||
}
|
||||
};
|
||||
}
|
||||
return null;
|
||||
}).filter(Boolean);
|
||||
|
||||
// For any active tool calls, mark them as complete
|
||||
for (const [toolId, toolCall] of activeToolCalls.entries()) {
|
||||
await callback({
|
||||
text: '',
|
||||
done: false,
|
||||
toolExecution: {
|
||||
type: 'complete',
|
||||
tool: currentToolCall
|
||||
tool: toolCall
|
||||
},
|
||||
tool_calls: collectedToolCalls.length > 0 ? collectedToolCalls : undefined,
|
||||
raw: chunk
|
||||
raw: { type: 'tool_complete', toolId }
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Extract text from text blocks
|
||||
const textBlocks = message.content.filter(
|
||||
block => block.type === 'text'
|
||||
) as Array<{ type: 'text', text: string }>;
|
||||
|
||||
// Update fullText if needed
|
||||
if (textBlocks.length > 0) {
|
||||
const allText = textBlocks.map(block => block.text).join('');
|
||||
// Only update if different from what we've accumulated
|
||||
if (allText !== fullText) {
|
||||
fullText = allText;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Reset current tool call
|
||||
currentToolCall = null;
|
||||
}
|
||||
}
|
||||
|
||||
// Signal completion with all tool calls
|
||||
log.info(`Streaming complete, collected ${collectedToolCalls.length} tool calls`);
|
||||
if (collectedToolCalls.length > 0) {
|
||||
log.info(`Tool calls detected in final response: ${JSON.stringify(collectedToolCalls)}`);
|
||||
// Listen for the final message
|
||||
stream.on('finalMessage', async (message) => {
|
||||
// Set the response text and tool calls
|
||||
response.text = fullText;
|
||||
if (toolCalls.length > 0) {
|
||||
response.tool_calls = toolCalls;
|
||||
}
|
||||
|
||||
// Send final completion with full text and all tool calls
|
||||
await callback({
|
||||
text: '',
|
||||
text: fullText,
|
||||
done: true,
|
||||
tool_calls: collectedToolCalls.length > 0 ? collectedToolCalls : undefined
|
||||
tool_calls: toolCalls.length > 0 ? toolCalls : undefined,
|
||||
raw: message
|
||||
});
|
||||
});
|
||||
|
||||
return completeText;
|
||||
// Listen for errors
|
||||
stream.on('error', (error) => {
|
||||
log.error(`Anthropic streaming error: ${error}`);
|
||||
throw error;
|
||||
});
|
||||
|
||||
// Wait for the stream to complete
|
||||
await stream.done();
|
||||
|
||||
return fullText;
|
||||
} catch (error) {
|
||||
log.error(`Error in Anthropic streaming: ${error}`);
|
||||
log.error(`Anthropic streaming error: ${error}`);
|
||||
|
||||
// More detailed error logging
|
||||
// Enhanced error diagnostic for Anthropic SDK errors
|
||||
if (error instanceof Error) {
|
||||
log.error(`[DEBUG] Error name: ${error.name}`);
|
||||
log.error(`[DEBUG] Error message: ${error.message}`);
|
||||
log.error(`[DEBUG] Error stack: ${error.stack}`);
|
||||
log.error(`Error name: ${error.name}`);
|
||||
log.error(`Error message: ${error.message}`);
|
||||
|
||||
// If there's response data in the error, log that too
|
||||
const anyError = error as any;
|
||||
if (anyError.response) {
|
||||
log.error(`Error response status: ${anyError.response.status}`);
|
||||
log.error(`Error response data: ${JSON.stringify(anyError.response.data)}`);
|
||||
// Type cast to access potential Anthropic API error properties
|
||||
const apiError = error as any;
|
||||
if (apiError.status) {
|
||||
log.error(`API status: ${apiError.status}`);
|
||||
}
|
||||
if (apiError.error) {
|
||||
log.error(`API error details: ${JSON.stringify(apiError.error)}`);
|
||||
}
|
||||
}
|
||||
|
||||
throw error;
|
||||
}
|
||||
};
|
||||
|
||||
// Create a custom stream function that captures tool calls
|
||||
const captureToolCallsStream = async (callback: (chunk: StreamChunk) => Promise<void> | void): Promise<string> => {
|
||||
// Use the original stream handler but wrap it to capture tool calls
|
||||
return streamHandler(async (chunk: StreamChunk) => {
|
||||
// If the chunk has tool calls, update our collection
|
||||
if (chunk.tool_calls && chunk.tool_calls.length > 0) {
|
||||
// Update our collection with new tool calls
|
||||
chunk.tool_calls.forEach(toolCall => {
|
||||
// Only add if it's not already in the collection
|
||||
if (!collectedToolCalls.some(tc => tc.id === toolCall.id)) {
|
||||
collectedToolCalls.push(toolCall);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Call the original callback
|
||||
return callback(chunk);
|
||||
});
|
||||
};
|
||||
|
||||
// Return a response object with the stream handler and tool_calls property
|
||||
const response: ChatResponse = {
|
||||
text: '', // Initial text is empty, will be populated during streaming
|
||||
model: providerOptions.model,
|
||||
provider: this.getName(),
|
||||
stream: captureToolCallsStream
|
||||
};
|
||||
|
||||
// Define a getter for tool_calls that will return the collected tool calls
|
||||
Object.defineProperty(response, 'tool_calls', {
|
||||
get: function() {
|
||||
return collectedToolCalls.length > 0 ? collectedToolCalls : undefined;
|
||||
},
|
||||
enumerable: true
|
||||
});
|
||||
|
||||
return response;
|
||||
}
|
||||
|
||||
@ -369,20 +407,89 @@ export class AnthropicService extends BaseAIService {
|
||||
if (msg.role === 'system') {
|
||||
// System messages are handled separately in the API call
|
||||
continue;
|
||||
} else if (msg.role === 'user' || msg.role === 'assistant') {
|
||||
// Convert to Anthropic format
|
||||
} else if (msg.role === 'user') {
|
||||
// Convert user message to Anthropic format
|
||||
anthropicMessages.push({
|
||||
role: msg.role,
|
||||
content: msg.content
|
||||
});
|
||||
} else if (msg.role === 'assistant') {
|
||||
// Assistant messages need special handling for tool_calls
|
||||
if (msg.tool_calls && msg.tool_calls.length > 0) {
|
||||
// Create content blocks array for tool calls
|
||||
const content = [];
|
||||
|
||||
// Add text content if present
|
||||
if (msg.content) {
|
||||
content.push({
|
||||
type: 'text',
|
||||
text: msg.content
|
||||
});
|
||||
}
|
||||
|
||||
// Add tool_use blocks for each tool call
|
||||
for (const toolCall of msg.tool_calls) {
|
||||
if (toolCall.function && toolCall.function.name) {
|
||||
try {
|
||||
// Parse arguments if they're a string
|
||||
let parsedArgs = toolCall.function.arguments;
|
||||
if (typeof parsedArgs === 'string') {
|
||||
try {
|
||||
parsedArgs = JSON.parse(parsedArgs);
|
||||
} catch (e) {
|
||||
// Keep as string if parsing fails
|
||||
log.info(`Could not parse tool arguments as JSON: ${e}`);
|
||||
}
|
||||
}
|
||||
|
||||
// Add tool_use block
|
||||
content.push({
|
||||
type: 'tool_use',
|
||||
id: toolCall.id || `tool_${Date.now()}`,
|
||||
name: toolCall.function.name,
|
||||
input: parsedArgs
|
||||
});
|
||||
} catch (e) {
|
||||
log.error(`Error processing tool call: ${e}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Add the assistant message with content blocks
|
||||
anthropicMessages.push({
|
||||
role: 'assistant',
|
||||
content
|
||||
});
|
||||
} else {
|
||||
// Regular assistant message without tool calls
|
||||
anthropicMessages.push({
|
||||
role: 'assistant',
|
||||
content: msg.content
|
||||
});
|
||||
}
|
||||
} else if (msg.role === 'tool') {
|
||||
// Tool response messages - typically follow a tool call from the assistant
|
||||
// Tool response messages need to be properly formatted as tool_result
|
||||
if (msg.tool_call_id) {
|
||||
// Format as a tool_result message
|
||||
anthropicMessages.push({
|
||||
role: 'user',
|
||||
content: [
|
||||
{
|
||||
type: 'tool_result',
|
||||
tool_use_id: msg.tool_call_id,
|
||||
content: msg.content
|
||||
}
|
||||
]
|
||||
});
|
||||
} else {
|
||||
// Fallback if no tool_call_id is present
|
||||
anthropicMessages.push({
|
||||
role: 'user',
|
||||
content: msg.content
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return anthropicMessages;
|
||||
}
|
||||
@ -397,6 +504,8 @@ export class AnthropicService extends BaseAIService {
|
||||
return [];
|
||||
}
|
||||
|
||||
log.info(`[TOOL DEBUG] Converting ${tools.length} tools to Anthropic format`);
|
||||
|
||||
// Filter out invalid tools
|
||||
const validTools = tools.filter(tool => {
|
||||
if (!tool || typeof tool !== 'object') {
|
||||
@ -420,9 +529,28 @@ export class AnthropicService extends BaseAIService {
|
||||
}
|
||||
|
||||
// Convert tools to Anthropic format
|
||||
return validTools.map((tool: any) => {
|
||||
const convertedTools = validTools.map((tool: any) => {
|
||||
// Convert from OpenAI format to Anthropic format
|
||||
if (tool.type === 'function' && tool.function) {
|
||||
log.info(`[TOOL DEBUG] Converting function tool: ${tool.function.name}`);
|
||||
|
||||
// Check the parameters structure
|
||||
if (tool.function.parameters) {
|
||||
log.info(`[TOOL DEBUG] Parameters for ${tool.function.name}:`);
|
||||
log.info(`[TOOL DEBUG] - Type: ${tool.function.parameters.type}`);
|
||||
log.info(`[TOOL DEBUG] - Properties: ${JSON.stringify(tool.function.parameters.properties || {})}`);
|
||||
log.info(`[TOOL DEBUG] - Required: ${JSON.stringify(tool.function.parameters.required || [])}`);
|
||||
|
||||
// Check if the required array is present and properly populated
|
||||
if (!tool.function.parameters.required || !Array.isArray(tool.function.parameters.required)) {
|
||||
log.error(`[TOOL DEBUG] WARNING: Tool ${tool.function.name} missing required array in parameters`);
|
||||
} else if (tool.function.parameters.required.length === 0) {
|
||||
log.error(`[TOOL DEBUG] WARNING: Tool ${tool.function.name} has empty required array - Anthropic may send empty inputs`);
|
||||
}
|
||||
} else {
|
||||
log.error(`[TOOL DEBUG] WARNING: Tool ${tool.function.name} has no parameters defined`);
|
||||
}
|
||||
|
||||
return {
|
||||
name: tool.function.name,
|
||||
description: tool.function.description || '',
|
||||
@ -432,6 +560,7 @@ export class AnthropicService extends BaseAIService {
|
||||
|
||||
// Handle already converted Anthropic format (from our temporary fix)
|
||||
if (tool.type === 'custom' && tool.custom) {
|
||||
log.info(`[TOOL DEBUG] Converting custom tool: ${tool.custom.name}`);
|
||||
return {
|
||||
name: tool.custom.name,
|
||||
description: tool.custom.description || '',
|
||||
@ -441,6 +570,7 @@ export class AnthropicService extends BaseAIService {
|
||||
|
||||
// If the tool is already in the correct Anthropic format
|
||||
if (tool.name && (tool.input_schema || tool.parameters)) {
|
||||
log.info(`[TOOL DEBUG] Tool already in Anthropic format: ${tool.name}`);
|
||||
return {
|
||||
name: tool.name,
|
||||
description: tool.description || '',
|
||||
@ -451,5 +581,7 @@ export class AnthropicService extends BaseAIService {
|
||||
log.error(`Unhandled tool format encountered`);
|
||||
return null;
|
||||
}).filter(Boolean); // Filter out any null values
|
||||
|
||||
return convertedTools;
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user