do a better job of handling failed note embeddings

This commit is contained in:
perf3ct 2025-03-12 21:04:06 +00:00
parent 39d265a9fa
commit c914aaa4a8
No known key found for this signature in database
GPG Key ID: 569C4EEC436F5232
3 changed files with 63 additions and 48 deletions

View File

@ -28,7 +28,8 @@ CREATE TABLE IF NOT EXISTS "embedding_queue" (
"priority" INTEGER NOT NULL DEFAULT 0, "priority" INTEGER NOT NULL DEFAULT 0,
"attempts" INTEGER NOT NULL DEFAULT 0, "attempts" INTEGER NOT NULL DEFAULT 0,
"lastAttempt" TEXT NULL, "lastAttempt" TEXT NULL,
"error" TEXT NULL "error" TEXT NULL,
"failed" INTEGER NOT NULL DEFAULT 0
); );
-- Table to store embedding provider configurations -- Table to store embedding provider configurations

View File

@ -158,7 +158,8 @@ CREATE TABLE IF NOT EXISTS "embedding_queue" (
"priority" INTEGER NOT NULL DEFAULT 0, "priority" INTEGER NOT NULL DEFAULT 0,
"attempts" INTEGER NOT NULL DEFAULT 0, "attempts" INTEGER NOT NULL DEFAULT 0,
"lastAttempt" TEXT NULL, "lastAttempt" TEXT NULL,
"error" TEXT NULL "error" TEXT NULL,
"failed" INTEGER NOT NULL DEFAULT 0
); );
CREATE TABLE IF NOT EXISTS "embedding_providers" ( CREATE TABLE IF NOT EXISTS "embedding_providers" (

View File

@ -17,26 +17,32 @@ export async function queueNoteForEmbedding(noteId: string, operation = 'UPDATE'
const now = dateUtils.localNowDateTime(); const now = dateUtils.localNowDateTime();
const utcNow = dateUtils.utcNowDateTime(); const utcNow = dateUtils.utcNowDateTime();
// Check if note is already in queue // Check if note is already in queue and whether it's marked as permanently failed
const existing = await sql.getValue( const queueInfo = await sql.getRow(
"SELECT 1 FROM embedding_queue WHERE noteId = ?", "SELECT 1 as exists, failed FROM embedding_queue WHERE noteId = ?",
[noteId] [noteId]
); ) as {exists: number, failed: number} | null;
if (existing) { if (queueInfo) {
// Update existing queue entry // Only update if not permanently failed
await sql.execute(` if (queueInfo.failed !== 1) {
UPDATE embedding_queue // Update existing queue entry but preserve the failed status
SET operation = ?, dateQueued = ?, utcDateQueued = ?, attempts = 0, error = NULL await sql.execute(`
WHERE noteId = ?`, UPDATE embedding_queue
[operation, now, utcNow, noteId] SET operation = ?, dateQueued = ?, utcDateQueued = ?, attempts = 0, error = NULL
); WHERE noteId = ?`,
[operation, now, utcNow, noteId]
);
} else {
// Note is marked as permanently failed, don't update
log.info(`Note ${noteId} is marked as permanently failed, skipping automatic re-queue`);
}
} else { } else {
// Add new queue entry // Add new queue entry
await sql.execute(` await sql.execute(`
INSERT INTO embedding_queue INSERT INTO embedding_queue
(noteId, operation, dateQueued, utcDateQueued) (noteId, operation, dateQueued, utcDateQueued, failed)
VALUES (?, ?, ?, ?)`, VALUES (?, ?, ?, ?, 0)`,
[noteId, operation, now, utcNow] [noteId, operation, now, utcNow]
); );
} }
@ -49,15 +55,15 @@ export async function queueNoteForEmbedding(noteId: string, operation = 'UPDATE'
* @returns List of failed notes with their error information * @returns List of failed notes with their error information
*/ */
export async function getFailedEmbeddingNotes(limit: number = 100): Promise<any[]> { export async function getFailedEmbeddingNotes(limit: number = 100): Promise<any[]> {
// Get notes with failed embedding attempts // Get notes with failed embedding attempts or permanently failed flag
const failedQueueItems = await sql.getRows(` const failedQueueItems = await sql.getRows(`
SELECT noteId, operation, attempts, lastAttempt, error SELECT noteId, operation, attempts, lastAttempt, error, failed
FROM embedding_queue FROM embedding_queue
WHERE attempts > 0 WHERE attempts > 0 OR failed = 1
ORDER BY attempts DESC, lastAttempt DESC ORDER BY failed DESC, attempts DESC, lastAttempt DESC
LIMIT ?`, LIMIT ?`,
[limit] [limit]
) as {noteId: string, operation: string, attempts: number, lastAttempt: string, error: string}[]; ) as {noteId: string, operation: string, attempts: number, lastAttempt: string, error: string, failed: number}[];
// Add titles to the failed notes // Add titles to the failed notes
const failedNotesWithTitles = []; const failedNotesWithTitles = [];
@ -66,16 +72,19 @@ export async function getFailedEmbeddingNotes(limit: number = 100): Promise<any[
if (note) { if (note) {
// Check if this is a chunking error (contains the word "chunks") // Check if this is a chunking error (contains the word "chunks")
const isChunkFailure = item.error && item.error.toLowerCase().includes('chunk'); const isChunkFailure = item.error && item.error.toLowerCase().includes('chunk');
const isPermanentFailure = item.failed === 1;
failedNotesWithTitles.push({ failedNotesWithTitles.push({
...item, ...item,
title: note.title, title: note.title,
failureType: isChunkFailure ? 'chunks' : 'full' failureType: isChunkFailure ? 'chunks' : 'full',
isPermanent: isPermanentFailure
}); });
} else { } else {
failedNotesWithTitles.push({ failedNotesWithTitles.push({
...item, ...item,
failureType: 'full' failureType: 'full',
isPermanent: item.failed === 1
}); });
} }
} }
@ -93,26 +102,23 @@ export async function getFailedEmbeddingNotes(limit: number = 100): Promise<any[
} }
/** /**
* Retry embedding generation for a specific failed note * Retry a specific failed note embedding
*
* @param noteId - ID of the note to retry
* @returns Success flag
*/ */
export async function retryFailedEmbedding(noteId: string): Promise<boolean> { export async function retryFailedEmbedding(noteId: string): Promise<boolean> {
// Check if the note is in the embedding queue with failed attempts const now = dateUtils.localNowDateTime();
const exists = await sql.getValue( const utcNow = dateUtils.utcNowDateTime();
"SELECT 1 FROM embedding_queue WHERE noteId = ? AND attempts > 0",
// Check if the note is in the embedding queue and has failed or has attempts
const existsInQueue = await sql.getValue(
"SELECT 1 FROM embedding_queue WHERE noteId = ? AND (failed = 1 OR attempts > 0)",
[noteId] [noteId]
); );
if (exists) { if (existsInQueue) {
// Reset the note in the queue // Reset the note in the queue
const now = dateUtils.localNowDateTime();
const utcNow = dateUtils.utcNowDateTime();
await sql.execute(` await sql.execute(`
UPDATE embedding_queue UPDATE embedding_queue
SET attempts = 0, error = NULL, dateQueued = ?, utcDateQueued = ? SET attempts = 0, error = NULL, failed = 0, dateQueued = ?, utcDateQueued = ?, priority = 10
WHERE noteId = ?`, WHERE noteId = ?`,
[now, utcNow, noteId] [now, utcNow, noteId]
); );
@ -128,20 +134,20 @@ export async function retryFailedEmbedding(noteId: string): Promise<boolean> {
* @returns Number of notes queued for retry * @returns Number of notes queued for retry
*/ */
export async function retryAllFailedEmbeddings(): Promise<number> { export async function retryAllFailedEmbeddings(): Promise<number> {
// Get count of failed notes in queue const now = dateUtils.localNowDateTime();
const utcNow = dateUtils.utcNowDateTime();
// Get count of all failed notes in queue (either with failed=1 or attempts>0)
const failedCount = await sql.getValue( const failedCount = await sql.getValue(
"SELECT COUNT(*) FROM embedding_queue WHERE attempts > 0" "SELECT COUNT(*) FROM embedding_queue WHERE failed = 1 OR attempts > 0"
) as number; ) as number;
if (failedCount > 0) { if (failedCount > 0) {
// Reset all failed notes in the queue // Reset all failed notes in the queue
const now = dateUtils.localNowDateTime();
const utcNow = dateUtils.utcNowDateTime();
await sql.execute(` await sql.execute(`
UPDATE embedding_queue UPDATE embedding_queue
SET attempts = 0, error = NULL, dateQueued = ?, utcDateQueued = ? SET attempts = 0, error = NULL, failed = 0, dateQueued = ?, utcDateQueued = ?, priority = 10
WHERE attempts > 0`, WHERE failed = 1 OR attempts > 0`,
[now, utcNow] [now, utcNow]
); );
} }
@ -174,10 +180,11 @@ export async function processEmbeddingQueue() {
return; return;
} }
// Get notes from queue // Get notes from queue (excluding failed ones)
const notes = await sql.getRows(` const notes = await sql.getRows(`
SELECT noteId, operation, attempts SELECT noteId, operation, attempts
FROM embedding_queue FROM embedding_queue
WHERE failed = 0
ORDER BY priority DESC, utcDateQueued ASC ORDER BY priority DESC, utcDateQueued ASC
LIMIT ?`, LIMIT ?`,
[batchSize] [batchSize]
@ -275,9 +282,16 @@ export async function processEmbeddingQueue() {
[dateUtils.utcNowDateTime(), "All providers failed to generate embeddings", noteData.noteId] [dateUtils.utcNowDateTime(), "All providers failed to generate embeddings", noteData.noteId]
); );
// Remove from queue if too many attempts // Mark as permanently failed if too many attempts
if (noteData.attempts + 1 >= 3) { if (noteData.attempts + 1 >= 3) {
log.error(`Marked note ${noteData.noteId} as permanently failed after multiple embedding attempts`); log.error(`Marked note ${noteData.noteId} as permanently failed after multiple embedding attempts`);
// Set the failed flag and update the attempts
await sql.execute(`
UPDATE embedding_queue
SET attempts = 999, failed = 1
WHERE noteId = ?
`, [noteData.noteId]);
} }
} }
} catch (error: any) { } catch (error: any) {
@ -295,15 +309,14 @@ export async function processEmbeddingQueue() {
log.error(`Error processing embedding for note ${noteData.noteId}: ${error.message || 'Unknown error'}`); log.error(`Error processing embedding for note ${noteData.noteId}: ${error.message || 'Unknown error'}`);
// Don't remove from queue even after multiple failures, just mark as failed // Mark as permanently failed if too many attempts
// This allows manual retries later
if (noteData.attempts + 1 >= 3) { if (noteData.attempts + 1 >= 3) {
log.error(`Marked note ${noteData.noteId} as permanently failed after multiple embedding attempts`); log.error(`Marked note ${noteData.noteId} as permanently failed after multiple embedding attempts`);
// Update the attempts to a very high number to indicate permanent failure // Set the failed flag and update the attempts
await sql.execute(` await sql.execute(`
UPDATE embedding_queue UPDATE embedding_queue
SET attempts = 999 SET attempts = 999, failed = 1
WHERE noteId = ? WHERE noteId = ?
`, [noteData.noteId]); `, [noteData.noteId]);
} }