nearly able to process embeddings

This commit is contained in:
perf3ct 2025-03-08 23:08:25 +00:00
parent dc439b21b0
commit 6ace4d5692
No known key found for this signature in database
GPG Key ID: 569C4EEC436F5232
2 changed files with 54 additions and 32 deletions

View File

@ -3,6 +3,8 @@ import vectorStore from "../../services/llm/embeddings/vector_store.js";
import providerManager from "../../services/llm/embeddings/providers.js"; import providerManager from "../../services/llm/embeddings/providers.js";
import becca from "../../becca/becca.js"; import becca from "../../becca/becca.js";
import type { Request, Response } from "express"; import type { Request, Response } from "express";
import log from "../../services/log.js";
import sql from "../../services/sql.js";
/** /**
* Get similar notes based on note ID * Get similar notes based on note ID
@ -172,18 +174,34 @@ async function updateProvider(req: Request, res: Response) {
* Manually trigger a reprocessing of all notes * Manually trigger a reprocessing of all notes
*/ */
async function reprocessAllNotes(req: Request, res: Response) { async function reprocessAllNotes(req: Request, res: Response) {
// Wrap in a try-catch to handle errors
try { try {
await vectorStore.reprocessAllNotes(); // Start the reprocessing operation in the background
// and immediately respond to the client
return res.send({ res.send({
success: true, success: true,
message: "Notes queued for reprocessing" message: "Embedding reprocessing started in the background"
}); });
// Continue processing in the background after sending the response
setTimeout(async () => {
try {
await vectorStore.reprocessAllNotes();
log.info("Embedding reprocessing completed successfully");
} catch (error: any) {
log.error(`Error during background embedding reprocessing: ${error.message || "Unknown error"}`);
}
}, 0);
} catch (error: any) { } catch (error: any) {
return res.status(500).send({ // Only catch errors that happen before we send the response
success: false, log.error(`Error initiating embedding reprocessing: ${error.message || "Unknown error"}`);
message: error.message || "Unknown error"
}); if (!res.headersSent) {
res.status(500).send({
success: false,
message: error.message || "Unknown error"
});
}
} }
} }
@ -192,9 +210,7 @@ async function reprocessAllNotes(req: Request, res: Response) {
*/ */
async function getQueueStatus(req: Request, res: Response) { async function getQueueStatus(req: Request, res: Response) {
try { try {
// Use sql directly instead of becca.sqliteDB // Use the imported sql instead of requiring it
const sql = require("../../services/sql.js").default;
const queueCount = await sql.getValue( const queueCount = await sql.getValue(
"SELECT COUNT(*) FROM embedding_queue" "SELECT COUNT(*) FROM embedding_queue"
); );

View File

@ -6,6 +6,7 @@ import log from "../../log.js";
import becca from "../../../becca/becca.js"; import becca from "../../../becca/becca.js";
import type { NoteEmbeddingContext } from "./embeddings_interface.js"; import type { NoteEmbeddingContext } from "./embeddings_interface.js";
import { getEmbeddingProviders, getEnabledEmbeddingProviders } from "./providers.js"; import { getEmbeddingProviders, getEnabledEmbeddingProviders } from "./providers.js";
import eventService from "../../events.js";
// Type definition for embedding result // Type definition for embedding result
interface EmbeddingResult { interface EmbeddingResult {
@ -382,37 +383,42 @@ export async function processEmbeddingQueue() {
} }
/** /**
* Setup note event listeners to keep embeddings up to date * Set up event listeners for embedding-related events
*/ */
export function setupEmbeddingEventListeners() { export function setupEmbeddingEventListeners() {
require("../../../becca/entity_events.js").subscribe({ // Listen for note content changes
entityName: "notes", eventService.subscribe(eventService.NOTE_CONTENT_CHANGE, ({ entity }) => {
eventType: "created", if (entity && entity.noteId) {
handler: (note: { noteId: string }) => queueNoteForEmbedding(note.noteId, 'CREATE') queueNoteForEmbedding(entity.noteId);
}
}); });
require("../../../becca/entity_events.js").subscribe({ // Listen for new notes
entityName: "notes", eventService.subscribe(eventService.ENTITY_CREATED, ({ entityName, entity }) => {
eventType: "updated", if (entityName === "notes" && entity && entity.noteId) {
handler: ({entity}: { entity: { noteId: string } }) => queueNoteForEmbedding(entity.noteId, 'UPDATE') queueNoteForEmbedding(entity.noteId);
}
}); });
require("../../../becca/entity_events.js").subscribe({ // Listen for note title changes
entityName: "notes", eventService.subscribe(eventService.NOTE_TITLE_CHANGED, ({ noteId }) => {
eventType: "deleted", if (noteId) {
handler: (note: { noteId: string }) => queueNoteForEmbedding(note.noteId, 'DELETE') queueNoteForEmbedding(noteId);
}
}); });
require("../../../becca/entity_events.js").subscribe({ // Listen for note deletions
entityName: "attributes", eventService.subscribe(eventService.ENTITY_DELETED, ({ entityName, entityId }) => {
eventType: ["created", "updated", "deleted"], if (entityName === "notes" && entityId) {
handler: ({entity}: { entity: { noteId: string } }) => queueNoteForEmbedding(entity.noteId, 'UPDATE') queueNoteForEmbedding(entityId, 'DELETE');
}
}); });
require("../../../becca/entity_events.js").subscribe({ // Listen for attribute changes that might affect context
entityName: "branches", eventService.subscribe(eventService.ENTITY_CHANGED, ({ entityName, entity }) => {
eventType: ["created", "updated", "deleted"], if (entityName === "attributes" && entity && entity.noteId) {
handler: ({entity}: { entity: { noteId: string } }) => queueNoteForEmbedding(entity.noteId, 'UPDATE') queueNoteForEmbedding(entity.noteId);
}
}); });
} }