From d2072c2a6f591f9c1b25ff005a2b9d556293f5ce Mon Sep 17 00:00:00 2001 From: perf3ct Date: Sun, 16 Mar 2025 20:36:47 +0000 Subject: [PATCH] "lock" notes that are having their embeddings created --- db/migrations/0230__vector_embeddings.sql | 3 +- db/schema.sql | 3 +- .../embeddings/chunking/chunking_processor.ts | 20 ++-- src/services/llm/embeddings/events.ts | 14 +++ .../llm/embeddings/providers/ollama.ts | 109 +++++++++++------- src/services/llm/embeddings/queue.ts | 87 +++++++++----- 6 files changed, 160 insertions(+), 76 deletions(-) diff --git a/db/migrations/0230__vector_embeddings.sql b/db/migrations/0230__vector_embeddings.sql index 1522682e8..742204a16 100644 --- a/db/migrations/0230__vector_embeddings.sql +++ b/db/migrations/0230__vector_embeddings.sql @@ -29,7 +29,8 @@ CREATE TABLE IF NOT EXISTS "embedding_queue" ( "attempts" INTEGER NOT NULL DEFAULT 0, "lastAttempt" TEXT NULL, "error" TEXT NULL, - "failed" INTEGER NOT NULL DEFAULT 0 + "failed" INTEGER NOT NULL DEFAULT 0, + "isProcessing" INTEGER NOT NULL DEFAULT 0 ); -- Table to store embedding provider configurations diff --git a/db/schema.sql b/db/schema.sql index 8451326d9..f22abdb3f 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -159,7 +159,8 @@ CREATE TABLE IF NOT EXISTS "embedding_queue" ( "attempts" INTEGER NOT NULL DEFAULT 0, "lastAttempt" TEXT NULL, "error" TEXT NULL, - "failed" INTEGER NOT NULL DEFAULT 0 + "failed" INTEGER NOT NULL DEFAULT 0, + "isProcessing" INTEGER NOT NULL DEFAULT 0 ); CREATE TABLE IF NOT EXISTS "embedding_providers" ( diff --git a/src/services/llm/embeddings/chunking/chunking_processor.ts b/src/services/llm/embeddings/chunking/chunking_processor.ts index c5aac27c8..fccf17fc5 100644 --- a/src/services/llm/embeddings/chunking/chunking_processor.ts +++ b/src/services/llm/embeddings/chunking/chunking_processor.ts @@ -34,7 +34,8 @@ const MAX_TOTAL_PROCESSING_TIME = 5 * 60 * 1000; // 5 minutes const MAX_CHUNK_RETRY_ATTEMPTS = 2; // Maximum time per chunk processing (to prevent individual chunks from hanging) -const MAX_CHUNK_PROCESSING_TIME = 60 * 1000; // 1 minute +const DEFAULT_MAX_CHUNK_PROCESSING_TIME = 60 * 1000; // 1 minute +const OLLAMA_MAX_CHUNK_PROCESSING_TIME = 120 * 1000; // 2 minutes /** * Categorize an error as temporary or permanent based on its message @@ -166,6 +167,11 @@ export async function processNoteWithChunking( log.info(`Processing ${chunks.length} chunks for note ${noteId} (${note.title})`); + // Get the current time to prevent duplicate processing from timeouts + const processingStartTime = Date.now(); + const processingId = `${noteId}-${processingStartTime}`; + log.info(`Starting processing run ${processingId}`); + // Process each chunk with a delay based on provider to avoid rate limits for (let i = 0; i < chunks.length; i++) { // Check if we've exceeded the overall time limit @@ -194,7 +200,7 @@ export async function processNoteWithChunking( const embedding = await processChunkWithTimeout( provider, chunk, - MAX_CHUNK_PROCESSING_TIME + provider.name === 'ollama' ? OLLAMA_MAX_CHUNK_PROCESSING_TIME : DEFAULT_MAX_CHUNK_PROCESSING_TIME ); // Store with chunk information in a unique ID format @@ -212,7 +218,7 @@ export async function processNoteWithChunking( // Small delay between chunks to avoid rate limits - longer for Ollama if (i < chunks.length - 1) { await new Promise(resolve => setTimeout(resolve, - provider.name === 'ollama' ? 500 : 100)); + provider.name === 'ollama' ? 2000 : 100)); } } catch (error: any) { const errorMessage = error.message || 'Unknown error'; @@ -274,7 +280,7 @@ export async function processNoteWithChunking( const embedding = await processChunkWithTimeout( provider, item.chunk, - MAX_CHUNK_PROCESSING_TIME + provider.name === 'ollama' ? OLLAMA_MAX_CHUNK_PROCESSING_TIME : DEFAULT_MAX_CHUNK_PROCESSING_TIME ); // Store with unique ID that indicates it was a retry @@ -335,7 +341,7 @@ export async function processNoteWithChunking( // Log information about the processed chunks if (successfulChunks > 0) { - log.info(`Generated ${successfulChunks} chunk embeddings for note ${noteId} (${note.title})`); + log.info(`[${processingId}] Generated ${successfulChunks} chunk embeddings for note ${noteId} (${note.title})`); } if (failedChunks > 0) { @@ -344,7 +350,7 @@ export async function processNoteWithChunking( const temporaryErrors = failedChunkDetails.filter(d => d.category === 'temporary').length; const unknownErrors = failedChunkDetails.filter(d => d.category === 'unknown').length; - log.info(`Failed to generate ${failedChunks} chunk embeddings for note ${noteId} (${note.title}). ` + + log.info(`[${processingId}] Failed to generate ${failedChunks} chunk embeddings for note ${noteId} (${note.title}). ` + `Permanent: ${permanentErrors}, Temporary: ${temporaryErrors}, Unknown: ${unknownErrors}`); } @@ -394,7 +400,7 @@ export async function processNoteWithChunking( // Track total processing time const totalTime = Date.now() - startTime; - log.info(`Total processing time for note ${noteId}: ${totalTime}ms`); + log.info(`[${processingId}] Total processing time for note ${noteId}: ${totalTime}ms`); } catch (error: any) { log.error(`Error in chunked embedding process for note ${noteId}: ${error.message || 'Unknown error'}`); diff --git a/src/services/llm/embeddings/events.ts b/src/services/llm/embeddings/events.ts index 220da4046..5c449e846 100644 --- a/src/services/llm/embeddings/events.ts +++ b/src/services/llm/embeddings/events.ts @@ -6,6 +6,9 @@ import { processEmbeddingQueue, queueNoteForEmbedding } from "./queue.js"; import eventService from "../../../services/events.js"; import becca from "../../../becca/becca.js"; +// Add mutex to prevent concurrent processing +let isProcessingEmbeddings = false; + /** * Setup event listeners for embedding-related events */ @@ -54,12 +57,23 @@ export async function setupEmbeddingBackgroundProcessing() { setInterval(async () => { try { + // Skip if already processing + if (isProcessingEmbeddings) { + return; + } + + // Set mutex + isProcessingEmbeddings = true; + // Wrap in cls.init to ensure proper context cls.init(async () => { await processEmbeddingQueue(); }); } catch (error: any) { log.error(`Error in background embedding processing: ${error.message || 'Unknown error'}`); + } finally { + // Always release the mutex + isProcessingEmbeddings = false; } }, interval); } diff --git a/src/services/llm/embeddings/providers/ollama.ts b/src/services/llm/embeddings/providers/ollama.ts index 43bdf6d8a..ba125b068 100644 --- a/src/services/llm/embeddings/providers/ollama.ts +++ b/src/services/llm/embeddings/providers/ollama.ts @@ -173,46 +173,77 @@ export class OllamaEmbeddingProvider extends BaseEmbeddingProvider { * Generate embeddings for a single text */ async generateEmbeddings(text: string): Promise { - try { - if (!text.trim()) { - return new Float32Array(this.config.dimension); - } - - const modelName = this.config.model || "llama3"; - - // Ensure we have model info - const modelInfo = await this.getModelInfo(modelName); - - // Trim text if it might exceed context window (rough character estimate) - // This is a simplistic approach - ideally we'd count tokens properly - const charLimit = modelInfo.contextWindow * 4; // Rough estimate: avg 4 chars per token - const trimmedText = text.length > charLimit ? text.substring(0, charLimit) : text; - - const response = await axios.post( - `${this.baseUrl}/api/embeddings`, - { - model: modelName, - prompt: trimmedText, - format: "json" - }, - { - headers: { - "Content-Type": "application/json" - }, - timeout: 30000 // Longer timeout for larger texts - } - ); - - if (response.data && Array.isArray(response.data.embedding)) { - return new Float32Array(response.data.embedding); - } else { - throw new Error("Unexpected response structure from Ollama API"); - } - } catch (error: any) { - const errorMessage = error.response?.data?.error?.message || error.message || "Unknown error"; - log.error(`Ollama embedding error: ${errorMessage}`); - throw new Error(`Ollama embedding error: ${errorMessage}`); + // Handle empty text + if (!text.trim()) { + return new Float32Array(this.config.dimension); } + + // Configuration for retries + const maxRetries = 3; + let retryCount = 0; + let lastError: any = null; + + while (retryCount <= maxRetries) { + try { + const modelName = this.config.model || "llama3"; + + // Ensure we have model info + const modelInfo = await this.getModelInfo(modelName); + + // Trim text if it might exceed context window (rough character estimate) + // This is a simplistic approach - ideally we'd count tokens properly + const charLimit = modelInfo.contextWindow * 4; // Rough estimate: avg 4 chars per token + const trimmedText = text.length > charLimit ? text.substring(0, charLimit) : text; + + const response = await axios.post( + `${this.baseUrl}/api/embeddings`, + { + model: modelName, + prompt: trimmedText, + format: "json" + }, + { + headers: { + "Content-Type": "application/json" + }, + timeout: 60000 // Increased timeout for larger texts (60 seconds) + } + ); + + if (response.data && Array.isArray(response.data.embedding)) { + // Success! Return the embedding + return new Float32Array(response.data.embedding); + } else { + throw new Error("Unexpected response structure from Ollama API"); + } + } catch (error: any) { + lastError = error; + // Only retry on timeout or connection errors + const errorMessage = error.response?.data?.error?.message || error.message || "Unknown error"; + const isTimeoutError = errorMessage.includes('timeout') || + errorMessage.includes('socket hang up') || + errorMessage.includes('ECONNREFUSED') || + errorMessage.includes('ECONNRESET'); + + if (isTimeoutError && retryCount < maxRetries) { + // Exponential backoff with jitter + const delay = Math.min(Math.pow(2, retryCount) * 1000 + Math.random() * 1000, 15000); + log.info(`Ollama embedding timeout, retrying in ${Math.round(delay/1000)}s (attempt ${retryCount + 1}/${maxRetries})`); + await new Promise(resolve => setTimeout(resolve, delay)); + retryCount++; + } else { + // Non-retryable error or max retries exceeded + const errorMessage = error.response?.data?.error?.message || error.message || "Unknown error"; + log.error(`Ollama embedding error: ${errorMessage}`); + throw new Error(`Ollama embedding error: ${errorMessage}`); + } + } + } + + // If we get here, we've exceeded our retry limit + const errorMessage = lastError.response?.data?.error?.message || lastError.message || "Unknown error"; + log.error(`Ollama embedding error after ${maxRetries} retries: ${errorMessage}`); + throw new Error(`Ollama embedding error after ${maxRetries} retries: ${errorMessage}`); } /** diff --git a/src/services/llm/embeddings/queue.ts b/src/services/llm/embeddings/queue.ts index 9d7637e86..31617dfb3 100644 --- a/src/services/llm/embeddings/queue.ts +++ b/src/services/llm/embeddings/queue.ts @@ -10,6 +10,9 @@ import type { QueueItem } from "./types.js"; import { getChunkingOperations } from "./chunking/chunking_interface.js"; import indexService from '../index_service.js'; +// Track which notes are currently being processed +const notesInProcess = new Set(); + /** * Queues a note for embedding update */ @@ -19,11 +22,17 @@ export async function queueNoteForEmbedding(noteId: string, operation = 'UPDATE' // Check if note is already in queue and whether it's marked as permanently failed const queueInfo = await sql.getRow( - "SELECT 1 as exists_flag, failed FROM embedding_queue WHERE noteId = ?", + "SELECT 1 as exists_flag, failed, isProcessing FROM embedding_queue WHERE noteId = ?", [noteId] - ) as {exists_flag: number, failed: number} | null; + ) as {exists_flag: number, failed: number, isProcessing: number} | null; if (queueInfo) { + // If the note is currently being processed, don't change its status + if (queueInfo.isProcessing === 1) { + log.info(`Note ${noteId} is currently being processed, skipping queue update`); + return; + } + // Only update if not permanently failed if (queueInfo.failed !== 1) { // Update existing queue entry but preserve the failed status @@ -41,8 +50,8 @@ export async function queueNoteForEmbedding(noteId: string, operation = 'UPDATE' // Add new queue entry await sql.execute(` INSERT INTO embedding_queue - (noteId, operation, dateQueued, utcDateQueued, failed) - VALUES (?, ?, ?, ?, 0)`, + (noteId, operation, dateQueued, utcDateQueued, failed, isProcessing) + VALUES (?, ?, ?, ?, 0, 0)`, [noteId, operation, now, utcNow] ); } @@ -180,11 +189,11 @@ export async function processEmbeddingQueue() { return; } - // Get notes from queue (excluding failed ones) + // Get notes from queue (excluding failed ones and those being processed) const notes = await sql.getRows(` SELECT noteId, operation, attempts FROM embedding_queue - WHERE failed = 0 + WHERE failed = 0 AND isProcessing = 0 ORDER BY priority DESC, utcDateQueued ASC LIMIT ?`, [batchSize] @@ -198,30 +207,47 @@ export async function processEmbeddingQueue() { let processedCount = 0; for (const note of notes) { + const noteData = note as unknown as QueueItem; + const noteId = noteData.noteId; + + // Double-check that this note isn't already being processed + if (notesInProcess.has(noteId)) { + log.info(`Note ${noteId} is already being processed by another thread, skipping`); + continue; + } + try { - const noteData = note as unknown as QueueItem; + // Mark the note as being processed + notesInProcess.add(noteId); + await sql.execute( + "UPDATE embedding_queue SET isProcessing = 1 WHERE noteId = ?", + [noteId] + ); // Skip if note no longer exists - if (!becca.getNote(noteData.noteId)) { + if (!becca.getNote(noteId)) { await sql.execute( "DELETE FROM embedding_queue WHERE noteId = ?", - [noteData.noteId] + [noteId] ); - await deleteNoteEmbeddings(noteData.noteId); + await deleteNoteEmbeddings(noteId); continue; } if (noteData.operation === 'DELETE') { - await deleteNoteEmbeddings(noteData.noteId); + await deleteNoteEmbeddings(noteId); await sql.execute( "DELETE FROM embedding_queue WHERE noteId = ?", - [noteData.noteId] + [noteId] ); continue; } + // Log that we're starting to process this note + log.info(`Starting embedding generation for note ${noteId}`); + // Get note context for embedding - const context = await getNoteEmbeddingContext(noteData.noteId); + const context = await getNoteEmbeddingContext(noteId); // Check if we should use chunking for large content const useChunking = context.content.length > 5000; @@ -236,7 +262,7 @@ export async function processEmbeddingQueue() { if (useChunking) { // Process large notes using chunking const chunkingOps = await getChunkingOperations(); - await chunkingOps.processNoteWithChunking(noteData.noteId, provider, context); + await chunkingOps.processNoteWithChunking(noteId, provider, context); allProvidersFailed = false; } else { // Standard approach: Generate a single embedding for the whole note @@ -246,7 +272,7 @@ export async function processEmbeddingQueue() { const config = provider.getConfig(); await import('./storage.js').then(storage => { return storage.storeNoteEmbedding( - noteData.noteId, + noteId, provider.name, config.model, embedding @@ -259,7 +285,7 @@ export async function processEmbeddingQueue() { } catch (providerError: any) { // This provider failed allProvidersSucceeded = false; - log.error(`Error generating embedding with provider ${provider.name} for note ${noteData.noteId}: ${providerError.message || 'Unknown error'}`); + log.error(`Error generating embedding with provider ${provider.name} for note ${noteId}: ${providerError.message || 'Unknown error'}`); } } @@ -267,8 +293,10 @@ export async function processEmbeddingQueue() { // At least one provider succeeded, remove from queue await sql.execute( "DELETE FROM embedding_queue WHERE noteId = ?", - [noteData.noteId] + [noteId] ); + log.info(`Successfully completed embedding processing for note ${noteId}`); + // Count as successfully processed processedCount++; } else { @@ -277,49 +305,52 @@ export async function processEmbeddingQueue() { UPDATE embedding_queue SET attempts = attempts + 1, lastAttempt = ?, - error = ? + error = ?, + isProcessing = 0 WHERE noteId = ?`, - [dateUtils.utcNowDateTime(), "All providers failed to generate embeddings", noteData.noteId] + [dateUtils.utcNowDateTime(), "All providers failed to generate embeddings", noteId] ); // Mark as permanently failed if too many attempts if (noteData.attempts + 1 >= 3) { - log.error(`Marked note ${noteData.noteId} as permanently failed after multiple embedding attempts`); + log.error(`Marked note ${noteId} as permanently failed after multiple embedding attempts`); // Set the failed flag but keep the actual attempts count await sql.execute(` UPDATE embedding_queue SET failed = 1 WHERE noteId = ? - `, [noteData.noteId]); + `, [noteId]); } } } catch (error: any) { - const noteData = note as unknown as QueueItem; - // Update attempt count and log error await sql.execute(` UPDATE embedding_queue SET attempts = attempts + 1, lastAttempt = ?, - error = ? + error = ?, + isProcessing = 0 WHERE noteId = ?`, - [dateUtils.utcNowDateTime(), error.message || 'Unknown error', noteData.noteId] + [dateUtils.utcNowDateTime(), error.message || 'Unknown error', noteId] ); - log.error(`Error processing embedding for note ${noteData.noteId}: ${error.message || 'Unknown error'}`); + log.error(`Error processing embedding for note ${noteId}: ${error.message || 'Unknown error'}`); // Mark as permanently failed if too many attempts if (noteData.attempts + 1 >= 3) { - log.error(`Marked note ${noteData.noteId} as permanently failed after multiple embedding attempts`); + log.error(`Marked note ${noteId} as permanently failed after multiple embedding attempts`); // Set the failed flag but keep the actual attempts count await sql.execute(` UPDATE embedding_queue SET failed = 1 WHERE noteId = ? - `, [noteData.noteId]); + `, [noteId]); } + } finally { + // Always clean up the processing status in the in-memory set + notesInProcess.delete(noteId); } }