streaming works for ollama :)

This commit is contained in:
perf3ct 2025-04-11 20:26:47 +00:00
parent 451e5ea31f
commit 681e8bb1ce
No known key found for this signature in database
GPG Key ID: 569C4EEC436F5232
2 changed files with 26 additions and 33 deletions

View File

@ -127,7 +127,7 @@ export class ChatPipeline {
// Determine if we should use tools or semantic context
const useTools = modelSelection.options.enableTools === true;
const useEnhancedContext = input.options?.useAdvancedContext === true;
// Log details about the advanced context parameter
log.info(`Enhanced context option check: input.options=${JSON.stringify(input.options || {})}`);
log.info(`Enhanced context decision: useEnhancedContext=${useEnhancedContext}, hasQuery=${!!input.query}`);
@ -236,17 +236,17 @@ export class ChatPipeline {
const streamFormatRequested = input.format === 'stream';
const streamRequestedInOptions = modelSelection.options.stream === true;
const streamCallbackAvailable = typeof streamCallback === 'function';
log.info(`[ChatPipeline] Request type info - Format: ${input.format || 'not specified'}, Options from pipelineInput: ${JSON.stringify({stream: input.options?.stream})}`);
log.info(`[ChatPipeline] Stream settings - config.enableStreaming: ${streamEnabledInConfig}, format parameter: ${input.format}, modelSelection.options.stream: ${modelSelection.options.stream}, streamCallback available: ${streamCallbackAvailable}`);
// IMPORTANT: Respect the existing stream option but with special handling for callbacks:
// 1. If a stream callback is available, streaming MUST be enabled for it to work
// 2. Otherwise, preserve the original stream setting from input options
// First, determine what the stream value should be based on various factors:
let shouldEnableStream = modelSelection.options.stream;
if (streamCallbackAvailable) {
// If we have a stream callback, we NEED to enable streaming
// This is critical for GET requests with EventSource
@ -265,12 +265,12 @@ export class ChatPipeline {
log.info(`[ChatPipeline] No explicit stream settings, using config default: ${streamEnabledInConfig}`);
shouldEnableStream = streamEnabledInConfig;
}
// Set the final stream option
modelSelection.options.stream = shouldEnableStream;
log.info(`[ChatPipeline] Final streaming decision: stream=${shouldEnableStream}, will stream to client=${streamCallbackAvailable && shouldEnableStream}`);
// STAGE 5 & 6: Handle LLM completion and tool execution loop
log.info(`========== STAGE 5: LLM COMPLETION ==========`);
@ -282,7 +282,7 @@ export class ChatPipeline {
this.updateStageMetrics('llmCompletion', llmStartTime);
log.info(`Received LLM response from model: ${completion.response.model}, provider: ${completion.response.provider}`);
// Handle streaming if enabled and available
// Handle streaming if enabled and available
// Use shouldEnableStream variable which contains our streaming decision
if (shouldEnableStream && completion.response.stream && streamCallback) {
// Setup stream handler that passes chunks through response processing
@ -344,7 +344,8 @@ export class ChatPipeline {
// If streaming was enabled, send an update to the user
if (isStreaming && streamCallback) {
streamingPaused = true;
await streamCallback('', true); // Signal pause in streaming
// IMPORTANT: Don't send done:true here, as it causes the client to stop processing messages
// Instead, send a marker message that indicates tools will be executed
await streamCallback('\n\n[Executing tools...]\n\n', false);
}
@ -566,8 +567,15 @@ export class ChatPipeline {
// If streaming was paused for tool execution, resume it now with the final response
if (isStreaming && streamCallback && streamingPaused) {
// First log for debugging
log.info(`Resuming streaming with final response: ${currentResponse.text.length} chars`);
// Resume streaming with the final response text
// This is where we send the definitive done:true signal with the complete content
await streamCallback(currentResponse.text, true);
// Log confirmation
log.info(`Sent final response with done=true signal`);
}
} else if (toolsEnabled) {
log.info(`========== NO TOOL CALLS DETECTED ==========`);

View File

@ -476,29 +476,14 @@ export class OllamaService extends BaseAIService {
// Call the callback with the current chunk content
if (opts.streamCallback) {
try {
// For the final chunk, make sure to send the complete text with done=true
if (chunk.done) {
log.info(`Sending final callback with done=true and complete content (${completeText.length} chars)`);
await opts.streamCallback(
completeText, // Send the full accumulated content for the final chunk
true,
{ ...chunk, message: { ...chunk.message, content: completeText } }
);
} else if (chunk.message?.content) {
// For content chunks, send them as they come
await opts.streamCallback(
chunk.message.content,
!!chunk.done,
chunk
);
} else if (chunk.message?.tool_calls && chunk.message.tool_calls.length > 0) {
// For tool call chunks, send an empty content string but include the tool calls
await opts.streamCallback(
'',
!!chunk.done,
chunk
);
}
// Don't send done:true when tool calls are present to avoid premature completion
const shouldMarkAsDone = !!chunk.done && !responseToolCalls.length;
await opts.streamCallback(
chunk.message?.content || '',
shouldMarkAsDone,
chunk
);
if (chunkCount === 1) {
log.info(`Successfully called streamCallback with first chunk`);