Notes/apps/server/src/services/llm/chat/rest_chat_service.ts

415 lines
16 KiB
TypeScript
Raw Normal View History

2025-04-16 19:35:09 +00:00
/**
* Simplified service to handle chat API interactions
* Works directly with ChatStorageService - no complex session management
2025-04-16 19:35:09 +00:00
*/
import log from "../../log.js";
import type { Request, Response } from "express";
import type { Message, ChatCompletionOptions } from "../ai_interface.js";
import aiServiceManager from "../ai_service_manager.js";
2025-04-16 19:35:09 +00:00
import { ChatPipeline } from "../pipeline/chat_pipeline.js";
import type { ChatPipelineInput } from "../pipeline/interfaces.js";
import options from "../../options.js";
2025-04-16 19:36:10 +00:00
import { ToolHandler } from "./handlers/tool_handler.js";
2025-04-16 21:20:34 +00:00
import type { LLMStreamMessage } from "../interfaces/chat_ws_messages.js";
import chatStorageService from '../chat_storage_service.js';
import {
isAIEnabled,
getSelectedModelConfig,
} from '../config/configuration_helpers.js';
2025-04-16 19:35:09 +00:00
/**
* Simplified service to handle chat API interactions
2025-04-16 19:35:09 +00:00
*/
class RestChatService {
/**
* Check if the database is initialized
*/
isDatabaseInitialized(): boolean {
try {
options.getOption('initialized');
return true;
} catch (error) {
return false;
}
}
/**
* Handle a message sent to an LLM and get a response
* Simplified to work directly with chat storage
2025-04-16 19:35:09 +00:00
*/
async handleSendMessage(req: Request, res: Response) {
log.info("=== Starting simplified handleSendMessage ===");
2025-04-16 19:35:09 +00:00
try {
// Extract parameters
let content, useAdvancedContext, showThinking, chatNoteId;
2025-04-16 19:35:09 +00:00
if (req.method === 'POST') {
const requestBody = req.body || {};
content = requestBody.content;
useAdvancedContext = requestBody.useAdvancedContext || false;
showThinking = requestBody.showThinking || false;
log.info(`LLM POST message: chatNoteId=${req.params.chatNoteId}, contentLength=${content ? content.length : 0}`);
2025-04-16 19:35:09 +00:00
} else if (req.method === 'GET') {
useAdvancedContext = req.query.useAdvancedContext === 'true' || (req.body && req.body.useAdvancedContext === true);
showThinking = req.query.showThinking === 'true' || (req.body && req.body.showThinking === true);
content = req.body && req.body.content ? req.body.content : '';
log.info(`LLM GET stream: chatNoteId=${req.params.chatNoteId}`);
2025-04-16 19:35:09 +00:00
}
chatNoteId = req.params.chatNoteId;
2025-04-16 19:35:09 +00:00
// Validate inputs
2025-04-16 19:35:09 +00:00
if (req.method === 'GET' && req.query.stream !== 'true') {
throw new Error('Stream parameter must be set to true for GET/streaming requests');
}
if (req.method === 'POST' && (!content || typeof content !== 'string' || content.trim().length === 0)) {
throw new Error('Content cannot be empty');
}
// Check if AI is enabled
2025-04-16 19:35:09 +00:00
const aiEnabled = await options.getOptionBool('aiEnabled');
if (!aiEnabled) {
return { error: "AI features are disabled. Please enable them in the settings." };
2025-04-16 19:35:09 +00:00
}
// Check database initialization first
if (!this.isDatabaseInitialized()) {
throw new Error("Database is not initialized");
2025-04-16 19:35:09 +00:00
}
// Get or create AI service - will throw meaningful error if not possible
await aiServiceManager.getOrCreateAnyService();
// Load or create chat directly from storage
let chat = await chatStorageService.getChat(chatNoteId);
2025-04-16 19:35:09 +00:00
if (!chat && req.method === 'GET') {
throw new Error('Chat Note not found, cannot create session for streaming');
2025-04-16 19:35:09 +00:00
}
if (!chat && req.method === 'POST') {
log.info(`Creating new chat note with ID: ${chatNoteId}`);
chat = await chatStorageService.createChat('New Chat');
// Update the chat ID to match the requested ID if possible
// In practice, we'll use the generated ID
chatNoteId = chat.id;
}
2025-04-16 19:35:09 +00:00
if (!chat) {
throw new Error('Failed to create or retrieve chat');
}
2025-04-16 19:35:09 +00:00
// For POST requests, add the user message to the chat immediately
// This ensures user messages are always saved
if (req.method === 'POST' && content) {
chat.messages.push({
role: 'user',
content
});
// Save immediately to ensure user message is saved
await chatStorageService.updateChat(chat.id, chat.messages, chat.title);
log.info(`Added and saved user message: "${content.substring(0, 50)}${content.length > 50 ? '...' : ''}"`);
2025-04-16 19:35:09 +00:00
}
// Initialize tools
await ToolHandler.ensureToolsInitialized();
// Create and use the chat pipeline
2025-04-16 19:35:09 +00:00
const pipeline = new ChatPipeline({
enableStreaming: req.method === 'GET',
enableMetrics: true,
maxToolCallIterations: 5
});
// Get user's preferred model
const preferredModel = await this.getPreferredModel();
2025-04-16 19:35:09 +00:00
const pipelineOptions = {
useAdvancedContext: useAdvancedContext === true,
systemPrompt: chat.messages.find(m => m.role === 'system')?.content,
model: preferredModel,
2025-04-16 19:35:09 +00:00
stream: !!(req.method === 'GET' || req.query.format === 'stream' || req.query.stream === 'true'),
chatNoteId: chatNoteId
2025-04-16 19:35:09 +00:00
};
log.info(`Pipeline options: ${JSON.stringify({ useAdvancedContext: pipelineOptions.useAdvancedContext, stream: pipelineOptions.stream })}`);
2025-04-16 19:35:09 +00:00
// Import WebSocket service for streaming
2025-04-16 19:35:09 +00:00
const wsService = await import('../../ws.js');
const accumulatedContentRef = { value: '' };
2025-04-16 19:35:09 +00:00
const pipelineInput: ChatPipelineInput = {
messages: chat.messages.map(msg => ({
2025-04-16 19:35:09 +00:00
role: msg.role as 'user' | 'assistant' | 'system',
content: msg.content
})),
query: content || '',
noteId: undefined, // TODO: Add context note support if needed
2025-04-16 19:35:09 +00:00
showThinking: showThinking,
options: pipelineOptions,
streamCallback: req.method === 'GET' ? (data, done, rawChunk) => {
this.handleStreamCallback(data, done, rawChunk, wsService.default, chatNoteId, res, accumulatedContentRef, chat);
2025-04-16 19:35:09 +00:00
} : undefined
};
// Execute the pipeline
const response = await pipeline.execute(pipelineInput);
if (req.method === 'POST') {
// Add assistant response to chat
chat.messages.push({
2025-04-16 19:35:09 +00:00
role: 'assistant',
content: response.text || ''
2025-04-16 19:35:09 +00:00
});
// Save the updated chat back to storage (single source of truth)
await chatStorageService.updateChat(chat.id, chat.messages, chat.title);
log.info(`Saved non-streaming assistant response: ${(response.text || '').length} characters`);
2025-04-16 19:35:09 +00:00
// Extract sources if available
const sources = (response as any).sources || [];
2025-04-16 19:35:09 +00:00
return {
content: response.text || '',
sources: sources,
metadata: {
model: response.model,
provider: response.provider,
lastUpdated: new Date().toISOString()
2025-04-16 19:35:09 +00:00
}
};
} else {
// For streaming, response is already sent via WebSocket/SSE
// The accumulatedContentRef will have been saved in handleStreamCallback when done=true
2025-04-16 19:35:09 +00:00
return null;
}
} catch (error: any) {
log.error(`Error processing message: ${error}`);
return { error: `Error processing your request: ${error.message}` };
2025-04-16 19:35:09 +00:00
}
}
/**
* Simplified stream callback handler
2025-04-16 19:35:09 +00:00
*/
private async handleStreamCallback(
2025-04-16 19:35:09 +00:00
data: string | null,
done: boolean,
rawChunk: any,
wsService: any,
chatNoteId: string,
res: Response,
accumulatedContentRef: { value: string },
chat: { id: string; messages: Message[]; title: string }
2025-04-16 19:35:09 +00:00
) {
const message: LLMStreamMessage = {
type: 'llm-stream',
chatNoteId: chatNoteId,
done: done
2025-04-16 19:35:09 +00:00
};
if (data) {
message.content = data;
// Simple accumulation - just append the new data
accumulatedContentRef.value += data;
2025-04-16 19:35:09 +00:00
}
// Only include thinking if explicitly present in rawChunk
2025-04-16 19:35:09 +00:00
if (rawChunk && 'thinking' in rawChunk && rawChunk.thinking) {
message.thinking = rawChunk.thinking as string;
}
// Only include tool execution if explicitly present in rawChunk
2025-04-16 19:35:09 +00:00
if (rawChunk && 'toolExecution' in rawChunk && rawChunk.toolExecution) {
const toolExec = rawChunk.toolExecution;
message.toolExecution = {
tool: typeof toolExec.tool === 'string' ? toolExec.tool : toolExec.tool?.name,
2025-04-16 19:35:09 +00:00
result: toolExec.result,
args: 'arguments' in toolExec ?
(typeof toolExec.arguments === 'object' ? toolExec.arguments as Record<string, unknown> : {}) : {},
2025-04-16 19:35:09 +00:00
action: 'action' in toolExec ? toolExec.action as string : undefined,
toolCallId: 'toolCallId' in toolExec ? toolExec.toolCallId as string : undefined,
error: 'error' in toolExec ? toolExec.error as string : undefined
};
}
// Send WebSocket message
2025-04-16 19:35:09 +00:00
wsService.sendMessageToAllClients(message);
// When streaming is complete, save the accumulated content to the chat note
2025-04-16 19:35:09 +00:00
if (done) {
try {
// Only save if we have accumulated content
if (accumulatedContentRef.value) {
// Add assistant response to chat
chat.messages.push({
role: 'assistant',
content: accumulatedContentRef.value
});
// Save the updated chat back to storage
await chatStorageService.updateChat(chat.id, chat.messages, chat.title);
log.info(`Saved streaming assistant response: ${accumulatedContentRef.value.length} characters`);
}
} catch (error) {
// Log error but don't break the response flow
log.error(`Error saving streaming response: ${error}`);
}
// Note: For WebSocket-only streaming, we don't end the HTTP response here
// since it was already handled by the calling endpoint
2025-04-16 19:35:09 +00:00
}
}
/**
* Create a new chat
2025-04-16 19:35:09 +00:00
*/
async createSession(req: Request, res: Response) {
try {
const options: any = req.body || {};
const title = options.title || 'Chat Session';
let noteId = options.noteId || options.chatNoteId;
// Check if currentNoteId is already an AI Chat note
2025-06-02 00:56:19 +00:00
if (!noteId && options.currentNoteId) {
const becca = (await import('../../../becca/becca.js')).default;
const note = becca.notes[options.currentNoteId];
if (note) {
try {
const content = note.getContent();
if (content) {
const contentStr = typeof content === 'string' ? content : content.toString();
const parsedContent = JSON.parse(contentStr);
if (parsedContent.messages && Array.isArray(parsedContent.messages)) {
2025-06-02 00:56:19 +00:00
noteId = options.currentNoteId;
log.info(`Using existing AI Chat note ${noteId} as session`);
}
}
} catch (_) {
2025-06-02 00:56:19 +00:00
// Not JSON content, so not an AI Chat note
}
}
}
// Create new chat if needed
2025-06-02 00:56:19 +00:00
if (!noteId) {
const newChat = await chatStorageService.createChat(title);
2025-06-02 00:56:19 +00:00
noteId = newChat.id;
log.info(`Created new Chat Note with ID: ${noteId}`);
} else {
log.info(`Using existing Chat Note with ID: ${noteId}`);
}
2025-04-16 19:35:09 +00:00
return {
id: noteId,
title: title,
createdAt: new Date(),
noteId: noteId
2025-04-16 19:35:09 +00:00
};
} catch (error: any) {
log.error(`Error creating chat session: ${error.message || 'Unknown error'}`);
throw new Error(`Failed to create chat session: ${error.message || 'Unknown error'}`);
2025-04-16 19:35:09 +00:00
}
}
/**
* Get a chat by ID
2025-04-16 19:35:09 +00:00
*/
async getSession(req: Request, res: Response): Promise<any> {
2025-04-16 19:35:09 +00:00
try {
const { sessionId } = req.params;
const chat = await chatStorageService.getChat(sessionId);
if (!chat) {
2025-04-16 19:35:09 +00:00
res.status(404).json({
error: true,
message: `Session with ID ${sessionId} not found`,
code: 'session_not_found',
sessionId
});
return null;
2025-04-16 19:35:09 +00:00
}
return {
id: chat.id,
title: chat.title,
createdAt: chat.createdAt,
lastActive: chat.updatedAt,
messages: chat.messages,
metadata: chat.metadata || {}
2025-04-16 19:35:09 +00:00
};
} catch (error: any) {
log.error(`Error getting chat session: ${error.message || 'Unknown error'}`);
2025-04-16 19:35:09 +00:00
throw new Error(`Failed to get session: ${error.message || 'Unknown error'}`);
}
}
/**
* Delete a chat
2025-04-16 19:35:09 +00:00
*/
async deleteSession(req: Request, res: Response) {
try {
const { sessionId } = req.params;
const success = await chatStorageService.deleteChat(sessionId);
2025-04-16 19:35:09 +00:00
if (!success) {
throw new Error(`Session with ID ${sessionId} not found`);
}
return {
success: true,
message: `Session ${sessionId} deleted successfully`
};
} catch (error: any) {
log.error(`Error deleting chat session: ${error.message || 'Unknown error'}`);
2025-04-16 19:35:09 +00:00
throw new Error(`Failed to delete session: ${error.message || 'Unknown error'}`);
}
}
/**
* Get all chats
*/
async getAllSessions() {
try {
const chats = await chatStorageService.getAllChats();
return {
sessions: chats.map(chat => ({
id: chat.id,
title: chat.title,
createdAt: chat.createdAt,
lastActive: chat.updatedAt,
messageCount: chat.messages.length
}))
};
} catch (error: any) {
log.error(`Error listing sessions: ${error}`);
throw new Error(`Failed to list sessions: ${error}`);
}
}
2025-06-02 02:38:21 +00:00
/**
* Get the user's preferred model
2025-06-02 02:38:21 +00:00
*/
async getPreferredModel(): Promise<string | undefined> {
2025-06-02 02:38:21 +00:00
try {
const validConfig = await getSelectedModelConfig();
if (!validConfig) {
log.error('No valid AI model configuration found');
return undefined;
2025-06-02 02:38:21 +00:00
}
return validConfig.model;
2025-06-02 02:38:21 +00:00
} catch (error) {
log.error(`Error getting preferred model: ${error}`);
return undefined;
2025-06-02 02:38:21 +00:00
}
}
2025-04-16 19:35:09 +00:00
}
// Create singleton instance
const restChatService = new RestChatService();
export default restChatService;