diff --git a/src/services/entity_changes.ts b/src/services/entity_changes.ts index f3abfba5d..e3efc73f3 100644 --- a/src/services/entity_changes.ts +++ b/src/services/entity_changes.ts @@ -188,6 +188,7 @@ function fillAllEntityChanges() { fillEntityChanges("attributes", "attributeId"); fillEntityChanges("etapi_tokens", "etapiTokenId"); fillEntityChanges("options", "name", "WHERE isSynced = 1"); + fillEntityChanges("note_embeddings", "embedId"); }); } diff --git a/src/services/erase.ts b/src/services/erase.ts index e07cfaa4c..15dd0cb6b 100644 --- a/src/services/erase.ts +++ b/src/services/erase.ts @@ -28,6 +28,11 @@ function eraseNotes(noteIdsToErase: string[]) { eraseRevisions(revisionIdsToErase); + // Erase embeddings related to the deleted notes + const embeddingIdsToErase = sql.getManyRows<{ embedId: string }>(`SELECT embedId FROM note_embeddings WHERE noteId IN (???)`, noteIdsToErase).map((row) => row.embedId); + + eraseEmbeddings(embeddingIdsToErase); + log.info(`Erased notes: ${JSON.stringify(noteIdsToErase)}`); } @@ -151,6 +156,13 @@ function eraseNotesWithDeleteId(deleteId: string) { const attachmentIdsToErase = sql.getColumn("SELECT attachmentId FROM attachments WHERE isDeleted = 1 AND deleteId = ?", [deleteId]); eraseAttachments(attachmentIdsToErase); + // Find and erase embeddings for deleted notes + const deletedNoteIds = sql.getColumn("SELECT noteId FROM notes WHERE isDeleted = 1 AND deleteId = ?", [deleteId]); + if (deletedNoteIds.length > 0) { + const embeddingIdsToErase = sql.getColumn("SELECT embedId FROM note_embeddings WHERE noteId IN (???)", deletedNoteIds); + eraseEmbeddings(embeddingIdsToErase); + } + eraseUnusedBlobs(); } @@ -173,6 +185,17 @@ function eraseScheduledAttachments(eraseUnusedAttachmentsAfterSeconds: number | eraseAttachments(attachmentIdsToErase); } +function eraseEmbeddings(embedIdsToErase: string[]) { + if (embedIdsToErase.length === 0) { + return; + } + + sql.executeMany(`DELETE FROM note_embeddings WHERE embedId IN (???)`, embedIdsToErase); + setEntityChangesAsErased(sql.getManyRows(`SELECT * FROM entity_changes WHERE entityName = 'note_embeddings' AND entityId IN (???)`, embedIdsToErase)); + + log.info(`Erased embeddings: ${JSON.stringify(embedIdsToErase)}`); +} + export function startScheduledCleanup() { sqlInit.dbReady.then(() => { // first cleanup kickoff 5 minutes after startup diff --git a/src/services/llm/embeddings/storage.ts b/src/services/llm/embeddings/storage.ts index fc43d58bf..06d74f2fe 100644 --- a/src/services/llm/embeddings/storage.ts +++ b/src/services/llm/embeddings/storage.ts @@ -4,6 +4,8 @@ import dateUtils from "../../../services/date_utils.js"; import log from "../../../services/log.js"; import { embeddingToBuffer, bufferToEmbedding, cosineSimilarity } from "./vector_utils.js"; import type { EmbeddingResult } from "./types.js"; +import entityChangesService from "../../../services/entity_changes.js"; +import type { EntityChange } from "../../../services/entity_changes_interface.js"; /** * Creates or updates an embedding for a note @@ -21,20 +23,21 @@ export async function storeNoteEmbedding( // Check if an embedding already exists for this note and provider/model const existingEmbed = await getEmbeddingForNote(noteId, providerId, modelId); + let embedId; if (existingEmbed) { // Update existing embedding + embedId = existingEmbed.embedId; await sql.execute(` UPDATE note_embeddings SET embedding = ?, dimension = ?, version = version + 1, dateModified = ?, utcDateModified = ? WHERE embedId = ?`, - [embeddingBlob, dimension, now, utcNow, existingEmbed.embedId] + [embeddingBlob, dimension, now, utcNow, embedId] ); - return existingEmbed.embedId; } else { // Create new embedding - const embedId = randomString(16); + embedId = randomString(16); await sql.execute(` INSERT INTO note_embeddings (embedId, noteId, providerId, modelId, dimension, embedding, @@ -43,8 +46,45 @@ export async function storeNoteEmbedding( [embedId, noteId, providerId, modelId, dimension, embeddingBlob, now, utcNow, now, utcNow] ); - return embedId; } + + // Create entity change record for syncing + interface EmbeddingRow { + embedId: string; + noteId: string; + providerId: string; + modelId: string; + dimension: number; + version: number; + dateCreated: string; + utcDateCreated: string; + dateModified: string; + utcDateModified: string; + } + + const row = await sql.getRow(` + SELECT embedId, noteId, providerId, modelId, dimension, version, + dateCreated, utcDateCreated, dateModified, utcDateModified + FROM note_embeddings + WHERE embedId = ?`, + [embedId] + ); + + if (row) { + // Skip the actual embedding data for the hash since it's large + const ec: EntityChange = { + entityName: "note_embeddings", + entityId: embedId, + hash: `${row.noteId}|${row.providerId}|${row.modelId}|${row.dimension}|${row.version}|${row.utcDateModified}`, + utcDateChanged: row.utcDateModified, + isSynced: true, + isErased: false + }; + + entityChangesService.putEntityChange(ec); + } + + return embedId; } /** diff --git a/src/services/sync_update.ts b/src/services/sync_update.ts index fae50dd70..0e7f7bc64 100644 --- a/src/services/sync_update.ts +++ b/src/services/sync_update.ts @@ -52,7 +52,11 @@ function updateEntity(remoteEC: EntityChange, remoteEntityRow: EntityRow | undef return; // can be undefined for options with isSynced=false } - const updated = remoteEC.entityName === "note_reordering" ? updateNoteReordering(remoteEC, remoteEntityRow, instanceId) : updateNormalEntity(remoteEC, remoteEntityRow, instanceId, updateContext); + const updated = remoteEC.entityName === "note_reordering" + ? updateNoteReordering(remoteEC, remoteEntityRow, instanceId) + : (remoteEC.entityName === "note_embeddings" + ? updateNoteEmbedding(remoteEC, remoteEntityRow, instanceId, updateContext) + : updateNormalEntity(remoteEC, remoteEntityRow, instanceId, updateContext)); if (updated) { if (remoteEntityRow?.isDeleted) { @@ -141,10 +145,73 @@ function updateNoteReordering(remoteEC: EntityChange, remoteEntityRow: EntityRow return true; } +function updateNoteEmbedding(remoteEC: EntityChange, remoteEntityRow: EntityRow | undefined, instanceId: string, updateContext: UpdateContext) { + if (remoteEC.isErased) { + eraseEntity(remoteEC); + updateContext.erased++; + return true; + } + + if (!remoteEntityRow) { + log.error(`Entity ${remoteEC.entityName} ${remoteEC.entityId} not found in sync update.`); + return false; + } + + interface NoteEmbeddingRow { + embedId: string; + noteId: string; + providerId: string; + modelId: string; + dimension: number; + embedding: Buffer; + version: number; + dateCreated: string; + utcDateCreated: string; + dateModified: string; + utcDateModified: string; + } + + // Cast remoteEntityRow to include required embedding properties + const typedRemoteEntityRow = remoteEntityRow as unknown as NoteEmbeddingRow; + + const localEntityRow = sql.getRow(`SELECT * FROM note_embeddings WHERE embedId = ?`, [remoteEC.entityId]); + + if (localEntityRow) { + // We already have this embedding, check if we need to update it + if (localEntityRow.utcDateModified >= typedRemoteEntityRow.utcDateModified) { + // Local is newer or same, no need to update + entityChangesService.putEntityChangeWithInstanceId(remoteEC, instanceId); + return true; + } else { + // Remote is newer, update local + sql.replace("note_embeddings", remoteEntityRow); + + if (!updateContext.updated[remoteEC.entityName]) { + updateContext.updated[remoteEC.entityName] = []; + } + updateContext.updated[remoteEC.entityName].push(remoteEC.entityId); + + entityChangesService.putEntityChangeWithInstanceId(remoteEC, instanceId); + return true; + } + } else { + // We don't have this embedding, insert it + sql.replace("note_embeddings", remoteEntityRow); + + if (!updateContext.updated[remoteEC.entityName]) { + updateContext.updated[remoteEC.entityName] = []; + } + updateContext.updated[remoteEC.entityName].push(remoteEC.entityId); + + entityChangesService.putEntityChangeWithInstanceId(remoteEC, instanceId); + return true; + } +} + function eraseEntity(entityChange: EntityChange) { const { entityName, entityId } = entityChange; - const entityNames = ["notes", "branches", "attributes", "revisions", "attachments", "blobs"]; + const entityNames = ["notes", "branches", "attributes", "revisions", "attachments", "blobs", "note_embeddings"]; if (!entityNames.includes(entityName)) { log.error(`Cannot erase ${entityName} '${entityId}'.`); diff --git a/src/services/ws.ts b/src/services/ws.ts index f2674dd20..b858489d8 100644 --- a/src/services/ws.ts +++ b/src/services/ws.ts @@ -207,7 +207,8 @@ const ORDERING: Record = { revisions: 2, attachments: 3, notes: 1, - options: 0 + options: 0, + note_embeddings: 3 }; function sendPing(client: WebSocket, entityChangeIds = []) {