From d2ba270fdff739dd994b942014b10f0250a032ce Mon Sep 17 00:00:00 2001 From: perf3ct Date: Tue, 3 Jun 2025 00:18:45 +0000 Subject: [PATCH] fix(llm): sending messages no longer throws an error at first --- apps/server/src/routes/api/llm.ts | 106 ++++++++++++++++++++++-------- 1 file changed, 77 insertions(+), 29 deletions(-) diff --git a/apps/server/src/routes/api/llm.ts b/apps/server/src/routes/api/llm.ts index 9aaf2fe81..cec29ffec 100644 --- a/apps/server/src/routes/api/llm.ts +++ b/apps/server/src/routes/api/llm.ts @@ -809,20 +809,26 @@ async function indexNote(req: Request, res: Response) { async function streamMessage(req: Request, res: Response) { log.info("=== Starting streamMessage ==="); try { - // Set up the response headers for streaming first, before any data is sent - res.setHeader('Content-Type', 'text/event-stream'); - res.setHeader('Cache-Control', 'no-cache'); - res.setHeader('Connection', 'keep-alive'); - const chatNoteId = req.params.chatNoteId; const { content, useAdvancedContext, showThinking, mentions } = req.body; if (!content || typeof content !== 'string' || content.trim().length === 0) { - // Early return with error - res.write(`data: ${JSON.stringify({ error: 'Content cannot be empty', done: true })}\n\n`); - res.end(); - return; + return res.status(400).json({ + success: false, + error: 'Content cannot be empty' + }); } + + // IMPORTANT: Immediately send a success response to the initial POST request + // The client is waiting for this to confirm streaming has been initiated + res.status(200).json({ + success: true, + message: 'Streaming initiated successfully' + }); + log.info(`Sent immediate success response for streaming setup`); + + // Create a new response object for streaming through WebSocket only + // We won't use HTTP streaming since we've already sent the HTTP response // Get or create chat directly from storage (simplified approach) let chat = await chatStorageService.getChat(chatNoteId); @@ -883,28 +889,70 @@ async function streamMessage(req: Request, res: Response) { thinking: showThinking ? 'Initializing streaming LLM response...' : undefined }); - // Process the streaming request directly + // Instead of trying to reimplement the streaming logic ourselves, + // delegate to restChatService but set up the correct protocol: + // 1. We've already sent a success response to the initial POST + // 2. Now we'll have restChatService process the actual streaming through WebSocket try { - // Call the streaming handler - it will handle the response streaming - // IMPORTANT: We do not await this because we don't want to try sending a response - // after the streaming has completed - the stream handler takes care of ending the response - restChatService.handleSendMessage({ - ...req, - method: 'GET', // Indicate streaming mode - query: { - ...req.query, - stream: 'true' // Add the required stream parameter - }, - body: { - content: enhancedContent, - useAdvancedContext: useAdvancedContext === true, - showThinking: showThinking === true - }, - params: { chatNoteId } - } as unknown as Request, res); + // Import the WebSocket service for sending messages + const wsService = (await import('../../services/ws.js')).default; - // Don't return or send any additional response here - // handleSendMessage handles the full streaming response cycle and will end the response + // Create a simple pass-through response object that won't write to the HTTP response + // but will allow restChatService to send WebSocket messages + const dummyResponse = { + writableEnded: false, + // Implement methods that would normally be used by restChatService + write: (_chunk: string) => { + // Silent no-op - we're only using WebSocket + return true; + }, + end: (_chunk?: string) => { + // Log when streaming is complete via WebSocket + log.info(`[${chatNoteId}] Completed HTTP response handling during WebSocket streaming`); + return dummyResponse; + }, + setHeader: (name: string, _value: string) => { + // Only log for content-type to reduce noise + if (name.toLowerCase() === 'content-type') { + log.info(`[${chatNoteId}] Setting up streaming for WebSocket only`); + } + return dummyResponse; + } + }; + + // Process the streaming now through WebSocket only + try { + log.info(`[${chatNoteId}] Processing LLM streaming through WebSocket after successful initiation at ${new Date().toISOString()}`); + + // Call restChatService with our enhanced request and dummy response + // The important part is setting method to GET to indicate streaming mode + await restChatService.handleSendMessage({ + ...req, + method: 'GET', // Indicate streaming mode + query: { + ...req.query, + stream: 'true' // Add the required stream parameter + }, + body: { + content: enhancedContent, + useAdvancedContext: useAdvancedContext === true, + showThinking: showThinking === true + }, + params: { chatNoteId } + } as unknown as Request, dummyResponse as unknown as Response); + + log.info(`[${chatNoteId}] WebSocket streaming completed at ${new Date().toISOString()}`); + } catch (streamError) { + log.error(`[${chatNoteId}] Error during WebSocket streaming: ${streamError}`); + + // Send error message through WebSocket + wsService.sendMessageToAllClients({ + type: 'llm-stream', + chatNoteId: chatNoteId, + error: `Error during streaming: ${streamError}`, + done: true + }); + } } catch (error) { log.error(`Error during streaming: ${error}`);