diff --git a/apps/server/src/routes/api/llm.spec.ts b/apps/server/src/routes/api/llm.spec.ts index c19df825f..0fc9cd4bf 100644 --- a/apps/server/src/routes/api/llm.spec.ts +++ b/apps/server/src/routes/api/llm.spec.ts @@ -14,7 +14,9 @@ vi.mock("../csrf_protection.js", () => ({ // Mock WebSocket service vi.mock("../../services/ws.js", () => ({ default: { - sendMessageToAllClients: vi.fn() + sendMessageToAllClients: vi.fn(), + sendTransactionEntityChangesToAllClients: vi.fn(), + setLastSyncedPush: vi.fn() } })); @@ -65,7 +67,11 @@ vi.mock("../../services/llm/config/configuration_helpers.js", () => ({ // Mock options service vi.mock("../../services/options.js", () => ({ default: { - getOptionBool: vi.fn() + getOptionBool: vi.fn(() => false), + getOptionMap: vi.fn(() => new Map()), + createOption: vi.fn(), + getOption: vi.fn(() => '0'), + getOptionOrNull: vi.fn(() => null) } })); diff --git a/apps/server/src/services/llm/providers/integration/streaming.spec.ts b/apps/server/src/services/llm/providers/integration/streaming.spec.ts index 068f725bd..407e13075 100644 --- a/apps/server/src/services/llm/providers/integration/streaming.spec.ts +++ b/apps/server/src/services/llm/providers/integration/streaming.spec.ts @@ -79,7 +79,8 @@ describe('Provider Streaming Integration Tests', () => { it('should handle OpenAI tool calls', async () => { const openAIWithTools = [ { - choices: [{ delta: { content: 'Let me calculate that' } }] + choices: [{ delta: { content: 'Let me calculate that' } }], + model: 'gpt-3.5-turbo' }, { choices: [{ @@ -93,12 +94,18 @@ describe('Provider Streaming Integration Tests', () => { } }] } - }] + }], + model: 'gpt-3.5-turbo' }, { - choices: [{ delta: { content: 'The answer is 4' } }] + choices: [{ delta: { content: 'The answer is 4' } }], + model: 'gpt-3.5-turbo' }, - { done: true } + { + choices: [{ finish_reason: 'stop' }], + model: 'gpt-3.5-turbo', + done: true + } ]; const mockIterator = { @@ -314,7 +321,7 @@ describe('Provider Streaming Integration Tests', () => { expect(result.completeText).toBe('Based on my analysis, the answer is 42.'); // Verify thinking states were captured - const thinkingChunks = receivedChunks.filter(c => c.chunk?.thinking); + const thinkingChunks = receivedChunks.filter(c => c.chunk?.message?.thinking); expect(thinkingChunks.length).toBe(2); }); }); diff --git a/apps/server/src/services/llm/providers/stream_handler.ts b/apps/server/src/services/llm/providers/stream_handler.ts index d84404009..21dd7d079 100644 --- a/apps/server/src/services/llm/providers/stream_handler.ts +++ b/apps/server/src/services/llm/providers/stream_handler.ts @@ -24,6 +24,24 @@ export interface StreamProcessingOptions { modelName: string; } +/** + * Helper function to extract content from a chunk based on provider's response format + * Different providers may have different chunk structures + */ +function getChunkContentProperty(chunk: any): string | null { + // Check common content locations in different provider responses + if (chunk.message?.content && typeof chunk.message.content === 'string') { + return chunk.message.content; + } + if (chunk.content && typeof chunk.content === 'string') { + return chunk.content; + } + if (chunk.choices?.[0]?.delta?.content && typeof chunk.choices[0].delta.content === 'string') { + return chunk.choices[0].delta.content; + } + return null; +} + /** * Stream processor that handles common streaming operations */ @@ -42,23 +60,27 @@ export class StreamProcessor { // Enhanced logging for content chunks and completion status if (chunkCount === 1 || chunkCount % 10 === 0 || chunk.done) { - log.info(`Processing ${options.providerName} stream chunk #${chunkCount}, done=${!!chunk.done}, has content=${!!chunk.message?.content}, content length=${chunk.message?.content?.length || 0}`); + const contentProp = getChunkContentProperty(chunk); + log.info(`Processing ${options.providerName} stream chunk #${chunkCount}, done=${!!chunk.done}, has content=${!!contentProp}, content length=${contentProp?.length || 0}`); logged = true; } - // Extract content if available - if (chunk.message?.content) { - textToAdd = chunk.message.content; + // Extract content if available using the same logic as getChunkContentProperty + const contentProperty = getChunkContentProperty(chunk); + if (contentProperty) { + textToAdd = contentProperty; const newCompleteText = completeText + textToAdd; if (chunkCount === 1) { // Log the first chunk more verbosely for debugging - log.info(`First content chunk [${chunk.message.content.length} chars]: "${textToAdd.substring(0, 100)}${textToAdd.length > 100 ? '...' : ''}"`); + const textStr = String(textToAdd); + const textPreview = textStr.substring(0, 100); + log.info(`First content chunk [${contentProperty.length} chars]: "${textPreview}${textStr.length > 100 ? '...' : ''}"`); } // For final chunks with done=true, log more information if (chunk.done) { - log.info(`Final content chunk received with done=true flag. Length: ${chunk.message.content.length}`); + log.info(`Final content chunk received with done=true flag. Length: ${contentProperty.length}`); } return { completeText: newCompleteText, logged }; @@ -103,7 +125,13 @@ export class StreamProcessor { log.info(`Successfully called streamCallback with done=true flag`); } } catch (callbackError) { - log.error(`Error in streamCallback: ${callbackError}`); + try { + log.error(`Error in streamCallback: ${callbackError}`); + } catch (loggingError) { + // If logging fails, there's not much we can do - just continue + // We don't want to break the stream processing because of logging issues + } + // Note: We don't re-throw callback errors to avoid breaking the stream } } @@ -128,7 +156,12 @@ export class StreamProcessor { log.info(`Final callback sent successfully with done=true flag`); } catch (finalCallbackError) { - log.error(`Error in final streamCallback: ${finalCallbackError}`); + try { + log.error(`Error in final streamCallback: ${finalCallbackError}`); + } catch (loggingError) { + // If logging fails, there's not much we can do - just continue + } + // Note: We don't re-throw final callback errors to avoid breaking the stream } } @@ -136,6 +169,7 @@ export class StreamProcessor { * Detect and extract tool calls from a response chunk */ static extractToolCalls(chunk: any): any[] { + // Check message.tool_calls first (common format) if (chunk.message?.tool_calls && Array.isArray(chunk.message.tool_calls) && chunk.message.tool_calls.length > 0) { @@ -144,6 +178,15 @@ export class StreamProcessor { return [...chunk.message.tool_calls]; } + // Check OpenAI format: choices[0].delta.tool_calls + if (chunk.choices?.[0]?.delta?.tool_calls && + Array.isArray(chunk.choices[0].delta.tool_calls) && + chunk.choices[0].delta.tool_calls.length > 0) { + + log.info(`Detected ${chunk.choices[0].delta.tool_calls.length} OpenAI tool calls in stream chunk`); + return [...chunk.choices[0].delta.tool_calls]; + } + return []; } @@ -274,6 +317,7 @@ export async function processProviderStream( let responseToolCalls: any[] = []; let finalChunk: any | null = null; let chunkCount = 0; + let streamComplete = false; // Track if done=true has been received try { log.info(`Starting ${options.providerName} stream processing with model ${options.modelName}`); @@ -286,9 +330,20 @@ export async function processProviderStream( // Process each chunk for await (const chunk of streamIterator) { + // Skip null/undefined chunks to handle malformed responses + if (chunk === null || chunk === undefined) { + chunkCount++; + continue; + } + chunkCount++; finalChunk = chunk; + // If we've already received done=true, ignore subsequent chunks but still count them + if (streamComplete) { + continue; + } + // Process chunk with StreamProcessor const result = await StreamProcessor.processChunk( chunk, @@ -309,7 +364,9 @@ export async function processProviderStream( if (streamCallback) { // For chunks with content, send the content directly const contentProperty = getChunkContentProperty(chunk); - if (contentProperty) { + const hasRealContent = contentProperty && contentProperty.trim().length > 0; + + if (hasRealContent) { await StreamProcessor.sendChunkToCallback( streamCallback, contentProperty, @@ -335,12 +392,24 @@ export async function processProviderStream( chunk, chunkCount ); + } else if (chunk.message?.thinking || chunk.thinking) { + // Send callback for thinking chunks (Anthropic format) + await StreamProcessor.sendChunkToCallback( + streamCallback, + '', + !!chunk.done, + chunk, + chunkCount + ); } } - // Log final chunk - if (chunk.done && !result.logged) { - log.info(`Reached final chunk (done=true) after ${chunkCount} chunks, total content length: ${completeText.length}`); + // Mark stream as complete if done=true is received + if (chunk.done) { + streamComplete = true; + if (!result.logged) { + log.info(`Reached final chunk (done=true) after ${chunkCount} chunks, total content length: ${completeText.length}`); + } } } @@ -359,30 +428,21 @@ export async function processProviderStream( chunkCount }; } catch (error) { - log.error(`Error in ${options.providerName} stream processing: ${error instanceof Error ? error.message : String(error)}`); - log.error(`Error details: ${error instanceof Error ? error.stack : 'No stack trace available'}`); + // Improved error handling to preserve original error even if logging fails + let logError = null; + try { + log.error(`Error in ${options.providerName} stream processing: ${error instanceof Error ? error.message : String(error)}`); + log.error(`Error details: ${error instanceof Error ? error.stack : 'No stack trace available'}`); + } catch (loggingError) { + // Store logging error but don't let it override the original error + logError = loggingError; + } + + // Always throw the original error, not the logging error throw error; } } -/** - * Helper function to extract content from a chunk based on provider's response format - * Different providers may have different chunk structures - */ -function getChunkContentProperty(chunk: any): string | null { - // Check common content locations in different provider responses - if (chunk.message?.content) { - return chunk.message.content; - } - if (chunk.content) { - return chunk.content; - } - if (chunk.choices?.[0]?.delta?.content) { - return chunk.choices[0].delta.content; - } - return null; -} - /** * Extract usage statistics from the final chunk based on provider format */