From 1f661e4c90fe04ab870caef48b63c700cbf7f197 Mon Sep 17 00:00:00 2001 From: perf3ct Date: Tue, 11 Mar 2025 20:38:40 +0000 Subject: [PATCH] make sure to not retry chunks if they fail or something else --- src/services/llm/embeddings/vector_store.ts | 246 ++++---------------- 1 file changed, 45 insertions(+), 201 deletions(-) diff --git a/src/services/llm/embeddings/vector_store.ts b/src/services/llm/embeddings/vector_store.ts index 168066ca9..433e597f4 100644 --- a/src/services/llm/embeddings/vector_store.ts +++ b/src/services/llm/embeddings/vector_store.ts @@ -588,10 +588,13 @@ export async function getFailedEmbeddingNotes(limit: number = 100): Promise; - const chunkCount = Object.keys(failedChunks).length; - if (chunkCount === 0) continue; - - // Get the most recent failed chunk - let latestAttempt = ''; - let totalAttempts = 0; - let errorExample = ''; - - for (const chunkId in failedChunks) { - const chunk = failedChunks[chunkId]; - totalAttempts += chunk.attempts; - - if (!latestAttempt || chunk.lastAttempt > latestAttempt) { - latestAttempt = chunk.lastAttempt; - errorExample = chunk.error; - } - } - - // Add this to our list of failed notes - failedNotesWithTitles.push({ - noteId, - title: note.title, - failureType: 'chunks', - chunks: chunkCount, - attempts: totalAttempts, - lastAttempt: latestAttempt, - error: `${chunkCount} chunks failed: ${errorExample}` - }); - } catch (error) { - console.error("Error processing note with failed chunks:", error); - } - } - // Sort by latest attempt failedNotesWithTitles.sort((a, b) => { if (a.lastAttempt && b.lastAttempt) { @@ -670,9 +623,7 @@ export async function getFailedEmbeddingNotes(limit: number = 100): Promise { - let success = false; - - // First, check if the note is in the embedding queue with failed attempts + // Check if the note is in the embedding queue with failed attempts const exists = await sql.getValue( "SELECT 1 FROM embedding_queue WHERE noteId = ? AND attempts > 0", [noteId] @@ -689,29 +640,10 @@ export async function retryFailedEmbedding(noteId: string): Promise { WHERE noteId = ?`, [now, utcNow, noteId] ); - success = true; + return true; } - // Next, check for failed chunks in labels - const note = becca.getNote(noteId); - if (note) { - // Look for any provider-specific failed chunks - const labels = note.getLabels(); - const failedChunksLabels = labels.filter(label => label.name.endsWith('FailedChunks')); - - for (const label of failedChunksLabels) { - // Remove the label - this will cause all chunks to be retried - await note.removeLabel(label.name); - success = true; - } - - // If we had chunk failures but no queue entry, we need to add one - if (failedChunksLabels.length > 0 && !exists) { - await queueNoteForEmbedding(noteId, 'UPDATE'); - } - } - - return success; + return false; } /** @@ -720,8 +652,6 @@ export async function retryFailedEmbedding(noteId: string): Promise { * @returns Number of notes queued for retry */ export async function retryAllFailedEmbeddings(): Promise { - let totalRetried = 0; - // Get count of failed notes in queue const failedCount = await sql.getValue( "SELECT COUNT(*) FROM embedding_queue WHERE attempts > 0" @@ -738,39 +668,9 @@ export async function retryAllFailedEmbeddings(): Promise { WHERE attempts > 0`, [now, utcNow] ); - - totalRetried += failedCount; } - // Now find notes with failed chunks - const notesWithFailedChunks = await sql.getRows(` - SELECT DISTINCT noteId - FROM attributes - WHERE type = 'label' AND name LIKE '%FailedChunks' - `) as {noteId: string}[]; - - // Process each note with failed chunks - for (const item of notesWithFailedChunks) { - const noteId = item.noteId; - const note = becca.getNote(noteId); - - if (note) { - // Get all failed chunks labels - const labels = note.getLabels(); - const failedChunksLabels = labels.filter(label => label.name.endsWith('FailedChunks')); - - for (const label of failedChunksLabels) { - // Remove the label - this will cause all chunks to be retried - await note.removeLabel(label.name); - } - - // Make sure the note is in the queue - await queueNoteForEmbedding(noteId, 'UPDATE'); - totalRetried++; - } - } - - return totalRetried; + return failedCount; } /** @@ -830,15 +730,17 @@ export async function processEmbeddingQueue() { // Check if we should use chunking for large content const useChunking = context.content.length > 5000; - // Track if all providers failed + // Track provider successes and failures let allProvidersFailed = true; + let allProvidersSucceeded = true; // Process with each enabled provider for (const provider of enabledProviders) { try { if (useChunking) { - // Enhanced approach: Process large notes using chunking + // Process large notes using chunking await processNoteWithChunking(noteData.noteId, provider, context); + allProvidersFailed = false; } else { // Standard approach: Generate a single embedding for the whole note const embedding = await provider.generateNoteEmbeddings(context); @@ -851,16 +753,19 @@ export async function processEmbeddingQueue() { config.model, embedding ); + + // At least one provider succeeded + allProvidersFailed = false; } - // At least one provider succeeded - allProvidersFailed = false; } catch (providerError: any) { + // This provider failed + allProvidersSucceeded = false; log.error(`Error generating embedding with provider ${provider.name} for note ${noteData.noteId}: ${providerError.message || 'Unknown error'}`); } } - // Only remove from queue on success if at least one provider succeeded if (!allProvidersFailed) { + // At least one provider succeeded, remove from queue await sql.execute( "DELETE FROM embedding_queue WHERE noteId = ?", [noteData.noteId] @@ -906,7 +811,7 @@ export async function processEmbeddingQueue() { } /** - * Set up event listeners for embedding-related events + * Setup event listeners for embedding-related events */ export function setupEmbeddingEventListeners() { // Listen for note content changes @@ -1083,27 +988,17 @@ async function processNoteWithChunking( // Delete existing embeddings first to avoid duplicates await deleteNoteEmbeddings(noteId, provider.name, config.model); - // Track successful and failed chunks + // Track successful and failed chunks in memory during this processing run let successfulChunks = 0; let failedChunks = 0; const totalChunks = chunks.length; - - // Get existing chunk failure data from the database - // We'll store this in a special attribute on the note to track per-chunk failures - const failedChunksData = await getFailedChunksData(noteId, provider.name); + const failedChunkDetails: {index: number, error: string}[] = []; // Process each chunk with a slight delay to avoid rate limits for (let i = 0; i < chunks.length; i++) { const chunk = chunks[i]; const chunkId = `chunk_${i + 1}_of_${chunks.length}`; - // Skip chunks that have failed multiple times - if (failedChunksData[chunkId] && failedChunksData[chunkId].attempts >= 3) { - log.info(`Skipping chunk ${chunkId} for note ${noteId} after ${failedChunksData[chunkId].attempts} failed attempts`); - failedChunks++; - continue; - } - try { // Create a modified context object with just this chunk's content const chunkContext: NoteEmbeddingContext = { @@ -1124,12 +1019,6 @@ async function processNoteWithChunking( successfulChunks++; - // Remove this chunk from failed chunks if it was previously failed - if (failedChunksData[chunkId]) { - delete failedChunksData[chunkId]; - await updateFailedChunksData(noteId, provider.name, failedChunksData); - } - // Small delay between chunks to avoid rate limits if (i < chunks.length - 1) { await new Promise(resolve => setTimeout(resolve, 100)); @@ -1137,21 +1026,10 @@ async function processNoteWithChunking( } catch (error: any) { // Track the failure for this specific chunk failedChunks++; - - if (!failedChunksData[chunkId]) { - failedChunksData[chunkId] = { - attempts: 1, - lastAttempt: dateUtils.utcNowDateTime(), - error: error.message || 'Unknown error' - }; - } else { - failedChunksData[chunkId].attempts++; - failedChunksData[chunkId].lastAttempt = dateUtils.utcNowDateTime(); - failedChunksData[chunkId].error = error.message || 'Unknown error'; - } - - // Update the failed chunks data in the database - await updateFailedChunksData(noteId, provider.name, failedChunksData); + failedChunkDetails.push({ + index: i + 1, + error: error.message || 'Unknown error' + }); log.error(`Error processing chunk ${chunkId} for note ${noteId}: ${error.message || 'Unknown error'}`); } @@ -1166,68 +1044,34 @@ async function processNoteWithChunking( log.info(`Failed to generate ${failedChunks} chunk embeddings for note ${noteId}`); } - // If all chunks failed, throw an error so the note will be marked as failed + // If no chunks were successfully processed, throw an error + // This will keep the note in the queue for another attempt if (successfulChunks === 0 && failedChunks > 0) { - throw new Error(`All ${failedChunks} chunks failed for note ${noteId}`); + throw new Error(`All ${failedChunks} chunks failed for note ${noteId}. First error: ${failedChunkDetails[0]?.error}`); } + + // If some chunks failed but others succeeded, log a warning but consider the processing complete + // The note will be removed from the queue, but we'll store error information + if (failedChunks > 0 && successfulChunks > 0) { + const errorSummary = `Note processed partially: ${successfulChunks}/${totalChunks} chunks succeeded, ${failedChunks}/${totalChunks} failed`; + log.info(errorSummary); + + // Store a summary in the error field of embedding_queue + // This is just for informational purposes - the note will be removed from the queue + const now = dateUtils.utcNowDateTime(); + await sql.execute(` + UPDATE embedding_queue + SET error = ?, lastAttempt = ? + WHERE noteId = ? + `, [errorSummary, now, noteId]); + } + } catch (error: any) { log.error(`Error in chunked embedding process for note ${noteId}: ${error.message || 'Unknown error'}`); throw error; } } -/** - * Store failed chunk data for a note - * This is stored in a special attribute on the note so we can track per-chunk failures - */ -async function getFailedChunksData(noteId: string, providerId: string): Promise> { - try { - const attributeName = `${providerId}FailedChunks`; - const note = becca.getNote(noteId); - - if (!note) { - return {}; - } - - const attr = note.getLabels().find(attr => attr.name === attributeName); - - if (!attr || !attr.value) { - return {}; - } - - return JSON.parse(attr.value); - } catch (e) { - return {}; - } -} - -/** - * Update failed chunk data for a note - */ -async function updateFailedChunksData(noteId: string, providerId: string, data: Record): Promise { - try { - const attributeName = `${providerId}FailedChunks`; - const note = becca.getNote(noteId); - - if (!note) { - return; - } - - // Only store if there are failed chunks - if (Object.keys(data).length > 0) { - await note.setLabel(attributeName, JSON.stringify(data)); - } else { - // If no failed chunks, remove the attribute if it exists - const attr = note.getLabels().find(attr => attr.name === attributeName); - if (attr) { - await note.removeLabel(attributeName); - } - } - } catch (e) { - log.error(`Error updating failed chunks data for note ${noteId}: ${e}`); - } -} - export function cleanupEmbeddings() { // Cleanup function implementation }