synchronize embeddings

This commit is contained in:
perf3ct 2025-03-12 18:22:05 +00:00
parent b6df3a721c
commit a930b79cf5
No known key found for this signature in database
GPG Key ID: 569C4EEC436F5232
5 changed files with 139 additions and 7 deletions

View File

@ -188,6 +188,7 @@ function fillAllEntityChanges() {
fillEntityChanges("attributes", "attributeId"); fillEntityChanges("attributes", "attributeId");
fillEntityChanges("etapi_tokens", "etapiTokenId"); fillEntityChanges("etapi_tokens", "etapiTokenId");
fillEntityChanges("options", "name", "WHERE isSynced = 1"); fillEntityChanges("options", "name", "WHERE isSynced = 1");
fillEntityChanges("note_embeddings", "embedId");
}); });
} }

View File

@ -28,6 +28,11 @@ function eraseNotes(noteIdsToErase: string[]) {
eraseRevisions(revisionIdsToErase); eraseRevisions(revisionIdsToErase);
// Erase embeddings related to the deleted notes
const embeddingIdsToErase = sql.getManyRows<{ embedId: string }>(`SELECT embedId FROM note_embeddings WHERE noteId IN (???)`, noteIdsToErase).map((row) => row.embedId);
eraseEmbeddings(embeddingIdsToErase);
log.info(`Erased notes: ${JSON.stringify(noteIdsToErase)}`); log.info(`Erased notes: ${JSON.stringify(noteIdsToErase)}`);
} }
@ -151,6 +156,13 @@ function eraseNotesWithDeleteId(deleteId: string) {
const attachmentIdsToErase = sql.getColumn<string>("SELECT attachmentId FROM attachments WHERE isDeleted = 1 AND deleteId = ?", [deleteId]); const attachmentIdsToErase = sql.getColumn<string>("SELECT attachmentId FROM attachments WHERE isDeleted = 1 AND deleteId = ?", [deleteId]);
eraseAttachments(attachmentIdsToErase); eraseAttachments(attachmentIdsToErase);
// Find and erase embeddings for deleted notes
const deletedNoteIds = sql.getColumn<string>("SELECT noteId FROM notes WHERE isDeleted = 1 AND deleteId = ?", [deleteId]);
if (deletedNoteIds.length > 0) {
const embeddingIdsToErase = sql.getColumn<string>("SELECT embedId FROM note_embeddings WHERE noteId IN (???)", deletedNoteIds);
eraseEmbeddings(embeddingIdsToErase);
}
eraseUnusedBlobs(); eraseUnusedBlobs();
} }
@ -173,6 +185,17 @@ function eraseScheduledAttachments(eraseUnusedAttachmentsAfterSeconds: number |
eraseAttachments(attachmentIdsToErase); eraseAttachments(attachmentIdsToErase);
} }
function eraseEmbeddings(embedIdsToErase: string[]) {
if (embedIdsToErase.length === 0) {
return;
}
sql.executeMany(`DELETE FROM note_embeddings WHERE embedId IN (???)`, embedIdsToErase);
setEntityChangesAsErased(sql.getManyRows(`SELECT * FROM entity_changes WHERE entityName = 'note_embeddings' AND entityId IN (???)`, embedIdsToErase));
log.info(`Erased embeddings: ${JSON.stringify(embedIdsToErase)}`);
}
export function startScheduledCleanup() { export function startScheduledCleanup() {
sqlInit.dbReady.then(() => { sqlInit.dbReady.then(() => {
// first cleanup kickoff 5 minutes after startup // first cleanup kickoff 5 minutes after startup

View File

@ -4,6 +4,8 @@ import dateUtils from "../../../services/date_utils.js";
import log from "../../../services/log.js"; import log from "../../../services/log.js";
import { embeddingToBuffer, bufferToEmbedding, cosineSimilarity } from "./vector_utils.js"; import { embeddingToBuffer, bufferToEmbedding, cosineSimilarity } from "./vector_utils.js";
import type { EmbeddingResult } from "./types.js"; import type { EmbeddingResult } from "./types.js";
import entityChangesService from "../../../services/entity_changes.js";
import type { EntityChange } from "../../../services/entity_changes_interface.js";
/** /**
* Creates or updates an embedding for a note * Creates or updates an embedding for a note
@ -21,20 +23,21 @@ export async function storeNoteEmbedding(
// Check if an embedding already exists for this note and provider/model // Check if an embedding already exists for this note and provider/model
const existingEmbed = await getEmbeddingForNote(noteId, providerId, modelId); const existingEmbed = await getEmbeddingForNote(noteId, providerId, modelId);
let embedId;
if (existingEmbed) { if (existingEmbed) {
// Update existing embedding // Update existing embedding
embedId = existingEmbed.embedId;
await sql.execute(` await sql.execute(`
UPDATE note_embeddings UPDATE note_embeddings
SET embedding = ?, dimension = ?, version = version + 1, SET embedding = ?, dimension = ?, version = version + 1,
dateModified = ?, utcDateModified = ? dateModified = ?, utcDateModified = ?
WHERE embedId = ?`, WHERE embedId = ?`,
[embeddingBlob, dimension, now, utcNow, existingEmbed.embedId] [embeddingBlob, dimension, now, utcNow, embedId]
); );
return existingEmbed.embedId;
} else { } else {
// Create new embedding // Create new embedding
const embedId = randomString(16); embedId = randomString(16);
await sql.execute(` await sql.execute(`
INSERT INTO note_embeddings INSERT INTO note_embeddings
(embedId, noteId, providerId, modelId, dimension, embedding, (embedId, noteId, providerId, modelId, dimension, embedding,
@ -43,8 +46,45 @@ export async function storeNoteEmbedding(
[embedId, noteId, providerId, modelId, dimension, embeddingBlob, [embedId, noteId, providerId, modelId, dimension, embeddingBlob,
now, utcNow, now, utcNow] now, utcNow, now, utcNow]
); );
return embedId;
} }
// Create entity change record for syncing
interface EmbeddingRow {
embedId: string;
noteId: string;
providerId: string;
modelId: string;
dimension: number;
version: number;
dateCreated: string;
utcDateCreated: string;
dateModified: string;
utcDateModified: string;
}
const row = await sql.getRow<EmbeddingRow>(`
SELECT embedId, noteId, providerId, modelId, dimension, version,
dateCreated, utcDateCreated, dateModified, utcDateModified
FROM note_embeddings
WHERE embedId = ?`,
[embedId]
);
if (row) {
// Skip the actual embedding data for the hash since it's large
const ec: EntityChange = {
entityName: "note_embeddings",
entityId: embedId,
hash: `${row.noteId}|${row.providerId}|${row.modelId}|${row.dimension}|${row.version}|${row.utcDateModified}`,
utcDateChanged: row.utcDateModified,
isSynced: true,
isErased: false
};
entityChangesService.putEntityChange(ec);
}
return embedId;
} }
/** /**

View File

@ -52,7 +52,11 @@ function updateEntity(remoteEC: EntityChange, remoteEntityRow: EntityRow | undef
return; // can be undefined for options with isSynced=false return; // can be undefined for options with isSynced=false
} }
const updated = remoteEC.entityName === "note_reordering" ? updateNoteReordering(remoteEC, remoteEntityRow, instanceId) : updateNormalEntity(remoteEC, remoteEntityRow, instanceId, updateContext); const updated = remoteEC.entityName === "note_reordering"
? updateNoteReordering(remoteEC, remoteEntityRow, instanceId)
: (remoteEC.entityName === "note_embeddings"
? updateNoteEmbedding(remoteEC, remoteEntityRow, instanceId, updateContext)
: updateNormalEntity(remoteEC, remoteEntityRow, instanceId, updateContext));
if (updated) { if (updated) {
if (remoteEntityRow?.isDeleted) { if (remoteEntityRow?.isDeleted) {
@ -141,10 +145,73 @@ function updateNoteReordering(remoteEC: EntityChange, remoteEntityRow: EntityRow
return true; return true;
} }
function updateNoteEmbedding(remoteEC: EntityChange, remoteEntityRow: EntityRow | undefined, instanceId: string, updateContext: UpdateContext) {
if (remoteEC.isErased) {
eraseEntity(remoteEC);
updateContext.erased++;
return true;
}
if (!remoteEntityRow) {
log.error(`Entity ${remoteEC.entityName} ${remoteEC.entityId} not found in sync update.`);
return false;
}
interface NoteEmbeddingRow {
embedId: string;
noteId: string;
providerId: string;
modelId: string;
dimension: number;
embedding: Buffer;
version: number;
dateCreated: string;
utcDateCreated: string;
dateModified: string;
utcDateModified: string;
}
// Cast remoteEntityRow to include required embedding properties
const typedRemoteEntityRow = remoteEntityRow as unknown as NoteEmbeddingRow;
const localEntityRow = sql.getRow<NoteEmbeddingRow>(`SELECT * FROM note_embeddings WHERE embedId = ?`, [remoteEC.entityId]);
if (localEntityRow) {
// We already have this embedding, check if we need to update it
if (localEntityRow.utcDateModified >= typedRemoteEntityRow.utcDateModified) {
// Local is newer or same, no need to update
entityChangesService.putEntityChangeWithInstanceId(remoteEC, instanceId);
return true;
} else {
// Remote is newer, update local
sql.replace("note_embeddings", remoteEntityRow);
if (!updateContext.updated[remoteEC.entityName]) {
updateContext.updated[remoteEC.entityName] = [];
}
updateContext.updated[remoteEC.entityName].push(remoteEC.entityId);
entityChangesService.putEntityChangeWithInstanceId(remoteEC, instanceId);
return true;
}
} else {
// We don't have this embedding, insert it
sql.replace("note_embeddings", remoteEntityRow);
if (!updateContext.updated[remoteEC.entityName]) {
updateContext.updated[remoteEC.entityName] = [];
}
updateContext.updated[remoteEC.entityName].push(remoteEC.entityId);
entityChangesService.putEntityChangeWithInstanceId(remoteEC, instanceId);
return true;
}
}
function eraseEntity(entityChange: EntityChange) { function eraseEntity(entityChange: EntityChange) {
const { entityName, entityId } = entityChange; const { entityName, entityId } = entityChange;
const entityNames = ["notes", "branches", "attributes", "revisions", "attachments", "blobs"]; const entityNames = ["notes", "branches", "attributes", "revisions", "attachments", "blobs", "note_embeddings"];
if (!entityNames.includes(entityName)) { if (!entityNames.includes(entityName)) {
log.error(`Cannot erase ${entityName} '${entityId}'.`); log.error(`Cannot erase ${entityName} '${entityId}'.`);

View File

@ -207,7 +207,8 @@ const ORDERING: Record<string, number> = {
revisions: 2, revisions: 2,
attachments: 3, attachments: 3,
notes: 1, notes: 1,
options: 0 options: 0,
note_embeddings: 3
}; };
function sendPing(client: WebSocket, entityChangeIds = []) { function sendPing(client: WebSocket, entityChangeIds = []) {