diff --git a/src/services/llm/embeddings/queue.ts b/src/services/llm/embeddings/queue.ts index 5ee386905..87541051b 100644 --- a/src/services/llm/embeddings/queue.ts +++ b/src/services/llm/embeddings/queue.ts @@ -20,40 +20,52 @@ export async function queueNoteForEmbedding(noteId: string, operation = 'UPDATE' const now = dateUtils.localNowDateTime(); const utcNow = dateUtils.utcNowDateTime(); - // Check if note is already in queue and whether it's marked as permanently failed - const queueInfo = await sql.getRow( - "SELECT 1 as exists_flag, failed, isProcessing FROM embedding_queue WHERE noteId = ?", - [noteId] - ) as {exists_flag: number, failed: number, isProcessing: number} | null; + try { + // Check if note is already in queue and whether it's marked as permanently failed + const queueInfo = await sql.getRow( + "SELECT 1 as exists_flag, failed, isProcessing FROM embedding_queue WHERE noteId = ?", + [noteId] + ) as {exists_flag: number, failed: number, isProcessing: number} | null; - if (queueInfo) { - // If the note is currently being processed, don't change its status - if (queueInfo.isProcessing === 1) { - log.info(`Note ${noteId} is currently being processed, skipping queue update`); + if (queueInfo) { + // If the note is currently being processed, don't change its status + if (queueInfo.isProcessing === 1) { + log.info(`Note ${noteId} is currently being processed, skipping queue update`); + return; + } + + // Only update if not permanently failed + if (queueInfo.failed !== 1) { + // Update existing queue entry but preserve the failed status + await sql.execute(` + UPDATE embedding_queue + SET operation = ?, dateQueued = ?, utcDateQueued = ?, attempts = 0, error = NULL + WHERE noteId = ?`, + [operation, now, utcNow, noteId] + ); + } else { + // Note is marked as permanently failed, don't update + log.info(`Note ${noteId} is marked as permanently failed, skipping automatic re-queue`); + } + } else { + // Add new queue entry + await sql.execute(` + INSERT INTO embedding_queue + (noteId, operation, dateQueued, utcDateQueued, failed, isProcessing) + VALUES (?, ?, ?, ?, 0, 0)`, + [noteId, operation, now, utcNow] + ); + } + } catch (error: any) { + // If there's a race condition where multiple events try to queue the same note simultaneously, + // one of them will succeed and others will fail with UNIQUE constraint violation. + // We can safely ignore this specific error since the note is already queued. + if (error.code === 'SQLITE_CONSTRAINT_PRIMARYKEY' && error.message.includes('UNIQUE constraint failed: embedding_queue.noteId')) { + log.info(`Note ${noteId} was already queued by another process, ignoring duplicate queue request`); return; } - - // Only update if not permanently failed - if (queueInfo.failed !== 1) { - // Update existing queue entry but preserve the failed status - await sql.execute(` - UPDATE embedding_queue - SET operation = ?, dateQueued = ?, utcDateQueued = ?, attempts = 0, error = NULL - WHERE noteId = ?`, - [operation, now, utcNow, noteId] - ); - } else { - // Note is marked as permanently failed, don't update - log.info(`Note ${noteId} is marked as permanently failed, skipping automatic re-queue`); - } - } else { - // Add new queue entry - await sql.execute(` - INSERT INTO embedding_queue - (noteId, operation, dateQueued, utcDateQueued, failed, isProcessing) - VALUES (?, ?, ?, ?, 0, 0)`, - [noteId, operation, now, utcNow] - ); + // Rethrow any other errors + throw error; } }