diff --git a/src/routes/api/embeddings.ts b/src/routes/api/embeddings.ts index 178074c8d..6a6ec7d6d 100644 --- a/src/routes/api/embeddings.ts +++ b/src/routes/api/embeddings.ts @@ -3,6 +3,8 @@ import vectorStore from "../../services/llm/embeddings/vector_store.js"; import providerManager from "../../services/llm/embeddings/providers.js"; import becca from "../../becca/becca.js"; import type { Request, Response } from "express"; +import log from "../../services/log.js"; +import sql from "../../services/sql.js"; /** * Get similar notes based on note ID @@ -172,18 +174,34 @@ async function updateProvider(req: Request, res: Response) { * Manually trigger a reprocessing of all notes */ async function reprocessAllNotes(req: Request, res: Response) { + // Wrap in a try-catch to handle errors try { - await vectorStore.reprocessAllNotes(); - - return res.send({ + // Start the reprocessing operation in the background + // and immediately respond to the client + res.send({ success: true, - message: "Notes queued for reprocessing" + message: "Embedding reprocessing started in the background" }); + + // Continue processing in the background after sending the response + setTimeout(async () => { + try { + await vectorStore.reprocessAllNotes(); + log.info("Embedding reprocessing completed successfully"); + } catch (error: any) { + log.error(`Error during background embedding reprocessing: ${error.message || "Unknown error"}`); + } + }, 0); } catch (error: any) { - return res.status(500).send({ - success: false, - message: error.message || "Unknown error" - }); + // Only catch errors that happen before we send the response + log.error(`Error initiating embedding reprocessing: ${error.message || "Unknown error"}`); + + if (!res.headersSent) { + res.status(500).send({ + success: false, + message: error.message || "Unknown error" + }); + } } } @@ -192,9 +210,7 @@ async function reprocessAllNotes(req: Request, res: Response) { */ async function getQueueStatus(req: Request, res: Response) { try { - // Use sql directly instead of becca.sqliteDB - const sql = require("../../services/sql.js").default; - + // Use the imported sql instead of requiring it const queueCount = await sql.getValue( "SELECT COUNT(*) FROM embedding_queue" ); diff --git a/src/services/llm/embeddings/vector_store.ts b/src/services/llm/embeddings/vector_store.ts index 067ff13f1..f02434194 100644 --- a/src/services/llm/embeddings/vector_store.ts +++ b/src/services/llm/embeddings/vector_store.ts @@ -6,6 +6,7 @@ import log from "../../log.js"; import becca from "../../../becca/becca.js"; import type { NoteEmbeddingContext } from "./embeddings_interface.js"; import { getEmbeddingProviders, getEnabledEmbeddingProviders } from "./providers.js"; +import eventService from "../../events.js"; // Type definition for embedding result interface EmbeddingResult { @@ -382,37 +383,42 @@ export async function processEmbeddingQueue() { } /** - * Setup note event listeners to keep embeddings up to date + * Set up event listeners for embedding-related events */ export function setupEmbeddingEventListeners() { - require("../../../becca/entity_events.js").subscribe({ - entityName: "notes", - eventType: "created", - handler: (note: { noteId: string }) => queueNoteForEmbedding(note.noteId, 'CREATE') + // Listen for note content changes + eventService.subscribe(eventService.NOTE_CONTENT_CHANGE, ({ entity }) => { + if (entity && entity.noteId) { + queueNoteForEmbedding(entity.noteId); + } }); - require("../../../becca/entity_events.js").subscribe({ - entityName: "notes", - eventType: "updated", - handler: ({entity}: { entity: { noteId: string } }) => queueNoteForEmbedding(entity.noteId, 'UPDATE') + // Listen for new notes + eventService.subscribe(eventService.ENTITY_CREATED, ({ entityName, entity }) => { + if (entityName === "notes" && entity && entity.noteId) { + queueNoteForEmbedding(entity.noteId); + } }); - require("../../../becca/entity_events.js").subscribe({ - entityName: "notes", - eventType: "deleted", - handler: (note: { noteId: string }) => queueNoteForEmbedding(note.noteId, 'DELETE') + // Listen for note title changes + eventService.subscribe(eventService.NOTE_TITLE_CHANGED, ({ noteId }) => { + if (noteId) { + queueNoteForEmbedding(noteId); + } }); - require("../../../becca/entity_events.js").subscribe({ - entityName: "attributes", - eventType: ["created", "updated", "deleted"], - handler: ({entity}: { entity: { noteId: string } }) => queueNoteForEmbedding(entity.noteId, 'UPDATE') + // Listen for note deletions + eventService.subscribe(eventService.ENTITY_DELETED, ({ entityName, entityId }) => { + if (entityName === "notes" && entityId) { + queueNoteForEmbedding(entityId, 'DELETE'); + } }); - require("../../../becca/entity_events.js").subscribe({ - entityName: "branches", - eventType: ["created", "updated", "deleted"], - handler: ({entity}: { entity: { noteId: string } }) => queueNoteForEmbedding(entity.noteId, 'UPDATE') + // Listen for attribute changes that might affect context + eventService.subscribe(eventService.ENTITY_CHANGED, ({ entityName, entity }) => { + if (entityName === "attributes" && entity && entity.noteId) { + queueNoteForEmbedding(entity.noteId); + } }); }