From 781a2506f0b7e442240c04dcbd97d20fd20858b3 Mon Sep 17 00:00:00 2001 From: perf3ct Date: Sun, 16 Mar 2025 18:55:53 +0000 Subject: [PATCH] fix embeddings w/ cls.init() --- src/routes/api/embeddings.ts | 10 ++- .../embeddings/chunking/chunking_processor.ts | 72 ++++++++++--------- src/services/llm/embeddings/events.ts | 14 ++-- src/services/llm/embeddings/stats.ts | 8 ++- 4 files changed, 65 insertions(+), 39 deletions(-) diff --git a/src/routes/api/embeddings.ts b/src/routes/api/embeddings.ts index d15a6b9c9..88bee65b8 100644 --- a/src/routes/api/embeddings.ts +++ b/src/routes/api/embeddings.ts @@ -148,11 +148,17 @@ async function updateProvider(req: Request, res: Response) { * Manually trigger a reprocessing of all notes */ async function reprocessAllNotes(req: Request, res: Response) { + // Import cls + const cls = (await import("../../services/cls.js")).default; + // Start the reprocessing operation in the background setTimeout(async () => { try { - await vectorStore.reprocessAllNotes(); - log.info("Embedding reprocessing completed successfully"); + // Wrap the operation in cls.init to ensure proper context + cls.init(async () => { + await vectorStore.reprocessAllNotes(); + log.info("Embedding reprocessing completed successfully"); + }); } catch (error: any) { log.error(`Error during background embedding reprocessing: ${error.message || "Unknown error"}`); } diff --git a/src/services/llm/embeddings/chunking/chunking_processor.ts b/src/services/llm/embeddings/chunking/chunking_processor.ts index 6dd1ab5dc..c5aac27c8 100644 --- a/src/services/llm/embeddings/chunking/chunking_processor.ts +++ b/src/services/llm/embeddings/chunking/chunking_processor.ts @@ -2,6 +2,7 @@ import log from "../../../log.js"; import dateUtils from "../../../date_utils.js"; import sql from "../../../sql.js"; import becca from "../../../../becca/becca.js"; +import cls from "../../../../services/cls.js"; import type { NoteEmbeddingContext } from "../types.js"; // Remove static imports that cause circular dependencies // import { storeNoteEmbedding, deleteNoteEmbeddings } from "./storage.js"; @@ -126,12 +127,14 @@ export async function processNoteWithChunking( if (!chunks || chunks.length === 0) { // Fall back to single embedding if chunking fails - const embedding = await provider.generateEmbeddings(context.content); - const config = provider.getConfig(); + await cls.init(async () => { + const embedding = await provider.generateEmbeddings(context.content); + const config = provider.getConfig(); - // Use dynamic import instead of static import - const storage = await import('../storage.js'); - await storage.storeNoteEmbedding(noteId, provider.name, config.model, embedding); + // Use dynamic import instead of static import + const storage = await import('../storage.js'); + await storage.storeNoteEmbedding(noteId, provider.name, config.model, embedding); + }); log.info(`Generated single embedding for note ${noteId} (${note.title}) since chunking failed`); return; @@ -187,20 +190,22 @@ export async function processNoteWithChunking( const chunk = chunks[i]; try { // Generate embedding for this chunk's content with a timeout - const embedding = await processChunkWithTimeout( - provider, - chunk, - MAX_CHUNK_PROCESSING_TIME - ); + await cls.init(async () => { + const embedding = await processChunkWithTimeout( + provider, + chunk, + MAX_CHUNK_PROCESSING_TIME + ); - // Store with chunk information in a unique ID format - const chunkIdSuffix = `${i + 1}_of_${chunks.length}`; - await storage.storeNoteEmbedding( - noteId, - provider.name, - config.model, - embedding - ); + // Store with chunk information in a unique ID format + const chunkIdSuffix = `${i + 1}_of_${chunks.length}`; + await storage.storeNoteEmbedding( + noteId, + provider.name, + config.model, + embedding + ); + }); successfulChunks++; @@ -264,21 +269,24 @@ export async function processNoteWithChunking( // Wait longer for retries with exponential backoff await new Promise(resolve => setTimeout(resolve, 1000 * Math.pow(1.5, j))); - // Retry the embedding with timeout - const embedding = await processChunkWithTimeout( - provider, - item.chunk, - MAX_CHUNK_PROCESSING_TIME - ); + // Retry the embedding with timeout using cls.init + await cls.init(async () => { + const embedding = await processChunkWithTimeout( + provider, + item.chunk, + MAX_CHUNK_PROCESSING_TIME + ); - // Store with unique ID that indicates it was a retry - const chunkIdSuffix = `${item.index + 1}_of_${chunks.length}`; - await storage.storeNoteEmbedding( - noteId, - provider.name, - config.model, - embedding - ); + // Store with unique ID that indicates it was a retry + const chunkIdSuffix = `${item.index + 1}_of_${chunks.length}`; + const storage = await import('../storage.js'); + await storage.storeNoteEmbedding( + noteId, + provider.name, + config.model, + embedding + ); + }); // Update counters successfulChunks++; diff --git a/src/services/llm/embeddings/events.ts b/src/services/llm/embeddings/events.ts index 6e8276a0c..220da4046 100644 --- a/src/services/llm/embeddings/events.ts +++ b/src/services/llm/embeddings/events.ts @@ -1,7 +1,10 @@ -import eventService from "../../../services/events.js"; -import options from "../../../services/options.js"; +import sql from "../../../services/sql.js"; import log from "../../../services/log.js"; -import { queueNoteForEmbedding, processEmbeddingQueue } from "./queue.js"; +import options from "../../../services/options.js"; +import cls from "../../../services/cls.js"; +import { processEmbeddingQueue, queueNoteForEmbedding } from "./queue.js"; +import eventService from "../../../services/events.js"; +import becca from "../../../becca/becca.js"; /** * Setup event listeners for embedding-related events @@ -51,7 +54,10 @@ export async function setupEmbeddingBackgroundProcessing() { setInterval(async () => { try { - await processEmbeddingQueue(); + // Wrap in cls.init to ensure proper context + cls.init(async () => { + await processEmbeddingQueue(); + }); } catch (error: any) { log.error(`Error in background embedding processing: ${error.message || 'Unknown error'}`); } diff --git a/src/services/llm/embeddings/stats.ts b/src/services/llm/embeddings/stats.ts index ecc2ef6a8..6154da368 100644 --- a/src/services/llm/embeddings/stats.ts +++ b/src/services/llm/embeddings/stats.ts @@ -1,5 +1,6 @@ import sql from "../../../services/sql.js"; import log from "../../../services/log.js"; +import cls from "../../../services/cls.js"; import { queueNoteForEmbedding } from "./queue.js"; /** @@ -8,14 +9,19 @@ import { queueNoteForEmbedding } from "./queue.js"; export async function reprocessAllNotes() { log.info("Queueing all notes for embedding updates"); + // Get all non-deleted note IDs const noteIds = await sql.getColumn( "SELECT noteId FROM notes WHERE isDeleted = 0" ); log.info(`Adding ${noteIds.length} notes to embedding queue`); + // Process each note ID within a cls context for (const noteId of noteIds) { - await queueNoteForEmbedding(noteId as string, 'UPDATE'); + // Use cls.init to ensure proper context for each operation + await cls.init(async () => { + await queueNoteForEmbedding(noteId as string, 'UPDATE'); + }); } }