mirror of
https://github.com/TriliumNext/Notes.git
synced 2025-08-31 19:51:36 +08:00
make sure to not retry chunks if they fail or something else
This commit is contained in:
parent
f47b070f0f
commit
1f661e4c90
@ -588,10 +588,13 @@ export async function getFailedEmbeddingNotes(limit: number = 100): Promise<any[
|
|||||||
for (const item of failedQueueItems) {
|
for (const item of failedQueueItems) {
|
||||||
const note = becca.getNote(item.noteId);
|
const note = becca.getNote(item.noteId);
|
||||||
if (note) {
|
if (note) {
|
||||||
|
// Check if this is a chunking error (contains the word "chunks")
|
||||||
|
const isChunkFailure = item.error && item.error.toLowerCase().includes('chunk');
|
||||||
|
|
||||||
failedNotesWithTitles.push({
|
failedNotesWithTitles.push({
|
||||||
...item,
|
...item,
|
||||||
title: note.title,
|
title: note.title,
|
||||||
failureType: 'full' // This indicates a complete embedding failure
|
failureType: isChunkFailure ? 'chunks' : 'full'
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
failedNotesWithTitles.push({
|
failedNotesWithTitles.push({
|
||||||
@ -601,56 +604,6 @@ export async function getFailedEmbeddingNotes(limit: number = 100): Promise<any[
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Now get notes with failed chunks
|
|
||||||
// We need to search for labels that contain failed chunks data
|
|
||||||
const notes = await sql.getRows(`
|
|
||||||
SELECT noteId, name, value
|
|
||||||
FROM attributes
|
|
||||||
WHERE type = 'label' AND name LIKE '%FailedChunks'
|
|
||||||
`) as {noteId: string, name: string, value: string}[];
|
|
||||||
|
|
||||||
// Process notes with failed chunks
|
|
||||||
for (const item of notes) {
|
|
||||||
try {
|
|
||||||
const noteId = item.noteId;
|
|
||||||
const note = becca.getNote(noteId);
|
|
||||||
if (!note) continue;
|
|
||||||
|
|
||||||
// Parse the failed chunks data
|
|
||||||
const failedChunks = JSON.parse(item.value) as Record<string, {attempts: number, lastAttempt: string, error: string}>;
|
|
||||||
const chunkCount = Object.keys(failedChunks).length;
|
|
||||||
if (chunkCount === 0) continue;
|
|
||||||
|
|
||||||
// Get the most recent failed chunk
|
|
||||||
let latestAttempt = '';
|
|
||||||
let totalAttempts = 0;
|
|
||||||
let errorExample = '';
|
|
||||||
|
|
||||||
for (const chunkId in failedChunks) {
|
|
||||||
const chunk = failedChunks[chunkId];
|
|
||||||
totalAttempts += chunk.attempts;
|
|
||||||
|
|
||||||
if (!latestAttempt || chunk.lastAttempt > latestAttempt) {
|
|
||||||
latestAttempt = chunk.lastAttempt;
|
|
||||||
errorExample = chunk.error;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add this to our list of failed notes
|
|
||||||
failedNotesWithTitles.push({
|
|
||||||
noteId,
|
|
||||||
title: note.title,
|
|
||||||
failureType: 'chunks',
|
|
||||||
chunks: chunkCount,
|
|
||||||
attempts: totalAttempts,
|
|
||||||
lastAttempt: latestAttempt,
|
|
||||||
error: `${chunkCount} chunks failed: ${errorExample}`
|
|
||||||
});
|
|
||||||
} catch (error) {
|
|
||||||
console.error("Error processing note with failed chunks:", error);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Sort by latest attempt
|
// Sort by latest attempt
|
||||||
failedNotesWithTitles.sort((a, b) => {
|
failedNotesWithTitles.sort((a, b) => {
|
||||||
if (a.lastAttempt && b.lastAttempt) {
|
if (a.lastAttempt && b.lastAttempt) {
|
||||||
@ -670,9 +623,7 @@ export async function getFailedEmbeddingNotes(limit: number = 100): Promise<any[
|
|||||||
* @returns Success flag
|
* @returns Success flag
|
||||||
*/
|
*/
|
||||||
export async function retryFailedEmbedding(noteId: string): Promise<boolean> {
|
export async function retryFailedEmbedding(noteId: string): Promise<boolean> {
|
||||||
let success = false;
|
// Check if the note is in the embedding queue with failed attempts
|
||||||
|
|
||||||
// First, check if the note is in the embedding queue with failed attempts
|
|
||||||
const exists = await sql.getValue(
|
const exists = await sql.getValue(
|
||||||
"SELECT 1 FROM embedding_queue WHERE noteId = ? AND attempts > 0",
|
"SELECT 1 FROM embedding_queue WHERE noteId = ? AND attempts > 0",
|
||||||
[noteId]
|
[noteId]
|
||||||
@ -689,29 +640,10 @@ export async function retryFailedEmbedding(noteId: string): Promise<boolean> {
|
|||||||
WHERE noteId = ?`,
|
WHERE noteId = ?`,
|
||||||
[now, utcNow, noteId]
|
[now, utcNow, noteId]
|
||||||
);
|
);
|
||||||
success = true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Next, check for failed chunks in labels
|
return false;
|
||||||
const note = becca.getNote(noteId);
|
|
||||||
if (note) {
|
|
||||||
// Look for any provider-specific failed chunks
|
|
||||||
const labels = note.getLabels();
|
|
||||||
const failedChunksLabels = labels.filter(label => label.name.endsWith('FailedChunks'));
|
|
||||||
|
|
||||||
for (const label of failedChunksLabels) {
|
|
||||||
// Remove the label - this will cause all chunks to be retried
|
|
||||||
await note.removeLabel(label.name);
|
|
||||||
success = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
// If we had chunk failures but no queue entry, we need to add one
|
|
||||||
if (failedChunksLabels.length > 0 && !exists) {
|
|
||||||
await queueNoteForEmbedding(noteId, 'UPDATE');
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return success;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -720,8 +652,6 @@ export async function retryFailedEmbedding(noteId: string): Promise<boolean> {
|
|||||||
* @returns Number of notes queued for retry
|
* @returns Number of notes queued for retry
|
||||||
*/
|
*/
|
||||||
export async function retryAllFailedEmbeddings(): Promise<number> {
|
export async function retryAllFailedEmbeddings(): Promise<number> {
|
||||||
let totalRetried = 0;
|
|
||||||
|
|
||||||
// Get count of failed notes in queue
|
// Get count of failed notes in queue
|
||||||
const failedCount = await sql.getValue(
|
const failedCount = await sql.getValue(
|
||||||
"SELECT COUNT(*) FROM embedding_queue WHERE attempts > 0"
|
"SELECT COUNT(*) FROM embedding_queue WHERE attempts > 0"
|
||||||
@ -738,39 +668,9 @@ export async function retryAllFailedEmbeddings(): Promise<number> {
|
|||||||
WHERE attempts > 0`,
|
WHERE attempts > 0`,
|
||||||
[now, utcNow]
|
[now, utcNow]
|
||||||
);
|
);
|
||||||
|
|
||||||
totalRetried += failedCount;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Now find notes with failed chunks
|
return failedCount;
|
||||||
const notesWithFailedChunks = await sql.getRows(`
|
|
||||||
SELECT DISTINCT noteId
|
|
||||||
FROM attributes
|
|
||||||
WHERE type = 'label' AND name LIKE '%FailedChunks'
|
|
||||||
`) as {noteId: string}[];
|
|
||||||
|
|
||||||
// Process each note with failed chunks
|
|
||||||
for (const item of notesWithFailedChunks) {
|
|
||||||
const noteId = item.noteId;
|
|
||||||
const note = becca.getNote(noteId);
|
|
||||||
|
|
||||||
if (note) {
|
|
||||||
// Get all failed chunks labels
|
|
||||||
const labels = note.getLabels();
|
|
||||||
const failedChunksLabels = labels.filter(label => label.name.endsWith('FailedChunks'));
|
|
||||||
|
|
||||||
for (const label of failedChunksLabels) {
|
|
||||||
// Remove the label - this will cause all chunks to be retried
|
|
||||||
await note.removeLabel(label.name);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Make sure the note is in the queue
|
|
||||||
await queueNoteForEmbedding(noteId, 'UPDATE');
|
|
||||||
totalRetried++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return totalRetried;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -830,15 +730,17 @@ export async function processEmbeddingQueue() {
|
|||||||
// Check if we should use chunking for large content
|
// Check if we should use chunking for large content
|
||||||
const useChunking = context.content.length > 5000;
|
const useChunking = context.content.length > 5000;
|
||||||
|
|
||||||
// Track if all providers failed
|
// Track provider successes and failures
|
||||||
let allProvidersFailed = true;
|
let allProvidersFailed = true;
|
||||||
|
let allProvidersSucceeded = true;
|
||||||
|
|
||||||
// Process with each enabled provider
|
// Process with each enabled provider
|
||||||
for (const provider of enabledProviders) {
|
for (const provider of enabledProviders) {
|
||||||
try {
|
try {
|
||||||
if (useChunking) {
|
if (useChunking) {
|
||||||
// Enhanced approach: Process large notes using chunking
|
// Process large notes using chunking
|
||||||
await processNoteWithChunking(noteData.noteId, provider, context);
|
await processNoteWithChunking(noteData.noteId, provider, context);
|
||||||
|
allProvidersFailed = false;
|
||||||
} else {
|
} else {
|
||||||
// Standard approach: Generate a single embedding for the whole note
|
// Standard approach: Generate a single embedding for the whole note
|
||||||
const embedding = await provider.generateNoteEmbeddings(context);
|
const embedding = await provider.generateNoteEmbeddings(context);
|
||||||
@ -851,16 +753,19 @@ export async function processEmbeddingQueue() {
|
|||||||
config.model,
|
config.model,
|
||||||
embedding
|
embedding
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// At least one provider succeeded
|
||||||
|
allProvidersFailed = false;
|
||||||
}
|
}
|
||||||
// At least one provider succeeded
|
|
||||||
allProvidersFailed = false;
|
|
||||||
} catch (providerError: any) {
|
} catch (providerError: any) {
|
||||||
|
// This provider failed
|
||||||
|
allProvidersSucceeded = false;
|
||||||
log.error(`Error generating embedding with provider ${provider.name} for note ${noteData.noteId}: ${providerError.message || 'Unknown error'}`);
|
log.error(`Error generating embedding with provider ${provider.name} for note ${noteData.noteId}: ${providerError.message || 'Unknown error'}`);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Only remove from queue on success if at least one provider succeeded
|
|
||||||
if (!allProvidersFailed) {
|
if (!allProvidersFailed) {
|
||||||
|
// At least one provider succeeded, remove from queue
|
||||||
await sql.execute(
|
await sql.execute(
|
||||||
"DELETE FROM embedding_queue WHERE noteId = ?",
|
"DELETE FROM embedding_queue WHERE noteId = ?",
|
||||||
[noteData.noteId]
|
[noteData.noteId]
|
||||||
@ -906,7 +811,7 @@ export async function processEmbeddingQueue() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set up event listeners for embedding-related events
|
* Setup event listeners for embedding-related events
|
||||||
*/
|
*/
|
||||||
export function setupEmbeddingEventListeners() {
|
export function setupEmbeddingEventListeners() {
|
||||||
// Listen for note content changes
|
// Listen for note content changes
|
||||||
@ -1083,27 +988,17 @@ async function processNoteWithChunking(
|
|||||||
// Delete existing embeddings first to avoid duplicates
|
// Delete existing embeddings first to avoid duplicates
|
||||||
await deleteNoteEmbeddings(noteId, provider.name, config.model);
|
await deleteNoteEmbeddings(noteId, provider.name, config.model);
|
||||||
|
|
||||||
// Track successful and failed chunks
|
// Track successful and failed chunks in memory during this processing run
|
||||||
let successfulChunks = 0;
|
let successfulChunks = 0;
|
||||||
let failedChunks = 0;
|
let failedChunks = 0;
|
||||||
const totalChunks = chunks.length;
|
const totalChunks = chunks.length;
|
||||||
|
const failedChunkDetails: {index: number, error: string}[] = [];
|
||||||
// Get existing chunk failure data from the database
|
|
||||||
// We'll store this in a special attribute on the note to track per-chunk failures
|
|
||||||
const failedChunksData = await getFailedChunksData(noteId, provider.name);
|
|
||||||
|
|
||||||
// Process each chunk with a slight delay to avoid rate limits
|
// Process each chunk with a slight delay to avoid rate limits
|
||||||
for (let i = 0; i < chunks.length; i++) {
|
for (let i = 0; i < chunks.length; i++) {
|
||||||
const chunk = chunks[i];
|
const chunk = chunks[i];
|
||||||
const chunkId = `chunk_${i + 1}_of_${chunks.length}`;
|
const chunkId = `chunk_${i + 1}_of_${chunks.length}`;
|
||||||
|
|
||||||
// Skip chunks that have failed multiple times
|
|
||||||
if (failedChunksData[chunkId] && failedChunksData[chunkId].attempts >= 3) {
|
|
||||||
log.info(`Skipping chunk ${chunkId} for note ${noteId} after ${failedChunksData[chunkId].attempts} failed attempts`);
|
|
||||||
failedChunks++;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Create a modified context object with just this chunk's content
|
// Create a modified context object with just this chunk's content
|
||||||
const chunkContext: NoteEmbeddingContext = {
|
const chunkContext: NoteEmbeddingContext = {
|
||||||
@ -1124,12 +1019,6 @@ async function processNoteWithChunking(
|
|||||||
|
|
||||||
successfulChunks++;
|
successfulChunks++;
|
||||||
|
|
||||||
// Remove this chunk from failed chunks if it was previously failed
|
|
||||||
if (failedChunksData[chunkId]) {
|
|
||||||
delete failedChunksData[chunkId];
|
|
||||||
await updateFailedChunksData(noteId, provider.name, failedChunksData);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Small delay between chunks to avoid rate limits
|
// Small delay between chunks to avoid rate limits
|
||||||
if (i < chunks.length - 1) {
|
if (i < chunks.length - 1) {
|
||||||
await new Promise(resolve => setTimeout(resolve, 100));
|
await new Promise(resolve => setTimeout(resolve, 100));
|
||||||
@ -1137,21 +1026,10 @@ async function processNoteWithChunking(
|
|||||||
} catch (error: any) {
|
} catch (error: any) {
|
||||||
// Track the failure for this specific chunk
|
// Track the failure for this specific chunk
|
||||||
failedChunks++;
|
failedChunks++;
|
||||||
|
failedChunkDetails.push({
|
||||||
if (!failedChunksData[chunkId]) {
|
index: i + 1,
|
||||||
failedChunksData[chunkId] = {
|
error: error.message || 'Unknown error'
|
||||||
attempts: 1,
|
});
|
||||||
lastAttempt: dateUtils.utcNowDateTime(),
|
|
||||||
error: error.message || 'Unknown error'
|
|
||||||
};
|
|
||||||
} else {
|
|
||||||
failedChunksData[chunkId].attempts++;
|
|
||||||
failedChunksData[chunkId].lastAttempt = dateUtils.utcNowDateTime();
|
|
||||||
failedChunksData[chunkId].error = error.message || 'Unknown error';
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update the failed chunks data in the database
|
|
||||||
await updateFailedChunksData(noteId, provider.name, failedChunksData);
|
|
||||||
|
|
||||||
log.error(`Error processing chunk ${chunkId} for note ${noteId}: ${error.message || 'Unknown error'}`);
|
log.error(`Error processing chunk ${chunkId} for note ${noteId}: ${error.message || 'Unknown error'}`);
|
||||||
}
|
}
|
||||||
@ -1166,68 +1044,34 @@ async function processNoteWithChunking(
|
|||||||
log.info(`Failed to generate ${failedChunks} chunk embeddings for note ${noteId}`);
|
log.info(`Failed to generate ${failedChunks} chunk embeddings for note ${noteId}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
// If all chunks failed, throw an error so the note will be marked as failed
|
// If no chunks were successfully processed, throw an error
|
||||||
|
// This will keep the note in the queue for another attempt
|
||||||
if (successfulChunks === 0 && failedChunks > 0) {
|
if (successfulChunks === 0 && failedChunks > 0) {
|
||||||
throw new Error(`All ${failedChunks} chunks failed for note ${noteId}`);
|
throw new Error(`All ${failedChunks} chunks failed for note ${noteId}. First error: ${failedChunkDetails[0]?.error}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If some chunks failed but others succeeded, log a warning but consider the processing complete
|
||||||
|
// The note will be removed from the queue, but we'll store error information
|
||||||
|
if (failedChunks > 0 && successfulChunks > 0) {
|
||||||
|
const errorSummary = `Note processed partially: ${successfulChunks}/${totalChunks} chunks succeeded, ${failedChunks}/${totalChunks} failed`;
|
||||||
|
log.info(errorSummary);
|
||||||
|
|
||||||
|
// Store a summary in the error field of embedding_queue
|
||||||
|
// This is just for informational purposes - the note will be removed from the queue
|
||||||
|
const now = dateUtils.utcNowDateTime();
|
||||||
|
await sql.execute(`
|
||||||
|
UPDATE embedding_queue
|
||||||
|
SET error = ?, lastAttempt = ?
|
||||||
|
WHERE noteId = ?
|
||||||
|
`, [errorSummary, now, noteId]);
|
||||||
|
}
|
||||||
|
|
||||||
} catch (error: any) {
|
} catch (error: any) {
|
||||||
log.error(`Error in chunked embedding process for note ${noteId}: ${error.message || 'Unknown error'}`);
|
log.error(`Error in chunked embedding process for note ${noteId}: ${error.message || 'Unknown error'}`);
|
||||||
throw error;
|
throw error;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Store failed chunk data for a note
|
|
||||||
* This is stored in a special attribute on the note so we can track per-chunk failures
|
|
||||||
*/
|
|
||||||
async function getFailedChunksData(noteId: string, providerId: string): Promise<Record<string, {attempts: number, lastAttempt: string, error: string}>> {
|
|
||||||
try {
|
|
||||||
const attributeName = `${providerId}FailedChunks`;
|
|
||||||
const note = becca.getNote(noteId);
|
|
||||||
|
|
||||||
if (!note) {
|
|
||||||
return {};
|
|
||||||
}
|
|
||||||
|
|
||||||
const attr = note.getLabels().find(attr => attr.name === attributeName);
|
|
||||||
|
|
||||||
if (!attr || !attr.value) {
|
|
||||||
return {};
|
|
||||||
}
|
|
||||||
|
|
||||||
return JSON.parse(attr.value);
|
|
||||||
} catch (e) {
|
|
||||||
return {};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Update failed chunk data for a note
|
|
||||||
*/
|
|
||||||
async function updateFailedChunksData(noteId: string, providerId: string, data: Record<string, {attempts: number, lastAttempt: string, error: string}>): Promise<void> {
|
|
||||||
try {
|
|
||||||
const attributeName = `${providerId}FailedChunks`;
|
|
||||||
const note = becca.getNote(noteId);
|
|
||||||
|
|
||||||
if (!note) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Only store if there are failed chunks
|
|
||||||
if (Object.keys(data).length > 0) {
|
|
||||||
await note.setLabel(attributeName, JSON.stringify(data));
|
|
||||||
} else {
|
|
||||||
// If no failed chunks, remove the attribute if it exists
|
|
||||||
const attr = note.getLabels().find(attr => attr.name === attributeName);
|
|
||||||
if (attr) {
|
|
||||||
await note.removeLabel(attributeName);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (e) {
|
|
||||||
log.error(`Error updating failed chunks data for note ${noteId}: ${e}`);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
export function cleanupEmbeddings() {
|
export function cleanupEmbeddings() {
|
||||||
// Cleanup function implementation
|
// Cleanup function implementation
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user