diff --git a/apps/server/src/routes/api/llm.ts b/apps/server/src/routes/api/llm.ts index bd56830bb..9aaf2fe81 100644 --- a/apps/server/src/routes/api/llm.ts +++ b/apps/server/src/routes/api/llm.ts @@ -809,11 +809,19 @@ async function indexNote(req: Request, res: Response) { async function streamMessage(req: Request, res: Response) { log.info("=== Starting streamMessage ==="); try { + // Set up the response headers for streaming first, before any data is sent + res.setHeader('Content-Type', 'text/event-stream'); + res.setHeader('Cache-Control', 'no-cache'); + res.setHeader('Connection', 'keep-alive'); + const chatNoteId = req.params.chatNoteId; const { content, useAdvancedContext, showThinking, mentions } = req.body; if (!content || typeof content !== 'string' || content.trim().length === 0) { - throw new Error('Content cannot be empty'); + // Early return with error + res.write(`data: ${JSON.stringify({ error: 'Content cannot be empty', done: true })}\n\n`); + res.end(); + return; } // Get or create chat directly from storage (simplified approach) @@ -823,6 +831,14 @@ async function streamMessage(req: Request, res: Response) { chat = await chatStorageService.createChat('New Chat'); log.info(`Created new chat with ID: ${chat.id} for stream request`); } + + // Add the user message to the chat immediately + chat.messages.push({ + role: 'user', + content + }); + // Save the chat to ensure the user message is recorded + await chatStorageService.updateChat(chat.id, chat.messages, chat.title); // Process mentions if provided let enhancedContent = content; @@ -831,7 +847,6 @@ async function streamMessage(req: Request, res: Response) { // Import note service to get note content const becca = (await import('../../becca/becca.js')).default; - const mentionContexts: string[] = []; for (const mention of mentions) { @@ -870,7 +885,10 @@ async function streamMessage(req: Request, res: Response) { // Process the streaming request directly try { - const result = await restChatService.handleSendMessage({ + // Call the streaming handler - it will handle the response streaming + // IMPORTANT: We do not await this because we don't want to try sending a response + // after the streaming has completed - the stream handler takes care of ending the response + restChatService.handleSendMessage({ ...req, method: 'GET', // Indicate streaming mode query: { @@ -884,13 +902,9 @@ async function streamMessage(req: Request, res: Response) { }, params: { chatNoteId } } as unknown as Request, res); - - // Since we're streaming, the result will be null - return { - success: true, - message: 'Streaming started', - chatNoteId: chatNoteId - }; + + // Don't return or send any additional response here + // handleSendMessage handles the full streaming response cycle and will end the response } catch (error) { log.error(`Error during streaming: ${error}`); @@ -901,12 +915,21 @@ async function streamMessage(req: Request, res: Response) { error: `Error processing message: ${error}`, done: true }); - - throw error; + + // Only write to the response if it hasn't been ended yet + if (!res.writableEnded) { + res.write(`data: ${JSON.stringify({ error: `Error processing message: ${error}`, done: true })}\n\n`); + res.end(); + } } } catch (error: any) { log.error(`Error starting message stream: ${error.message}`); - throw error; + + // Only write to the response if it hasn't been ended yet + if (!res.writableEnded) { + res.write(`data: ${JSON.stringify({ error: `Error starting message stream: ${error.message}`, done: true })}\n\n`); + res.end(); + } } } diff --git a/apps/server/src/services/llm/chat/rest_chat_service.ts b/apps/server/src/services/llm/chat/rest_chat_service.ts index 2f67422f3..1ad3d7a22 100644 --- a/apps/server/src/services/llm/chat/rest_chat_service.ts +++ b/apps/server/src/services/llm/chat/rest_chat_service.ts @@ -116,13 +116,16 @@ class RestChatService { throw new Error('Failed to create or retrieve chat'); } - // For POST requests, add the user message + // For POST requests, add the user message to the chat immediately + // This ensures user messages are always saved if (req.method === 'POST' && content) { chat.messages.push({ role: 'user', content }); - log.info(`Processing LLM message: "${content.substring(0, 50)}${content.length > 50 ? '...' : ''}"`); + // Save immediately to ensure user message is saved + await chatStorageService.updateChat(chat.id, chat.messages, chat.title); + log.info(`Added and saved user message: "${content.substring(0, 50)}${content.length > 50 ? '...' : ''}"`); } // Initialize tools @@ -162,7 +165,7 @@ class RestChatService { showThinking: showThinking, options: pipelineOptions, streamCallback: req.method === 'GET' ? (data, done, rawChunk) => { - this.handleStreamCallback(data, done, rawChunk, wsService.default, chatNoteId, res, accumulatedContentRef); + this.handleStreamCallback(data, done, rawChunk, wsService.default, chatNoteId, res, accumulatedContentRef, chat); } : undefined }; @@ -178,6 +181,7 @@ class RestChatService { // Save the updated chat back to storage (single source of truth) await chatStorageService.updateChat(chat.id, chat.messages, chat.title); + log.info(`Saved non-streaming assistant response: ${(response.text || '').length} characters`); // Extract sources if available const sources = (response as any).sources || []; @@ -193,16 +197,7 @@ class RestChatService { }; } else { // For streaming, response is already sent via WebSocket/SSE - // Save the accumulated content - prefer accumulated content over response.text - const finalContent = accumulatedContentRef.value || response.text || ''; - if (finalContent) { - chat.messages.push({ - role: 'assistant', - content: finalContent - }); - await chatStorageService.updateChat(chat.id, chat.messages, chat.title); - log.info(`Saved accumulated streaming content: ${finalContent.length} characters`); - } + // The accumulatedContentRef will have been saved in handleStreamCallback when done=true return null; } } catch (error: any) { @@ -214,14 +209,15 @@ class RestChatService { /** * Simplified stream callback handler */ - private handleStreamCallback( + private async handleStreamCallback( data: string | null, done: boolean, rawChunk: any, wsService: any, chatNoteId: string, res: Response, - accumulatedContentRef: { value: string } + accumulatedContentRef: { value: string }, + chat: { id: string; messages: Message[]; title: string } ) { const message: LLMStreamMessage = { type: 'llm-stream', @@ -264,7 +260,28 @@ class RestChatService { } res.write(`data: ${JSON.stringify(responseData)}\n\n`); + + // When streaming is complete, save the accumulated content to the chat note if (done) { + try { + // Only save if we have accumulated content + if (accumulatedContentRef.value) { + // Add assistant response to chat + chat.messages.push({ + role: 'assistant', + content: accumulatedContentRef.value + }); + + // Save the updated chat back to storage + await chatStorageService.updateChat(chat.id, chat.messages, chat.title); + log.info(`Saved streaming assistant response: ${accumulatedContentRef.value.length} characters`); + } + } catch (error) { + // Log error but don't break the response flow + log.error(`Error saving streaming response: ${error}`); + } + + // End the response res.end(); } } @@ -295,7 +312,7 @@ class RestChatService { log.info(`Using existing AI Chat note ${noteId} as session`); } } - } catch (e) { + } catch (_) { // Not JSON content, so not an AI Chat note } }