mirror of
https://github.com/TriliumNext/Notes.git
synced 2025-07-29 19:12:27 +08:00
getting closer to streaming?
even closer? closer streaming... this is darn close
This commit is contained in:
parent
b68ff88840
commit
451e5ea31f
@ -127,6 +127,49 @@ async function handleMessage(event: MessageEvent<any>) {
|
||||
appContext.triggerEvent("apiLogMessages", { noteId: message.noteId, messages: message.messages });
|
||||
} else if (message.type === "toast") {
|
||||
toastService.showMessage(message.message);
|
||||
} else if (message.type === "llm-stream") {
|
||||
// ENHANCED LOGGING FOR DEBUGGING
|
||||
console.log(`[WS-CLIENT] >>> RECEIVED LLM STREAM MESSAGE <<<`);
|
||||
console.log(`[WS-CLIENT] Message details: sessionId=${message.sessionId}, hasContent=${!!message.content}, contentLength=${message.content ? message.content.length : 0}, hasThinking=${!!message.thinking}, hasToolExecution=${!!message.toolExecution}, isDone=${!!message.done}`);
|
||||
|
||||
if (message.content) {
|
||||
console.log(`[WS-CLIENT] CONTENT PREVIEW: "${message.content.substring(0, 50)}..."`);
|
||||
}
|
||||
|
||||
// Create the event with detailed logging
|
||||
console.log(`[WS-CLIENT] Creating CustomEvent 'llm-stream-message'`);
|
||||
const llmStreamEvent = new CustomEvent('llm-stream-message', { detail: message });
|
||||
|
||||
// Dispatch to multiple targets to ensure delivery
|
||||
try {
|
||||
console.log(`[WS-CLIENT] Dispatching event to window`);
|
||||
window.dispatchEvent(llmStreamEvent);
|
||||
console.log(`[WS-CLIENT] Event dispatched to window`);
|
||||
|
||||
// Also try document for completeness
|
||||
console.log(`[WS-CLIENT] Dispatching event to document`);
|
||||
document.dispatchEvent(new CustomEvent('llm-stream-message', { detail: message }));
|
||||
console.log(`[WS-CLIENT] Event dispatched to document`);
|
||||
} catch (err) {
|
||||
console.error(`[WS-CLIENT] Error dispatching event:`, err);
|
||||
}
|
||||
|
||||
// Debug current listeners (though we can't directly check for specific event listeners)
|
||||
console.log(`[WS-CLIENT] Active event listeners should receive this message now`);
|
||||
|
||||
// Detailed logging based on message type
|
||||
if (message.content) {
|
||||
console.log(`[WS-CLIENT] Content message: ${message.content.length} chars`);
|
||||
} else if (message.thinking) {
|
||||
console.log(`[WS-CLIENT] Thinking update: "${message.thinking}"`);
|
||||
} else if (message.toolExecution) {
|
||||
console.log(`[WS-CLIENT] Tool execution: action=${message.toolExecution.action}, tool=${message.toolExecution.tool || 'unknown'}`);
|
||||
if (message.toolExecution.result) {
|
||||
console.log(`[WS-CLIENT] Tool result preview: "${String(message.toolExecution.result).substring(0, 50)}..."`);
|
||||
}
|
||||
} else if (message.done) {
|
||||
console.log(`[WS-CLIENT] Completion signal received`);
|
||||
}
|
||||
} else if (message.type === "execute-script") {
|
||||
// TODO: Remove after porting the file
|
||||
// @ts-ignore
|
||||
|
@ -7,6 +7,7 @@ import { t } from "../services/i18n.js";
|
||||
import libraryLoader from "../services/library_loader.js";
|
||||
import { applySyntaxHighlight } from "../services/syntax_highlight.js";
|
||||
import options from "../services/options.js";
|
||||
import ws from "../services/ws.js";
|
||||
import { marked } from "marked";
|
||||
|
||||
// Import the LLM Chat CSS
|
||||
@ -105,6 +106,8 @@ export default class LlmChatPanel extends BasicWidget {
|
||||
private validationWarning!: HTMLElement;
|
||||
private sessionId: string | null = null;
|
||||
private currentNoteId: string | null = null;
|
||||
private _messageHandlerId: number | null = null;
|
||||
private _messageHandler: any = null;
|
||||
|
||||
// Callbacks for data persistence
|
||||
private onSaveData: ((data: any) => Promise<void>) | null = null;
|
||||
@ -178,6 +181,15 @@ export default class LlmChatPanel extends BasicWidget {
|
||||
return this.$widget;
|
||||
}
|
||||
|
||||
cleanup() {
|
||||
console.log(`LlmChatPanel cleanup called, removing any active WebSocket subscriptions`);
|
||||
|
||||
// No need to manually clean up the event listeners, as they will be garbage collected
|
||||
// when the component is destroyed. We only need to clean up references.
|
||||
this._messageHandler = null;
|
||||
this._messageHandlerId = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the callbacks for data persistence
|
||||
*/
|
||||
@ -375,16 +387,15 @@ export default class LlmChatPanel extends BasicWidget {
|
||||
// Create the message parameters
|
||||
const messageParams = {
|
||||
content,
|
||||
contextNoteId: this.currentNoteId,
|
||||
useAdvancedContext,
|
||||
showThinking
|
||||
};
|
||||
|
||||
// First try to use streaming (preferred method)
|
||||
// Try websocket streaming (preferred method)
|
||||
try {
|
||||
await this.setupStreamingResponse(messageParams);
|
||||
} catch (streamingError) {
|
||||
console.warn("Streaming request failed, falling back to direct response:", streamingError);
|
||||
console.warn("WebSocket streaming failed, falling back to direct response:", streamingError);
|
||||
|
||||
// If streaming fails, fall back to direct response
|
||||
const handled = await this.handleDirectResponse(messageParams);
|
||||
@ -424,12 +435,14 @@ export default class LlmChatPanel extends BasicWidget {
|
||||
*/
|
||||
private async handleDirectResponse(messageParams: any): Promise<boolean> {
|
||||
try {
|
||||
// Add format parameter to maintain consistency with the streaming GET request
|
||||
// Create a copy of the params without any streaming flags
|
||||
const postParams = {
|
||||
...messageParams,
|
||||
format: 'stream' // Match the format parameter used in the GET streaming request
|
||||
stream: false // Explicitly set to false to ensure we get a direct response
|
||||
};
|
||||
|
||||
console.log(`Sending direct POST request for session ${this.sessionId}`);
|
||||
|
||||
// Send the message via POST request with the updated params
|
||||
const postResponse = await server.post<any>(`llm/sessions/${this.sessionId}/messages`, postParams);
|
||||
|
||||
@ -474,184 +487,318 @@ export default class LlmChatPanel extends BasicWidget {
|
||||
}
|
||||
|
||||
/**
|
||||
* Set up streaming response from the server
|
||||
* Set up streaming response via WebSocket
|
||||
*/
|
||||
private async setupStreamingResponse(messageParams: any): Promise<void> {
|
||||
const content = messageParams.content || '';
|
||||
const useAdvancedContext = messageParams.useAdvancedContext;
|
||||
const showThinking = messageParams.showThinking;
|
||||
|
||||
// Set up streaming via EventSource - explicitly add stream=true parameter to ensure consistency
|
||||
const streamUrl = `./api/llm/sessions/${this.sessionId}/messages?format=stream&stream=true&useAdvancedContext=${useAdvancedContext}&showThinking=${showThinking}`;
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
const source = new EventSource(streamUrl);
|
||||
let assistantResponse = '';
|
||||
let receivedAnyContent = false;
|
||||
let timeoutId: number | null = null;
|
||||
let initialTimeoutId: number | null = null;
|
||||
let receivedAnyMessage = false;
|
||||
let eventListener: ((event: Event) => void) | null = null;
|
||||
|
||||
// Set up timeout for streaming response
|
||||
timeoutId = this.setupStreamingTimeout(source);
|
||||
// Create a unique identifier for this response process
|
||||
const responseId = `llm-stream-${Date.now()}-${Math.floor(Math.random() * 1000)}`;
|
||||
console.log(`[${responseId}] Setting up WebSocket streaming for session ${this.sessionId}`);
|
||||
|
||||
// Handle streaming response
|
||||
source.onmessage = (event) => {
|
||||
try {
|
||||
if (event.data === '[DONE]') {
|
||||
// Stream completed successfully
|
||||
this.handleStreamingComplete(source, timeoutId, receivedAnyContent, assistantResponse);
|
||||
resolve();
|
||||
return;
|
||||
// Create a message handler for CustomEvents
|
||||
eventListener = (event: Event) => {
|
||||
const customEvent = event as CustomEvent;
|
||||
const message = customEvent.detail;
|
||||
|
||||
// Only process messages for our session
|
||||
if (!message || message.sessionId !== this.sessionId) {
|
||||
return;
|
||||
}
|
||||
|
||||
console.log(`[${responseId}] LLM Stream message received via CustomEvent: session=${this.sessionId}, content=${!!message.content}, contentLength=${message.content?.length || 0}, thinking=${!!message.thinking}, toolExecution=${!!message.toolExecution}, done=${!!message.done}`);
|
||||
|
||||
// Mark first message received
|
||||
if (!receivedAnyMessage) {
|
||||
receivedAnyMessage = true;
|
||||
console.log(`[${responseId}] First message received for session ${this.sessionId}`);
|
||||
|
||||
// Clear the initial timeout since we've received a message
|
||||
if (initialTimeoutId !== null) {
|
||||
window.clearTimeout(initialTimeoutId);
|
||||
initialTimeoutId = null;
|
||||
}
|
||||
}
|
||||
|
||||
// Handle content updates
|
||||
if (message.content) {
|
||||
receivedAnyContent = true;
|
||||
assistantResponse += message.content;
|
||||
|
||||
// Update the UI immediately
|
||||
this.updateStreamingUI(assistantResponse);
|
||||
|
||||
// Reset timeout since we got content
|
||||
if (timeoutId !== null) {
|
||||
window.clearTimeout(timeoutId);
|
||||
}
|
||||
|
||||
const data = JSON.parse(event.data);
|
||||
console.log("Received streaming data:", data);
|
||||
// Set new timeout
|
||||
timeoutId = window.setTimeout(() => {
|
||||
console.warn(`[${responseId}] Stream timeout for session ${this.sessionId}`);
|
||||
|
||||
// Handle both content and error cases
|
||||
if (data.content) {
|
||||
receivedAnyContent = true;
|
||||
assistantResponse += data.content;
|
||||
|
||||
// Update the UI with the accumulated response
|
||||
this.updateStreamingUI(assistantResponse);
|
||||
} else if (data.toolExecution) {
|
||||
// Handle tool execution info
|
||||
this.showToolExecutionInfo(data.toolExecution);
|
||||
// When tool execution info is received, also show the loading indicator
|
||||
// in case it's not already visible
|
||||
this.loadingIndicator.style.display = 'flex';
|
||||
} else if (data.error) {
|
||||
// Handle error message
|
||||
this.hideLoadingIndicator();
|
||||
this.addMessageToChat('assistant', `Error: ${data.error}`);
|
||||
|
||||
if (timeoutId !== null) {
|
||||
window.clearTimeout(timeoutId);
|
||||
// Save what we have
|
||||
if (assistantResponse) {
|
||||
console.log(`[${responseId}] Saving partial response due to timeout (${assistantResponse.length} chars)`);
|
||||
this.messages.push({
|
||||
role: 'assistant',
|
||||
content: assistantResponse,
|
||||
timestamp: new Date()
|
||||
});
|
||||
this.saveCurrentData().catch(err => {
|
||||
console.error(`[${responseId}] Failed to save partial response:`, err);
|
||||
});
|
||||
}
|
||||
|
||||
source.close();
|
||||
reject(new Error(data.error));
|
||||
return;
|
||||
// Clean up
|
||||
this.cleanupEventListener(eventListener);
|
||||
this.hideLoadingIndicator();
|
||||
reject(new Error('Stream timeout'));
|
||||
}, 30000);
|
||||
}
|
||||
|
||||
// Handle tool execution updates
|
||||
if (message.toolExecution) {
|
||||
console.log(`[${responseId}] Received tool execution update: action=${message.toolExecution.action || 'unknown'}`);
|
||||
this.showToolExecutionInfo(message.toolExecution);
|
||||
this.loadingIndicator.style.display = 'flex';
|
||||
}
|
||||
|
||||
// Handle thinking state updates
|
||||
if (message.thinking) {
|
||||
console.log(`[${responseId}] Received thinking update: ${message.thinking.substring(0, 50)}...`);
|
||||
this.showThinkingState(message.thinking);
|
||||
this.loadingIndicator.style.display = 'flex';
|
||||
}
|
||||
|
||||
// Handle completion
|
||||
if (message.done) {
|
||||
console.log(`[${responseId}] Stream completed for session ${this.sessionId}, has content: ${!!message.content}, content length: ${message.content?.length || 0}, current response: ${assistantResponse.length} chars`);
|
||||
|
||||
// Dump message content to console for debugging
|
||||
if (message.content) {
|
||||
console.log(`[${responseId}] CONTENT IN DONE MESSAGE (first 200 chars): "${message.content.substring(0, 200)}..."`);
|
||||
}
|
||||
|
||||
// Scroll to the bottom
|
||||
this.chatContainer.scrollTop = this.chatContainer.scrollHeight;
|
||||
} catch (e) {
|
||||
console.error('Error parsing SSE message:', e, 'Raw data:', event.data);
|
||||
reject(e);
|
||||
}
|
||||
};
|
||||
// Clear timeout if set
|
||||
if (timeoutId !== null) {
|
||||
window.clearTimeout(timeoutId);
|
||||
timeoutId = null;
|
||||
}
|
||||
|
||||
// Handle streaming errors
|
||||
source.onerror = (err) => {
|
||||
console.error("EventSource error:", err);
|
||||
source.close();
|
||||
this.hideLoadingIndicator();
|
||||
// Check if we have content in the done message
|
||||
// This is particularly important for Ollama which often sends the entire response in one message
|
||||
if (message.content) {
|
||||
console.log(`[${responseId}] Processing content in done message: ${message.content.length} chars`);
|
||||
receivedAnyContent = true;
|
||||
|
||||
// Clear the timeout if there was an error
|
||||
if (timeoutId !== null) {
|
||||
window.clearTimeout(timeoutId);
|
||||
}
|
||||
// Replace current response if we didn't have content before or if it's empty
|
||||
if (assistantResponse.length === 0) {
|
||||
console.log(`[${responseId}] Using content from done message as full response`);
|
||||
assistantResponse = message.content;
|
||||
}
|
||||
// Otherwise append it if it's different
|
||||
else if (message.content !== assistantResponse) {
|
||||
console.log(`[${responseId}] Appending content from done message to existing response`);
|
||||
assistantResponse += message.content;
|
||||
}
|
||||
else {
|
||||
console.log(`[${responseId}] Content in done message is identical to existing response, not appending`);
|
||||
}
|
||||
|
||||
// Only reject if we haven't received any content yet
|
||||
if (!receivedAnyContent) {
|
||||
reject(new Error('Error connecting to the LLM streaming service'));
|
||||
} else {
|
||||
// If we've already received some content, consider it a successful but incomplete response
|
||||
this.handleStreamingComplete(source, timeoutId, receivedAnyContent, assistantResponse);
|
||||
this.updateStreamingUI(assistantResponse);
|
||||
}
|
||||
|
||||
// Save the final response
|
||||
if (assistantResponse) {
|
||||
console.log(`[${responseId}] Saving final response of ${assistantResponse.length} chars`);
|
||||
this.messages.push({
|
||||
role: 'assistant',
|
||||
content: assistantResponse,
|
||||
timestamp: new Date()
|
||||
});
|
||||
|
||||
this.saveCurrentData().catch(err => {
|
||||
console.error(`[${responseId}] Failed to save final response:`, err);
|
||||
});
|
||||
} else {
|
||||
// If we didn't receive any content at all, show a generic message
|
||||
console.log(`[${responseId}] No content received for session ${this.sessionId}`);
|
||||
const defaultMessage = 'I processed your request, but I don\'t have any specific information to share at the moment.';
|
||||
this.processAssistantResponse(defaultMessage);
|
||||
}
|
||||
|
||||
// Clean up and resolve
|
||||
this.cleanupEventListener(eventListener);
|
||||
this.hideLoadingIndicator();
|
||||
resolve();
|
||||
}
|
||||
};
|
||||
|
||||
// Register event listener for the custom event
|
||||
try {
|
||||
window.addEventListener('llm-stream-message', eventListener);
|
||||
console.log(`[${responseId}] Event listener added for llm-stream-message events`);
|
||||
} catch (err) {
|
||||
console.error(`[${responseId}] Error setting up event listener:`, err);
|
||||
reject(err);
|
||||
return;
|
||||
}
|
||||
|
||||
// Set initial timeout for receiving any message
|
||||
initialTimeoutId = window.setTimeout(() => {
|
||||
console.warn(`[${responseId}] No messages received for initial period in session ${this.sessionId}`);
|
||||
if (!receivedAnyMessage) {
|
||||
console.error(`[${responseId}] WebSocket connection not established for session ${this.sessionId}`);
|
||||
|
||||
if (timeoutId !== null) {
|
||||
window.clearTimeout(timeoutId);
|
||||
}
|
||||
|
||||
// Clean up
|
||||
this.cleanupEventListener(eventListener);
|
||||
this.hideLoadingIndicator();
|
||||
|
||||
// Show error message to user
|
||||
const errorMessage = 'Connection error: Unable to establish WebSocket streaming.';
|
||||
this.processAssistantResponse(errorMessage);
|
||||
reject(new Error('WebSocket connection not established'));
|
||||
}
|
||||
}, 10000);
|
||||
|
||||
// Send the streaming request to start the process
|
||||
console.log(`[${responseId}] Sending HTTP POST request to initiate streaming: /llm/sessions/${this.sessionId}/messages/stream`);
|
||||
server.post(`llm/sessions/${this.sessionId}/messages/stream`, {
|
||||
content,
|
||||
useAdvancedContext,
|
||||
showThinking,
|
||||
stream: true // Explicitly indicate this is a streaming request
|
||||
}).catch(err => {
|
||||
console.error(`[${responseId}] HTTP error sending streaming request for session ${this.sessionId}:`, err);
|
||||
|
||||
// Clean up timeouts
|
||||
if (initialTimeoutId !== null) {
|
||||
window.clearTimeout(initialTimeoutId);
|
||||
initialTimeoutId = null;
|
||||
}
|
||||
|
||||
if (timeoutId !== null) {
|
||||
window.clearTimeout(timeoutId);
|
||||
timeoutId = null;
|
||||
}
|
||||
|
||||
// Clean up event listener
|
||||
this.cleanupEventListener(eventListener);
|
||||
|
||||
reject(err);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Set up timeout for streaming response
|
||||
* @returns Timeout ID for the created timeout
|
||||
* Clean up an event listener
|
||||
*/
|
||||
private setupStreamingTimeout(source: EventSource): number {
|
||||
// Set a timeout to handle case where streaming doesn't work properly
|
||||
return window.setTimeout(() => {
|
||||
// If we haven't received any content after a reasonable timeout (10 seconds),
|
||||
// add a fallback message and close the stream
|
||||
this.hideLoadingIndicator();
|
||||
const errorMessage = 'I\'m having trouble generating a response right now. Please try again later.';
|
||||
this.processAssistantResponse(errorMessage);
|
||||
source.close();
|
||||
}, 10000);
|
||||
private cleanupEventListener(listener: ((event: Event) => void) | null): void {
|
||||
if (listener) {
|
||||
try {
|
||||
window.removeEventListener('llm-stream-message', listener);
|
||||
console.log(`Successfully removed event listener`);
|
||||
} catch (err) {
|
||||
console.error(`Error removing event listener:`, err);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the UI with streaming content as it arrives
|
||||
*/
|
||||
private updateStreamingUI(assistantResponse: string) {
|
||||
const logId = `ui-update-${Date.now()}`;
|
||||
console.log(`[${logId}] Updating UI with response text: ${assistantResponse.length} chars`);
|
||||
|
||||
if (!this.noteContextChatMessages) {
|
||||
console.error(`[${logId}] noteContextChatMessages element not available`);
|
||||
return;
|
||||
}
|
||||
|
||||
// Check if we already have an assistant message element to update
|
||||
const assistantElement = this.noteContextChatMessages.querySelector('.assistant-message:last-child .message-content');
|
||||
|
||||
if (assistantElement) {
|
||||
assistantElement.innerHTML = this.formatMarkdown(assistantResponse);
|
||||
// Apply syntax highlighting to any code blocks in the updated content
|
||||
applySyntaxHighlight($(assistantElement as HTMLElement));
|
||||
console.log(`[${logId}] Found existing assistant message element, updating content`);
|
||||
try {
|
||||
// Format markdown and update the element
|
||||
const formattedContent = this.formatMarkdown(assistantResponse);
|
||||
|
||||
// Ensure content is properly formatted
|
||||
if (!formattedContent || formattedContent.trim() === '') {
|
||||
console.warn(`[${logId}] Formatted content is empty, using original content`);
|
||||
assistantElement.textContent = assistantResponse;
|
||||
} else {
|
||||
assistantElement.innerHTML = formattedContent;
|
||||
}
|
||||
|
||||
// Apply syntax highlighting to any code blocks in the updated content
|
||||
applySyntaxHighlight($(assistantElement as HTMLElement));
|
||||
|
||||
console.log(`[${logId}] Successfully updated existing element with ${formattedContent.length} chars of HTML`);
|
||||
} catch (err) {
|
||||
console.error(`[${logId}] Error updating existing element:`, err);
|
||||
// Fallback to text content if HTML update fails
|
||||
try {
|
||||
assistantElement.textContent = assistantResponse;
|
||||
console.log(`[${logId}] Fallback to text content successful`);
|
||||
} catch (fallbackErr) {
|
||||
console.error(`[${logId}] Even fallback update failed:`, fallbackErr);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
this.addMessageToChat('assistant', assistantResponse);
|
||||
}
|
||||
}
|
||||
console.log(`[${logId}] No existing assistant message element found, creating new one`);
|
||||
try {
|
||||
this.addMessageToChat('assistant', assistantResponse);
|
||||
console.log(`[${logId}] Successfully added new assistant message`);
|
||||
} catch (err) {
|
||||
console.error(`[${logId}] Error adding new message:`, err);
|
||||
|
||||
/**
|
||||
* Handle completion of streaming response
|
||||
*/
|
||||
private handleStreamingComplete(
|
||||
source: EventSource,
|
||||
timeoutId: number | null,
|
||||
receivedAnyContent: boolean,
|
||||
assistantResponse: string
|
||||
) {
|
||||
// Stream completed
|
||||
source.close();
|
||||
this.hideLoadingIndicator();
|
||||
|
||||
// Clear the timeout since we're done
|
||||
if (timeoutId !== null) {
|
||||
window.clearTimeout(timeoutId);
|
||||
// Last resort emergency approach - create element directly
|
||||
try {
|
||||
console.log(`[${logId}] Attempting emergency DOM update`);
|
||||
const emergencyElement = document.createElement('div');
|
||||
emergencyElement.className = 'chat-message assistant-message mb-3 d-flex';
|
||||
emergencyElement.innerHTML = `
|
||||
<div class="message-avatar d-flex align-items-center justify-content-center me-2 assistant-avatar">
|
||||
<i class="bx bx-bot"></i>
|
||||
</div>
|
||||
<div class="message-content p-3 rounded flex-grow-1 assistant-content">
|
||||
${assistantResponse}
|
||||
</div>
|
||||
`;
|
||||
this.noteContextChatMessages.appendChild(emergencyElement);
|
||||
console.log(`[${logId}] Emergency DOM update successful`);
|
||||
} catch (emergencyErr) {
|
||||
console.error(`[${logId}] Emergency DOM update failed:`, emergencyErr);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If we didn't receive any content but the stream completed normally,
|
||||
// display a message to the user
|
||||
if (!receivedAnyContent) {
|
||||
const defaultMessage = 'I processed your request, but I don\'t have any specific information to share at the moment.';
|
||||
this.processAssistantResponse(defaultMessage);
|
||||
} else if (assistantResponse) {
|
||||
// Save the completed streaming response to the message array
|
||||
this.messages.push({
|
||||
role: 'assistant',
|
||||
content: assistantResponse,
|
||||
timestamp: new Date()
|
||||
});
|
||||
|
||||
// Save to note
|
||||
this.saveCurrentData().catch(err => {
|
||||
console.error("Failed to save assistant response to note:", err);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle errors during streaming response
|
||||
*/
|
||||
private handleStreamingError(
|
||||
source: EventSource,
|
||||
timeoutId: number | null,
|
||||
receivedAnyContent: boolean
|
||||
) {
|
||||
source.close();
|
||||
this.hideLoadingIndicator();
|
||||
|
||||
// Clear the timeout if there was an error
|
||||
if (timeoutId !== null) {
|
||||
window.clearTimeout(timeoutId);
|
||||
}
|
||||
|
||||
// Only show error message if we haven't received any content yet
|
||||
if (!receivedAnyContent) {
|
||||
// Instead of automatically showing the error message in the chat,
|
||||
// throw an error so the parent function can handle the fallback
|
||||
throw new Error('Error connecting to the LLM streaming service');
|
||||
// Always try to scroll to the latest content
|
||||
try {
|
||||
if (this.chatContainer) {
|
||||
this.chatContainer.scrollTop = this.chatContainer.scrollHeight;
|
||||
console.log(`[${logId}] Scrolled to latest content`);
|
||||
}
|
||||
} catch (scrollErr) {
|
||||
console.error(`[${logId}] Error scrolling to latest content:`, scrollErr);
|
||||
}
|
||||
}
|
||||
|
||||
@ -755,32 +902,111 @@ export default class LlmChatPanel extends BasicWidget {
|
||||
}
|
||||
|
||||
private showLoadingIndicator() {
|
||||
this.loadingIndicator.style.display = 'flex';
|
||||
// Reset the tool execution area when starting a new request, but keep it visible
|
||||
// We'll make it visible when we get our first tool execution event
|
||||
this.toolExecutionInfo.style.display = 'none';
|
||||
this.toolExecutionSteps.innerHTML = '';
|
||||
const logId = `ui-${Date.now()}`;
|
||||
console.log(`[${logId}] Showing loading indicator and preparing tool execution display`);
|
||||
|
||||
// Ensure elements exist before trying to modify them
|
||||
if (!this.loadingIndicator || !this.toolExecutionInfo || !this.toolExecutionSteps) {
|
||||
console.error(`[${logId}] UI elements not properly initialized`);
|
||||
return;
|
||||
}
|
||||
|
||||
// Force display of loading indicator
|
||||
try {
|
||||
this.loadingIndicator.style.display = 'flex';
|
||||
|
||||
// Make sure tool execution info area is always visible even before we get the first event
|
||||
// This helps avoid the UI getting stuck in "Processing..." state
|
||||
this.toolExecutionInfo.style.display = 'block';
|
||||
|
||||
// Clear previous tool steps but add a placeholder
|
||||
this.toolExecutionSteps.innerHTML = `
|
||||
<div class="tool-step my-1">
|
||||
<div class="d-flex align-items-center">
|
||||
<i class="bx bx-loader-alt bx-spin text-primary me-1"></i>
|
||||
<span>Initializing...</span>
|
||||
</div>
|
||||
</div>
|
||||
`;
|
||||
|
||||
// Force a UI update by accessing element properties
|
||||
const forceUpdate = this.loadingIndicator.offsetHeight;
|
||||
|
||||
// Verify display states
|
||||
console.log(`[${logId}] Loading indicator display state: ${this.loadingIndicator.style.display}`);
|
||||
console.log(`[${logId}] Tool execution info display state: ${this.toolExecutionInfo.style.display}`);
|
||||
|
||||
console.log(`[${logId}] Loading indicator and tool execution area initialized`);
|
||||
} catch (err) {
|
||||
console.error(`[${logId}] Error showing loading indicator:`, err);
|
||||
}
|
||||
}
|
||||
|
||||
private hideLoadingIndicator() {
|
||||
this.loadingIndicator.style.display = 'none';
|
||||
this.toolExecutionInfo.style.display = 'none';
|
||||
const logId = `ui-${Date.now()}`;
|
||||
console.log(`[${logId}] Hiding loading indicator and tool execution area`);
|
||||
|
||||
// Ensure elements exist before trying to modify them
|
||||
if (!this.loadingIndicator || !this.toolExecutionInfo) {
|
||||
console.error(`[${logId}] UI elements not properly initialized`);
|
||||
return;
|
||||
}
|
||||
|
||||
// Properly reset DOM elements
|
||||
try {
|
||||
// First hide the tool execution info area
|
||||
this.toolExecutionInfo.style.display = 'none';
|
||||
|
||||
// Force a UI update by accessing element properties
|
||||
const forceUpdate1 = this.toolExecutionInfo.offsetHeight;
|
||||
|
||||
// Then hide the loading indicator
|
||||
this.loadingIndicator.style.display = 'none';
|
||||
|
||||
// Force another UI update
|
||||
const forceUpdate2 = this.loadingIndicator.offsetHeight;
|
||||
|
||||
// Verify display states immediately
|
||||
console.log(`[${logId}] Loading indicator display state: ${this.loadingIndicator.style.display}`);
|
||||
console.log(`[${logId}] Tool execution info display state: ${this.toolExecutionInfo.style.display}`);
|
||||
|
||||
// Add a delay to double-check that UI updates are complete
|
||||
setTimeout(() => {
|
||||
console.log(`[${logId}] Verification after hide timeout: loading indicator display=${this.loadingIndicator.style.display}, tool execution info display=${this.toolExecutionInfo.style.display}`);
|
||||
|
||||
// Force display none again in case something changed it
|
||||
if (this.loadingIndicator.style.display !== 'none') {
|
||||
console.log(`[${logId}] Loading indicator still visible after timeout, forcing hidden`);
|
||||
this.loadingIndicator.style.display = 'none';
|
||||
}
|
||||
|
||||
if (this.toolExecutionInfo.style.display !== 'none') {
|
||||
console.log(`[${logId}] Tool execution info still visible after timeout, forcing hidden`);
|
||||
this.toolExecutionInfo.style.display = 'none';
|
||||
}
|
||||
}, 100);
|
||||
} catch (err) {
|
||||
console.error(`[${logId}] Error hiding loading indicator:`, err);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Show tool execution information in the UI
|
||||
*/
|
||||
private showToolExecutionInfo(toolExecutionData: any) {
|
||||
console.log(`Showing tool execution info: ${JSON.stringify(toolExecutionData)}`);
|
||||
|
||||
// Make sure tool execution info section is visible
|
||||
this.toolExecutionInfo.style.display = 'block';
|
||||
|
||||
this.loadingIndicator.style.display = 'flex'; // Ensure loading indicator is shown during tool execution
|
||||
|
||||
// Create a new step element to show the tool being executed
|
||||
const stepElement = document.createElement('div');
|
||||
stepElement.className = 'tool-step my-1';
|
||||
|
||||
|
||||
// Basic styling for the step
|
||||
let stepHtml = '';
|
||||
|
||||
|
||||
if (toolExecutionData.action === 'start') {
|
||||
// Tool execution starting
|
||||
stepHtml = `
|
||||
@ -814,20 +1040,26 @@ export default class LlmChatPanel extends BasicWidget {
|
||||
</div>
|
||||
`;
|
||||
}
|
||||
|
||||
stepElement.innerHTML = stepHtml;
|
||||
this.toolExecutionSteps.appendChild(stepElement);
|
||||
|
||||
// Scroll to bottom of tool execution steps
|
||||
this.toolExecutionSteps.scrollTop = this.toolExecutionSteps.scrollHeight;
|
||||
|
||||
if (stepHtml) {
|
||||
stepElement.innerHTML = stepHtml;
|
||||
this.toolExecutionSteps.appendChild(stepElement);
|
||||
|
||||
// Scroll to bottom of tool execution steps
|
||||
this.toolExecutionSteps.scrollTop = this.toolExecutionSteps.scrollHeight;
|
||||
|
||||
console.log(`Added new tool execution step to UI`);
|
||||
} else {
|
||||
console.log(`No HTML generated for tool execution data:`, toolExecutionData);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Format tool arguments for display
|
||||
*/
|
||||
private formatToolArgs(args: any): string {
|
||||
if (!args || typeof args !== 'object') return '';
|
||||
|
||||
|
||||
return Object.entries(args)
|
||||
.map(([key, value]) => {
|
||||
// Format the value based on its type
|
||||
@ -843,25 +1075,25 @@ export default class LlmChatPanel extends BasicWidget {
|
||||
} else {
|
||||
displayValue = String(value);
|
||||
}
|
||||
|
||||
|
||||
return `<span class="text-primary">${this.escapeHtml(key)}</span>: ${this.escapeHtml(displayValue)}`;
|
||||
})
|
||||
.join(', ');
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Format tool results for display
|
||||
*/
|
||||
private formatToolResult(result: any): string {
|
||||
if (result === undefined || result === null) return '';
|
||||
|
||||
|
||||
// Try to format as JSON if it's an object
|
||||
if (typeof result === 'object') {
|
||||
try {
|
||||
// Get a preview of structured data
|
||||
const entries = Object.entries(result);
|
||||
if (entries.length === 0) return 'Empty result';
|
||||
|
||||
|
||||
// Just show first 2 key-value pairs if there are many
|
||||
const preview = entries.slice(0, 2).map(([key, val]) => {
|
||||
let valPreview;
|
||||
@ -876,22 +1108,22 @@ export default class LlmChatPanel extends BasicWidget {
|
||||
}
|
||||
return `${key}: ${valPreview}`;
|
||||
}).join(', ');
|
||||
|
||||
|
||||
return entries.length > 2 ? `${preview}, ... (${entries.length} properties)` : preview;
|
||||
} catch (e) {
|
||||
return String(result).substring(0, 100) + (String(result).length > 100 ? '...' : '');
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// For string results
|
||||
if (typeof result === 'string') {
|
||||
return result.length > 100 ? result.substring(0, 97) + '...' : result;
|
||||
}
|
||||
|
||||
|
||||
// Default formatting
|
||||
return String(result).substring(0, 100) + (String(result).length > 100 ? '...' : '');
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Simple HTML escaping for safer content display
|
||||
*/
|
||||
@ -899,7 +1131,7 @@ export default class LlmChatPanel extends BasicWidget {
|
||||
if (typeof text !== 'string') {
|
||||
text = String(text || '');
|
||||
}
|
||||
|
||||
|
||||
return text
|
||||
.replace(/&/g, '&')
|
||||
.replace(/</g, '<')
|
||||
@ -968,6 +1200,26 @@ export default class LlmChatPanel extends BasicWidget {
|
||||
return processedContent;
|
||||
}
|
||||
|
||||
/**
|
||||
* Show thinking state in the UI
|
||||
*/
|
||||
private showThinkingState(thinkingData: string) {
|
||||
// Update the UI to show thinking indicator
|
||||
const thinking = typeof thinkingData === 'string' ? thinkingData : 'Thinking...';
|
||||
const toolExecutionStep = document.createElement('div');
|
||||
toolExecutionStep.className = 'tool-step my-1';
|
||||
toolExecutionStep.innerHTML = `
|
||||
<div class="d-flex align-items-center">
|
||||
<i class="bx bx-bulb text-warning me-1"></i>
|
||||
<span>${this.escapeHtml(thinking)}</span>
|
||||
</div>
|
||||
`;
|
||||
|
||||
this.toolExecutionInfo.style.display = 'block';
|
||||
this.toolExecutionSteps.appendChild(toolExecutionStep);
|
||||
this.toolExecutionSteps.scrollTop = this.toolExecutionSteps.scrollHeight;
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate embedding providers configuration
|
||||
* Check if there are issues with the embedding providers that might affect LLM functionality
|
||||
@ -1067,4 +1319,4 @@ export default class LlmChatPanel extends BasicWidget {
|
||||
this.validationWarning.style.display = 'none';
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -791,6 +791,163 @@ async function indexNote(req: Request, res: Response) {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @swagger
|
||||
* /api/llm/sessions/{sessionId}/messages/stream:
|
||||
* post:
|
||||
* summary: Start a streaming response session via WebSockets
|
||||
* operationId: llm-stream-message
|
||||
* parameters:
|
||||
* - name: sessionId
|
||||
* in: path
|
||||
* required: true
|
||||
* schema:
|
||||
* type: string
|
||||
* requestBody:
|
||||
* required: true
|
||||
* content:
|
||||
* application/json:
|
||||
* schema:
|
||||
* type: object
|
||||
* properties:
|
||||
* content:
|
||||
* type: string
|
||||
* description: The user message to send to the LLM
|
||||
* useAdvancedContext:
|
||||
* type: boolean
|
||||
* description: Whether to use advanced context extraction
|
||||
* showThinking:
|
||||
* type: boolean
|
||||
* description: Whether to show thinking process in the response
|
||||
* responses:
|
||||
* '200':
|
||||
* description: Streaming started successfully
|
||||
* '404':
|
||||
* description: Session not found
|
||||
* '500':
|
||||
* description: Error processing request
|
||||
* security:
|
||||
* - session: []
|
||||
* tags: ["llm"]
|
||||
*/
|
||||
async function streamMessage(req: Request, res: Response) {
|
||||
log.info("=== Starting streamMessage ===");
|
||||
try {
|
||||
const sessionId = req.params.sessionId;
|
||||
const { content, useAdvancedContext, showThinking } = req.body;
|
||||
|
||||
if (!content || typeof content !== 'string' || content.trim().length === 0) {
|
||||
throw new Error('Content cannot be empty');
|
||||
}
|
||||
|
||||
// Check if session exists
|
||||
const session = restChatService.getSessions().get(sessionId);
|
||||
if (!session) {
|
||||
throw new Error('Session not found');
|
||||
}
|
||||
|
||||
// Update last active timestamp
|
||||
session.lastActive = new Date();
|
||||
|
||||
// Add user message to the session
|
||||
session.messages.push({
|
||||
role: 'user',
|
||||
content,
|
||||
timestamp: new Date()
|
||||
});
|
||||
|
||||
// Create request parameters for the pipeline
|
||||
const requestParams = {
|
||||
sessionId,
|
||||
content,
|
||||
useAdvancedContext: useAdvancedContext === true,
|
||||
showThinking: showThinking === true,
|
||||
stream: true // Always stream for this endpoint
|
||||
};
|
||||
|
||||
// Create a fake request/response pair to pass to the handler
|
||||
const fakeReq = {
|
||||
...req,
|
||||
method: 'GET', // Set to GET to indicate streaming
|
||||
query: {
|
||||
stream: 'true', // Set stream param - don't use format: 'stream' to avoid confusion
|
||||
useAdvancedContext: String(useAdvancedContext === true),
|
||||
showThinking: String(showThinking === true)
|
||||
},
|
||||
params: {
|
||||
sessionId
|
||||
},
|
||||
// Make sure the original content is available to the handler
|
||||
body: {
|
||||
content,
|
||||
useAdvancedContext: useAdvancedContext === true,
|
||||
showThinking: showThinking === true
|
||||
}
|
||||
} as unknown as Request;
|
||||
|
||||
// Log to verify correct parameters
|
||||
log.info(`WebSocket stream settings - useAdvancedContext=${useAdvancedContext === true}, in query=${fakeReq.query.useAdvancedContext}, in body=${fakeReq.body.useAdvancedContext}`);
|
||||
// Extra safety to ensure the parameters are passed correctly
|
||||
if (useAdvancedContext === true) {
|
||||
log.info(`Enhanced context IS enabled for this request`);
|
||||
} else {
|
||||
log.info(`Enhanced context is NOT enabled for this request`);
|
||||
}
|
||||
|
||||
// Process the request in the background
|
||||
Promise.resolve().then(async () => {
|
||||
try {
|
||||
await restChatService.handleSendMessage(fakeReq, res);
|
||||
} catch (error) {
|
||||
log.error(`Background message processing error: ${error}`);
|
||||
|
||||
// Import the WebSocket service
|
||||
const wsService = (await import('../../services/ws.js')).default;
|
||||
|
||||
// Define LLMStreamMessage interface
|
||||
interface LLMStreamMessage {
|
||||
type: 'llm-stream';
|
||||
sessionId: string;
|
||||
content?: string;
|
||||
thinking?: string;
|
||||
toolExecution?: any;
|
||||
done?: boolean;
|
||||
error?: string;
|
||||
raw?: unknown;
|
||||
}
|
||||
|
||||
// Send error to client via WebSocket
|
||||
wsService.sendMessageToAllClients({
|
||||
type: 'llm-stream',
|
||||
sessionId,
|
||||
error: `Error processing message: ${error}`,
|
||||
done: true
|
||||
} as LLMStreamMessage);
|
||||
}
|
||||
});
|
||||
|
||||
// Import the WebSocket service
|
||||
const wsService = (await import('../../services/ws.js')).default;
|
||||
|
||||
// Let the client know streaming has started via WebSocket (helps client confirm connection is working)
|
||||
wsService.sendMessageToAllClients({
|
||||
type: 'llm-stream',
|
||||
sessionId,
|
||||
thinking: 'Initializing streaming LLM response...'
|
||||
});
|
||||
|
||||
// Let the client know streaming has started via HTTP response
|
||||
return {
|
||||
success: true,
|
||||
message: 'Streaming started',
|
||||
sessionId
|
||||
};
|
||||
} catch (error: any) {
|
||||
log.error(`Error starting message stream: ${error.message}`);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
export default {
|
||||
// Chat session management
|
||||
createSession,
|
||||
@ -799,6 +956,7 @@ export default {
|
||||
listSessions,
|
||||
deleteSession,
|
||||
sendMessage,
|
||||
streamMessage, // Add new streaming endpoint
|
||||
|
||||
// Knowledge base index management
|
||||
getIndexStats,
|
||||
|
@ -400,6 +400,7 @@ function register(app: express.Application) {
|
||||
apiRoute(DEL, "/api/llm/sessions/:sessionId", llmRoute.deleteSession);
|
||||
apiRoute(PST, "/api/llm/sessions/:sessionId/messages", llmRoute.sendMessage);
|
||||
apiRoute(GET, "/api/llm/sessions/:sessionId/messages", llmRoute.sendMessage);
|
||||
apiRoute(PST, "/api/llm/sessions/:sessionId/messages/stream", llmRoute.streamMessage);
|
||||
|
||||
// LLM index management endpoints - reorganized for REST principles
|
||||
apiRoute(GET, "/api/llm/indexes/stats", llmRoute.getIndexStats);
|
||||
|
@ -1,12 +1,16 @@
|
||||
import type { ToolCall } from './tools/tool_interfaces.js';
|
||||
import type { ModelMetadata } from './providers/provider_options.js';
|
||||
|
||||
/**
|
||||
* Interface for chat messages between client and LLM models
|
||||
*/
|
||||
export interface Message {
|
||||
role: 'user' | 'assistant' | 'system' | 'tool';
|
||||
content: string;
|
||||
name?: string;
|
||||
tool_call_id?: string;
|
||||
tool_calls?: ToolCall[] | any[];
|
||||
sessionId?: string; // Optional session ID for WebSocket communication
|
||||
}
|
||||
|
||||
/**
|
||||
@ -32,6 +36,12 @@ export interface StreamChunk {
|
||||
completionTokens?: number;
|
||||
totalTokens?: number;
|
||||
};
|
||||
|
||||
/**
|
||||
* Raw provider-specific data from the original response chunk
|
||||
* This can include thinking state, tool execution info, etc.
|
||||
*/
|
||||
raw?: any;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -127,6 +127,10 @@ export class ChatPipeline {
|
||||
// Determine if we should use tools or semantic context
|
||||
const useTools = modelSelection.options.enableTools === true;
|
||||
const useEnhancedContext = input.options?.useAdvancedContext === true;
|
||||
|
||||
// Log details about the advanced context parameter
|
||||
log.info(`Enhanced context option check: input.options=${JSON.stringify(input.options || {})}`);
|
||||
log.info(`Enhanced context decision: useEnhancedContext=${useEnhancedContext}, hasQuery=${!!input.query}`);
|
||||
|
||||
// Early return if we don't have a query or enhanced context is disabled
|
||||
if (!input.query || !useEnhancedContext) {
|
||||
@ -431,8 +435,8 @@ export class ChatPipeline {
|
||||
...modelSelection.options,
|
||||
// Ensure tool support is still enabled for follow-up requests
|
||||
enableTools: true,
|
||||
// Disable streaming during tool execution follow-ups
|
||||
stream: false,
|
||||
// Preserve original streaming setting for tool execution follow-ups
|
||||
stream: modelSelection.options.stream,
|
||||
// Add tool execution status for Ollama provider
|
||||
...(currentResponse.provider === 'Ollama' ? { toolExecutionStatus } : {})
|
||||
}
|
||||
@ -498,6 +502,8 @@ export class ChatPipeline {
|
||||
messages: currentMessages,
|
||||
options: {
|
||||
...modelSelection.options,
|
||||
// Preserve streaming for error follow-up
|
||||
stream: modelSelection.options.stream,
|
||||
// For Ollama, include tool execution status
|
||||
...(currentResponse.provider === 'Ollama' ? { toolExecutionStatus } : {})
|
||||
}
|
||||
@ -547,6 +553,8 @@ export class ChatPipeline {
|
||||
options: {
|
||||
...modelSelection.options,
|
||||
enableTools: false, // Disable tools for the final response
|
||||
// Preserve streaming setting for max iterations response
|
||||
stream: modelSelection.options.stream,
|
||||
// For Ollama, include tool execution status
|
||||
...(currentResponse.provider === 'Ollama' ? { toolExecutionStatus } : {})
|
||||
}
|
||||
|
@ -1,12 +1,12 @@
|
||||
import { BasePipelineStage } from '../pipeline_stage.js';
|
||||
import type { LLMCompletionInput } from '../interfaces.js';
|
||||
import type { ChatCompletionOptions, ChatResponse } from '../../ai_interface.js';
|
||||
import type { ChatCompletionOptions, ChatResponse, StreamChunk } from '../../ai_interface.js';
|
||||
import aiServiceManager from '../../ai_service_manager.js';
|
||||
import toolRegistry from '../../tools/tool_registry.js';
|
||||
import log from '../../../log.js';
|
||||
|
||||
/**
|
||||
* Pipeline stage for LLM completion
|
||||
* Pipeline stage for LLM completion with enhanced streaming support
|
||||
*/
|
||||
export class LLMCompletionStage extends BasePipelineStage<LLMCompletionInput, { response: ChatResponse }> {
|
||||
constructor() {
|
||||
@ -15,88 +15,124 @@ export class LLMCompletionStage extends BasePipelineStage<LLMCompletionInput, {
|
||||
|
||||
/**
|
||||
* Generate LLM completion using the AI service
|
||||
*
|
||||
* This enhanced version supports better streaming by forwarding raw provider data
|
||||
* and ensuring consistent handling of stream options.
|
||||
*/
|
||||
protected async process(input: LLMCompletionInput): Promise<{ response: ChatResponse }> {
|
||||
const { messages, options, provider } = input;
|
||||
|
||||
// Log input options, particularly focusing on the stream option
|
||||
// Log input options
|
||||
log.info(`[LLMCompletionStage] Input options: ${JSON.stringify({
|
||||
model: options.model,
|
||||
provider,
|
||||
stream: options.stream,
|
||||
enableTools: options.enableTools
|
||||
})}`);
|
||||
log.info(`[LLMCompletionStage] Stream option in input: ${options.stream}, type: ${typeof options.stream}`);
|
||||
|
||||
// Create a deep copy of options to avoid modifying the original
|
||||
const updatedOptions: ChatCompletionOptions = JSON.parse(JSON.stringify(options));
|
||||
|
||||
// IMPORTANT: Handle stream option carefully:
|
||||
// 1. If it's undefined, leave it undefined (provider will use defaults)
|
||||
// 2. If explicitly set to true/false, ensure it's a proper boolean
|
||||
// Handle stream option explicitly
|
||||
if (options.stream !== undefined) {
|
||||
updatedOptions.stream = options.stream === true;
|
||||
log.info(`[LLMCompletionStage] Stream explicitly provided in options, set to: ${updatedOptions.stream}`);
|
||||
} else {
|
||||
// If undefined, leave it undefined so provider can use its default behavior
|
||||
log.info(`[LLMCompletionStage] Stream option not explicitly set, leaving as undefined`);
|
||||
log.info(`[LLMCompletionStage] Stream explicitly set to: ${updatedOptions.stream}`);
|
||||
}
|
||||
|
||||
// If this is a direct (non-stream) call to Ollama but has the stream flag,
|
||||
// ensure we set additional metadata to maintain proper state
|
||||
if (updatedOptions.stream && !provider && updatedOptions.providerMetadata?.provider === 'ollama') {
|
||||
log.info(`[LLMCompletionStage] This is an Ollama request with stream=true, ensuring provider config is consistent`);
|
||||
// Add capture of raw provider data for streaming
|
||||
if (updatedOptions.stream) {
|
||||
// Add a function to capture raw provider data in stream chunks
|
||||
const originalStreamCallback = updatedOptions.streamCallback;
|
||||
updatedOptions.streamCallback = async (text, done, rawProviderData) => {
|
||||
// Create an enhanced chunk with the raw provider data
|
||||
const enhancedChunk = {
|
||||
text,
|
||||
done,
|
||||
// Include raw provider data if available
|
||||
raw: rawProviderData
|
||||
};
|
||||
|
||||
// Call the original callback if provided
|
||||
if (originalStreamCallback) {
|
||||
return originalStreamCallback(text, done, enhancedChunk);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
log.info(`[LLMCompletionStage] Copied options: ${JSON.stringify({
|
||||
model: updatedOptions.model,
|
||||
stream: updatedOptions.stream,
|
||||
enableTools: updatedOptions.enableTools
|
||||
})}`);
|
||||
|
||||
// Check if tools should be enabled
|
||||
if (updatedOptions.enableTools !== false) {
|
||||
// Get all available tools from the registry
|
||||
const toolDefinitions = toolRegistry.getAllToolDefinitions();
|
||||
|
||||
if (toolDefinitions.length > 0) {
|
||||
// Enable tools and add them to the options
|
||||
updatedOptions.enableTools = true;
|
||||
updatedOptions.tools = toolDefinitions;
|
||||
log.info(`Adding ${toolDefinitions.length} tools to LLM request`);
|
||||
}
|
||||
}
|
||||
|
||||
// Determine which provider to use - prioritize in this order:
|
||||
// 1. Explicit provider parameter (legacy approach)
|
||||
// 2. Provider from metadata
|
||||
// 3. Auto-selection
|
||||
// Determine which provider to use
|
||||
let selectedProvider = provider;
|
||||
|
||||
// If no explicit provider is specified, check for provider metadata
|
||||
if (!selectedProvider && updatedOptions.providerMetadata?.provider) {
|
||||
selectedProvider = updatedOptions.providerMetadata.provider;
|
||||
log.info(`Using provider ${selectedProvider} from metadata for model ${updatedOptions.model}`);
|
||||
}
|
||||
|
||||
log.info(`Generating LLM completion, provider: ${selectedProvider || 'auto'}, model: ${updatedOptions?.model || 'default'}`);
|
||||
log.info(`[LLMCompletionStage] Options before service call: ${JSON.stringify({
|
||||
model: updatedOptions.model,
|
||||
stream: updatedOptions.stream,
|
||||
enableTools: updatedOptions.enableTools
|
||||
})}`);
|
||||
|
||||
// If provider is specified (either explicit or from metadata), use that specific provider
|
||||
// Use specific provider if available
|
||||
if (selectedProvider && aiServiceManager.isProviderAvailable(selectedProvider)) {
|
||||
const service = aiServiceManager.getService(selectedProvider);
|
||||
log.info(`[LLMCompletionStage] Using specific service for ${selectedProvider}, stream option: ${updatedOptions.stream}`);
|
||||
log.info(`[LLMCompletionStage] Using specific service for ${selectedProvider}`);
|
||||
|
||||
// Generate completion and wrap with enhanced stream handling
|
||||
const response = await service.generateChatCompletion(messages, updatedOptions);
|
||||
|
||||
// If streaming is enabled, enhance the stream method
|
||||
if (response.stream && typeof response.stream === 'function' && updatedOptions.stream) {
|
||||
const originalStream = response.stream;
|
||||
|
||||
// Replace the stream method with an enhanced version that captures and forwards raw data
|
||||
response.stream = async (callback) => {
|
||||
return originalStream(async (chunk) => {
|
||||
// Forward the chunk with any additional provider-specific data
|
||||
// Create an enhanced chunk with provider info
|
||||
const enhancedChunk: StreamChunk = {
|
||||
...chunk,
|
||||
// If the provider didn't include raw data, add minimal info
|
||||
raw: chunk.raw || {
|
||||
provider: selectedProvider,
|
||||
model: response.model
|
||||
}
|
||||
};
|
||||
return callback(enhancedChunk);
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
return { response };
|
||||
}
|
||||
|
||||
// Otherwise use the service manager to select an available provider
|
||||
log.info(`[LLMCompletionStage] Using auto-selected service, stream option: ${updatedOptions.stream}`);
|
||||
// Use auto-selection if no specific provider
|
||||
log.info(`[LLMCompletionStage] Using auto-selected service`);
|
||||
const response = await aiServiceManager.generateChatCompletion(messages, updatedOptions);
|
||||
|
||||
// Add similar stream enhancement for auto-selected provider
|
||||
if (response.stream && typeof response.stream === 'function' && updatedOptions.stream) {
|
||||
const originalStream = response.stream;
|
||||
response.stream = async (callback) => {
|
||||
return originalStream(async (chunk) => {
|
||||
// Create an enhanced chunk with provider info
|
||||
const enhancedChunk: StreamChunk = {
|
||||
...chunk,
|
||||
raw: chunk.raw || {
|
||||
provider: response.provider,
|
||||
model: response.model
|
||||
}
|
||||
};
|
||||
return callback(enhancedChunk);
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
return { response };
|
||||
}
|
||||
}
|
||||
|
@ -112,6 +112,8 @@ export class AnthropicService extends BaseAIService {
|
||||
|
||||
/**
|
||||
* Handle streaming response from Anthropic
|
||||
*
|
||||
* Simplified implementation that leverages the Anthropic SDK's streaming capabilities
|
||||
*/
|
||||
private async handleStreamingResponse(
|
||||
client: any,
|
||||
@ -119,26 +121,29 @@ export class AnthropicService extends BaseAIService {
|
||||
opts: ChatCompletionOptions,
|
||||
providerOptions: AnthropicOptions
|
||||
): Promise<ChatResponse> {
|
||||
let completeText = '';
|
||||
|
||||
// Create a function that will return a Promise that resolves with the final text
|
||||
// Create a stream handler function that processes the SDK's stream
|
||||
const streamHandler = async (callback: (chunk: StreamChunk) => Promise<void> | void): Promise<string> => {
|
||||
let completeText = '';
|
||||
|
||||
try {
|
||||
// Request a streaming response from Anthropic
|
||||
const streamResponse = await client.messages.create({
|
||||
...params,
|
||||
stream: true
|
||||
});
|
||||
|
||||
// Process each chunk in the stream
|
||||
for await (const chunk of streamResponse) {
|
||||
// Only process text content deltas
|
||||
if (chunk.type === 'content_block_delta' && chunk.delta?.type === 'text_delta') {
|
||||
const text = chunk.delta.text || '';
|
||||
completeText += text;
|
||||
|
||||
// Call the callback with the chunk
|
||||
// Send the chunk to the caller
|
||||
await callback({
|
||||
text,
|
||||
done: false,
|
||||
usage: {} // Usage stats not available in chunks
|
||||
raw: chunk // Include the raw chunk for advanced processing
|
||||
});
|
||||
}
|
||||
}
|
||||
@ -146,11 +151,7 @@ export class AnthropicService extends BaseAIService {
|
||||
// Signal completion
|
||||
await callback({
|
||||
text: '',
|
||||
done: true,
|
||||
usage: {
|
||||
// We don't have token usage information in streaming mode from the chunks
|
||||
totalTokens: completeText.length / 4 // Rough estimate
|
||||
}
|
||||
done: true
|
||||
});
|
||||
|
||||
return completeText;
|
||||
@ -160,25 +161,12 @@ export class AnthropicService extends BaseAIService {
|
||||
}
|
||||
};
|
||||
|
||||
// If a stream callback was provided in the options, set up immediate streaming
|
||||
if (opts.streamCallback) {
|
||||
// Start streaming in the background
|
||||
void streamHandler(async (chunk) => {
|
||||
if (opts.streamCallback) {
|
||||
await opts.streamCallback(chunk.text, chunk.done);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Return a response object with the stream handler
|
||||
return {
|
||||
text: completeText, // This will be empty initially until streaming completes
|
||||
text: '', // Initial text is empty, will be populated during streaming
|
||||
model: providerOptions.model,
|
||||
provider: this.getName(),
|
||||
stream: streamHandler,
|
||||
usage: {
|
||||
// We don't have token counts initially with streaming
|
||||
totalTokens: 0
|
||||
}
|
||||
stream: streamHandler
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -1,6 +1,6 @@
|
||||
import options from '../../options.js';
|
||||
import { BaseAIService } from '../base_ai_service.js';
|
||||
import type { Message, ChatCompletionOptions, ChatResponse } from '../ai_interface.js';
|
||||
import type { Message, ChatCompletionOptions, ChatResponse, StreamChunk } from '../ai_interface.js';
|
||||
import { OllamaMessageFormatter } from '../formatters/ollama_formatter.js';
|
||||
import log from '../../log.js';
|
||||
import type { ToolCall } from '../tools/tool_interfaces.js';
|
||||
@ -37,7 +37,37 @@ export class OllamaService extends BaseAIService {
|
||||
if (!baseUrl) {
|
||||
throw new Error('Ollama base URL is not configured');
|
||||
}
|
||||
this.client = new Ollama({ host: baseUrl });
|
||||
|
||||
log.info(`Creating new Ollama client with base URL: ${baseUrl}`);
|
||||
|
||||
// Create client with debug options
|
||||
try {
|
||||
this.client = new Ollama({
|
||||
host: baseUrl,
|
||||
fetch: (url, init) => {
|
||||
log.info(`Ollama API request to: ${url}`);
|
||||
log.info(`Ollama API request method: ${init?.method || 'GET'}`);
|
||||
log.info(`Ollama API request headers: ${JSON.stringify(init?.headers || {})}`);
|
||||
|
||||
// Call the actual fetch
|
||||
return fetch(url, init).then(response => {
|
||||
log.info(`Ollama API response status: ${response.status}`);
|
||||
if (!response.ok) {
|
||||
log.error(`Ollama API error response: ${response.statusText}`);
|
||||
}
|
||||
return response;
|
||||
}).catch(error => {
|
||||
log.error(`Ollama API fetch error: ${error.message}`);
|
||||
throw error;
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
log.info(`Ollama client successfully created`);
|
||||
} catch (error) {
|
||||
log.error(`Error creating Ollama client: ${error}`);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
return this.client;
|
||||
}
|
||||
@ -88,11 +118,6 @@ export class OllamaService extends BaseAIService {
|
||||
log.info(`Sending to Ollama with formatted messages: ${messagesToSend.length}`);
|
||||
}
|
||||
|
||||
// Log request details
|
||||
log.info(`========== OLLAMA API REQUEST ==========`);
|
||||
log.info(`Model: ${providerOptions.model}, Messages: ${messagesToSend.length}`);
|
||||
log.info(`Stream: ${opts.streamCallback ? true : false}`);
|
||||
|
||||
// Get tools if enabled
|
||||
let tools = [];
|
||||
if (providerOptions.enableTools !== false) {
|
||||
@ -119,48 +144,18 @@ export class OllamaService extends BaseAIService {
|
||||
}
|
||||
}
|
||||
|
||||
// Check message structure and log detailed information about each message
|
||||
messagesToSend.forEach((msg: any, index: number) => {
|
||||
const keys = Object.keys(msg);
|
||||
log.info(`Message ${index}, Role: ${msg.role}, Keys: ${keys.join(', ')}`);
|
||||
|
||||
// Log message content preview
|
||||
if (msg.content && typeof msg.content === 'string') {
|
||||
const contentPreview = msg.content.length > 200
|
||||
? `${msg.content.substring(0, 200)}...`
|
||||
: msg.content;
|
||||
log.info(`Message ${index} content: ${contentPreview}`);
|
||||
}
|
||||
|
||||
// Log tool-related details
|
||||
if (keys.includes('tool_calls')) {
|
||||
log.info(`Message ${index} has ${msg.tool_calls.length} tool calls`);
|
||||
}
|
||||
|
||||
if (keys.includes('tool_call_id')) {
|
||||
log.info(`Message ${index} is a tool response for tool call ID: ${msg.tool_call_id}`);
|
||||
}
|
||||
|
||||
if (keys.includes('name') && msg.role === 'tool') {
|
||||
log.info(`Message ${index} is from tool: ${msg.name}`);
|
||||
}
|
||||
});
|
||||
|
||||
// Get client instance
|
||||
const client = this.getClient();
|
||||
|
||||
// Convert our message format to Ollama's format
|
||||
const convertedMessages = messagesToSend.map(msg => {
|
||||
const converted: any = {
|
||||
role: msg.role,
|
||||
content: msg.content
|
||||
};
|
||||
|
||||
|
||||
if (msg.tool_calls) {
|
||||
converted.tool_calls = msg.tool_calls.map(tc => {
|
||||
// For Ollama, arguments must be an object, not a string
|
||||
let processedArgs = tc.function.arguments;
|
||||
|
||||
|
||||
// If arguments is a string, try to parse it as JSON
|
||||
if (typeof processedArgs === 'string') {
|
||||
try {
|
||||
@ -171,7 +166,7 @@ export class OllamaService extends BaseAIService {
|
||||
processedArgs = { raw: processedArgs };
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
return {
|
||||
id: tc.id,
|
||||
function: {
|
||||
@ -181,18 +176,18 @@ export class OllamaService extends BaseAIService {
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
if (msg.tool_call_id) {
|
||||
converted.tool_call_id = msg.tool_call_id;
|
||||
}
|
||||
|
||||
|
||||
if (msg.name) {
|
||||
converted.name = msg.name;
|
||||
}
|
||||
|
||||
|
||||
return converted;
|
||||
});
|
||||
|
||||
|
||||
// Prepare base request options
|
||||
const baseRequestOptions = {
|
||||
model: providerOptions.model,
|
||||
@ -202,85 +197,29 @@ export class OllamaService extends BaseAIService {
|
||||
tools: tools.length > 0 ? tools : undefined
|
||||
};
|
||||
|
||||
// Get client instance
|
||||
const client = this.getClient();
|
||||
|
||||
// Handle streaming
|
||||
if (opts.streamCallback) {
|
||||
let responseText = '';
|
||||
let responseToolCalls: any[] = [];
|
||||
|
||||
log.info(`Using streaming mode with Ollama client`);
|
||||
|
||||
let streamResponse: OllamaChatResponse | null = null;
|
||||
|
||||
// Create streaming request
|
||||
const streamingRequest = {
|
||||
...baseRequestOptions,
|
||||
stream: true as const // Use const assertion to fix the type
|
||||
};
|
||||
|
||||
// Get the async iterator
|
||||
const streamIterator = await client.chat(streamingRequest);
|
||||
|
||||
// Process each chunk
|
||||
for await (const chunk of streamIterator) {
|
||||
// Save the last chunk for final stats
|
||||
streamResponse = chunk;
|
||||
|
||||
// Accumulate text
|
||||
if (chunk.message?.content) {
|
||||
responseText += chunk.message.content;
|
||||
}
|
||||
|
||||
// Check for tool calls
|
||||
if (chunk.message?.tool_calls && chunk.message.tool_calls.length > 0) {
|
||||
responseToolCalls = [...chunk.message.tool_calls];
|
||||
}
|
||||
|
||||
// Call the callback with the current chunk content
|
||||
if (opts.streamCallback) {
|
||||
// Original callback expects text content, isDone flag, and optional original chunk
|
||||
opts.streamCallback(
|
||||
chunk.message?.content || '',
|
||||
!!chunk.done,
|
||||
chunk
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Create the final response after streaming is complete
|
||||
return {
|
||||
text: responseText,
|
||||
model: providerOptions.model,
|
||||
provider: this.getName(),
|
||||
tool_calls: this.transformToolCalls(responseToolCalls),
|
||||
usage: {
|
||||
promptTokens: streamResponse?.prompt_eval_count || 0,
|
||||
completionTokens: streamResponse?.eval_count || 0,
|
||||
totalTokens: (streamResponse?.prompt_eval_count || 0) + (streamResponse?.eval_count || 0)
|
||||
}
|
||||
};
|
||||
if (opts.stream || opts.streamCallback) {
|
||||
return this.handleStreamingResponse(client, baseRequestOptions, opts, providerOptions);
|
||||
} else {
|
||||
// Non-streaming request
|
||||
log.info(`Using non-streaming mode with Ollama client`);
|
||||
|
||||
|
||||
// Create non-streaming request
|
||||
const nonStreamingRequest = {
|
||||
...baseRequestOptions,
|
||||
stream: false as const // Use const assertion for type safety
|
||||
};
|
||||
|
||||
|
||||
const response = await client.chat(nonStreamingRequest);
|
||||
|
||||
|
||||
// Log response details
|
||||
log.info(`========== OLLAMA API RESPONSE ==========`);
|
||||
log.info(`Model: ${response.model}, Content length: ${response.message?.content?.length || 0} chars`);
|
||||
log.info(`Tokens: ${response.prompt_eval_count || 0} prompt, ${response.eval_count || 0} completion, ${(response.prompt_eval_count || 0) + (response.eval_count || 0)} total`);
|
||||
|
||||
// Log content preview
|
||||
const contentPreview = response.message?.content && response.message.content.length > 300
|
||||
? `${response.message.content.substring(0, 300)}...`
|
||||
: response.message?.content || '';
|
||||
log.info(`Response content: ${contentPreview}`);
|
||||
|
||||
|
||||
// Handle the response and extract tool calls if present
|
||||
const chatResponse: ChatResponse = {
|
||||
text: response.message?.content || '',
|
||||
@ -292,15 +231,13 @@ export class OllamaService extends BaseAIService {
|
||||
totalTokens: (response.prompt_eval_count || 0) + (response.eval_count || 0)
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
// Add tool calls if present
|
||||
if (response.message?.tool_calls && response.message.tool_calls.length > 0) {
|
||||
log.info(`Ollama response includes ${response.message.tool_calls.length} tool calls`);
|
||||
chatResponse.tool_calls = this.transformToolCalls(response.message.tool_calls);
|
||||
log.info(`Transformed tool calls: ${JSON.stringify(chatResponse.tool_calls)}`);
|
||||
}
|
||||
|
||||
log.info(`========== END OLLAMA RESPONSE ==========`);
|
||||
|
||||
return chatResponse;
|
||||
}
|
||||
} catch (error: any) {
|
||||
@ -315,6 +252,303 @@ export class OllamaService extends BaseAIService {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle streaming response from Ollama
|
||||
*
|
||||
* Simplified implementation that leverages the Ollama SDK's streaming capabilities
|
||||
*/
|
||||
private async handleStreamingResponse(
|
||||
client: Ollama,
|
||||
requestOptions: any,
|
||||
opts: ChatCompletionOptions,
|
||||
providerOptions: OllamaOptions
|
||||
): Promise<ChatResponse> {
|
||||
log.info(`Using streaming mode with Ollama client`);
|
||||
|
||||
// Log detailed information about the streaming setup
|
||||
log.info(`Ollama streaming details: model=${providerOptions.model}, streamCallback=${opts.streamCallback ? 'provided' : 'not provided'}`);
|
||||
|
||||
// Create a stream handler function that processes the SDK's stream
|
||||
const streamHandler = async (callback: (chunk: StreamChunk) => Promise<void> | void): Promise<string> => {
|
||||
let completeText = '';
|
||||
let responseToolCalls: any[] = [];
|
||||
let chunkCount = 0;
|
||||
|
||||
try {
|
||||
// Create streaming request
|
||||
const streamingRequest = {
|
||||
...requestOptions,
|
||||
stream: true as const // Use const assertion to fix the type
|
||||
};
|
||||
|
||||
log.info(`Creating Ollama streaming request with options: model=${streamingRequest.model}, stream=${streamingRequest.stream}, tools=${streamingRequest.tools ? streamingRequest.tools.length : 0}`);
|
||||
|
||||
// Get the async iterator
|
||||
log.info(`Calling Ollama chat API with streaming enabled`);
|
||||
let streamIterator;
|
||||
try {
|
||||
log.info(`About to call client.chat with streaming request to ${options.getOption('ollamaBaseUrl')}`);
|
||||
log.info(`Stream request: model=${streamingRequest.model}, messages count=${streamingRequest.messages?.length || 0}`);
|
||||
|
||||
// Check if we can connect to Ollama by getting available models
|
||||
try {
|
||||
log.info(`Performing Ollama health check...`);
|
||||
const healthCheck = await client.list();
|
||||
log.info(`Ollama health check successful. Available models: ${healthCheck.models.map(m => m.name).join(', ')}`);
|
||||
} catch (healthError) {
|
||||
log.error(`Ollama health check failed: ${healthError instanceof Error ? healthError.message : String(healthError)}`);
|
||||
log.error(`This indicates a connection issue to the Ollama server at ${options.getOption('ollamaBaseUrl')}`);
|
||||
throw new Error(`Unable to connect to Ollama server: ${healthError instanceof Error ? healthError.message : String(healthError)}`);
|
||||
}
|
||||
|
||||
// Make the streaming request
|
||||
log.info(`Proceeding with Ollama streaming request after successful health check`);
|
||||
streamIterator = await client.chat(streamingRequest);
|
||||
|
||||
log.info(`Successfully obtained Ollama stream iterator`);
|
||||
|
||||
if (!streamIterator || typeof streamIterator[Symbol.asyncIterator] !== 'function') {
|
||||
log.error(`Invalid stream iterator returned: ${JSON.stringify(streamIterator)}`);
|
||||
throw new Error('Stream iterator is not valid');
|
||||
}
|
||||
} catch (error) {
|
||||
log.error(`Error getting stream iterator: ${error instanceof Error ? error.message : String(error)}`);
|
||||
log.error(`Error stack: ${error instanceof Error ? error.stack : 'No stack trace'}`);
|
||||
throw error;
|
||||
}
|
||||
|
||||
// Process each chunk
|
||||
try {
|
||||
log.info(`About to start processing stream chunks`);
|
||||
for await (const chunk of streamIterator) {
|
||||
chunkCount++;
|
||||
|
||||
// Log first chunk and then periodic updates
|
||||
if (chunkCount === 1 || chunkCount % 10 === 0) {
|
||||
log.info(`Processing Ollama stream chunk #${chunkCount}, done=${!!chunk.done}, has content=${!!chunk.message?.content}`);
|
||||
}
|
||||
|
||||
// Accumulate text
|
||||
if (chunk.message?.content) {
|
||||
const newContent = chunk.message.content;
|
||||
completeText += newContent;
|
||||
|
||||
if (chunkCount === 1) {
|
||||
log.info(`First content chunk received: "${newContent.substring(0, 50)}${newContent.length > 50 ? '...' : ''}"`);
|
||||
}
|
||||
}
|
||||
|
||||
// Check for tool calls
|
||||
if (chunk.message?.tool_calls && chunk.message.tool_calls.length > 0) {
|
||||
responseToolCalls = [...chunk.message.tool_calls];
|
||||
log.info(`Received tool calls in stream: ${chunk.message.tool_calls.length} tools`);
|
||||
}
|
||||
|
||||
// Send the chunk to the caller
|
||||
await callback({
|
||||
text: chunk.message?.content || '',
|
||||
done: !!chunk.done,
|
||||
raw: chunk // Include the raw chunk for advanced processing
|
||||
});
|
||||
|
||||
// If this is the done chunk, log it
|
||||
if (chunk.done) {
|
||||
log.info(`Reached final chunk (done=true) after ${chunkCount} chunks, total content length: ${completeText.length}`);
|
||||
}
|
||||
}
|
||||
|
||||
log.info(`Completed streaming from Ollama: processed ${chunkCount} chunks, total content: ${completeText.length} chars`);
|
||||
|
||||
// Signal completion
|
||||
await callback({
|
||||
text: '',
|
||||
done: true
|
||||
});
|
||||
} catch (streamProcessError) {
|
||||
log.error(`Error processing Ollama stream: ${streamProcessError instanceof Error ? streamProcessError.message : String(streamProcessError)}`);
|
||||
log.error(`Stream process error stack: ${streamProcessError instanceof Error ? streamProcessError.stack : 'No stack trace'}`);
|
||||
|
||||
// Try to signal completion with error
|
||||
try {
|
||||
await callback({
|
||||
text: '',
|
||||
done: true,
|
||||
raw: { error: streamProcessError instanceof Error ? streamProcessError.message : String(streamProcessError) }
|
||||
});
|
||||
} catch (finalError) {
|
||||
log.error(`Error sending final error chunk: ${finalError}`);
|
||||
}
|
||||
|
||||
throw streamProcessError;
|
||||
}
|
||||
|
||||
return completeText;
|
||||
} catch (error) {
|
||||
log.error(`Error in Ollama streaming: ${error}`);
|
||||
log.error(`Error details: ${error instanceof Error ? error.stack : 'No stack trace available'}`);
|
||||
throw error;
|
||||
}
|
||||
};
|
||||
|
||||
// Handle direct streamCallback if provided
|
||||
if (opts.streamCallback) {
|
||||
let completeText = '';
|
||||
let responseToolCalls: any[] = [];
|
||||
let finalChunk: OllamaChatResponse | null = null;
|
||||
let chunkCount = 0;
|
||||
|
||||
try {
|
||||
// Create streaming request
|
||||
const streamingRequest = {
|
||||
...requestOptions,
|
||||
stream: true as const
|
||||
};
|
||||
|
||||
log.info(`Starting Ollama direct streamCallback processing with model ${providerOptions.model}`);
|
||||
|
||||
// Get the async iterator
|
||||
log.info(`Calling Ollama chat API for direct streaming`);
|
||||
let streamIterator;
|
||||
try {
|
||||
log.info(`About to call client.chat with streaming request to ${options.getOption('ollamaBaseUrl')}`);
|
||||
log.info(`Model: ${streamingRequest.model}, Stream: ${streamingRequest.stream}`);
|
||||
log.info(`Messages count: ${streamingRequest.messages.length}`);
|
||||
log.info(`First message: role=${streamingRequest.messages[0].role}, content preview=${streamingRequest.messages[0].content?.substring(0, 50) || 'empty'}`);
|
||||
|
||||
// Perform health check before streaming
|
||||
try {
|
||||
log.info(`Performing Ollama health check before direct streaming...`);
|
||||
const healthCheck = await client.list();
|
||||
log.info(`Ollama health check successful. Available models: ${healthCheck.models.map(m => m.name).join(', ')}`);
|
||||
} catch (healthError) {
|
||||
log.error(`Ollama health check failed: ${healthError instanceof Error ? healthError.message : String(healthError)}`);
|
||||
log.error(`This indicates a connection issue to the Ollama server at ${options.getOption('ollamaBaseUrl')}`);
|
||||
throw new Error(`Unable to connect to Ollama server: ${healthError instanceof Error ? healthError.message : String(healthError)}`);
|
||||
}
|
||||
|
||||
// Proceed with streaming after successful health check
|
||||
log.info(`Making Ollama streaming request after successful health check`);
|
||||
streamIterator = await client.chat(streamingRequest);
|
||||
|
||||
log.info(`Successfully obtained Ollama stream iterator for direct callback`);
|
||||
|
||||
// Check if the stream iterator is valid
|
||||
if (!streamIterator || typeof streamIterator[Symbol.asyncIterator] !== 'function') {
|
||||
log.error(`Invalid stream iterator returned from Ollama: ${JSON.stringify(streamIterator)}`);
|
||||
throw new Error('Invalid stream iterator returned from Ollama');
|
||||
}
|
||||
|
||||
log.info(`Stream iterator is valid, beginning processing`);
|
||||
} catch (error) {
|
||||
log.error(`Error getting stream iterator from Ollama: ${error instanceof Error ? error.message : String(error)}`);
|
||||
log.error(`Error stack: ${error instanceof Error ? error.stack : 'No stack trace'}`);
|
||||
throw error;
|
||||
}
|
||||
|
||||
// Process each chunk
|
||||
try {
|
||||
log.info(`Starting to iterate through stream chunks`);
|
||||
for await (const chunk of streamIterator) {
|
||||
chunkCount++;
|
||||
finalChunk = chunk;
|
||||
|
||||
// Log first chunk and periodic updates
|
||||
if (chunkCount === 1 || chunkCount % 10 === 0) {
|
||||
log.info(`Processing Ollama direct stream chunk #${chunkCount}, done=${!!chunk.done}, has content=${!!chunk.message?.content}`);
|
||||
}
|
||||
|
||||
// Accumulate text
|
||||
if (chunk.message?.content) {
|
||||
const newContent = chunk.message.content;
|
||||
completeText += newContent;
|
||||
|
||||
if (chunkCount === 1) {
|
||||
log.info(`First direct content chunk: "${newContent.substring(0, 50)}${newContent.length > 50 ? '...' : ''}"`);
|
||||
}
|
||||
}
|
||||
|
||||
// Check for tool calls
|
||||
if (chunk.message?.tool_calls && chunk.message.tool_calls.length > 0) {
|
||||
responseToolCalls = [...chunk.message.tool_calls];
|
||||
log.info(`Received tool calls in direct stream: ${chunk.message.tool_calls.length} tools`);
|
||||
}
|
||||
|
||||
// Call the callback with the current chunk content
|
||||
if (opts.streamCallback) {
|
||||
try {
|
||||
// For the final chunk, make sure to send the complete text with done=true
|
||||
if (chunk.done) {
|
||||
log.info(`Sending final callback with done=true and complete content (${completeText.length} chars)`);
|
||||
await opts.streamCallback(
|
||||
completeText, // Send the full accumulated content for the final chunk
|
||||
true,
|
||||
{ ...chunk, message: { ...chunk.message, content: completeText } }
|
||||
);
|
||||
} else if (chunk.message?.content) {
|
||||
// For content chunks, send them as they come
|
||||
await opts.streamCallback(
|
||||
chunk.message.content,
|
||||
!!chunk.done,
|
||||
chunk
|
||||
);
|
||||
} else if (chunk.message?.tool_calls && chunk.message.tool_calls.length > 0) {
|
||||
// For tool call chunks, send an empty content string but include the tool calls
|
||||
await opts.streamCallback(
|
||||
'',
|
||||
!!chunk.done,
|
||||
chunk
|
||||
);
|
||||
}
|
||||
|
||||
if (chunkCount === 1) {
|
||||
log.info(`Successfully called streamCallback with first chunk`);
|
||||
}
|
||||
} catch (callbackError) {
|
||||
log.error(`Error in streamCallback: ${callbackError}`);
|
||||
}
|
||||
}
|
||||
|
||||
// If this is the done chunk, log it
|
||||
if (chunk.done) {
|
||||
log.info(`Reached final direct chunk (done=true) after ${chunkCount} chunks, total content length: ${completeText.length}`);
|
||||
}
|
||||
}
|
||||
|
||||
log.info(`Completed direct streaming from Ollama: processed ${chunkCount} chunks, final content: ${completeText.length} chars`);
|
||||
} catch (iterationError) {
|
||||
log.error(`Error iterating through Ollama stream chunks: ${iterationError instanceof Error ? iterationError.message : String(iterationError)}`);
|
||||
log.error(`Iteration error stack: ${iterationError instanceof Error ? iterationError.stack : 'No stack trace'}`);
|
||||
throw iterationError;
|
||||
}
|
||||
|
||||
// Create the final response after streaming is complete
|
||||
return {
|
||||
text: completeText,
|
||||
model: providerOptions.model,
|
||||
provider: this.getName(),
|
||||
tool_calls: this.transformToolCalls(responseToolCalls),
|
||||
usage: {
|
||||
promptTokens: finalChunk?.prompt_eval_count || 0,
|
||||
completionTokens: finalChunk?.eval_count || 0,
|
||||
totalTokens: (finalChunk?.prompt_eval_count || 0) + (finalChunk?.eval_count || 0)
|
||||
}
|
||||
};
|
||||
} catch (error) {
|
||||
log.error(`Error in Ollama streaming with callback: ${error}`);
|
||||
log.error(`Error details: ${error instanceof Error ? error.stack : 'No stack trace available'}`);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
// Return a response object with the stream handler
|
||||
return {
|
||||
text: '', // Initial text is empty, will be populated during streaming
|
||||
model: providerOptions.model,
|
||||
provider: this.getName(),
|
||||
stream: streamHandler
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Transform Ollama tool calls to the standard format expected by the pipeline
|
||||
*/
|
||||
@ -322,14 +556,14 @@ export class OllamaService extends BaseAIService {
|
||||
if (!toolCalls || !Array.isArray(toolCalls) || toolCalls.length === 0) {
|
||||
return [];
|
||||
}
|
||||
|
||||
|
||||
return toolCalls.map((toolCall, index) => {
|
||||
// Generate a unique ID if none is provided
|
||||
const id = toolCall.id || `tool-call-${Date.now()}-${index}`;
|
||||
|
||||
|
||||
// Handle arguments based on their type
|
||||
let processedArguments: Record<string, any> | string = toolCall.function?.arguments || {};
|
||||
|
||||
|
||||
if (typeof processedArguments === 'string') {
|
||||
try {
|
||||
processedArguments = JSON.parse(processedArguments);
|
||||
@ -339,7 +573,7 @@ export class OllamaService extends BaseAIService {
|
||||
processedArguments = { raw: processedArguments };
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
return {
|
||||
id,
|
||||
type: 'function',
|
||||
|
@ -70,103 +70,61 @@ export class OpenAIService extends BaseAIService {
|
||||
if (providerOptions.stream) {
|
||||
params.stream = true;
|
||||
|
||||
// Get stream from OpenAI SDK
|
||||
const stream = await client.chat.completions.create(params);
|
||||
let fullText = '';
|
||||
|
||||
// If a direct callback is provided, use it
|
||||
if (providerOptions.streamCallback) {
|
||||
// Process the stream with the callback
|
||||
try {
|
||||
// The stream is an AsyncIterable
|
||||
if (Symbol.asyncIterator in stream) {
|
||||
for await (const chunk of stream as AsyncIterable<OpenAI.Chat.ChatCompletionChunk>) {
|
||||
const content = chunk.choices[0]?.delta?.content || '';
|
||||
if (content) {
|
||||
fullText += content;
|
||||
await providerOptions.streamCallback(content, false, chunk);
|
||||
}
|
||||
|
||||
// If this is the last chunk
|
||||
if (chunk.choices[0]?.finish_reason) {
|
||||
await providerOptions.streamCallback('', true, chunk);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
console.error('Stream is not iterable, falling back to non-streaming response');
|
||||
|
||||
// If we get a non-streaming response somehow
|
||||
if ('choices' in stream) {
|
||||
const content = stream.choices[0]?.message?.content || '';
|
||||
fullText = content;
|
||||
if (providerOptions.streamCallback) {
|
||||
await providerOptions.streamCallback(content, true, stream);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('Error processing stream:', error);
|
||||
throw error;
|
||||
}
|
||||
|
||||
return {
|
||||
text: fullText,
|
||||
model: params.model,
|
||||
provider: this.getName(),
|
||||
usage: {} // Usage stats aren't available with streaming
|
||||
};
|
||||
} else {
|
||||
// Use the more flexible stream interface
|
||||
return {
|
||||
text: '', // Initial empty text, will be filled by stream processing
|
||||
model: params.model,
|
||||
provider: this.getName(),
|
||||
usage: {}, // Usage stats aren't available with streaming
|
||||
stream: async (callback) => {
|
||||
let completeText = '';
|
||||
|
||||
try {
|
||||
// The stream is an AsyncIterable
|
||||
if (Symbol.asyncIterator in stream) {
|
||||
for await (const chunk of stream as AsyncIterable<OpenAI.Chat.ChatCompletionChunk>) {
|
||||
const content = chunk.choices[0]?.delta?.content || '';
|
||||
const isDone = !!chunk.choices[0]?.finish_reason;
|
||||
|
||||
if (content) {
|
||||
completeText += content;
|
||||
}
|
||||
|
||||
// Call the provided callback with the StreamChunk interface
|
||||
await callback({
|
||||
text: content,
|
||||
done: isDone
|
||||
});
|
||||
|
||||
if (isDone) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
console.warn('Stream is not iterable, falling back to non-streaming response');
|
||||
// Return a response with the stream handler
|
||||
return {
|
||||
text: '', // Initial empty text, will be populated during streaming
|
||||
model: params.model,
|
||||
provider: this.getName(),
|
||||
stream: async (callback) => {
|
||||
let completeText = '';
|
||||
|
||||
try {
|
||||
// Process the stream
|
||||
if (Symbol.asyncIterator in stream) {
|
||||
for await (const chunk of stream as AsyncIterable<OpenAI.Chat.ChatCompletionChunk>) {
|
||||
const content = chunk.choices[0]?.delta?.content || '';
|
||||
const isDone = !!chunk.choices[0]?.finish_reason;
|
||||
|
||||
// If we get a non-streaming response somehow
|
||||
if ('choices' in stream) {
|
||||
const content = stream.choices[0]?.message?.content || '';
|
||||
completeText = content;
|
||||
await callback({
|
||||
text: content,
|
||||
done: true
|
||||
});
|
||||
if (content) {
|
||||
completeText += content;
|
||||
}
|
||||
|
||||
// Send the chunk to the caller with raw data
|
||||
await callback({
|
||||
text: content,
|
||||
done: isDone,
|
||||
raw: chunk // Include the raw chunk for advanced processing
|
||||
});
|
||||
|
||||
if (isDone) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('Error processing stream:', error);
|
||||
throw error;
|
||||
} else {
|
||||
// Fallback for non-iterable response
|
||||
console.warn('Stream is not iterable, falling back to non-streaming response');
|
||||
|
||||
if ('choices' in stream) {
|
||||
const content = stream.choices[0]?.message?.content || '';
|
||||
completeText = content;
|
||||
await callback({
|
||||
text: content,
|
||||
done: true,
|
||||
raw: stream
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return completeText;
|
||||
} catch (error) {
|
||||
console.error('Error processing stream:', error);
|
||||
throw error;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
return completeText;
|
||||
}
|
||||
};
|
||||
} else {
|
||||
// Non-streaming response
|
||||
params.stream = false;
|
||||
|
@ -1,6 +1,26 @@
|
||||
import log from "../log.js";
|
||||
import type { Request, Response } from "express";
|
||||
import type { Message, ChatCompletionOptions } from "./ai_interface.js";
|
||||
import type { Message, ChatCompletionOptions, ChatResponse, StreamChunk } from "./ai_interface.js";
|
||||
|
||||
/**
|
||||
* Interface for WebSocket LLM streaming messages
|
||||
*/
|
||||
interface LLMStreamMessage {
|
||||
type: 'llm-stream';
|
||||
sessionId: string;
|
||||
content?: string;
|
||||
thinking?: string;
|
||||
toolExecution?: {
|
||||
action?: string;
|
||||
tool?: string;
|
||||
result?: string;
|
||||
error?: string;
|
||||
args?: Record<string, unknown>;
|
||||
};
|
||||
done?: boolean;
|
||||
error?: string;
|
||||
raw?: unknown;
|
||||
}
|
||||
import contextService from "./context/services/context_service.js";
|
||||
import { LLM_CONSTANTS } from './constants/provider_constants.js';
|
||||
import { ERROR_PROMPTS } from './constants/llm_prompt_constants.js';
|
||||
@ -290,22 +310,24 @@ class RestChatService {
|
||||
// Add logging for POST requests
|
||||
log.info(`LLM POST message: sessionId=${req.params.sessionId}, useAdvancedContext=${useAdvancedContext}, showThinking=${showThinking}, contentLength=${content ? content.length : 0}`);
|
||||
} else if (req.method === 'GET') {
|
||||
// For GET (streaming) requests, get format from query params
|
||||
// The content should have been sent in a previous POST request
|
||||
useAdvancedContext = req.query.useAdvancedContext === 'true';
|
||||
showThinking = req.query.showThinking === 'true';
|
||||
content = ''; // We don't need content for GET requests
|
||||
// For GET (streaming) requests, get parameters from query params and body
|
||||
// For streaming requests, we need the content from the body
|
||||
useAdvancedContext = req.query.useAdvancedContext === 'true' || (req.body && req.body.useAdvancedContext === true);
|
||||
showThinking = req.query.showThinking === 'true' || (req.body && req.body.showThinking === true);
|
||||
content = req.body && req.body.content ? req.body.content : '';
|
||||
|
||||
// Add logging for GET requests
|
||||
// Add detailed logging for GET requests
|
||||
log.info(`LLM GET stream: sessionId=${req.params.sessionId}, useAdvancedContext=${useAdvancedContext}, showThinking=${showThinking}`);
|
||||
log.info(`Parameters from query: useAdvancedContext=${req.query.useAdvancedContext}, showThinking=${req.query.showThinking}`);
|
||||
log.info(`Parameters from body: useAdvancedContext=${req.body?.useAdvancedContext}, showThinking=${req.body?.showThinking}, content=${content ? `${content.substring(0, 20)}...` : 'none'}`);
|
||||
}
|
||||
|
||||
// Get sessionId from URL params since it's part of the route
|
||||
sessionId = req.params.sessionId;
|
||||
|
||||
// For GET requests, ensure we have the format=stream parameter
|
||||
if (req.method === 'GET' && (!req.query.format || req.query.format !== 'stream')) {
|
||||
throw new Error('Stream format parameter is required for GET requests');
|
||||
// For GET requests, ensure we have the stream parameter
|
||||
if (req.method === 'GET' && req.query.stream !== 'true') {
|
||||
throw new Error('Stream parameter must be set to true for GET/streaming requests');
|
||||
}
|
||||
|
||||
// For POST requests, validate the content
|
||||
@ -443,6 +465,33 @@ class RestChatService {
|
||||
|
||||
log.info("Executing chat pipeline...");
|
||||
|
||||
// Create options object for better tracking
|
||||
const pipelineOptions = {
|
||||
// Force useAdvancedContext to be a boolean, no matter what
|
||||
useAdvancedContext: useAdvancedContext === true,
|
||||
systemPrompt: session.messages.find(m => m.role === 'system')?.content,
|
||||
temperature: session.metadata.temperature,
|
||||
maxTokens: session.metadata.maxTokens,
|
||||
model: session.metadata.model,
|
||||
// Set stream based on request type, but ensure it's explicitly a boolean value
|
||||
// GET requests or format=stream parameter indicates streaming should be used
|
||||
stream: !!(req.method === 'GET' || req.query.format === 'stream' || req.query.stream === 'true')
|
||||
};
|
||||
|
||||
// Log the options to verify what's being sent to the pipeline
|
||||
log.info(`Pipeline input options: ${JSON.stringify({
|
||||
useAdvancedContext: pipelineOptions.useAdvancedContext,
|
||||
stream: pipelineOptions.stream
|
||||
})}`);
|
||||
|
||||
// Import the WebSocket service for direct access
|
||||
const wsService = await import('../../services/ws.js');
|
||||
|
||||
// Create a stream callback wrapper
|
||||
// This will ensure we properly handle all streaming messages
|
||||
let messageContent = '';
|
||||
let streamFinished = false;
|
||||
|
||||
// Prepare the pipeline input
|
||||
const pipelineInput: ChatPipelineInput = {
|
||||
messages: session.messages.map(msg => ({
|
||||
@ -452,30 +501,109 @@ class RestChatService {
|
||||
query: content,
|
||||
noteId: session.noteContext ?? undefined,
|
||||
showThinking: showThinking,
|
||||
options: {
|
||||
useAdvancedContext: useAdvancedContext,
|
||||
systemPrompt: session.messages.find(m => m.role === 'system')?.content,
|
||||
temperature: session.metadata.temperature,
|
||||
maxTokens: session.metadata.maxTokens,
|
||||
model: session.metadata.model,
|
||||
// Set stream based on request type, but ensure it's explicitly a boolean value
|
||||
// GET requests or format=stream parameter indicates streaming should be used
|
||||
stream: !!(req.method === 'GET' || req.query.format === 'stream')
|
||||
},
|
||||
options: pipelineOptions,
|
||||
streamCallback: req.method === 'GET' ? (data, done, rawChunk) => {
|
||||
// Prepare response data - include both the content and raw chunk data if available
|
||||
const responseData: any = { content: data, done };
|
||||
|
||||
// If there's tool execution information, add it to the response
|
||||
if (rawChunk && rawChunk.toolExecution) {
|
||||
responseData.toolExecution = rawChunk.toolExecution;
|
||||
}
|
||||
|
||||
// Send the data as a JSON event
|
||||
res.write(`data: ${JSON.stringify(responseData)}\n\n`);
|
||||
|
||||
if (done) {
|
||||
res.end();
|
||||
try {
|
||||
// Send a single WebSocket message that contains everything needed
|
||||
// Only accumulate content that's actually text (not tool execution or thinking info)
|
||||
if (data) {
|
||||
messageContent += data;
|
||||
}
|
||||
|
||||
// Create a message object with all necessary fields
|
||||
const message: LLMStreamMessage = {
|
||||
type: 'llm-stream',
|
||||
sessionId
|
||||
};
|
||||
|
||||
// Add content if available - either the new chunk or full content on completion
|
||||
if (data) {
|
||||
message.content = data;
|
||||
}
|
||||
|
||||
// Add thinking info if available in the raw chunk
|
||||
if (rawChunk?.thinking) {
|
||||
message.thinking = rawChunk.thinking;
|
||||
}
|
||||
|
||||
// Add tool execution info if available in the raw chunk
|
||||
if (rawChunk?.toolExecution) {
|
||||
message.toolExecution = rawChunk.toolExecution;
|
||||
}
|
||||
|
||||
// Set done flag explicitly
|
||||
message.done = done;
|
||||
|
||||
// On final message, include the complete content too
|
||||
if (done) {
|
||||
streamFinished = true;
|
||||
|
||||
// Always send the accumulated content with the done=true message
|
||||
// This ensures the client receives the complete content even if earlier messages were missed
|
||||
message.content = messageContent;
|
||||
|
||||
log.info(`Stream complete, sending final message with ${messageContent.length} chars of content`);
|
||||
|
||||
// Store the response in the session when done
|
||||
session.messages.push({
|
||||
role: 'assistant',
|
||||
content: messageContent,
|
||||
timestamp: new Date()
|
||||
});
|
||||
}
|
||||
|
||||
// Send message to all clients
|
||||
wsService.default.sendMessageToAllClients(message);
|
||||
|
||||
// Log what was sent (first message and completion)
|
||||
if (message.thinking || done) {
|
||||
log.info(
|
||||
`[WS-SERVER] Sending LLM stream message: sessionId=${sessionId}, content=${!!message.content}, contentLength=${message.content?.length || 0}, thinking=${!!message.thinking}, toolExecution=${!!message.toolExecution}, done=${done}`
|
||||
);
|
||||
}
|
||||
|
||||
// For GET requests, also send as server-sent events
|
||||
// Prepare response data for JSON event
|
||||
const responseData: any = {
|
||||
content: data,
|
||||
done
|
||||
};
|
||||
|
||||
// Add tool execution if available
|
||||
if (rawChunk?.toolExecution) {
|
||||
responseData.toolExecution = rawChunk.toolExecution;
|
||||
}
|
||||
|
||||
// Send the data as a JSON event
|
||||
res.write(`data: ${JSON.stringify(responseData)}\n\n`);
|
||||
|
||||
if (done) {
|
||||
res.end();
|
||||
}
|
||||
} catch (error) {
|
||||
log.error(`Error in stream callback: ${error}`);
|
||||
|
||||
// Try to send error message
|
||||
try {
|
||||
wsService.default.sendMessageToAllClients({
|
||||
type: 'llm-stream',
|
||||
sessionId,
|
||||
error: `Stream error: ${error instanceof Error ? error.message : 'Unknown error'}`,
|
||||
done: true
|
||||
});
|
||||
} catch (e) {
|
||||
log.error(`Failed to send error message: ${e}`);
|
||||
}
|
||||
|
||||
// End the response if not already done
|
||||
try {
|
||||
if (!streamFinished) {
|
||||
res.write(`data: ${JSON.stringify({ error: 'Stream error', done: true })}\n\n`);
|
||||
res.end();
|
||||
}
|
||||
} catch (e) {
|
||||
log.error(`Failed to end response: ${e}`);
|
||||
}
|
||||
}
|
||||
} : undefined
|
||||
};
|
||||
@ -613,7 +741,7 @@ class RestChatService {
|
||||
|
||||
// Make a follow-up request with the tool results
|
||||
log.info(`Making follow-up request with ${toolResults.length} tool results`);
|
||||
const followUpOptions = {...chatOptions, enableTools: iterationCount < MAX_ITERATIONS}; // Enable tools for follow-up but limit iterations
|
||||
const followUpOptions = { ...chatOptions, enableTools: iterationCount < MAX_ITERATIONS }; // Enable tools for follow-up but limit iterations
|
||||
const followUpResponse = await service.generateChatCompletion(currentMessages, followUpOptions);
|
||||
|
||||
// Check if the follow-up response has more tool calls
|
||||
@ -740,7 +868,10 @@ class RestChatService {
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle streaming response from LLM
|
||||
* Handle streaming response via WebSocket
|
||||
*
|
||||
* This method processes LLM responses and sends them incrementally via WebSocket
|
||||
* to the client, supporting both text content and tool execution status updates.
|
||||
*/
|
||||
private async handleStreamingResponse(
|
||||
res: Response,
|
||||
@ -749,133 +880,211 @@ class RestChatService {
|
||||
service: any,
|
||||
session: ChatSession
|
||||
) {
|
||||
// Set streaming headers once
|
||||
res.setHeader('Content-Type', 'text/event-stream');
|
||||
res.setHeader('Cache-Control', 'no-cache');
|
||||
res.setHeader('Connection', 'keep-alive');
|
||||
// The client receives a success response for their HTTP request,
|
||||
// but the actual content will be streamed via WebSocket
|
||||
res.json({ success: true, message: 'Streaming response started' });
|
||||
|
||||
// Flag to indicate we've handled the response directly
|
||||
// This lets the route handler know not to process the result
|
||||
(res as any).triliumResponseHandled = true;
|
||||
// Import the WebSocket service
|
||||
const wsService = (await import('../../services/ws.js')).default;
|
||||
|
||||
let messageContent = '';
|
||||
const sessionId = session.id;
|
||||
|
||||
// Immediately send an initial message to confirm WebSocket connection is working
|
||||
// This helps prevent timeouts on the client side
|
||||
wsService.sendMessageToAllClients({
|
||||
type: 'llm-stream',
|
||||
sessionId,
|
||||
thinking: 'Preparing response...'
|
||||
} as LLMStreamMessage);
|
||||
|
||||
try {
|
||||
// Use the correct method name: generateChatCompletion
|
||||
const response = await service.generateChatCompletion(aiMessages, chatOptions);
|
||||
// Generate the LLM completion with streaming enabled
|
||||
const response = await service.generateChatCompletion(aiMessages, {
|
||||
...chatOptions,
|
||||
stream: true
|
||||
});
|
||||
|
||||
// Check for tool calls in the response
|
||||
// If the model doesn't support streaming via .stream() method or returns tool calls,
|
||||
// we'll handle it specially
|
||||
if (response.tool_calls && response.tool_calls.length > 0) {
|
||||
log.info(`========== STREAMING TOOL CALLS DETECTED ==========`);
|
||||
log.info(`Response contains ${response.tool_calls.length} tool calls, executing them...`);
|
||||
log.info(`CRITICAL CHECK: Tool execution is supposed to happen in the pipeline, not directly here.`);
|
||||
log.info(`If tools are being executed here instead of in the pipeline, this may be a flow issue.`);
|
||||
log.info(`Response came from provider: ${response.provider || 'unknown'}, model: ${response.model || 'unknown'}`);
|
||||
// Send thinking state notification via WebSocket
|
||||
wsService.sendMessageToAllClients({
|
||||
type: 'llm-stream',
|
||||
sessionId,
|
||||
thinking: 'Analyzing tools needed for this request...'
|
||||
} as LLMStreamMessage);
|
||||
|
||||
try {
|
||||
log.info(`========== STREAMING TOOL EXECUTION PATH ==========`);
|
||||
log.info(`About to execute tools in streaming path (this is separate from pipeline tool execution)`);
|
||||
|
||||
// Execute the tools
|
||||
const toolResults = await this.executeToolCalls(response);
|
||||
log.info(`Successfully executed ${toolResults.length} tool calls in streaming path`);
|
||||
|
||||
// Make a follow-up request with the tool results
|
||||
// For each tool execution, send progress update via WebSocket
|
||||
for (const toolResult of toolResults) {
|
||||
wsService.sendMessageToAllClients({
|
||||
type: 'llm-stream',
|
||||
sessionId,
|
||||
toolExecution: {
|
||||
action: 'complete',
|
||||
tool: toolResult.name || 'unknown',
|
||||
result: toolResult.content.substring(0, 100) + (toolResult.content.length > 100 ? '...' : '')
|
||||
}
|
||||
} as LLMStreamMessage);
|
||||
}
|
||||
|
||||
// Make follow-up request with tool results
|
||||
const toolMessages = [...aiMessages, {
|
||||
role: 'assistant',
|
||||
content: response.text || '',
|
||||
tool_calls: response.tool_calls
|
||||
}, ...toolResults];
|
||||
|
||||
log.info(`Making follow-up request with ${toolResults.length} tool results`);
|
||||
|
||||
// Send partial response to let the client know tools are being processed
|
||||
if (!res.writableEnded) {
|
||||
res.write(`data: ${JSON.stringify({ content: "Processing tools... " })}\n\n`);
|
||||
}
|
||||
|
||||
// Use non-streaming for the follow-up to get a complete response
|
||||
const followUpOptions = {...chatOptions, stream: false, enableTools: false}; // Prevent infinite loops
|
||||
const followUpOptions = { ...chatOptions, stream: false, enableTools: false };
|
||||
const followUpResponse = await service.generateChatCompletion(toolMessages, followUpOptions);
|
||||
|
||||
messageContent = followUpResponse.text || "";
|
||||
|
||||
// Send the complete response as a single chunk
|
||||
if (!res.writableEnded) {
|
||||
res.write(`data: ${JSON.stringify({ content: messageContent })}\n\n`);
|
||||
res.write('data: [DONE]\n\n');
|
||||
res.end();
|
||||
}
|
||||
// Send the complete response with done flag in the same message
|
||||
wsService.sendMessageToAllClients({
|
||||
type: 'llm-stream',
|
||||
sessionId,
|
||||
content: messageContent,
|
||||
done: true
|
||||
} as LLMStreamMessage);
|
||||
|
||||
// Store the full response for the session
|
||||
// Store the response in the session
|
||||
session.messages.push({
|
||||
role: 'assistant',
|
||||
content: messageContent,
|
||||
timestamp: new Date()
|
||||
});
|
||||
|
||||
return; // Skip the rest of the processing
|
||||
return;
|
||||
} catch (toolError) {
|
||||
log.error(`Error executing tools: ${toolError}`);
|
||||
// Continue with normal streaming response as fallback
|
||||
|
||||
// Send error via WebSocket with done flag
|
||||
wsService.sendMessageToAllClients({
|
||||
type: 'llm-stream',
|
||||
sessionId,
|
||||
error: `Error executing tools: ${toolError instanceof Error ? toolError.message : 'Unknown error'}`,
|
||||
done: true
|
||||
} as LLMStreamMessage);
|
||||
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Handle streaming if the response includes a stream method
|
||||
// Handle standard streaming through the stream() method
|
||||
if (response.stream) {
|
||||
await response.stream((chunk: { text: string; done: boolean }) => {
|
||||
if (chunk.text) {
|
||||
messageContent += chunk.text;
|
||||
// Only write if the response hasn't finished
|
||||
if (!res.writableEnded) {
|
||||
res.write(`data: ${JSON.stringify({ content: chunk.text })}\n\n`);
|
||||
}
|
||||
}
|
||||
log.info(`Provider ${service.getName()} supports streaming via stream() method`);
|
||||
|
||||
if (chunk.done) {
|
||||
// Signal the end of the stream when done, only if not already ended
|
||||
if (!res.writableEnded) {
|
||||
res.write('data: [DONE]\n\n');
|
||||
res.end();
|
||||
try {
|
||||
await response.stream(async (chunk: StreamChunk) => {
|
||||
if (chunk.text) {
|
||||
messageContent += chunk.text;
|
||||
|
||||
// Send the chunk content via WebSocket
|
||||
wsService.sendMessageToAllClients({
|
||||
type: 'llm-stream',
|
||||
sessionId,
|
||||
content: chunk.text,
|
||||
// Include any raw data from the provider that might contain thinking/tool info
|
||||
...(chunk.raw ? { raw: chunk.raw } : {})
|
||||
} as LLMStreamMessage);
|
||||
|
||||
// Log the first chunk (useful for debugging)
|
||||
if (messageContent.length === chunk.text.length) {
|
||||
log.info(`First stream chunk received from ${service.getName()}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
} else {
|
||||
// If no streaming available, send the response as a single chunk
|
||||
messageContent = response.text;
|
||||
// Only write if the response hasn't finished
|
||||
if (!res.writableEnded) {
|
||||
res.write(`data: ${JSON.stringify({ content: messageContent })}\n\n`);
|
||||
res.write('data: [DONE]\n\n');
|
||||
res.end();
|
||||
|
||||
// If the provider indicates this is "thinking" state, relay that
|
||||
if (chunk.raw?.thinking) {
|
||||
wsService.sendMessageToAllClients({
|
||||
type: 'llm-stream',
|
||||
sessionId,
|
||||
thinking: chunk.raw.thinking
|
||||
} as LLMStreamMessage);
|
||||
}
|
||||
|
||||
// If the provider indicates tool execution, relay that
|
||||
if (chunk.raw?.toolExecution) {
|
||||
wsService.sendMessageToAllClients({
|
||||
type: 'llm-stream',
|
||||
sessionId,
|
||||
toolExecution: chunk.raw.toolExecution
|
||||
} as LLMStreamMessage);
|
||||
}
|
||||
|
||||
// Signal completion when done
|
||||
if (chunk.done) {
|
||||
log.info(`Stream completed from ${service.getName()}`);
|
||||
|
||||
// Send the final message with both content and done flag together
|
||||
wsService.sendMessageToAllClients({
|
||||
type: 'llm-stream',
|
||||
sessionId,
|
||||
content: messageContent, // Send the accumulated content
|
||||
done: true
|
||||
} as LLMStreamMessage);
|
||||
}
|
||||
});
|
||||
|
||||
log.info(`Streaming from ${service.getName()} completed successfully`);
|
||||
} catch (streamError) {
|
||||
log.error(`Error during streaming from ${service.getName()}: ${streamError}`);
|
||||
|
||||
// Report the error to the client
|
||||
wsService.sendMessageToAllClients({
|
||||
type: 'llm-stream',
|
||||
sessionId,
|
||||
error: `Error during streaming: ${streamError instanceof Error ? streamError.message : 'Unknown error'}`,
|
||||
done: true
|
||||
} as LLMStreamMessage);
|
||||
|
||||
throw streamError;
|
||||
}
|
||||
} else {
|
||||
log.info(`Provider ${service.getName()} does not support streaming via stream() method, falling back to single response`);
|
||||
|
||||
// If streaming isn't available, send the entire response at once
|
||||
messageContent = response.text || '';
|
||||
|
||||
// Send via WebSocket - include both content and done flag in same message
|
||||
wsService.sendMessageToAllClients({
|
||||
type: 'llm-stream',
|
||||
sessionId,
|
||||
content: messageContent,
|
||||
done: true
|
||||
} as LLMStreamMessage);
|
||||
|
||||
log.info(`Complete response sent for ${service.getName()}`);
|
||||
}
|
||||
|
||||
// Store the full response for the session
|
||||
const aiResponse = messageContent;
|
||||
|
||||
// Store the assistant's response in the session
|
||||
// Store the full response in the session
|
||||
session.messages.push({
|
||||
role: 'assistant',
|
||||
content: aiResponse,
|
||||
content: messageContent,
|
||||
timestamp: new Date()
|
||||
});
|
||||
} catch (streamingError: any) {
|
||||
// If streaming fails and we haven't sent a response yet, throw the error
|
||||
if (!res.headersSent) {
|
||||
throw streamingError;
|
||||
} else {
|
||||
// If headers were already sent, try to send an error event
|
||||
try {
|
||||
if (!res.writableEnded) {
|
||||
res.write(`data: ${JSON.stringify({ error: streamingError.message })}\n\n`);
|
||||
res.write('data: [DONE]\n\n');
|
||||
res.end();
|
||||
}
|
||||
} catch (e) {
|
||||
log.error(`Failed to write streaming error: ${e}`);
|
||||
}
|
||||
}
|
||||
log.error(`Streaming error: ${streamingError.message}`);
|
||||
|
||||
// Send error via WebSocket
|
||||
wsService.sendMessageToAllClients({
|
||||
type: 'llm-stream',
|
||||
sessionId,
|
||||
error: `Error generating response: ${streamingError instanceof Error ? streamingError.message : 'Unknown error'}`
|
||||
} as LLMStreamMessage);
|
||||
|
||||
// Signal completion
|
||||
wsService.sendMessageToAllClients({
|
||||
type: 'llm-stream',
|
||||
sessionId,
|
||||
done: true
|
||||
} as LLMStreamMessage);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -56,6 +56,21 @@ interface Message {
|
||||
originEntityId?: string | null;
|
||||
lastModifiedMs?: number;
|
||||
filePath?: string;
|
||||
|
||||
// LLM streaming specific fields
|
||||
sessionId?: string;
|
||||
content?: string;
|
||||
thinking?: string;
|
||||
toolExecution?: {
|
||||
action?: string;
|
||||
tool?: string;
|
||||
result?: string;
|
||||
error?: string;
|
||||
args?: Record<string, unknown>;
|
||||
};
|
||||
done?: boolean;
|
||||
error?: string;
|
||||
raw?: unknown;
|
||||
}
|
||||
|
||||
type SessionParser = (req: IncomingMessage, params: {}, cb: () => void) => void;
|
||||
@ -115,15 +130,25 @@ function sendMessageToAllClients(message: Message) {
|
||||
const jsonStr = JSON.stringify(message);
|
||||
|
||||
if (webSocketServer) {
|
||||
if (message.type !== "sync-failed" && message.type !== "api-log-messages") {
|
||||
// Special logging for LLM streaming messages
|
||||
if (message.type === "llm-stream") {
|
||||
log.info(`[WS-SERVER] Sending LLM stream message: sessionId=${message.sessionId}, content=${!!message.content}, thinking=${!!message.thinking}, toolExecution=${!!message.toolExecution}, done=${!!message.done}`);
|
||||
} else if (message.type !== "sync-failed" && message.type !== "api-log-messages") {
|
||||
log.info(`Sending message to all clients: ${jsonStr}`);
|
||||
}
|
||||
|
||||
let clientCount = 0;
|
||||
webSocketServer.clients.forEach(function each(client) {
|
||||
if (client.readyState === WebSocket.OPEN) {
|
||||
client.send(jsonStr);
|
||||
clientCount++;
|
||||
}
|
||||
});
|
||||
|
||||
// Log WebSocket client count for debugging
|
||||
if (message.type === "llm-stream") {
|
||||
log.info(`[WS-SERVER] Sent LLM stream message to ${clientCount} clients`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user