diff --git a/src/services/llm/embeddings/chunking.ts b/src/services/llm/embeddings/chunking.ts index 01773ae61..a0540f0dc 100644 --- a/src/services/llm/embeddings/chunking.ts +++ b/src/services/llm/embeddings/chunking.ts @@ -26,6 +26,15 @@ const ERROR_CATEGORIES = { } }; +// Maximum time (in milliseconds) allowed for the entire chunking process +const MAX_TOTAL_PROCESSING_TIME = 5 * 60 * 1000; // 5 minutes + +// Maximum number of retry attempts per chunk +const MAX_CHUNK_RETRY_ATTEMPTS = 2; + +// Maximum time per chunk processing (to prevent individual chunks from hanging) +const MAX_CHUNK_PROCESSING_TIME = 60 * 1000; // 1 minute + /** * Categorize an error as temporary or permanent based on its message * @param errorMessage - The error message to categorize @@ -52,6 +61,28 @@ function categorizeError(errorMessage: string): 'temporary' | 'permanent' | 'unk return 'unknown'; } +/** + * Process a chunk with a timeout to prevent hanging + * @param provider - The embedding provider + * @param chunk - The chunk to process + * @param timeoutMs - Timeout in milliseconds + * @returns The generated embedding + */ +async function processChunkWithTimeout(provider: any, chunk: any, timeoutMs: number): Promise { + // Create a promise that rejects after the timeout + const timeoutPromise = new Promise((_, reject) => { + setTimeout(() => { + reject(new Error(`Chunk processing timed out after ${timeoutMs}ms`)); + }, timeoutMs); + }); + + // Create the actual processing promise + const processingPromise = provider.generateEmbeddings(chunk.content); + + // Race the two promises - whichever completes/rejects first wins + return Promise.race([processingPromise, timeoutPromise]); +} + /** * Process a large note by breaking it into chunks and creating embeddings for each chunk * This provides more detailed and focused embeddings for different parts of large notes @@ -65,6 +96,9 @@ export async function processNoteWithChunking( provider: any, context: NoteEmbeddingContext ): Promise { + // Track the overall start time + const startTime = Date.now(); + try { // Get the context extractor dynamically to avoid circular dependencies const { ContextExtractor } = await import('../context/index.js'); @@ -127,17 +161,37 @@ export async function processNoteWithChunking( attempts: number }[] = []; - // Maximum number of retry attempts per chunk - const MAX_CHUNK_RETRY_ATTEMPTS = 2; - log.info(`Processing ${chunks.length} chunks for note ${noteId} (${note.title})`); // Process each chunk with a delay based on provider to avoid rate limits for (let i = 0; i < chunks.length; i++) { + // Check if we've exceeded the overall time limit + if (Date.now() - startTime > MAX_TOTAL_PROCESSING_TIME) { + log.info(`Exceeded maximum processing time (${MAX_TOTAL_PROCESSING_TIME}ms) for note ${noteId}, stopping after ${i} chunks`); + + // Mark remaining chunks as failed due to timeout + for (let j = i; j < chunks.length; j++) { + failedChunks++; + failedChunkDetails.push({ + index: j + 1, + error: "Processing timeout - exceeded total allowed time", + category: 'temporary', + attempts: 1 + }); + } + + // Break the loop, we'll handle this as partial success if some chunks succeeded + break; + } + const chunk = chunks[i]; try { - // Generate embedding for this chunk's content - const embedding = await provider.generateEmbeddings(chunk.content); + // Generate embedding for this chunk's content with a timeout + const embedding = await processChunkWithTimeout( + provider, + chunk, + MAX_CHUNK_PROCESSING_TIME + ); // Store with chunk information in a unique ID format const chunkIdSuffix = `${i + 1}_of_${chunks.length}`; @@ -183,11 +237,21 @@ export async function processNoteWithChunking( } } + // Set a time limit for the retry phase + const retryStartTime = Date.now(); + const MAX_RETRY_TIME = 2 * 60 * 1000; // 2 minutes for all retries + // Retry failed chunks with exponential backoff, but only those that aren't permanent errors if (retryQueue.length > 0 && retryQueue.length < chunks.length) { log.info(`Retrying ${retryQueue.length} failed chunks for note ${noteId}`); for (let j = 0; j < retryQueue.length; j++) { + // Check if we've exceeded the retry time limit + if (Date.now() - retryStartTime > MAX_RETRY_TIME) { + log.info(`Exceeded maximum retry time (${MAX_RETRY_TIME}ms) for note ${noteId}, stopping after ${j} retries`); + break; + } + const item = retryQueue[j]; // Skip if we've already reached the max retry attempts for this chunk @@ -200,8 +264,12 @@ export async function processNoteWithChunking( // Wait longer for retries with exponential backoff await new Promise(resolve => setTimeout(resolve, 1000 * Math.pow(1.5, j))); - // Retry the embedding - const embedding = await provider.generateEmbeddings(item.chunk.content); + // Retry the embedding with timeout + const embedding = await processChunkWithTimeout( + provider, + item.chunk, + MAX_CHUNK_PROCESSING_TIME + ); // Store with unique ID that indicates it was a retry const chunkIdSuffix = `${item.index + 1}_of_${chunks.length}`; @@ -237,8 +305,15 @@ export async function processNoteWithChunking( log.error(`Retry failed for chunk ${item.index + 1} of note ${noteId} (${errorCategory} error): ${errorMessage}`); + // For timeout errors, mark as permanent to avoid further retries + if (errorMessage.includes('timed out')) { + if (detailIndex >= 0) { + failedChunkDetails[detailIndex].category = 'permanent'; + } + log.info(`Chunk ${item.index + 1} for note ${noteId} timed out, marking as permanent failure`); + } // Add to retry queue again only if it's not a permanent error and hasn't reached the max attempts - if (errorCategory !== 'permanent' && item.attempts + 1 < MAX_CHUNK_RETRY_ATTEMPTS) { + else if (errorCategory !== 'permanent' && item.attempts + 1 < MAX_CHUNK_RETRY_ATTEMPTS) { // If we're still below MAX_CHUNK_RETRY_ATTEMPTS, we'll try again in the next cycle item.attempts++; } else if (errorCategory === 'permanent') { @@ -265,17 +340,26 @@ export async function processNoteWithChunking( `Permanent: ${permanentErrors}, Temporary: ${temporaryErrors}, Unknown: ${unknownErrors}`); } - // If no chunks were successfully processed, throw an error - // This will keep the note in the queue for another attempt - if (successfulChunks === 0 && failedChunks > 0) { + // Calculate the failure ratio + const failureRatio = failedChunks / totalChunks; + + // If no chunks were successfully processed, or if more than 50% failed, mark the entire note as failed + if (successfulChunks === 0 || failureRatio > 0.5) { // Check if all failures are permanent const allPermanent = failedChunkDetails.every(d => d.category === 'permanent'); + const errorType = allPermanent ? 'permanent' : (failureRatio > 0.5 ? 'too_many_failures' : 'all_failed'); - if (allPermanent) { - throw new Error(`All ${failedChunks} chunks failed with permanent errors for note ${noteId}. First error: ${failedChunkDetails[0]?.error}`); - } else { - throw new Error(`All ${failedChunks} chunks failed for note ${noteId}. First error: ${failedChunkDetails[0]?.error}`); - } + // Mark this note as failed in the embedding_queue table with a permanent error status + const now = dateUtils.utcNowDateTime(); + const errorSummary = `Note embedding failed: ${failedChunks}/${totalChunks} chunks failed (${errorType}). First error: ${failedChunkDetails[0]?.error}`; + + await sql.execute(` + UPDATE embedding_queue + SET error = ?, lastAttempt = ?, attempts = 999 + WHERE noteId = ? + `, [errorSummary, now, noteId]); + + throw new Error(errorSummary); } // If some chunks failed but others succeeded, log a warning but consider the processing complete @@ -300,6 +384,10 @@ export async function processNoteWithChunking( `, [errorSummary, now, noteId]); } + // Track total processing time + const totalTime = Date.now() - startTime; + log.info(`Total processing time for note ${noteId}: ${totalTime}ms`); + } catch (error: any) { log.error(`Error in chunked embedding process for note ${noteId}: ${error.message || 'Unknown error'}`); throw error; diff --git a/src/services/llm/embeddings/queue.ts b/src/services/llm/embeddings/queue.ts index 5218f06dd..ff7ffcfff 100644 --- a/src/services/llm/embeddings/queue.ts +++ b/src/services/llm/embeddings/queue.ts @@ -289,6 +289,13 @@ export async function processEmbeddingQueue() { // This allows manual retries later if (noteData.attempts + 1 >= 3) { log.error(`Marked note ${noteData.noteId} as permanently failed after multiple embedding attempts`); + + // Update the attempts to a very high number to indicate permanent failure + await sql.execute(` + UPDATE embedding_queue + SET attempts = 999 + WHERE noteId = ? + `, [noteData.noteId]); } } }