From 451e5ea31f1d203f750483ea6aba3ecc07828ac0 Mon Sep 17 00:00:00 2001 From: perf3ct Date: Thu, 10 Apr 2025 21:00:12 +0000 Subject: [PATCH] getting closer to streaming? even closer? closer streaming... this is darn close --- src/public/app/services/ws.ts | 43 ++ src/public/app/widgets/llm_chat_panel.ts | 594 +++++++++++++----- src/routes/api/llm.ts | 158 +++++ src/routes/routes.ts | 1 + src/services/llm/ai_interface.ts | 10 + src/services/llm/pipeline/chat_pipeline.ts | 12 +- .../pipeline/stages/llm_completion_stage.ts | 114 ++-- .../llm/providers/anthropic_service.ts | 40 +- src/services/llm/providers/ollama_service.ts | 468 ++++++++++---- src/services/llm/providers/openai_service.ts | 138 ++-- src/services/llm/rest_chat_service.ts | 441 +++++++++---- src/services/ws.ts | 27 +- 12 files changed, 1484 insertions(+), 562 deletions(-) diff --git a/src/public/app/services/ws.ts b/src/public/app/services/ws.ts index 7f735c458..db79b1a5e 100644 --- a/src/public/app/services/ws.ts +++ b/src/public/app/services/ws.ts @@ -127,6 +127,49 @@ async function handleMessage(event: MessageEvent) { appContext.triggerEvent("apiLogMessages", { noteId: message.noteId, messages: message.messages }); } else if (message.type === "toast") { toastService.showMessage(message.message); + } else if (message.type === "llm-stream") { + // ENHANCED LOGGING FOR DEBUGGING + console.log(`[WS-CLIENT] >>> RECEIVED LLM STREAM MESSAGE <<<`); + console.log(`[WS-CLIENT] Message details: sessionId=${message.sessionId}, hasContent=${!!message.content}, contentLength=${message.content ? message.content.length : 0}, hasThinking=${!!message.thinking}, hasToolExecution=${!!message.toolExecution}, isDone=${!!message.done}`); + + if (message.content) { + console.log(`[WS-CLIENT] CONTENT PREVIEW: "${message.content.substring(0, 50)}..."`); + } + + // Create the event with detailed logging + console.log(`[WS-CLIENT] Creating CustomEvent 'llm-stream-message'`); + const llmStreamEvent = new CustomEvent('llm-stream-message', { detail: message }); + + // Dispatch to multiple targets to ensure delivery + try { + console.log(`[WS-CLIENT] Dispatching event to window`); + window.dispatchEvent(llmStreamEvent); + console.log(`[WS-CLIENT] Event dispatched to window`); + + // Also try document for completeness + console.log(`[WS-CLIENT] Dispatching event to document`); + document.dispatchEvent(new CustomEvent('llm-stream-message', { detail: message })); + console.log(`[WS-CLIENT] Event dispatched to document`); + } catch (err) { + console.error(`[WS-CLIENT] Error dispatching event:`, err); + } + + // Debug current listeners (though we can't directly check for specific event listeners) + console.log(`[WS-CLIENT] Active event listeners should receive this message now`); + + // Detailed logging based on message type + if (message.content) { + console.log(`[WS-CLIENT] Content message: ${message.content.length} chars`); + } else if (message.thinking) { + console.log(`[WS-CLIENT] Thinking update: "${message.thinking}"`); + } else if (message.toolExecution) { + console.log(`[WS-CLIENT] Tool execution: action=${message.toolExecution.action}, tool=${message.toolExecution.tool || 'unknown'}`); + if (message.toolExecution.result) { + console.log(`[WS-CLIENT] Tool result preview: "${String(message.toolExecution.result).substring(0, 50)}..."`); + } + } else if (message.done) { + console.log(`[WS-CLIENT] Completion signal received`); + } } else if (message.type === "execute-script") { // TODO: Remove after porting the file // @ts-ignore diff --git a/src/public/app/widgets/llm_chat_panel.ts b/src/public/app/widgets/llm_chat_panel.ts index 8db826841..90f40c42f 100644 --- a/src/public/app/widgets/llm_chat_panel.ts +++ b/src/public/app/widgets/llm_chat_panel.ts @@ -7,6 +7,7 @@ import { t } from "../services/i18n.js"; import libraryLoader from "../services/library_loader.js"; import { applySyntaxHighlight } from "../services/syntax_highlight.js"; import options from "../services/options.js"; +import ws from "../services/ws.js"; import { marked } from "marked"; // Import the LLM Chat CSS @@ -105,6 +106,8 @@ export default class LlmChatPanel extends BasicWidget { private validationWarning!: HTMLElement; private sessionId: string | null = null; private currentNoteId: string | null = null; + private _messageHandlerId: number | null = null; + private _messageHandler: any = null; // Callbacks for data persistence private onSaveData: ((data: any) => Promise) | null = null; @@ -178,6 +181,15 @@ export default class LlmChatPanel extends BasicWidget { return this.$widget; } + cleanup() { + console.log(`LlmChatPanel cleanup called, removing any active WebSocket subscriptions`); + + // No need to manually clean up the event listeners, as they will be garbage collected + // when the component is destroyed. We only need to clean up references. + this._messageHandler = null; + this._messageHandlerId = null; + } + /** * Set the callbacks for data persistence */ @@ -375,16 +387,15 @@ export default class LlmChatPanel extends BasicWidget { // Create the message parameters const messageParams = { content, - contextNoteId: this.currentNoteId, useAdvancedContext, showThinking }; - // First try to use streaming (preferred method) + // Try websocket streaming (preferred method) try { await this.setupStreamingResponse(messageParams); } catch (streamingError) { - console.warn("Streaming request failed, falling back to direct response:", streamingError); + console.warn("WebSocket streaming failed, falling back to direct response:", streamingError); // If streaming fails, fall back to direct response const handled = await this.handleDirectResponse(messageParams); @@ -424,12 +435,14 @@ export default class LlmChatPanel extends BasicWidget { */ private async handleDirectResponse(messageParams: any): Promise { try { - // Add format parameter to maintain consistency with the streaming GET request + // Create a copy of the params without any streaming flags const postParams = { ...messageParams, - format: 'stream' // Match the format parameter used in the GET streaming request + stream: false // Explicitly set to false to ensure we get a direct response }; + console.log(`Sending direct POST request for session ${this.sessionId}`); + // Send the message via POST request with the updated params const postResponse = await server.post(`llm/sessions/${this.sessionId}/messages`, postParams); @@ -474,184 +487,318 @@ export default class LlmChatPanel extends BasicWidget { } /** - * Set up streaming response from the server + * Set up streaming response via WebSocket */ private async setupStreamingResponse(messageParams: any): Promise { + const content = messageParams.content || ''; const useAdvancedContext = messageParams.useAdvancedContext; const showThinking = messageParams.showThinking; - // Set up streaming via EventSource - explicitly add stream=true parameter to ensure consistency - const streamUrl = `./api/llm/sessions/${this.sessionId}/messages?format=stream&stream=true&useAdvancedContext=${useAdvancedContext}&showThinking=${showThinking}`; - return new Promise((resolve, reject) => { - const source = new EventSource(streamUrl); let assistantResponse = ''; let receivedAnyContent = false; let timeoutId: number | null = null; + let initialTimeoutId: number | null = null; + let receivedAnyMessage = false; + let eventListener: ((event: Event) => void) | null = null; - // Set up timeout for streaming response - timeoutId = this.setupStreamingTimeout(source); + // Create a unique identifier for this response process + const responseId = `llm-stream-${Date.now()}-${Math.floor(Math.random() * 1000)}`; + console.log(`[${responseId}] Setting up WebSocket streaming for session ${this.sessionId}`); - // Handle streaming response - source.onmessage = (event) => { - try { - if (event.data === '[DONE]') { - // Stream completed successfully - this.handleStreamingComplete(source, timeoutId, receivedAnyContent, assistantResponse); - resolve(); - return; + // Create a message handler for CustomEvents + eventListener = (event: Event) => { + const customEvent = event as CustomEvent; + const message = customEvent.detail; + + // Only process messages for our session + if (!message || message.sessionId !== this.sessionId) { + return; + } + + console.log(`[${responseId}] LLM Stream message received via CustomEvent: session=${this.sessionId}, content=${!!message.content}, contentLength=${message.content?.length || 0}, thinking=${!!message.thinking}, toolExecution=${!!message.toolExecution}, done=${!!message.done}`); + + // Mark first message received + if (!receivedAnyMessage) { + receivedAnyMessage = true; + console.log(`[${responseId}] First message received for session ${this.sessionId}`); + + // Clear the initial timeout since we've received a message + if (initialTimeoutId !== null) { + window.clearTimeout(initialTimeoutId); + initialTimeoutId = null; + } + } + + // Handle content updates + if (message.content) { + receivedAnyContent = true; + assistantResponse += message.content; + + // Update the UI immediately + this.updateStreamingUI(assistantResponse); + + // Reset timeout since we got content + if (timeoutId !== null) { + window.clearTimeout(timeoutId); } - const data = JSON.parse(event.data); - console.log("Received streaming data:", data); + // Set new timeout + timeoutId = window.setTimeout(() => { + console.warn(`[${responseId}] Stream timeout for session ${this.sessionId}`); - // Handle both content and error cases - if (data.content) { - receivedAnyContent = true; - assistantResponse += data.content; - - // Update the UI with the accumulated response - this.updateStreamingUI(assistantResponse); - } else if (data.toolExecution) { - // Handle tool execution info - this.showToolExecutionInfo(data.toolExecution); - // When tool execution info is received, also show the loading indicator - // in case it's not already visible - this.loadingIndicator.style.display = 'flex'; - } else if (data.error) { - // Handle error message - this.hideLoadingIndicator(); - this.addMessageToChat('assistant', `Error: ${data.error}`); - - if (timeoutId !== null) { - window.clearTimeout(timeoutId); + // Save what we have + if (assistantResponse) { + console.log(`[${responseId}] Saving partial response due to timeout (${assistantResponse.length} chars)`); + this.messages.push({ + role: 'assistant', + content: assistantResponse, + timestamp: new Date() + }); + this.saveCurrentData().catch(err => { + console.error(`[${responseId}] Failed to save partial response:`, err); + }); } - source.close(); - reject(new Error(data.error)); - return; + // Clean up + this.cleanupEventListener(eventListener); + this.hideLoadingIndicator(); + reject(new Error('Stream timeout')); + }, 30000); + } + + // Handle tool execution updates + if (message.toolExecution) { + console.log(`[${responseId}] Received tool execution update: action=${message.toolExecution.action || 'unknown'}`); + this.showToolExecutionInfo(message.toolExecution); + this.loadingIndicator.style.display = 'flex'; + } + + // Handle thinking state updates + if (message.thinking) { + console.log(`[${responseId}] Received thinking update: ${message.thinking.substring(0, 50)}...`); + this.showThinkingState(message.thinking); + this.loadingIndicator.style.display = 'flex'; + } + + // Handle completion + if (message.done) { + console.log(`[${responseId}] Stream completed for session ${this.sessionId}, has content: ${!!message.content}, content length: ${message.content?.length || 0}, current response: ${assistantResponse.length} chars`); + + // Dump message content to console for debugging + if (message.content) { + console.log(`[${responseId}] CONTENT IN DONE MESSAGE (first 200 chars): "${message.content.substring(0, 200)}..."`); } - // Scroll to the bottom - this.chatContainer.scrollTop = this.chatContainer.scrollHeight; - } catch (e) { - console.error('Error parsing SSE message:', e, 'Raw data:', event.data); - reject(e); - } - }; + // Clear timeout if set + if (timeoutId !== null) { + window.clearTimeout(timeoutId); + timeoutId = null; + } - // Handle streaming errors - source.onerror = (err) => { - console.error("EventSource error:", err); - source.close(); - this.hideLoadingIndicator(); + // Check if we have content in the done message + // This is particularly important for Ollama which often sends the entire response in one message + if (message.content) { + console.log(`[${responseId}] Processing content in done message: ${message.content.length} chars`); + receivedAnyContent = true; - // Clear the timeout if there was an error - if (timeoutId !== null) { - window.clearTimeout(timeoutId); - } + // Replace current response if we didn't have content before or if it's empty + if (assistantResponse.length === 0) { + console.log(`[${responseId}] Using content from done message as full response`); + assistantResponse = message.content; + } + // Otherwise append it if it's different + else if (message.content !== assistantResponse) { + console.log(`[${responseId}] Appending content from done message to existing response`); + assistantResponse += message.content; + } + else { + console.log(`[${responseId}] Content in done message is identical to existing response, not appending`); + } - // Only reject if we haven't received any content yet - if (!receivedAnyContent) { - reject(new Error('Error connecting to the LLM streaming service')); - } else { - // If we've already received some content, consider it a successful but incomplete response - this.handleStreamingComplete(source, timeoutId, receivedAnyContent, assistantResponse); + this.updateStreamingUI(assistantResponse); + } + + // Save the final response + if (assistantResponse) { + console.log(`[${responseId}] Saving final response of ${assistantResponse.length} chars`); + this.messages.push({ + role: 'assistant', + content: assistantResponse, + timestamp: new Date() + }); + + this.saveCurrentData().catch(err => { + console.error(`[${responseId}] Failed to save final response:`, err); + }); + } else { + // If we didn't receive any content at all, show a generic message + console.log(`[${responseId}] No content received for session ${this.sessionId}`); + const defaultMessage = 'I processed your request, but I don\'t have any specific information to share at the moment.'; + this.processAssistantResponse(defaultMessage); + } + + // Clean up and resolve + this.cleanupEventListener(eventListener); + this.hideLoadingIndicator(); resolve(); } }; + + // Register event listener for the custom event + try { + window.addEventListener('llm-stream-message', eventListener); + console.log(`[${responseId}] Event listener added for llm-stream-message events`); + } catch (err) { + console.error(`[${responseId}] Error setting up event listener:`, err); + reject(err); + return; + } + + // Set initial timeout for receiving any message + initialTimeoutId = window.setTimeout(() => { + console.warn(`[${responseId}] No messages received for initial period in session ${this.sessionId}`); + if (!receivedAnyMessage) { + console.error(`[${responseId}] WebSocket connection not established for session ${this.sessionId}`); + + if (timeoutId !== null) { + window.clearTimeout(timeoutId); + } + + // Clean up + this.cleanupEventListener(eventListener); + this.hideLoadingIndicator(); + + // Show error message to user + const errorMessage = 'Connection error: Unable to establish WebSocket streaming.'; + this.processAssistantResponse(errorMessage); + reject(new Error('WebSocket connection not established')); + } + }, 10000); + + // Send the streaming request to start the process + console.log(`[${responseId}] Sending HTTP POST request to initiate streaming: /llm/sessions/${this.sessionId}/messages/stream`); + server.post(`llm/sessions/${this.sessionId}/messages/stream`, { + content, + useAdvancedContext, + showThinking, + stream: true // Explicitly indicate this is a streaming request + }).catch(err => { + console.error(`[${responseId}] HTTP error sending streaming request for session ${this.sessionId}:`, err); + + // Clean up timeouts + if (initialTimeoutId !== null) { + window.clearTimeout(initialTimeoutId); + initialTimeoutId = null; + } + + if (timeoutId !== null) { + window.clearTimeout(timeoutId); + timeoutId = null; + } + + // Clean up event listener + this.cleanupEventListener(eventListener); + + reject(err); + }); }); } /** - * Set up timeout for streaming response - * @returns Timeout ID for the created timeout + * Clean up an event listener */ - private setupStreamingTimeout(source: EventSource): number { - // Set a timeout to handle case where streaming doesn't work properly - return window.setTimeout(() => { - // If we haven't received any content after a reasonable timeout (10 seconds), - // add a fallback message and close the stream - this.hideLoadingIndicator(); - const errorMessage = 'I\'m having trouble generating a response right now. Please try again later.'; - this.processAssistantResponse(errorMessage); - source.close(); - }, 10000); + private cleanupEventListener(listener: ((event: Event) => void) | null): void { + if (listener) { + try { + window.removeEventListener('llm-stream-message', listener); + console.log(`Successfully removed event listener`); + } catch (err) { + console.error(`Error removing event listener:`, err); + } + } } /** * Update the UI with streaming content as it arrives */ private updateStreamingUI(assistantResponse: string) { + const logId = `ui-update-${Date.now()}`; + console.log(`[${logId}] Updating UI with response text: ${assistantResponse.length} chars`); + + if (!this.noteContextChatMessages) { + console.error(`[${logId}] noteContextChatMessages element not available`); + return; + } + + // Check if we already have an assistant message element to update const assistantElement = this.noteContextChatMessages.querySelector('.assistant-message:last-child .message-content'); + if (assistantElement) { - assistantElement.innerHTML = this.formatMarkdown(assistantResponse); - // Apply syntax highlighting to any code blocks in the updated content - applySyntaxHighlight($(assistantElement as HTMLElement)); + console.log(`[${logId}] Found existing assistant message element, updating content`); + try { + // Format markdown and update the element + const formattedContent = this.formatMarkdown(assistantResponse); + + // Ensure content is properly formatted + if (!formattedContent || formattedContent.trim() === '') { + console.warn(`[${logId}] Formatted content is empty, using original content`); + assistantElement.textContent = assistantResponse; + } else { + assistantElement.innerHTML = formattedContent; + } + + // Apply syntax highlighting to any code blocks in the updated content + applySyntaxHighlight($(assistantElement as HTMLElement)); + + console.log(`[${logId}] Successfully updated existing element with ${formattedContent.length} chars of HTML`); + } catch (err) { + console.error(`[${logId}] Error updating existing element:`, err); + // Fallback to text content if HTML update fails + try { + assistantElement.textContent = assistantResponse; + console.log(`[${logId}] Fallback to text content successful`); + } catch (fallbackErr) { + console.error(`[${logId}] Even fallback update failed:`, fallbackErr); + } + } } else { - this.addMessageToChat('assistant', assistantResponse); - } - } + console.log(`[${logId}] No existing assistant message element found, creating new one`); + try { + this.addMessageToChat('assistant', assistantResponse); + console.log(`[${logId}] Successfully added new assistant message`); + } catch (err) { + console.error(`[${logId}] Error adding new message:`, err); - /** - * Handle completion of streaming response - */ - private handleStreamingComplete( - source: EventSource, - timeoutId: number | null, - receivedAnyContent: boolean, - assistantResponse: string - ) { - // Stream completed - source.close(); - this.hideLoadingIndicator(); - - // Clear the timeout since we're done - if (timeoutId !== null) { - window.clearTimeout(timeoutId); + // Last resort emergency approach - create element directly + try { + console.log(`[${logId}] Attempting emergency DOM update`); + const emergencyElement = document.createElement('div'); + emergencyElement.className = 'chat-message assistant-message mb-3 d-flex'; + emergencyElement.innerHTML = ` +
+ +
+
+ ${assistantResponse} +
+ `; + this.noteContextChatMessages.appendChild(emergencyElement); + console.log(`[${logId}] Emergency DOM update successful`); + } catch (emergencyErr) { + console.error(`[${logId}] Emergency DOM update failed:`, emergencyErr); + } + } } - // If we didn't receive any content but the stream completed normally, - // display a message to the user - if (!receivedAnyContent) { - const defaultMessage = 'I processed your request, but I don\'t have any specific information to share at the moment.'; - this.processAssistantResponse(defaultMessage); - } else if (assistantResponse) { - // Save the completed streaming response to the message array - this.messages.push({ - role: 'assistant', - content: assistantResponse, - timestamp: new Date() - }); - - // Save to note - this.saveCurrentData().catch(err => { - console.error("Failed to save assistant response to note:", err); - }); - } - } - - /** - * Handle errors during streaming response - */ - private handleStreamingError( - source: EventSource, - timeoutId: number | null, - receivedAnyContent: boolean - ) { - source.close(); - this.hideLoadingIndicator(); - - // Clear the timeout if there was an error - if (timeoutId !== null) { - window.clearTimeout(timeoutId); - } - - // Only show error message if we haven't received any content yet - if (!receivedAnyContent) { - // Instead of automatically showing the error message in the chat, - // throw an error so the parent function can handle the fallback - throw new Error('Error connecting to the LLM streaming service'); + // Always try to scroll to the latest content + try { + if (this.chatContainer) { + this.chatContainer.scrollTop = this.chatContainer.scrollHeight; + console.log(`[${logId}] Scrolled to latest content`); + } + } catch (scrollErr) { + console.error(`[${logId}] Error scrolling to latest content:`, scrollErr); } } @@ -755,32 +902,111 @@ export default class LlmChatPanel extends BasicWidget { } private showLoadingIndicator() { - this.loadingIndicator.style.display = 'flex'; - // Reset the tool execution area when starting a new request, but keep it visible - // We'll make it visible when we get our first tool execution event - this.toolExecutionInfo.style.display = 'none'; - this.toolExecutionSteps.innerHTML = ''; + const logId = `ui-${Date.now()}`; + console.log(`[${logId}] Showing loading indicator and preparing tool execution display`); + + // Ensure elements exist before trying to modify them + if (!this.loadingIndicator || !this.toolExecutionInfo || !this.toolExecutionSteps) { + console.error(`[${logId}] UI elements not properly initialized`); + return; + } + + // Force display of loading indicator + try { + this.loadingIndicator.style.display = 'flex'; + + // Make sure tool execution info area is always visible even before we get the first event + // This helps avoid the UI getting stuck in "Processing..." state + this.toolExecutionInfo.style.display = 'block'; + + // Clear previous tool steps but add a placeholder + this.toolExecutionSteps.innerHTML = ` +
+
+ + Initializing... +
+
+ `; + + // Force a UI update by accessing element properties + const forceUpdate = this.loadingIndicator.offsetHeight; + + // Verify display states + console.log(`[${logId}] Loading indicator display state: ${this.loadingIndicator.style.display}`); + console.log(`[${logId}] Tool execution info display state: ${this.toolExecutionInfo.style.display}`); + + console.log(`[${logId}] Loading indicator and tool execution area initialized`); + } catch (err) { + console.error(`[${logId}] Error showing loading indicator:`, err); + } } private hideLoadingIndicator() { - this.loadingIndicator.style.display = 'none'; - this.toolExecutionInfo.style.display = 'none'; + const logId = `ui-${Date.now()}`; + console.log(`[${logId}] Hiding loading indicator and tool execution area`); + + // Ensure elements exist before trying to modify them + if (!this.loadingIndicator || !this.toolExecutionInfo) { + console.error(`[${logId}] UI elements not properly initialized`); + return; + } + + // Properly reset DOM elements + try { + // First hide the tool execution info area + this.toolExecutionInfo.style.display = 'none'; + + // Force a UI update by accessing element properties + const forceUpdate1 = this.toolExecutionInfo.offsetHeight; + + // Then hide the loading indicator + this.loadingIndicator.style.display = 'none'; + + // Force another UI update + const forceUpdate2 = this.loadingIndicator.offsetHeight; + + // Verify display states immediately + console.log(`[${logId}] Loading indicator display state: ${this.loadingIndicator.style.display}`); + console.log(`[${logId}] Tool execution info display state: ${this.toolExecutionInfo.style.display}`); + + // Add a delay to double-check that UI updates are complete + setTimeout(() => { + console.log(`[${logId}] Verification after hide timeout: loading indicator display=${this.loadingIndicator.style.display}, tool execution info display=${this.toolExecutionInfo.style.display}`); + + // Force display none again in case something changed it + if (this.loadingIndicator.style.display !== 'none') { + console.log(`[${logId}] Loading indicator still visible after timeout, forcing hidden`); + this.loadingIndicator.style.display = 'none'; + } + + if (this.toolExecutionInfo.style.display !== 'none') { + console.log(`[${logId}] Tool execution info still visible after timeout, forcing hidden`); + this.toolExecutionInfo.style.display = 'none'; + } + }, 100); + } catch (err) { + console.error(`[${logId}] Error hiding loading indicator:`, err); + } } - + /** * Show tool execution information in the UI */ private showToolExecutionInfo(toolExecutionData: any) { + console.log(`Showing tool execution info: ${JSON.stringify(toolExecutionData)}`); + // Make sure tool execution info section is visible this.toolExecutionInfo.style.display = 'block'; - + this.loadingIndicator.style.display = 'flex'; // Ensure loading indicator is shown during tool execution + // Create a new step element to show the tool being executed const stepElement = document.createElement('div'); stepElement.className = 'tool-step my-1'; - + // Basic styling for the step let stepHtml = ''; - + if (toolExecutionData.action === 'start') { // Tool execution starting stepHtml = ` @@ -814,20 +1040,26 @@ export default class LlmChatPanel extends BasicWidget { `; } - - stepElement.innerHTML = stepHtml; - this.toolExecutionSteps.appendChild(stepElement); - - // Scroll to bottom of tool execution steps - this.toolExecutionSteps.scrollTop = this.toolExecutionSteps.scrollHeight; + + if (stepHtml) { + stepElement.innerHTML = stepHtml; + this.toolExecutionSteps.appendChild(stepElement); + + // Scroll to bottom of tool execution steps + this.toolExecutionSteps.scrollTop = this.toolExecutionSteps.scrollHeight; + + console.log(`Added new tool execution step to UI`); + } else { + console.log(`No HTML generated for tool execution data:`, toolExecutionData); + } } - + /** * Format tool arguments for display */ private formatToolArgs(args: any): string { if (!args || typeof args !== 'object') return ''; - + return Object.entries(args) .map(([key, value]) => { // Format the value based on its type @@ -843,25 +1075,25 @@ export default class LlmChatPanel extends BasicWidget { } else { displayValue = String(value); } - + return `${this.escapeHtml(key)}: ${this.escapeHtml(displayValue)}`; }) .join(', '); } - + /** * Format tool results for display */ private formatToolResult(result: any): string { if (result === undefined || result === null) return ''; - + // Try to format as JSON if it's an object if (typeof result === 'object') { try { // Get a preview of structured data const entries = Object.entries(result); if (entries.length === 0) return 'Empty result'; - + // Just show first 2 key-value pairs if there are many const preview = entries.slice(0, 2).map(([key, val]) => { let valPreview; @@ -876,22 +1108,22 @@ export default class LlmChatPanel extends BasicWidget { } return `${key}: ${valPreview}`; }).join(', '); - + return entries.length > 2 ? `${preview}, ... (${entries.length} properties)` : preview; } catch (e) { return String(result).substring(0, 100) + (String(result).length > 100 ? '...' : ''); } } - + // For string results if (typeof result === 'string') { return result.length > 100 ? result.substring(0, 97) + '...' : result; } - + // Default formatting return String(result).substring(0, 100) + (String(result).length > 100 ? '...' : ''); } - + /** * Simple HTML escaping for safer content display */ @@ -899,7 +1131,7 @@ export default class LlmChatPanel extends BasicWidget { if (typeof text !== 'string') { text = String(text || ''); } - + return text .replace(/&/g, '&') .replace(/ + + ${this.escapeHtml(thinking)} + + `; + + this.toolExecutionInfo.style.display = 'block'; + this.toolExecutionSteps.appendChild(toolExecutionStep); + this.toolExecutionSteps.scrollTop = this.toolExecutionSteps.scrollHeight; + } + /** * Validate embedding providers configuration * Check if there are issues with the embedding providers that might affect LLM functionality @@ -1067,4 +1319,4 @@ export default class LlmChatPanel extends BasicWidget { this.validationWarning.style.display = 'none'; } } -} +} \ No newline at end of file diff --git a/src/routes/api/llm.ts b/src/routes/api/llm.ts index 5ca8b8774..959cbc71e 100644 --- a/src/routes/api/llm.ts +++ b/src/routes/api/llm.ts @@ -791,6 +791,163 @@ async function indexNote(req: Request, res: Response) { } } +/** + * @swagger + * /api/llm/sessions/{sessionId}/messages/stream: + * post: + * summary: Start a streaming response session via WebSockets + * operationId: llm-stream-message + * parameters: + * - name: sessionId + * in: path + * required: true + * schema: + * type: string + * requestBody: + * required: true + * content: + * application/json: + * schema: + * type: object + * properties: + * content: + * type: string + * description: The user message to send to the LLM + * useAdvancedContext: + * type: boolean + * description: Whether to use advanced context extraction + * showThinking: + * type: boolean + * description: Whether to show thinking process in the response + * responses: + * '200': + * description: Streaming started successfully + * '404': + * description: Session not found + * '500': + * description: Error processing request + * security: + * - session: [] + * tags: ["llm"] + */ +async function streamMessage(req: Request, res: Response) { + log.info("=== Starting streamMessage ==="); + try { + const sessionId = req.params.sessionId; + const { content, useAdvancedContext, showThinking } = req.body; + + if (!content || typeof content !== 'string' || content.trim().length === 0) { + throw new Error('Content cannot be empty'); + } + + // Check if session exists + const session = restChatService.getSessions().get(sessionId); + if (!session) { + throw new Error('Session not found'); + } + + // Update last active timestamp + session.lastActive = new Date(); + + // Add user message to the session + session.messages.push({ + role: 'user', + content, + timestamp: new Date() + }); + + // Create request parameters for the pipeline + const requestParams = { + sessionId, + content, + useAdvancedContext: useAdvancedContext === true, + showThinking: showThinking === true, + stream: true // Always stream for this endpoint + }; + + // Create a fake request/response pair to pass to the handler + const fakeReq = { + ...req, + method: 'GET', // Set to GET to indicate streaming + query: { + stream: 'true', // Set stream param - don't use format: 'stream' to avoid confusion + useAdvancedContext: String(useAdvancedContext === true), + showThinking: String(showThinking === true) + }, + params: { + sessionId + }, + // Make sure the original content is available to the handler + body: { + content, + useAdvancedContext: useAdvancedContext === true, + showThinking: showThinking === true + } + } as unknown as Request; + + // Log to verify correct parameters + log.info(`WebSocket stream settings - useAdvancedContext=${useAdvancedContext === true}, in query=${fakeReq.query.useAdvancedContext}, in body=${fakeReq.body.useAdvancedContext}`); + // Extra safety to ensure the parameters are passed correctly + if (useAdvancedContext === true) { + log.info(`Enhanced context IS enabled for this request`); + } else { + log.info(`Enhanced context is NOT enabled for this request`); + } + + // Process the request in the background + Promise.resolve().then(async () => { + try { + await restChatService.handleSendMessage(fakeReq, res); + } catch (error) { + log.error(`Background message processing error: ${error}`); + + // Import the WebSocket service + const wsService = (await import('../../services/ws.js')).default; + + // Define LLMStreamMessage interface + interface LLMStreamMessage { + type: 'llm-stream'; + sessionId: string; + content?: string; + thinking?: string; + toolExecution?: any; + done?: boolean; + error?: string; + raw?: unknown; + } + + // Send error to client via WebSocket + wsService.sendMessageToAllClients({ + type: 'llm-stream', + sessionId, + error: `Error processing message: ${error}`, + done: true + } as LLMStreamMessage); + } + }); + + // Import the WebSocket service + const wsService = (await import('../../services/ws.js')).default; + + // Let the client know streaming has started via WebSocket (helps client confirm connection is working) + wsService.sendMessageToAllClients({ + type: 'llm-stream', + sessionId, + thinking: 'Initializing streaming LLM response...' + }); + + // Let the client know streaming has started via HTTP response + return { + success: true, + message: 'Streaming started', + sessionId + }; + } catch (error: any) { + log.error(`Error starting message stream: ${error.message}`); + throw error; + } +} + export default { // Chat session management createSession, @@ -799,6 +956,7 @@ export default { listSessions, deleteSession, sendMessage, + streamMessage, // Add new streaming endpoint // Knowledge base index management getIndexStats, diff --git a/src/routes/routes.ts b/src/routes/routes.ts index 6dbbe0567..30780a5e8 100644 --- a/src/routes/routes.ts +++ b/src/routes/routes.ts @@ -400,6 +400,7 @@ function register(app: express.Application) { apiRoute(DEL, "/api/llm/sessions/:sessionId", llmRoute.deleteSession); apiRoute(PST, "/api/llm/sessions/:sessionId/messages", llmRoute.sendMessage); apiRoute(GET, "/api/llm/sessions/:sessionId/messages", llmRoute.sendMessage); + apiRoute(PST, "/api/llm/sessions/:sessionId/messages/stream", llmRoute.streamMessage); // LLM index management endpoints - reorganized for REST principles apiRoute(GET, "/api/llm/indexes/stats", llmRoute.getIndexStats); diff --git a/src/services/llm/ai_interface.ts b/src/services/llm/ai_interface.ts index 6b7c89ab7..ca07b0ae8 100644 --- a/src/services/llm/ai_interface.ts +++ b/src/services/llm/ai_interface.ts @@ -1,12 +1,16 @@ import type { ToolCall } from './tools/tool_interfaces.js'; import type { ModelMetadata } from './providers/provider_options.js'; +/** + * Interface for chat messages between client and LLM models + */ export interface Message { role: 'user' | 'assistant' | 'system' | 'tool'; content: string; name?: string; tool_call_id?: string; tool_calls?: ToolCall[] | any[]; + sessionId?: string; // Optional session ID for WebSocket communication } /** @@ -32,6 +36,12 @@ export interface StreamChunk { completionTokens?: number; totalTokens?: number; }; + + /** + * Raw provider-specific data from the original response chunk + * This can include thinking state, tool execution info, etc. + */ + raw?: any; } /** diff --git a/src/services/llm/pipeline/chat_pipeline.ts b/src/services/llm/pipeline/chat_pipeline.ts index 4c300ce52..073cf6d7a 100644 --- a/src/services/llm/pipeline/chat_pipeline.ts +++ b/src/services/llm/pipeline/chat_pipeline.ts @@ -127,6 +127,10 @@ export class ChatPipeline { // Determine if we should use tools or semantic context const useTools = modelSelection.options.enableTools === true; const useEnhancedContext = input.options?.useAdvancedContext === true; + + // Log details about the advanced context parameter + log.info(`Enhanced context option check: input.options=${JSON.stringify(input.options || {})}`); + log.info(`Enhanced context decision: useEnhancedContext=${useEnhancedContext}, hasQuery=${!!input.query}`); // Early return if we don't have a query or enhanced context is disabled if (!input.query || !useEnhancedContext) { @@ -431,8 +435,8 @@ export class ChatPipeline { ...modelSelection.options, // Ensure tool support is still enabled for follow-up requests enableTools: true, - // Disable streaming during tool execution follow-ups - stream: false, + // Preserve original streaming setting for tool execution follow-ups + stream: modelSelection.options.stream, // Add tool execution status for Ollama provider ...(currentResponse.provider === 'Ollama' ? { toolExecutionStatus } : {}) } @@ -498,6 +502,8 @@ export class ChatPipeline { messages: currentMessages, options: { ...modelSelection.options, + // Preserve streaming for error follow-up + stream: modelSelection.options.stream, // For Ollama, include tool execution status ...(currentResponse.provider === 'Ollama' ? { toolExecutionStatus } : {}) } @@ -547,6 +553,8 @@ export class ChatPipeline { options: { ...modelSelection.options, enableTools: false, // Disable tools for the final response + // Preserve streaming setting for max iterations response + stream: modelSelection.options.stream, // For Ollama, include tool execution status ...(currentResponse.provider === 'Ollama' ? { toolExecutionStatus } : {}) } diff --git a/src/services/llm/pipeline/stages/llm_completion_stage.ts b/src/services/llm/pipeline/stages/llm_completion_stage.ts index f6fc7730b..9b181ca91 100644 --- a/src/services/llm/pipeline/stages/llm_completion_stage.ts +++ b/src/services/llm/pipeline/stages/llm_completion_stage.ts @@ -1,12 +1,12 @@ import { BasePipelineStage } from '../pipeline_stage.js'; import type { LLMCompletionInput } from '../interfaces.js'; -import type { ChatCompletionOptions, ChatResponse } from '../../ai_interface.js'; +import type { ChatCompletionOptions, ChatResponse, StreamChunk } from '../../ai_interface.js'; import aiServiceManager from '../../ai_service_manager.js'; import toolRegistry from '../../tools/tool_registry.js'; import log from '../../../log.js'; /** - * Pipeline stage for LLM completion + * Pipeline stage for LLM completion with enhanced streaming support */ export class LLMCompletionStage extends BasePipelineStage { constructor() { @@ -15,88 +15,124 @@ export class LLMCompletionStage extends BasePipelineStage { const { messages, options, provider } = input; - // Log input options, particularly focusing on the stream option + // Log input options log.info(`[LLMCompletionStage] Input options: ${JSON.stringify({ model: options.model, provider, stream: options.stream, enableTools: options.enableTools })}`); - log.info(`[LLMCompletionStage] Stream option in input: ${options.stream}, type: ${typeof options.stream}`); // Create a deep copy of options to avoid modifying the original const updatedOptions: ChatCompletionOptions = JSON.parse(JSON.stringify(options)); - // IMPORTANT: Handle stream option carefully: - // 1. If it's undefined, leave it undefined (provider will use defaults) - // 2. If explicitly set to true/false, ensure it's a proper boolean + // Handle stream option explicitly if (options.stream !== undefined) { updatedOptions.stream = options.stream === true; - log.info(`[LLMCompletionStage] Stream explicitly provided in options, set to: ${updatedOptions.stream}`); - } else { - // If undefined, leave it undefined so provider can use its default behavior - log.info(`[LLMCompletionStage] Stream option not explicitly set, leaving as undefined`); + log.info(`[LLMCompletionStage] Stream explicitly set to: ${updatedOptions.stream}`); } - // If this is a direct (non-stream) call to Ollama but has the stream flag, - // ensure we set additional metadata to maintain proper state - if (updatedOptions.stream && !provider && updatedOptions.providerMetadata?.provider === 'ollama') { - log.info(`[LLMCompletionStage] This is an Ollama request with stream=true, ensuring provider config is consistent`); + // Add capture of raw provider data for streaming + if (updatedOptions.stream) { + // Add a function to capture raw provider data in stream chunks + const originalStreamCallback = updatedOptions.streamCallback; + updatedOptions.streamCallback = async (text, done, rawProviderData) => { + // Create an enhanced chunk with the raw provider data + const enhancedChunk = { + text, + done, + // Include raw provider data if available + raw: rawProviderData + }; + + // Call the original callback if provided + if (originalStreamCallback) { + return originalStreamCallback(text, done, enhancedChunk); + } + }; } - log.info(`[LLMCompletionStage] Copied options: ${JSON.stringify({ - model: updatedOptions.model, - stream: updatedOptions.stream, - enableTools: updatedOptions.enableTools - })}`); - // Check if tools should be enabled if (updatedOptions.enableTools !== false) { - // Get all available tools from the registry const toolDefinitions = toolRegistry.getAllToolDefinitions(); - if (toolDefinitions.length > 0) { - // Enable tools and add them to the options updatedOptions.enableTools = true; updatedOptions.tools = toolDefinitions; log.info(`Adding ${toolDefinitions.length} tools to LLM request`); } } - // Determine which provider to use - prioritize in this order: - // 1. Explicit provider parameter (legacy approach) - // 2. Provider from metadata - // 3. Auto-selection + // Determine which provider to use let selectedProvider = provider; - - // If no explicit provider is specified, check for provider metadata if (!selectedProvider && updatedOptions.providerMetadata?.provider) { selectedProvider = updatedOptions.providerMetadata.provider; log.info(`Using provider ${selectedProvider} from metadata for model ${updatedOptions.model}`); } log.info(`Generating LLM completion, provider: ${selectedProvider || 'auto'}, model: ${updatedOptions?.model || 'default'}`); - log.info(`[LLMCompletionStage] Options before service call: ${JSON.stringify({ - model: updatedOptions.model, - stream: updatedOptions.stream, - enableTools: updatedOptions.enableTools - })}`); - // If provider is specified (either explicit or from metadata), use that specific provider + // Use specific provider if available if (selectedProvider && aiServiceManager.isProviderAvailable(selectedProvider)) { const service = aiServiceManager.getService(selectedProvider); - log.info(`[LLMCompletionStage] Using specific service for ${selectedProvider}, stream option: ${updatedOptions.stream}`); + log.info(`[LLMCompletionStage] Using specific service for ${selectedProvider}`); + + // Generate completion and wrap with enhanced stream handling const response = await service.generateChatCompletion(messages, updatedOptions); + + // If streaming is enabled, enhance the stream method + if (response.stream && typeof response.stream === 'function' && updatedOptions.stream) { + const originalStream = response.stream; + + // Replace the stream method with an enhanced version that captures and forwards raw data + response.stream = async (callback) => { + return originalStream(async (chunk) => { + // Forward the chunk with any additional provider-specific data + // Create an enhanced chunk with provider info + const enhancedChunk: StreamChunk = { + ...chunk, + // If the provider didn't include raw data, add minimal info + raw: chunk.raw || { + provider: selectedProvider, + model: response.model + } + }; + return callback(enhancedChunk); + }); + }; + } + return { response }; } - // Otherwise use the service manager to select an available provider - log.info(`[LLMCompletionStage] Using auto-selected service, stream option: ${updatedOptions.stream}`); + // Use auto-selection if no specific provider + log.info(`[LLMCompletionStage] Using auto-selected service`); const response = await aiServiceManager.generateChatCompletion(messages, updatedOptions); + + // Add similar stream enhancement for auto-selected provider + if (response.stream && typeof response.stream === 'function' && updatedOptions.stream) { + const originalStream = response.stream; + response.stream = async (callback) => { + return originalStream(async (chunk) => { + // Create an enhanced chunk with provider info + const enhancedChunk: StreamChunk = { + ...chunk, + raw: chunk.raw || { + provider: response.provider, + model: response.model + } + }; + return callback(enhancedChunk); + }); + }; + } + return { response }; } } diff --git a/src/services/llm/providers/anthropic_service.ts b/src/services/llm/providers/anthropic_service.ts index 523ebbcc3..8f03404a0 100644 --- a/src/services/llm/providers/anthropic_service.ts +++ b/src/services/llm/providers/anthropic_service.ts @@ -112,6 +112,8 @@ export class AnthropicService extends BaseAIService { /** * Handle streaming response from Anthropic + * + * Simplified implementation that leverages the Anthropic SDK's streaming capabilities */ private async handleStreamingResponse( client: any, @@ -119,26 +121,29 @@ export class AnthropicService extends BaseAIService { opts: ChatCompletionOptions, providerOptions: AnthropicOptions ): Promise { - let completeText = ''; - - // Create a function that will return a Promise that resolves with the final text + // Create a stream handler function that processes the SDK's stream const streamHandler = async (callback: (chunk: StreamChunk) => Promise | void): Promise => { + let completeText = ''; + try { + // Request a streaming response from Anthropic const streamResponse = await client.messages.create({ ...params, stream: true }); + // Process each chunk in the stream for await (const chunk of streamResponse) { + // Only process text content deltas if (chunk.type === 'content_block_delta' && chunk.delta?.type === 'text_delta') { const text = chunk.delta.text || ''; completeText += text; - // Call the callback with the chunk + // Send the chunk to the caller await callback({ text, done: false, - usage: {} // Usage stats not available in chunks + raw: chunk // Include the raw chunk for advanced processing }); } } @@ -146,11 +151,7 @@ export class AnthropicService extends BaseAIService { // Signal completion await callback({ text: '', - done: true, - usage: { - // We don't have token usage information in streaming mode from the chunks - totalTokens: completeText.length / 4 // Rough estimate - } + done: true }); return completeText; @@ -160,25 +161,12 @@ export class AnthropicService extends BaseAIService { } }; - // If a stream callback was provided in the options, set up immediate streaming - if (opts.streamCallback) { - // Start streaming in the background - void streamHandler(async (chunk) => { - if (opts.streamCallback) { - await opts.streamCallback(chunk.text, chunk.done); - } - }); - } - + // Return a response object with the stream handler return { - text: completeText, // This will be empty initially until streaming completes + text: '', // Initial text is empty, will be populated during streaming model: providerOptions.model, provider: this.getName(), - stream: streamHandler, - usage: { - // We don't have token counts initially with streaming - totalTokens: 0 - } + stream: streamHandler }; } diff --git a/src/services/llm/providers/ollama_service.ts b/src/services/llm/providers/ollama_service.ts index 64b66aa48..e83c86b87 100644 --- a/src/services/llm/providers/ollama_service.ts +++ b/src/services/llm/providers/ollama_service.ts @@ -1,6 +1,6 @@ import options from '../../options.js'; import { BaseAIService } from '../base_ai_service.js'; -import type { Message, ChatCompletionOptions, ChatResponse } from '../ai_interface.js'; +import type { Message, ChatCompletionOptions, ChatResponse, StreamChunk } from '../ai_interface.js'; import { OllamaMessageFormatter } from '../formatters/ollama_formatter.js'; import log from '../../log.js'; import type { ToolCall } from '../tools/tool_interfaces.js'; @@ -37,7 +37,37 @@ export class OllamaService extends BaseAIService { if (!baseUrl) { throw new Error('Ollama base URL is not configured'); } - this.client = new Ollama({ host: baseUrl }); + + log.info(`Creating new Ollama client with base URL: ${baseUrl}`); + + // Create client with debug options + try { + this.client = new Ollama({ + host: baseUrl, + fetch: (url, init) => { + log.info(`Ollama API request to: ${url}`); + log.info(`Ollama API request method: ${init?.method || 'GET'}`); + log.info(`Ollama API request headers: ${JSON.stringify(init?.headers || {})}`); + + // Call the actual fetch + return fetch(url, init).then(response => { + log.info(`Ollama API response status: ${response.status}`); + if (!response.ok) { + log.error(`Ollama API error response: ${response.statusText}`); + } + return response; + }).catch(error => { + log.error(`Ollama API fetch error: ${error.message}`); + throw error; + }); + } + }); + + log.info(`Ollama client successfully created`); + } catch (error) { + log.error(`Error creating Ollama client: ${error}`); + throw error; + } } return this.client; } @@ -88,11 +118,6 @@ export class OllamaService extends BaseAIService { log.info(`Sending to Ollama with formatted messages: ${messagesToSend.length}`); } - // Log request details - log.info(`========== OLLAMA API REQUEST ==========`); - log.info(`Model: ${providerOptions.model}, Messages: ${messagesToSend.length}`); - log.info(`Stream: ${opts.streamCallback ? true : false}`); - // Get tools if enabled let tools = []; if (providerOptions.enableTools !== false) { @@ -119,48 +144,18 @@ export class OllamaService extends BaseAIService { } } - // Check message structure and log detailed information about each message - messagesToSend.forEach((msg: any, index: number) => { - const keys = Object.keys(msg); - log.info(`Message ${index}, Role: ${msg.role}, Keys: ${keys.join(', ')}`); - - // Log message content preview - if (msg.content && typeof msg.content === 'string') { - const contentPreview = msg.content.length > 200 - ? `${msg.content.substring(0, 200)}...` - : msg.content; - log.info(`Message ${index} content: ${contentPreview}`); - } - - // Log tool-related details - if (keys.includes('tool_calls')) { - log.info(`Message ${index} has ${msg.tool_calls.length} tool calls`); - } - - if (keys.includes('tool_call_id')) { - log.info(`Message ${index} is a tool response for tool call ID: ${msg.tool_call_id}`); - } - - if (keys.includes('name') && msg.role === 'tool') { - log.info(`Message ${index} is from tool: ${msg.name}`); - } - }); - - // Get client instance - const client = this.getClient(); - // Convert our message format to Ollama's format const convertedMessages = messagesToSend.map(msg => { const converted: any = { role: msg.role, content: msg.content }; - + if (msg.tool_calls) { converted.tool_calls = msg.tool_calls.map(tc => { // For Ollama, arguments must be an object, not a string let processedArgs = tc.function.arguments; - + // If arguments is a string, try to parse it as JSON if (typeof processedArgs === 'string') { try { @@ -171,7 +166,7 @@ export class OllamaService extends BaseAIService { processedArgs = { raw: processedArgs }; } } - + return { id: tc.id, function: { @@ -181,18 +176,18 @@ export class OllamaService extends BaseAIService { }; }); } - + if (msg.tool_call_id) { converted.tool_call_id = msg.tool_call_id; } - + if (msg.name) { converted.name = msg.name; } - + return converted; }); - + // Prepare base request options const baseRequestOptions = { model: providerOptions.model, @@ -202,85 +197,29 @@ export class OllamaService extends BaseAIService { tools: tools.length > 0 ? tools : undefined }; + // Get client instance + const client = this.getClient(); + // Handle streaming - if (opts.streamCallback) { - let responseText = ''; - let responseToolCalls: any[] = []; - - log.info(`Using streaming mode with Ollama client`); - - let streamResponse: OllamaChatResponse | null = null; - - // Create streaming request - const streamingRequest = { - ...baseRequestOptions, - stream: true as const // Use const assertion to fix the type - }; - - // Get the async iterator - const streamIterator = await client.chat(streamingRequest); - - // Process each chunk - for await (const chunk of streamIterator) { - // Save the last chunk for final stats - streamResponse = chunk; - - // Accumulate text - if (chunk.message?.content) { - responseText += chunk.message.content; - } - - // Check for tool calls - if (chunk.message?.tool_calls && chunk.message.tool_calls.length > 0) { - responseToolCalls = [...chunk.message.tool_calls]; - } - - // Call the callback with the current chunk content - if (opts.streamCallback) { - // Original callback expects text content, isDone flag, and optional original chunk - opts.streamCallback( - chunk.message?.content || '', - !!chunk.done, - chunk - ); - } - } - - // Create the final response after streaming is complete - return { - text: responseText, - model: providerOptions.model, - provider: this.getName(), - tool_calls: this.transformToolCalls(responseToolCalls), - usage: { - promptTokens: streamResponse?.prompt_eval_count || 0, - completionTokens: streamResponse?.eval_count || 0, - totalTokens: (streamResponse?.prompt_eval_count || 0) + (streamResponse?.eval_count || 0) - } - }; + if (opts.stream || opts.streamCallback) { + return this.handleStreamingResponse(client, baseRequestOptions, opts, providerOptions); } else { // Non-streaming request log.info(`Using non-streaming mode with Ollama client`); - + // Create non-streaming request const nonStreamingRequest = { ...baseRequestOptions, stream: false as const // Use const assertion for type safety }; - + const response = await client.chat(nonStreamingRequest); - + // Log response details log.info(`========== OLLAMA API RESPONSE ==========`); log.info(`Model: ${response.model}, Content length: ${response.message?.content?.length || 0} chars`); log.info(`Tokens: ${response.prompt_eval_count || 0} prompt, ${response.eval_count || 0} completion, ${(response.prompt_eval_count || 0) + (response.eval_count || 0)} total`); - - // Log content preview - const contentPreview = response.message?.content && response.message.content.length > 300 - ? `${response.message.content.substring(0, 300)}...` - : response.message?.content || ''; - log.info(`Response content: ${contentPreview}`); - + // Handle the response and extract tool calls if present const chatResponse: ChatResponse = { text: response.message?.content || '', @@ -292,15 +231,13 @@ export class OllamaService extends BaseAIService { totalTokens: (response.prompt_eval_count || 0) + (response.eval_count || 0) } }; - + // Add tool calls if present if (response.message?.tool_calls && response.message.tool_calls.length > 0) { log.info(`Ollama response includes ${response.message.tool_calls.length} tool calls`); chatResponse.tool_calls = this.transformToolCalls(response.message.tool_calls); - log.info(`Transformed tool calls: ${JSON.stringify(chatResponse.tool_calls)}`); } - - log.info(`========== END OLLAMA RESPONSE ==========`); + return chatResponse; } } catch (error: any) { @@ -315,6 +252,303 @@ export class OllamaService extends BaseAIService { } } + /** + * Handle streaming response from Ollama + * + * Simplified implementation that leverages the Ollama SDK's streaming capabilities + */ + private async handleStreamingResponse( + client: Ollama, + requestOptions: any, + opts: ChatCompletionOptions, + providerOptions: OllamaOptions + ): Promise { + log.info(`Using streaming mode with Ollama client`); + + // Log detailed information about the streaming setup + log.info(`Ollama streaming details: model=${providerOptions.model}, streamCallback=${opts.streamCallback ? 'provided' : 'not provided'}`); + + // Create a stream handler function that processes the SDK's stream + const streamHandler = async (callback: (chunk: StreamChunk) => Promise | void): Promise => { + let completeText = ''; + let responseToolCalls: any[] = []; + let chunkCount = 0; + + try { + // Create streaming request + const streamingRequest = { + ...requestOptions, + stream: true as const // Use const assertion to fix the type + }; + + log.info(`Creating Ollama streaming request with options: model=${streamingRequest.model}, stream=${streamingRequest.stream}, tools=${streamingRequest.tools ? streamingRequest.tools.length : 0}`); + + // Get the async iterator + log.info(`Calling Ollama chat API with streaming enabled`); + let streamIterator; + try { + log.info(`About to call client.chat with streaming request to ${options.getOption('ollamaBaseUrl')}`); + log.info(`Stream request: model=${streamingRequest.model}, messages count=${streamingRequest.messages?.length || 0}`); + + // Check if we can connect to Ollama by getting available models + try { + log.info(`Performing Ollama health check...`); + const healthCheck = await client.list(); + log.info(`Ollama health check successful. Available models: ${healthCheck.models.map(m => m.name).join(', ')}`); + } catch (healthError) { + log.error(`Ollama health check failed: ${healthError instanceof Error ? healthError.message : String(healthError)}`); + log.error(`This indicates a connection issue to the Ollama server at ${options.getOption('ollamaBaseUrl')}`); + throw new Error(`Unable to connect to Ollama server: ${healthError instanceof Error ? healthError.message : String(healthError)}`); + } + + // Make the streaming request + log.info(`Proceeding with Ollama streaming request after successful health check`); + streamIterator = await client.chat(streamingRequest); + + log.info(`Successfully obtained Ollama stream iterator`); + + if (!streamIterator || typeof streamIterator[Symbol.asyncIterator] !== 'function') { + log.error(`Invalid stream iterator returned: ${JSON.stringify(streamIterator)}`); + throw new Error('Stream iterator is not valid'); + } + } catch (error) { + log.error(`Error getting stream iterator: ${error instanceof Error ? error.message : String(error)}`); + log.error(`Error stack: ${error instanceof Error ? error.stack : 'No stack trace'}`); + throw error; + } + + // Process each chunk + try { + log.info(`About to start processing stream chunks`); + for await (const chunk of streamIterator) { + chunkCount++; + + // Log first chunk and then periodic updates + if (chunkCount === 1 || chunkCount % 10 === 0) { + log.info(`Processing Ollama stream chunk #${chunkCount}, done=${!!chunk.done}, has content=${!!chunk.message?.content}`); + } + + // Accumulate text + if (chunk.message?.content) { + const newContent = chunk.message.content; + completeText += newContent; + + if (chunkCount === 1) { + log.info(`First content chunk received: "${newContent.substring(0, 50)}${newContent.length > 50 ? '...' : ''}"`); + } + } + + // Check for tool calls + if (chunk.message?.tool_calls && chunk.message.tool_calls.length > 0) { + responseToolCalls = [...chunk.message.tool_calls]; + log.info(`Received tool calls in stream: ${chunk.message.tool_calls.length} tools`); + } + + // Send the chunk to the caller + await callback({ + text: chunk.message?.content || '', + done: !!chunk.done, + raw: chunk // Include the raw chunk for advanced processing + }); + + // If this is the done chunk, log it + if (chunk.done) { + log.info(`Reached final chunk (done=true) after ${chunkCount} chunks, total content length: ${completeText.length}`); + } + } + + log.info(`Completed streaming from Ollama: processed ${chunkCount} chunks, total content: ${completeText.length} chars`); + + // Signal completion + await callback({ + text: '', + done: true + }); + } catch (streamProcessError) { + log.error(`Error processing Ollama stream: ${streamProcessError instanceof Error ? streamProcessError.message : String(streamProcessError)}`); + log.error(`Stream process error stack: ${streamProcessError instanceof Error ? streamProcessError.stack : 'No stack trace'}`); + + // Try to signal completion with error + try { + await callback({ + text: '', + done: true, + raw: { error: streamProcessError instanceof Error ? streamProcessError.message : String(streamProcessError) } + }); + } catch (finalError) { + log.error(`Error sending final error chunk: ${finalError}`); + } + + throw streamProcessError; + } + + return completeText; + } catch (error) { + log.error(`Error in Ollama streaming: ${error}`); + log.error(`Error details: ${error instanceof Error ? error.stack : 'No stack trace available'}`); + throw error; + } + }; + + // Handle direct streamCallback if provided + if (opts.streamCallback) { + let completeText = ''; + let responseToolCalls: any[] = []; + let finalChunk: OllamaChatResponse | null = null; + let chunkCount = 0; + + try { + // Create streaming request + const streamingRequest = { + ...requestOptions, + stream: true as const + }; + + log.info(`Starting Ollama direct streamCallback processing with model ${providerOptions.model}`); + + // Get the async iterator + log.info(`Calling Ollama chat API for direct streaming`); + let streamIterator; + try { + log.info(`About to call client.chat with streaming request to ${options.getOption('ollamaBaseUrl')}`); + log.info(`Model: ${streamingRequest.model}, Stream: ${streamingRequest.stream}`); + log.info(`Messages count: ${streamingRequest.messages.length}`); + log.info(`First message: role=${streamingRequest.messages[0].role}, content preview=${streamingRequest.messages[0].content?.substring(0, 50) || 'empty'}`); + + // Perform health check before streaming + try { + log.info(`Performing Ollama health check before direct streaming...`); + const healthCheck = await client.list(); + log.info(`Ollama health check successful. Available models: ${healthCheck.models.map(m => m.name).join(', ')}`); + } catch (healthError) { + log.error(`Ollama health check failed: ${healthError instanceof Error ? healthError.message : String(healthError)}`); + log.error(`This indicates a connection issue to the Ollama server at ${options.getOption('ollamaBaseUrl')}`); + throw new Error(`Unable to connect to Ollama server: ${healthError instanceof Error ? healthError.message : String(healthError)}`); + } + + // Proceed with streaming after successful health check + log.info(`Making Ollama streaming request after successful health check`); + streamIterator = await client.chat(streamingRequest); + + log.info(`Successfully obtained Ollama stream iterator for direct callback`); + + // Check if the stream iterator is valid + if (!streamIterator || typeof streamIterator[Symbol.asyncIterator] !== 'function') { + log.error(`Invalid stream iterator returned from Ollama: ${JSON.stringify(streamIterator)}`); + throw new Error('Invalid stream iterator returned from Ollama'); + } + + log.info(`Stream iterator is valid, beginning processing`); + } catch (error) { + log.error(`Error getting stream iterator from Ollama: ${error instanceof Error ? error.message : String(error)}`); + log.error(`Error stack: ${error instanceof Error ? error.stack : 'No stack trace'}`); + throw error; + } + + // Process each chunk + try { + log.info(`Starting to iterate through stream chunks`); + for await (const chunk of streamIterator) { + chunkCount++; + finalChunk = chunk; + + // Log first chunk and periodic updates + if (chunkCount === 1 || chunkCount % 10 === 0) { + log.info(`Processing Ollama direct stream chunk #${chunkCount}, done=${!!chunk.done}, has content=${!!chunk.message?.content}`); + } + + // Accumulate text + if (chunk.message?.content) { + const newContent = chunk.message.content; + completeText += newContent; + + if (chunkCount === 1) { + log.info(`First direct content chunk: "${newContent.substring(0, 50)}${newContent.length > 50 ? '...' : ''}"`); + } + } + + // Check for tool calls + if (chunk.message?.tool_calls && chunk.message.tool_calls.length > 0) { + responseToolCalls = [...chunk.message.tool_calls]; + log.info(`Received tool calls in direct stream: ${chunk.message.tool_calls.length} tools`); + } + + // Call the callback with the current chunk content + if (opts.streamCallback) { + try { + // For the final chunk, make sure to send the complete text with done=true + if (chunk.done) { + log.info(`Sending final callback with done=true and complete content (${completeText.length} chars)`); + await opts.streamCallback( + completeText, // Send the full accumulated content for the final chunk + true, + { ...chunk, message: { ...chunk.message, content: completeText } } + ); + } else if (chunk.message?.content) { + // For content chunks, send them as they come + await opts.streamCallback( + chunk.message.content, + !!chunk.done, + chunk + ); + } else if (chunk.message?.tool_calls && chunk.message.tool_calls.length > 0) { + // For tool call chunks, send an empty content string but include the tool calls + await opts.streamCallback( + '', + !!chunk.done, + chunk + ); + } + + if (chunkCount === 1) { + log.info(`Successfully called streamCallback with first chunk`); + } + } catch (callbackError) { + log.error(`Error in streamCallback: ${callbackError}`); + } + } + + // If this is the done chunk, log it + if (chunk.done) { + log.info(`Reached final direct chunk (done=true) after ${chunkCount} chunks, total content length: ${completeText.length}`); + } + } + + log.info(`Completed direct streaming from Ollama: processed ${chunkCount} chunks, final content: ${completeText.length} chars`); + } catch (iterationError) { + log.error(`Error iterating through Ollama stream chunks: ${iterationError instanceof Error ? iterationError.message : String(iterationError)}`); + log.error(`Iteration error stack: ${iterationError instanceof Error ? iterationError.stack : 'No stack trace'}`); + throw iterationError; + } + + // Create the final response after streaming is complete + return { + text: completeText, + model: providerOptions.model, + provider: this.getName(), + tool_calls: this.transformToolCalls(responseToolCalls), + usage: { + promptTokens: finalChunk?.prompt_eval_count || 0, + completionTokens: finalChunk?.eval_count || 0, + totalTokens: (finalChunk?.prompt_eval_count || 0) + (finalChunk?.eval_count || 0) + } + }; + } catch (error) { + log.error(`Error in Ollama streaming with callback: ${error}`); + log.error(`Error details: ${error instanceof Error ? error.stack : 'No stack trace available'}`); + throw error; + } + } + + // Return a response object with the stream handler + return { + text: '', // Initial text is empty, will be populated during streaming + model: providerOptions.model, + provider: this.getName(), + stream: streamHandler + }; + } + /** * Transform Ollama tool calls to the standard format expected by the pipeline */ @@ -322,14 +556,14 @@ export class OllamaService extends BaseAIService { if (!toolCalls || !Array.isArray(toolCalls) || toolCalls.length === 0) { return []; } - + return toolCalls.map((toolCall, index) => { // Generate a unique ID if none is provided const id = toolCall.id || `tool-call-${Date.now()}-${index}`; - + // Handle arguments based on their type let processedArguments: Record | string = toolCall.function?.arguments || {}; - + if (typeof processedArguments === 'string') { try { processedArguments = JSON.parse(processedArguments); @@ -339,7 +573,7 @@ export class OllamaService extends BaseAIService { processedArguments = { raw: processedArguments }; } } - + return { id, type: 'function', diff --git a/src/services/llm/providers/openai_service.ts b/src/services/llm/providers/openai_service.ts index 15e68ca1d..f537dfd8d 100644 --- a/src/services/llm/providers/openai_service.ts +++ b/src/services/llm/providers/openai_service.ts @@ -70,103 +70,61 @@ export class OpenAIService extends BaseAIService { if (providerOptions.stream) { params.stream = true; + // Get stream from OpenAI SDK const stream = await client.chat.completions.create(params); - let fullText = ''; - // If a direct callback is provided, use it - if (providerOptions.streamCallback) { - // Process the stream with the callback - try { - // The stream is an AsyncIterable - if (Symbol.asyncIterator in stream) { - for await (const chunk of stream as AsyncIterable) { - const content = chunk.choices[0]?.delta?.content || ''; - if (content) { - fullText += content; - await providerOptions.streamCallback(content, false, chunk); - } - - // If this is the last chunk - if (chunk.choices[0]?.finish_reason) { - await providerOptions.streamCallback('', true, chunk); - } - } - } else { - console.error('Stream is not iterable, falling back to non-streaming response'); - - // If we get a non-streaming response somehow - if ('choices' in stream) { - const content = stream.choices[0]?.message?.content || ''; - fullText = content; - if (providerOptions.streamCallback) { - await providerOptions.streamCallback(content, true, stream); - } - } - } - } catch (error) { - console.error('Error processing stream:', error); - throw error; - } - - return { - text: fullText, - model: params.model, - provider: this.getName(), - usage: {} // Usage stats aren't available with streaming - }; - } else { - // Use the more flexible stream interface - return { - text: '', // Initial empty text, will be filled by stream processing - model: params.model, - provider: this.getName(), - usage: {}, // Usage stats aren't available with streaming - stream: async (callback) => { - let completeText = ''; - - try { - // The stream is an AsyncIterable - if (Symbol.asyncIterator in stream) { - for await (const chunk of stream as AsyncIterable) { - const content = chunk.choices[0]?.delta?.content || ''; - const isDone = !!chunk.choices[0]?.finish_reason; - - if (content) { - completeText += content; - } - - // Call the provided callback with the StreamChunk interface - await callback({ - text: content, - done: isDone - }); - - if (isDone) { - break; - } - } - } else { - console.warn('Stream is not iterable, falling back to non-streaming response'); + // Return a response with the stream handler + return { + text: '', // Initial empty text, will be populated during streaming + model: params.model, + provider: this.getName(), + stream: async (callback) => { + let completeText = ''; + + try { + // Process the stream + if (Symbol.asyncIterator in stream) { + for await (const chunk of stream as AsyncIterable) { + const content = chunk.choices[0]?.delta?.content || ''; + const isDone = !!chunk.choices[0]?.finish_reason; - // If we get a non-streaming response somehow - if ('choices' in stream) { - const content = stream.choices[0]?.message?.content || ''; - completeText = content; - await callback({ - text: content, - done: true - }); + if (content) { + completeText += content; + } + + // Send the chunk to the caller with raw data + await callback({ + text: content, + done: isDone, + raw: chunk // Include the raw chunk for advanced processing + }); + + if (isDone) { + break; } } - } catch (error) { - console.error('Error processing stream:', error); - throw error; + } else { + // Fallback for non-iterable response + console.warn('Stream is not iterable, falling back to non-streaming response'); + + if ('choices' in stream) { + const content = stream.choices[0]?.message?.content || ''; + completeText = content; + await callback({ + text: content, + done: true, + raw: stream + }); + } } - - return completeText; + } catch (error) { + console.error('Error processing stream:', error); + throw error; } - }; - } + + return completeText; + } + }; } else { // Non-streaming response params.stream = false; diff --git a/src/services/llm/rest_chat_service.ts b/src/services/llm/rest_chat_service.ts index 53a789f92..5760375d7 100644 --- a/src/services/llm/rest_chat_service.ts +++ b/src/services/llm/rest_chat_service.ts @@ -1,6 +1,26 @@ import log from "../log.js"; import type { Request, Response } from "express"; -import type { Message, ChatCompletionOptions } from "./ai_interface.js"; +import type { Message, ChatCompletionOptions, ChatResponse, StreamChunk } from "./ai_interface.js"; + +/** + * Interface for WebSocket LLM streaming messages + */ +interface LLMStreamMessage { + type: 'llm-stream'; + sessionId: string; + content?: string; + thinking?: string; + toolExecution?: { + action?: string; + tool?: string; + result?: string; + error?: string; + args?: Record; + }; + done?: boolean; + error?: string; + raw?: unknown; +} import contextService from "./context/services/context_service.js"; import { LLM_CONSTANTS } from './constants/provider_constants.js'; import { ERROR_PROMPTS } from './constants/llm_prompt_constants.js'; @@ -290,22 +310,24 @@ class RestChatService { // Add logging for POST requests log.info(`LLM POST message: sessionId=${req.params.sessionId}, useAdvancedContext=${useAdvancedContext}, showThinking=${showThinking}, contentLength=${content ? content.length : 0}`); } else if (req.method === 'GET') { - // For GET (streaming) requests, get format from query params - // The content should have been sent in a previous POST request - useAdvancedContext = req.query.useAdvancedContext === 'true'; - showThinking = req.query.showThinking === 'true'; - content = ''; // We don't need content for GET requests + // For GET (streaming) requests, get parameters from query params and body + // For streaming requests, we need the content from the body + useAdvancedContext = req.query.useAdvancedContext === 'true' || (req.body && req.body.useAdvancedContext === true); + showThinking = req.query.showThinking === 'true' || (req.body && req.body.showThinking === true); + content = req.body && req.body.content ? req.body.content : ''; - // Add logging for GET requests + // Add detailed logging for GET requests log.info(`LLM GET stream: sessionId=${req.params.sessionId}, useAdvancedContext=${useAdvancedContext}, showThinking=${showThinking}`); + log.info(`Parameters from query: useAdvancedContext=${req.query.useAdvancedContext}, showThinking=${req.query.showThinking}`); + log.info(`Parameters from body: useAdvancedContext=${req.body?.useAdvancedContext}, showThinking=${req.body?.showThinking}, content=${content ? `${content.substring(0, 20)}...` : 'none'}`); } // Get sessionId from URL params since it's part of the route sessionId = req.params.sessionId; - // For GET requests, ensure we have the format=stream parameter - if (req.method === 'GET' && (!req.query.format || req.query.format !== 'stream')) { - throw new Error('Stream format parameter is required for GET requests'); + // For GET requests, ensure we have the stream parameter + if (req.method === 'GET' && req.query.stream !== 'true') { + throw new Error('Stream parameter must be set to true for GET/streaming requests'); } // For POST requests, validate the content @@ -443,6 +465,33 @@ class RestChatService { log.info("Executing chat pipeline..."); + // Create options object for better tracking + const pipelineOptions = { + // Force useAdvancedContext to be a boolean, no matter what + useAdvancedContext: useAdvancedContext === true, + systemPrompt: session.messages.find(m => m.role === 'system')?.content, + temperature: session.metadata.temperature, + maxTokens: session.metadata.maxTokens, + model: session.metadata.model, + // Set stream based on request type, but ensure it's explicitly a boolean value + // GET requests or format=stream parameter indicates streaming should be used + stream: !!(req.method === 'GET' || req.query.format === 'stream' || req.query.stream === 'true') + }; + + // Log the options to verify what's being sent to the pipeline + log.info(`Pipeline input options: ${JSON.stringify({ + useAdvancedContext: pipelineOptions.useAdvancedContext, + stream: pipelineOptions.stream + })}`); + + // Import the WebSocket service for direct access + const wsService = await import('../../services/ws.js'); + + // Create a stream callback wrapper + // This will ensure we properly handle all streaming messages + let messageContent = ''; + let streamFinished = false; + // Prepare the pipeline input const pipelineInput: ChatPipelineInput = { messages: session.messages.map(msg => ({ @@ -452,30 +501,109 @@ class RestChatService { query: content, noteId: session.noteContext ?? undefined, showThinking: showThinking, - options: { - useAdvancedContext: useAdvancedContext, - systemPrompt: session.messages.find(m => m.role === 'system')?.content, - temperature: session.metadata.temperature, - maxTokens: session.metadata.maxTokens, - model: session.metadata.model, - // Set stream based on request type, but ensure it's explicitly a boolean value - // GET requests or format=stream parameter indicates streaming should be used - stream: !!(req.method === 'GET' || req.query.format === 'stream') - }, + options: pipelineOptions, streamCallback: req.method === 'GET' ? (data, done, rawChunk) => { - // Prepare response data - include both the content and raw chunk data if available - const responseData: any = { content: data, done }; - - // If there's tool execution information, add it to the response - if (rawChunk && rawChunk.toolExecution) { - responseData.toolExecution = rawChunk.toolExecution; - } - - // Send the data as a JSON event - res.write(`data: ${JSON.stringify(responseData)}\n\n`); - - if (done) { - res.end(); + try { + // Send a single WebSocket message that contains everything needed + // Only accumulate content that's actually text (not tool execution or thinking info) + if (data) { + messageContent += data; + } + + // Create a message object with all necessary fields + const message: LLMStreamMessage = { + type: 'llm-stream', + sessionId + }; + + // Add content if available - either the new chunk or full content on completion + if (data) { + message.content = data; + } + + // Add thinking info if available in the raw chunk + if (rawChunk?.thinking) { + message.thinking = rawChunk.thinking; + } + + // Add tool execution info if available in the raw chunk + if (rawChunk?.toolExecution) { + message.toolExecution = rawChunk.toolExecution; + } + + // Set done flag explicitly + message.done = done; + + // On final message, include the complete content too + if (done) { + streamFinished = true; + + // Always send the accumulated content with the done=true message + // This ensures the client receives the complete content even if earlier messages were missed + message.content = messageContent; + + log.info(`Stream complete, sending final message with ${messageContent.length} chars of content`); + + // Store the response in the session when done + session.messages.push({ + role: 'assistant', + content: messageContent, + timestamp: new Date() + }); + } + + // Send message to all clients + wsService.default.sendMessageToAllClients(message); + + // Log what was sent (first message and completion) + if (message.thinking || done) { + log.info( + `[WS-SERVER] Sending LLM stream message: sessionId=${sessionId}, content=${!!message.content}, contentLength=${message.content?.length || 0}, thinking=${!!message.thinking}, toolExecution=${!!message.toolExecution}, done=${done}` + ); + } + + // For GET requests, also send as server-sent events + // Prepare response data for JSON event + const responseData: any = { + content: data, + done + }; + + // Add tool execution if available + if (rawChunk?.toolExecution) { + responseData.toolExecution = rawChunk.toolExecution; + } + + // Send the data as a JSON event + res.write(`data: ${JSON.stringify(responseData)}\n\n`); + + if (done) { + res.end(); + } + } catch (error) { + log.error(`Error in stream callback: ${error}`); + + // Try to send error message + try { + wsService.default.sendMessageToAllClients({ + type: 'llm-stream', + sessionId, + error: `Stream error: ${error instanceof Error ? error.message : 'Unknown error'}`, + done: true + }); + } catch (e) { + log.error(`Failed to send error message: ${e}`); + } + + // End the response if not already done + try { + if (!streamFinished) { + res.write(`data: ${JSON.stringify({ error: 'Stream error', done: true })}\n\n`); + res.end(); + } + } catch (e) { + log.error(`Failed to end response: ${e}`); + } } } : undefined }; @@ -613,7 +741,7 @@ class RestChatService { // Make a follow-up request with the tool results log.info(`Making follow-up request with ${toolResults.length} tool results`); - const followUpOptions = {...chatOptions, enableTools: iterationCount < MAX_ITERATIONS}; // Enable tools for follow-up but limit iterations + const followUpOptions = { ...chatOptions, enableTools: iterationCount < MAX_ITERATIONS }; // Enable tools for follow-up but limit iterations const followUpResponse = await service.generateChatCompletion(currentMessages, followUpOptions); // Check if the follow-up response has more tool calls @@ -740,7 +868,10 @@ class RestChatService { } /** - * Handle streaming response from LLM + * Handle streaming response via WebSocket + * + * This method processes LLM responses and sends them incrementally via WebSocket + * to the client, supporting both text content and tool execution status updates. */ private async handleStreamingResponse( res: Response, @@ -749,133 +880,211 @@ class RestChatService { service: any, session: ChatSession ) { - // Set streaming headers once - res.setHeader('Content-Type', 'text/event-stream'); - res.setHeader('Cache-Control', 'no-cache'); - res.setHeader('Connection', 'keep-alive'); + // The client receives a success response for their HTTP request, + // but the actual content will be streamed via WebSocket + res.json({ success: true, message: 'Streaming response started' }); - // Flag to indicate we've handled the response directly - // This lets the route handler know not to process the result - (res as any).triliumResponseHandled = true; + // Import the WebSocket service + const wsService = (await import('../../services/ws.js')).default; let messageContent = ''; + const sessionId = session.id; + + // Immediately send an initial message to confirm WebSocket connection is working + // This helps prevent timeouts on the client side + wsService.sendMessageToAllClients({ + type: 'llm-stream', + sessionId, + thinking: 'Preparing response...' + } as LLMStreamMessage); try { - // Use the correct method name: generateChatCompletion - const response = await service.generateChatCompletion(aiMessages, chatOptions); + // Generate the LLM completion with streaming enabled + const response = await service.generateChatCompletion(aiMessages, { + ...chatOptions, + stream: true + }); - // Check for tool calls in the response + // If the model doesn't support streaming via .stream() method or returns tool calls, + // we'll handle it specially if (response.tool_calls && response.tool_calls.length > 0) { - log.info(`========== STREAMING TOOL CALLS DETECTED ==========`); - log.info(`Response contains ${response.tool_calls.length} tool calls, executing them...`); - log.info(`CRITICAL CHECK: Tool execution is supposed to happen in the pipeline, not directly here.`); - log.info(`If tools are being executed here instead of in the pipeline, this may be a flow issue.`); - log.info(`Response came from provider: ${response.provider || 'unknown'}, model: ${response.model || 'unknown'}`); + // Send thinking state notification via WebSocket + wsService.sendMessageToAllClients({ + type: 'llm-stream', + sessionId, + thinking: 'Analyzing tools needed for this request...' + } as LLMStreamMessage); try { - log.info(`========== STREAMING TOOL EXECUTION PATH ==========`); - log.info(`About to execute tools in streaming path (this is separate from pipeline tool execution)`); - // Execute the tools const toolResults = await this.executeToolCalls(response); - log.info(`Successfully executed ${toolResults.length} tool calls in streaming path`); - // Make a follow-up request with the tool results + // For each tool execution, send progress update via WebSocket + for (const toolResult of toolResults) { + wsService.sendMessageToAllClients({ + type: 'llm-stream', + sessionId, + toolExecution: { + action: 'complete', + tool: toolResult.name || 'unknown', + result: toolResult.content.substring(0, 100) + (toolResult.content.length > 100 ? '...' : '') + } + } as LLMStreamMessage); + } + + // Make follow-up request with tool results const toolMessages = [...aiMessages, { role: 'assistant', content: response.text || '', tool_calls: response.tool_calls }, ...toolResults]; - log.info(`Making follow-up request with ${toolResults.length} tool results`); - - // Send partial response to let the client know tools are being processed - if (!res.writableEnded) { - res.write(`data: ${JSON.stringify({ content: "Processing tools... " })}\n\n`); - } - // Use non-streaming for the follow-up to get a complete response - const followUpOptions = {...chatOptions, stream: false, enableTools: false}; // Prevent infinite loops + const followUpOptions = { ...chatOptions, stream: false, enableTools: false }; const followUpResponse = await service.generateChatCompletion(toolMessages, followUpOptions); messageContent = followUpResponse.text || ""; - // Send the complete response as a single chunk - if (!res.writableEnded) { - res.write(`data: ${JSON.stringify({ content: messageContent })}\n\n`); - res.write('data: [DONE]\n\n'); - res.end(); - } + // Send the complete response with done flag in the same message + wsService.sendMessageToAllClients({ + type: 'llm-stream', + sessionId, + content: messageContent, + done: true + } as LLMStreamMessage); - // Store the full response for the session + // Store the response in the session session.messages.push({ role: 'assistant', content: messageContent, timestamp: new Date() }); - return; // Skip the rest of the processing + return; } catch (toolError) { log.error(`Error executing tools: ${toolError}`); - // Continue with normal streaming response as fallback + + // Send error via WebSocket with done flag + wsService.sendMessageToAllClients({ + type: 'llm-stream', + sessionId, + error: `Error executing tools: ${toolError instanceof Error ? toolError.message : 'Unknown error'}`, + done: true + } as LLMStreamMessage); + + return; } } - // Handle streaming if the response includes a stream method + // Handle standard streaming through the stream() method if (response.stream) { - await response.stream((chunk: { text: string; done: boolean }) => { - if (chunk.text) { - messageContent += chunk.text; - // Only write if the response hasn't finished - if (!res.writableEnded) { - res.write(`data: ${JSON.stringify({ content: chunk.text })}\n\n`); - } - } + log.info(`Provider ${service.getName()} supports streaming via stream() method`); - if (chunk.done) { - // Signal the end of the stream when done, only if not already ended - if (!res.writableEnded) { - res.write('data: [DONE]\n\n'); - res.end(); + try { + await response.stream(async (chunk: StreamChunk) => { + if (chunk.text) { + messageContent += chunk.text; + + // Send the chunk content via WebSocket + wsService.sendMessageToAllClients({ + type: 'llm-stream', + sessionId, + content: chunk.text, + // Include any raw data from the provider that might contain thinking/tool info + ...(chunk.raw ? { raw: chunk.raw } : {}) + } as LLMStreamMessage); + + // Log the first chunk (useful for debugging) + if (messageContent.length === chunk.text.length) { + log.info(`First stream chunk received from ${service.getName()}`); + } } - } - }); - } else { - // If no streaming available, send the response as a single chunk - messageContent = response.text; - // Only write if the response hasn't finished - if (!res.writableEnded) { - res.write(`data: ${JSON.stringify({ content: messageContent })}\n\n`); - res.write('data: [DONE]\n\n'); - res.end(); + + // If the provider indicates this is "thinking" state, relay that + if (chunk.raw?.thinking) { + wsService.sendMessageToAllClients({ + type: 'llm-stream', + sessionId, + thinking: chunk.raw.thinking + } as LLMStreamMessage); + } + + // If the provider indicates tool execution, relay that + if (chunk.raw?.toolExecution) { + wsService.sendMessageToAllClients({ + type: 'llm-stream', + sessionId, + toolExecution: chunk.raw.toolExecution + } as LLMStreamMessage); + } + + // Signal completion when done + if (chunk.done) { + log.info(`Stream completed from ${service.getName()}`); + + // Send the final message with both content and done flag together + wsService.sendMessageToAllClients({ + type: 'llm-stream', + sessionId, + content: messageContent, // Send the accumulated content + done: true + } as LLMStreamMessage); + } + }); + + log.info(`Streaming from ${service.getName()} completed successfully`); + } catch (streamError) { + log.error(`Error during streaming from ${service.getName()}: ${streamError}`); + + // Report the error to the client + wsService.sendMessageToAllClients({ + type: 'llm-stream', + sessionId, + error: `Error during streaming: ${streamError instanceof Error ? streamError.message : 'Unknown error'}`, + done: true + } as LLMStreamMessage); + + throw streamError; } + } else { + log.info(`Provider ${service.getName()} does not support streaming via stream() method, falling back to single response`); + + // If streaming isn't available, send the entire response at once + messageContent = response.text || ''; + + // Send via WebSocket - include both content and done flag in same message + wsService.sendMessageToAllClients({ + type: 'llm-stream', + sessionId, + content: messageContent, + done: true + } as LLMStreamMessage); + + log.info(`Complete response sent for ${service.getName()}`); } - // Store the full response for the session - const aiResponse = messageContent; - - // Store the assistant's response in the session + // Store the full response in the session session.messages.push({ role: 'assistant', - content: aiResponse, + content: messageContent, timestamp: new Date() }); } catch (streamingError: any) { - // If streaming fails and we haven't sent a response yet, throw the error - if (!res.headersSent) { - throw streamingError; - } else { - // If headers were already sent, try to send an error event - try { - if (!res.writableEnded) { - res.write(`data: ${JSON.stringify({ error: streamingError.message })}\n\n`); - res.write('data: [DONE]\n\n'); - res.end(); - } - } catch (e) { - log.error(`Failed to write streaming error: ${e}`); - } - } + log.error(`Streaming error: ${streamingError.message}`); + + // Send error via WebSocket + wsService.sendMessageToAllClients({ + type: 'llm-stream', + sessionId, + error: `Error generating response: ${streamingError instanceof Error ? streamingError.message : 'Unknown error'}` + } as LLMStreamMessage); + + // Signal completion + wsService.sendMessageToAllClients({ + type: 'llm-stream', + sessionId, + done: true + } as LLMStreamMessage); } } diff --git a/src/services/ws.ts b/src/services/ws.ts index 81e4baccb..9bc5d7c72 100644 --- a/src/services/ws.ts +++ b/src/services/ws.ts @@ -56,6 +56,21 @@ interface Message { originEntityId?: string | null; lastModifiedMs?: number; filePath?: string; + + // LLM streaming specific fields + sessionId?: string; + content?: string; + thinking?: string; + toolExecution?: { + action?: string; + tool?: string; + result?: string; + error?: string; + args?: Record; + }; + done?: boolean; + error?: string; + raw?: unknown; } type SessionParser = (req: IncomingMessage, params: {}, cb: () => void) => void; @@ -115,15 +130,25 @@ function sendMessageToAllClients(message: Message) { const jsonStr = JSON.stringify(message); if (webSocketServer) { - if (message.type !== "sync-failed" && message.type !== "api-log-messages") { + // Special logging for LLM streaming messages + if (message.type === "llm-stream") { + log.info(`[WS-SERVER] Sending LLM stream message: sessionId=${message.sessionId}, content=${!!message.content}, thinking=${!!message.thinking}, toolExecution=${!!message.toolExecution}, done=${!!message.done}`); + } else if (message.type !== "sync-failed" && message.type !== "api-log-messages") { log.info(`Sending message to all clients: ${jsonStr}`); } + let clientCount = 0; webSocketServer.clients.forEach(function each(client) { if (client.readyState === WebSocket.OPEN) { client.send(jsonStr); + clientCount++; } }); + + // Log WebSocket client count for debugging + if (message.type === "llm-stream") { + log.info(`[WS-SERVER] Sent LLM stream message to ${clientCount} clients`); + } } }