refactor(llm): resolve issue with headers being sent after request was sent

This commit is contained in:
perf3ct 2025-06-02 23:54:38 +00:00
parent e7e04b7ccd
commit ab3758c9b3
No known key found for this signature in database
GPG Key ID: 569C4EEC436F5232
2 changed files with 69 additions and 29 deletions

View File

@ -809,11 +809,19 @@ async function indexNote(req: Request, res: Response) {
async function streamMessage(req: Request, res: Response) { async function streamMessage(req: Request, res: Response) {
log.info("=== Starting streamMessage ==="); log.info("=== Starting streamMessage ===");
try { try {
// Set up the response headers for streaming first, before any data is sent
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
const chatNoteId = req.params.chatNoteId; const chatNoteId = req.params.chatNoteId;
const { content, useAdvancedContext, showThinking, mentions } = req.body; const { content, useAdvancedContext, showThinking, mentions } = req.body;
if (!content || typeof content !== 'string' || content.trim().length === 0) { if (!content || typeof content !== 'string' || content.trim().length === 0) {
throw new Error('Content cannot be empty'); // Early return with error
res.write(`data: ${JSON.stringify({ error: 'Content cannot be empty', done: true })}\n\n`);
res.end();
return;
} }
// Get or create chat directly from storage (simplified approach) // Get or create chat directly from storage (simplified approach)
@ -823,6 +831,14 @@ async function streamMessage(req: Request, res: Response) {
chat = await chatStorageService.createChat('New Chat'); chat = await chatStorageService.createChat('New Chat');
log.info(`Created new chat with ID: ${chat.id} for stream request`); log.info(`Created new chat with ID: ${chat.id} for stream request`);
} }
// Add the user message to the chat immediately
chat.messages.push({
role: 'user',
content
});
// Save the chat to ensure the user message is recorded
await chatStorageService.updateChat(chat.id, chat.messages, chat.title);
// Process mentions if provided // Process mentions if provided
let enhancedContent = content; let enhancedContent = content;
@ -831,7 +847,6 @@ async function streamMessage(req: Request, res: Response) {
// Import note service to get note content // Import note service to get note content
const becca = (await import('../../becca/becca.js')).default; const becca = (await import('../../becca/becca.js')).default;
const mentionContexts: string[] = []; const mentionContexts: string[] = [];
for (const mention of mentions) { for (const mention of mentions) {
@ -870,7 +885,10 @@ async function streamMessage(req: Request, res: Response) {
// Process the streaming request directly // Process the streaming request directly
try { try {
const result = await restChatService.handleSendMessage({ // Call the streaming handler - it will handle the response streaming
// IMPORTANT: We do not await this because we don't want to try sending a response
// after the streaming has completed - the stream handler takes care of ending the response
restChatService.handleSendMessage({
...req, ...req,
method: 'GET', // Indicate streaming mode method: 'GET', // Indicate streaming mode
query: { query: {
@ -884,13 +902,9 @@ async function streamMessage(req: Request, res: Response) {
}, },
params: { chatNoteId } params: { chatNoteId }
} as unknown as Request, res); } as unknown as Request, res);
// Since we're streaming, the result will be null // Don't return or send any additional response here
return { // handleSendMessage handles the full streaming response cycle and will end the response
success: true,
message: 'Streaming started',
chatNoteId: chatNoteId
};
} catch (error) { } catch (error) {
log.error(`Error during streaming: ${error}`); log.error(`Error during streaming: ${error}`);
@ -901,12 +915,21 @@ async function streamMessage(req: Request, res: Response) {
error: `Error processing message: ${error}`, error: `Error processing message: ${error}`,
done: true done: true
}); });
throw error; // Only write to the response if it hasn't been ended yet
if (!res.writableEnded) {
res.write(`data: ${JSON.stringify({ error: `Error processing message: ${error}`, done: true })}\n\n`);
res.end();
}
} }
} catch (error: any) { } catch (error: any) {
log.error(`Error starting message stream: ${error.message}`); log.error(`Error starting message stream: ${error.message}`);
throw error;
// Only write to the response if it hasn't been ended yet
if (!res.writableEnded) {
res.write(`data: ${JSON.stringify({ error: `Error starting message stream: ${error.message}`, done: true })}\n\n`);
res.end();
}
} }
} }

View File

@ -116,13 +116,16 @@ class RestChatService {
throw new Error('Failed to create or retrieve chat'); throw new Error('Failed to create or retrieve chat');
} }
// For POST requests, add the user message // For POST requests, add the user message to the chat immediately
// This ensures user messages are always saved
if (req.method === 'POST' && content) { if (req.method === 'POST' && content) {
chat.messages.push({ chat.messages.push({
role: 'user', role: 'user',
content content
}); });
log.info(`Processing LLM message: "${content.substring(0, 50)}${content.length > 50 ? '...' : ''}"`); // Save immediately to ensure user message is saved
await chatStorageService.updateChat(chat.id, chat.messages, chat.title);
log.info(`Added and saved user message: "${content.substring(0, 50)}${content.length > 50 ? '...' : ''}"`);
} }
// Initialize tools // Initialize tools
@ -162,7 +165,7 @@ class RestChatService {
showThinking: showThinking, showThinking: showThinking,
options: pipelineOptions, options: pipelineOptions,
streamCallback: req.method === 'GET' ? (data, done, rawChunk) => { streamCallback: req.method === 'GET' ? (data, done, rawChunk) => {
this.handleStreamCallback(data, done, rawChunk, wsService.default, chatNoteId, res, accumulatedContentRef); this.handleStreamCallback(data, done, rawChunk, wsService.default, chatNoteId, res, accumulatedContentRef, chat);
} : undefined } : undefined
}; };
@ -178,6 +181,7 @@ class RestChatService {
// Save the updated chat back to storage (single source of truth) // Save the updated chat back to storage (single source of truth)
await chatStorageService.updateChat(chat.id, chat.messages, chat.title); await chatStorageService.updateChat(chat.id, chat.messages, chat.title);
log.info(`Saved non-streaming assistant response: ${(response.text || '').length} characters`);
// Extract sources if available // Extract sources if available
const sources = (response as any).sources || []; const sources = (response as any).sources || [];
@ -193,16 +197,7 @@ class RestChatService {
}; };
} else { } else {
// For streaming, response is already sent via WebSocket/SSE // For streaming, response is already sent via WebSocket/SSE
// Save the accumulated content - prefer accumulated content over response.text // The accumulatedContentRef will have been saved in handleStreamCallback when done=true
const finalContent = accumulatedContentRef.value || response.text || '';
if (finalContent) {
chat.messages.push({
role: 'assistant',
content: finalContent
});
await chatStorageService.updateChat(chat.id, chat.messages, chat.title);
log.info(`Saved accumulated streaming content: ${finalContent.length} characters`);
}
return null; return null;
} }
} catch (error: any) { } catch (error: any) {
@ -214,14 +209,15 @@ class RestChatService {
/** /**
* Simplified stream callback handler * Simplified stream callback handler
*/ */
private handleStreamCallback( private async handleStreamCallback(
data: string | null, data: string | null,
done: boolean, done: boolean,
rawChunk: any, rawChunk: any,
wsService: any, wsService: any,
chatNoteId: string, chatNoteId: string,
res: Response, res: Response,
accumulatedContentRef: { value: string } accumulatedContentRef: { value: string },
chat: { id: string; messages: Message[]; title: string }
) { ) {
const message: LLMStreamMessage = { const message: LLMStreamMessage = {
type: 'llm-stream', type: 'llm-stream',
@ -264,7 +260,28 @@ class RestChatService {
} }
res.write(`data: ${JSON.stringify(responseData)}\n\n`); res.write(`data: ${JSON.stringify(responseData)}\n\n`);
// When streaming is complete, save the accumulated content to the chat note
if (done) { if (done) {
try {
// Only save if we have accumulated content
if (accumulatedContentRef.value) {
// Add assistant response to chat
chat.messages.push({
role: 'assistant',
content: accumulatedContentRef.value
});
// Save the updated chat back to storage
await chatStorageService.updateChat(chat.id, chat.messages, chat.title);
log.info(`Saved streaming assistant response: ${accumulatedContentRef.value.length} characters`);
}
} catch (error) {
// Log error but don't break the response flow
log.error(`Error saving streaming response: ${error}`);
}
// End the response
res.end(); res.end();
} }
} }
@ -295,7 +312,7 @@ class RestChatService {
log.info(`Using existing AI Chat note ${noteId} as session`); log.info(`Using existing AI Chat note ${noteId} as session`);
} }
} }
} catch (e) { } catch (_) {
// Not JSON content, so not an AI Chat note // Not JSON content, so not an AI Chat note
} }
} }