diff --git a/db/migrations/0230__vector_embeddings.sql b/db/migrations/0230__vector_embeddings.sql index 07c425fd2..e275fc8ff 100644 --- a/db/migrations/0230__vector_embeddings.sql +++ b/db/migrations/0230__vector_embeddings.sql @@ -28,7 +28,8 @@ CREATE TABLE IF NOT EXISTS "embedding_queue" ( "priority" INTEGER NOT NULL DEFAULT 0, "attempts" INTEGER NOT NULL DEFAULT 0, "lastAttempt" TEXT NULL, - "error" TEXT NULL + "error" TEXT NULL, + "failed" INTEGER NOT NULL DEFAULT 0 ); -- Table to store embedding provider configurations diff --git a/db/schema.sql b/db/schema.sql index a4f153042..8451326d9 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -158,7 +158,8 @@ CREATE TABLE IF NOT EXISTS "embedding_queue" ( "priority" INTEGER NOT NULL DEFAULT 0, "attempts" INTEGER NOT NULL DEFAULT 0, "lastAttempt" TEXT NULL, - "error" TEXT NULL + "error" TEXT NULL, + "failed" INTEGER NOT NULL DEFAULT 0 ); CREATE TABLE IF NOT EXISTS "embedding_providers" ( diff --git a/src/services/llm/embeddings/queue.ts b/src/services/llm/embeddings/queue.ts index 9ea9fc3a7..102b52ff8 100644 --- a/src/services/llm/embeddings/queue.ts +++ b/src/services/llm/embeddings/queue.ts @@ -17,26 +17,32 @@ export async function queueNoteForEmbedding(noteId: string, operation = 'UPDATE' const now = dateUtils.localNowDateTime(); const utcNow = dateUtils.utcNowDateTime(); - // Check if note is already in queue - const existing = await sql.getValue( - "SELECT 1 FROM embedding_queue WHERE noteId = ?", + // Check if note is already in queue and whether it's marked as permanently failed + const queueInfo = await sql.getRow( + "SELECT 1 as exists, failed FROM embedding_queue WHERE noteId = ?", [noteId] - ); + ) as {exists: number, failed: number} | null; - if (existing) { - // Update existing queue entry - await sql.execute(` - UPDATE embedding_queue - SET operation = ?, dateQueued = ?, utcDateQueued = ?, attempts = 0, error = NULL - WHERE noteId = ?`, - [operation, now, utcNow, noteId] - ); + if (queueInfo) { + // Only update if not permanently failed + if (queueInfo.failed !== 1) { + // Update existing queue entry but preserve the failed status + await sql.execute(` + UPDATE embedding_queue + SET operation = ?, dateQueued = ?, utcDateQueued = ?, attempts = 0, error = NULL + WHERE noteId = ?`, + [operation, now, utcNow, noteId] + ); + } else { + // Note is marked as permanently failed, don't update + log.info(`Note ${noteId} is marked as permanently failed, skipping automatic re-queue`); + } } else { // Add new queue entry await sql.execute(` INSERT INTO embedding_queue - (noteId, operation, dateQueued, utcDateQueued) - VALUES (?, ?, ?, ?)`, + (noteId, operation, dateQueued, utcDateQueued, failed) + VALUES (?, ?, ?, ?, 0)`, [noteId, operation, now, utcNow] ); } @@ -49,15 +55,15 @@ export async function queueNoteForEmbedding(noteId: string, operation = 'UPDATE' * @returns List of failed notes with their error information */ export async function getFailedEmbeddingNotes(limit: number = 100): Promise { - // Get notes with failed embedding attempts + // Get notes with failed embedding attempts or permanently failed flag const failedQueueItems = await sql.getRows(` - SELECT noteId, operation, attempts, lastAttempt, error + SELECT noteId, operation, attempts, lastAttempt, error, failed FROM embedding_queue - WHERE attempts > 0 - ORDER BY attempts DESC, lastAttempt DESC + WHERE attempts > 0 OR failed = 1 + ORDER BY failed DESC, attempts DESC, lastAttempt DESC LIMIT ?`, [limit] - ) as {noteId: string, operation: string, attempts: number, lastAttempt: string, error: string}[]; + ) as {noteId: string, operation: string, attempts: number, lastAttempt: string, error: string, failed: number}[]; // Add titles to the failed notes const failedNotesWithTitles = []; @@ -66,16 +72,19 @@ export async function getFailedEmbeddingNotes(limit: number = 100): Promise { - // Check if the note is in the embedding queue with failed attempts - const exists = await sql.getValue( - "SELECT 1 FROM embedding_queue WHERE noteId = ? AND attempts > 0", + const now = dateUtils.localNowDateTime(); + const utcNow = dateUtils.utcNowDateTime(); + + // Check if the note is in the embedding queue and has failed or has attempts + const existsInQueue = await sql.getValue( + "SELECT 1 FROM embedding_queue WHERE noteId = ? AND (failed = 1 OR attempts > 0)", [noteId] ); - if (exists) { + if (existsInQueue) { // Reset the note in the queue - const now = dateUtils.localNowDateTime(); - const utcNow = dateUtils.utcNowDateTime(); - await sql.execute(` UPDATE embedding_queue - SET attempts = 0, error = NULL, dateQueued = ?, utcDateQueued = ? + SET attempts = 0, error = NULL, failed = 0, dateQueued = ?, utcDateQueued = ?, priority = 10 WHERE noteId = ?`, [now, utcNow, noteId] ); @@ -128,20 +134,20 @@ export async function retryFailedEmbedding(noteId: string): Promise { * @returns Number of notes queued for retry */ export async function retryAllFailedEmbeddings(): Promise { - // Get count of failed notes in queue + const now = dateUtils.localNowDateTime(); + const utcNow = dateUtils.utcNowDateTime(); + + // Get count of all failed notes in queue (either with failed=1 or attempts>0) const failedCount = await sql.getValue( - "SELECT COUNT(*) FROM embedding_queue WHERE attempts > 0" + "SELECT COUNT(*) FROM embedding_queue WHERE failed = 1 OR attempts > 0" ) as number; if (failedCount > 0) { // Reset all failed notes in the queue - const now = dateUtils.localNowDateTime(); - const utcNow = dateUtils.utcNowDateTime(); - await sql.execute(` UPDATE embedding_queue - SET attempts = 0, error = NULL, dateQueued = ?, utcDateQueued = ? - WHERE attempts > 0`, + SET attempts = 0, error = NULL, failed = 0, dateQueued = ?, utcDateQueued = ?, priority = 10 + WHERE failed = 1 OR attempts > 0`, [now, utcNow] ); } @@ -174,10 +180,11 @@ export async function processEmbeddingQueue() { return; } - // Get notes from queue + // Get notes from queue (excluding failed ones) const notes = await sql.getRows(` SELECT noteId, operation, attempts FROM embedding_queue + WHERE failed = 0 ORDER BY priority DESC, utcDateQueued ASC LIMIT ?`, [batchSize] @@ -275,9 +282,16 @@ export async function processEmbeddingQueue() { [dateUtils.utcNowDateTime(), "All providers failed to generate embeddings", noteData.noteId] ); - // Remove from queue if too many attempts + // Mark as permanently failed if too many attempts if (noteData.attempts + 1 >= 3) { log.error(`Marked note ${noteData.noteId} as permanently failed after multiple embedding attempts`); + + // Set the failed flag and update the attempts + await sql.execute(` + UPDATE embedding_queue + SET attempts = 999, failed = 1 + WHERE noteId = ? + `, [noteData.noteId]); } } } catch (error: any) { @@ -295,15 +309,14 @@ export async function processEmbeddingQueue() { log.error(`Error processing embedding for note ${noteData.noteId}: ${error.message || 'Unknown error'}`); - // Don't remove from queue even after multiple failures, just mark as failed - // This allows manual retries later + // Mark as permanently failed if too many attempts if (noteData.attempts + 1 >= 3) { log.error(`Marked note ${noteData.noteId} as permanently failed after multiple embedding attempts`); - // Update the attempts to a very high number to indicate permanent failure + // Set the failed flag and update the attempts await sql.execute(` UPDATE embedding_queue - SET attempts = 999 + SET attempts = 999, failed = 1 WHERE noteId = ? `, [noteData.noteId]); }