fix embeddings w/ cls.init()

This commit is contained in:
perf3ct 2025-03-16 18:55:53 +00:00
parent 0081e6f1d0
commit 781a2506f0
No known key found for this signature in database
GPG Key ID: 569C4EEC436F5232
4 changed files with 65 additions and 39 deletions

View File

@ -148,11 +148,17 @@ async function updateProvider(req: Request, res: Response) {
* Manually trigger a reprocessing of all notes
*/
async function reprocessAllNotes(req: Request, res: Response) {
// Import cls
const cls = (await import("../../services/cls.js")).default;
// Start the reprocessing operation in the background
setTimeout(async () => {
try {
await vectorStore.reprocessAllNotes();
log.info("Embedding reprocessing completed successfully");
// Wrap the operation in cls.init to ensure proper context
cls.init(async () => {
await vectorStore.reprocessAllNotes();
log.info("Embedding reprocessing completed successfully");
});
} catch (error: any) {
log.error(`Error during background embedding reprocessing: ${error.message || "Unknown error"}`);
}

View File

@ -2,6 +2,7 @@ import log from "../../../log.js";
import dateUtils from "../../../date_utils.js";
import sql from "../../../sql.js";
import becca from "../../../../becca/becca.js";
import cls from "../../../../services/cls.js";
import type { NoteEmbeddingContext } from "../types.js";
// Remove static imports that cause circular dependencies
// import { storeNoteEmbedding, deleteNoteEmbeddings } from "./storage.js";
@ -126,12 +127,14 @@ export async function processNoteWithChunking(
if (!chunks || chunks.length === 0) {
// Fall back to single embedding if chunking fails
const embedding = await provider.generateEmbeddings(context.content);
const config = provider.getConfig();
await cls.init(async () => {
const embedding = await provider.generateEmbeddings(context.content);
const config = provider.getConfig();
// Use dynamic import instead of static import
const storage = await import('../storage.js');
await storage.storeNoteEmbedding(noteId, provider.name, config.model, embedding);
// Use dynamic import instead of static import
const storage = await import('../storage.js');
await storage.storeNoteEmbedding(noteId, provider.name, config.model, embedding);
});
log.info(`Generated single embedding for note ${noteId} (${note.title}) since chunking failed`);
return;
@ -187,20 +190,22 @@ export async function processNoteWithChunking(
const chunk = chunks[i];
try {
// Generate embedding for this chunk's content with a timeout
const embedding = await processChunkWithTimeout(
provider,
chunk,
MAX_CHUNK_PROCESSING_TIME
);
await cls.init(async () => {
const embedding = await processChunkWithTimeout(
provider,
chunk,
MAX_CHUNK_PROCESSING_TIME
);
// Store with chunk information in a unique ID format
const chunkIdSuffix = `${i + 1}_of_${chunks.length}`;
await storage.storeNoteEmbedding(
noteId,
provider.name,
config.model,
embedding
);
// Store with chunk information in a unique ID format
const chunkIdSuffix = `${i + 1}_of_${chunks.length}`;
await storage.storeNoteEmbedding(
noteId,
provider.name,
config.model,
embedding
);
});
successfulChunks++;
@ -264,21 +269,24 @@ export async function processNoteWithChunking(
// Wait longer for retries with exponential backoff
await new Promise(resolve => setTimeout(resolve, 1000 * Math.pow(1.5, j)));
// Retry the embedding with timeout
const embedding = await processChunkWithTimeout(
provider,
item.chunk,
MAX_CHUNK_PROCESSING_TIME
);
// Retry the embedding with timeout using cls.init
await cls.init(async () => {
const embedding = await processChunkWithTimeout(
provider,
item.chunk,
MAX_CHUNK_PROCESSING_TIME
);
// Store with unique ID that indicates it was a retry
const chunkIdSuffix = `${item.index + 1}_of_${chunks.length}`;
await storage.storeNoteEmbedding(
noteId,
provider.name,
config.model,
embedding
);
// Store with unique ID that indicates it was a retry
const chunkIdSuffix = `${item.index + 1}_of_${chunks.length}`;
const storage = await import('../storage.js');
await storage.storeNoteEmbedding(
noteId,
provider.name,
config.model,
embedding
);
});
// Update counters
successfulChunks++;

View File

@ -1,7 +1,10 @@
import eventService from "../../../services/events.js";
import options from "../../../services/options.js";
import sql from "../../../services/sql.js";
import log from "../../../services/log.js";
import { queueNoteForEmbedding, processEmbeddingQueue } from "./queue.js";
import options from "../../../services/options.js";
import cls from "../../../services/cls.js";
import { processEmbeddingQueue, queueNoteForEmbedding } from "./queue.js";
import eventService from "../../../services/events.js";
import becca from "../../../becca/becca.js";
/**
* Setup event listeners for embedding-related events
@ -51,7 +54,10 @@ export async function setupEmbeddingBackgroundProcessing() {
setInterval(async () => {
try {
await processEmbeddingQueue();
// Wrap in cls.init to ensure proper context
cls.init(async () => {
await processEmbeddingQueue();
});
} catch (error: any) {
log.error(`Error in background embedding processing: ${error.message || 'Unknown error'}`);
}

View File

@ -1,5 +1,6 @@
import sql from "../../../services/sql.js";
import log from "../../../services/log.js";
import cls from "../../../services/cls.js";
import { queueNoteForEmbedding } from "./queue.js";
/**
@ -8,14 +9,19 @@ import { queueNoteForEmbedding } from "./queue.js";
export async function reprocessAllNotes() {
log.info("Queueing all notes for embedding updates");
// Get all non-deleted note IDs
const noteIds = await sql.getColumn(
"SELECT noteId FROM notes WHERE isDeleted = 0"
);
log.info(`Adding ${noteIds.length} notes to embedding queue`);
// Process each note ID within a cls context
for (const noteId of noteIds) {
await queueNoteForEmbedding(noteId as string, 'UPDATE');
// Use cls.init to ensure proper context for each operation
await cls.init(async () => {
await queueNoteForEmbedding(noteId as string, 'UPDATE');
});
}
}