mirror of
https://github.com/TriliumNext/Notes.git
synced 2025-08-11 19:13:55 +08:00
update chunking management
This commit is contained in:
parent
e5afbc6ddc
commit
46a6533e57
@ -26,6 +26,15 @@ const ERROR_CATEGORIES = {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Maximum time (in milliseconds) allowed for the entire chunking process
|
||||||
|
const MAX_TOTAL_PROCESSING_TIME = 5 * 60 * 1000; // 5 minutes
|
||||||
|
|
||||||
|
// Maximum number of retry attempts per chunk
|
||||||
|
const MAX_CHUNK_RETRY_ATTEMPTS = 2;
|
||||||
|
|
||||||
|
// Maximum time per chunk processing (to prevent individual chunks from hanging)
|
||||||
|
const MAX_CHUNK_PROCESSING_TIME = 60 * 1000; // 1 minute
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Categorize an error as temporary or permanent based on its message
|
* Categorize an error as temporary or permanent based on its message
|
||||||
* @param errorMessage - The error message to categorize
|
* @param errorMessage - The error message to categorize
|
||||||
@ -52,6 +61,28 @@ function categorizeError(errorMessage: string): 'temporary' | 'permanent' | 'unk
|
|||||||
return 'unknown';
|
return 'unknown';
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Process a chunk with a timeout to prevent hanging
|
||||||
|
* @param provider - The embedding provider
|
||||||
|
* @param chunk - The chunk to process
|
||||||
|
* @param timeoutMs - Timeout in milliseconds
|
||||||
|
* @returns The generated embedding
|
||||||
|
*/
|
||||||
|
async function processChunkWithTimeout(provider: any, chunk: any, timeoutMs: number): Promise<any> {
|
||||||
|
// Create a promise that rejects after the timeout
|
||||||
|
const timeoutPromise = new Promise((_, reject) => {
|
||||||
|
setTimeout(() => {
|
||||||
|
reject(new Error(`Chunk processing timed out after ${timeoutMs}ms`));
|
||||||
|
}, timeoutMs);
|
||||||
|
});
|
||||||
|
|
||||||
|
// Create the actual processing promise
|
||||||
|
const processingPromise = provider.generateEmbeddings(chunk.content);
|
||||||
|
|
||||||
|
// Race the two promises - whichever completes/rejects first wins
|
||||||
|
return Promise.race([processingPromise, timeoutPromise]);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Process a large note by breaking it into chunks and creating embeddings for each chunk
|
* Process a large note by breaking it into chunks and creating embeddings for each chunk
|
||||||
* This provides more detailed and focused embeddings for different parts of large notes
|
* This provides more detailed and focused embeddings for different parts of large notes
|
||||||
@ -65,6 +96,9 @@ export async function processNoteWithChunking(
|
|||||||
provider: any,
|
provider: any,
|
||||||
context: NoteEmbeddingContext
|
context: NoteEmbeddingContext
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
|
// Track the overall start time
|
||||||
|
const startTime = Date.now();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Get the context extractor dynamically to avoid circular dependencies
|
// Get the context extractor dynamically to avoid circular dependencies
|
||||||
const { ContextExtractor } = await import('../context/index.js');
|
const { ContextExtractor } = await import('../context/index.js');
|
||||||
@ -127,17 +161,37 @@ export async function processNoteWithChunking(
|
|||||||
attempts: number
|
attempts: number
|
||||||
}[] = [];
|
}[] = [];
|
||||||
|
|
||||||
// Maximum number of retry attempts per chunk
|
|
||||||
const MAX_CHUNK_RETRY_ATTEMPTS = 2;
|
|
||||||
|
|
||||||
log.info(`Processing ${chunks.length} chunks for note ${noteId} (${note.title})`);
|
log.info(`Processing ${chunks.length} chunks for note ${noteId} (${note.title})`);
|
||||||
|
|
||||||
// Process each chunk with a delay based on provider to avoid rate limits
|
// Process each chunk with a delay based on provider to avoid rate limits
|
||||||
for (let i = 0; i < chunks.length; i++) {
|
for (let i = 0; i < chunks.length; i++) {
|
||||||
|
// Check if we've exceeded the overall time limit
|
||||||
|
if (Date.now() - startTime > MAX_TOTAL_PROCESSING_TIME) {
|
||||||
|
log.info(`Exceeded maximum processing time (${MAX_TOTAL_PROCESSING_TIME}ms) for note ${noteId}, stopping after ${i} chunks`);
|
||||||
|
|
||||||
|
// Mark remaining chunks as failed due to timeout
|
||||||
|
for (let j = i; j < chunks.length; j++) {
|
||||||
|
failedChunks++;
|
||||||
|
failedChunkDetails.push({
|
||||||
|
index: j + 1,
|
||||||
|
error: "Processing timeout - exceeded total allowed time",
|
||||||
|
category: 'temporary',
|
||||||
|
attempts: 1
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Break the loop, we'll handle this as partial success if some chunks succeeded
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
const chunk = chunks[i];
|
const chunk = chunks[i];
|
||||||
try {
|
try {
|
||||||
// Generate embedding for this chunk's content
|
// Generate embedding for this chunk's content with a timeout
|
||||||
const embedding = await provider.generateEmbeddings(chunk.content);
|
const embedding = await processChunkWithTimeout(
|
||||||
|
provider,
|
||||||
|
chunk,
|
||||||
|
MAX_CHUNK_PROCESSING_TIME
|
||||||
|
);
|
||||||
|
|
||||||
// Store with chunk information in a unique ID format
|
// Store with chunk information in a unique ID format
|
||||||
const chunkIdSuffix = `${i + 1}_of_${chunks.length}`;
|
const chunkIdSuffix = `${i + 1}_of_${chunks.length}`;
|
||||||
@ -183,11 +237,21 @@ export async function processNoteWithChunking(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Set a time limit for the retry phase
|
||||||
|
const retryStartTime = Date.now();
|
||||||
|
const MAX_RETRY_TIME = 2 * 60 * 1000; // 2 minutes for all retries
|
||||||
|
|
||||||
// Retry failed chunks with exponential backoff, but only those that aren't permanent errors
|
// Retry failed chunks with exponential backoff, but only those that aren't permanent errors
|
||||||
if (retryQueue.length > 0 && retryQueue.length < chunks.length) {
|
if (retryQueue.length > 0 && retryQueue.length < chunks.length) {
|
||||||
log.info(`Retrying ${retryQueue.length} failed chunks for note ${noteId}`);
|
log.info(`Retrying ${retryQueue.length} failed chunks for note ${noteId}`);
|
||||||
|
|
||||||
for (let j = 0; j < retryQueue.length; j++) {
|
for (let j = 0; j < retryQueue.length; j++) {
|
||||||
|
// Check if we've exceeded the retry time limit
|
||||||
|
if (Date.now() - retryStartTime > MAX_RETRY_TIME) {
|
||||||
|
log.info(`Exceeded maximum retry time (${MAX_RETRY_TIME}ms) for note ${noteId}, stopping after ${j} retries`);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
const item = retryQueue[j];
|
const item = retryQueue[j];
|
||||||
|
|
||||||
// Skip if we've already reached the max retry attempts for this chunk
|
// Skip if we've already reached the max retry attempts for this chunk
|
||||||
@ -200,8 +264,12 @@ export async function processNoteWithChunking(
|
|||||||
// Wait longer for retries with exponential backoff
|
// Wait longer for retries with exponential backoff
|
||||||
await new Promise(resolve => setTimeout(resolve, 1000 * Math.pow(1.5, j)));
|
await new Promise(resolve => setTimeout(resolve, 1000 * Math.pow(1.5, j)));
|
||||||
|
|
||||||
// Retry the embedding
|
// Retry the embedding with timeout
|
||||||
const embedding = await provider.generateEmbeddings(item.chunk.content);
|
const embedding = await processChunkWithTimeout(
|
||||||
|
provider,
|
||||||
|
item.chunk,
|
||||||
|
MAX_CHUNK_PROCESSING_TIME
|
||||||
|
);
|
||||||
|
|
||||||
// Store with unique ID that indicates it was a retry
|
// Store with unique ID that indicates it was a retry
|
||||||
const chunkIdSuffix = `${item.index + 1}_of_${chunks.length}`;
|
const chunkIdSuffix = `${item.index + 1}_of_${chunks.length}`;
|
||||||
@ -237,8 +305,15 @@ export async function processNoteWithChunking(
|
|||||||
|
|
||||||
log.error(`Retry failed for chunk ${item.index + 1} of note ${noteId} (${errorCategory} error): ${errorMessage}`);
|
log.error(`Retry failed for chunk ${item.index + 1} of note ${noteId} (${errorCategory} error): ${errorMessage}`);
|
||||||
|
|
||||||
|
// For timeout errors, mark as permanent to avoid further retries
|
||||||
|
if (errorMessage.includes('timed out')) {
|
||||||
|
if (detailIndex >= 0) {
|
||||||
|
failedChunkDetails[detailIndex].category = 'permanent';
|
||||||
|
}
|
||||||
|
log.info(`Chunk ${item.index + 1} for note ${noteId} timed out, marking as permanent failure`);
|
||||||
|
}
|
||||||
// Add to retry queue again only if it's not a permanent error and hasn't reached the max attempts
|
// Add to retry queue again only if it's not a permanent error and hasn't reached the max attempts
|
||||||
if (errorCategory !== 'permanent' && item.attempts + 1 < MAX_CHUNK_RETRY_ATTEMPTS) {
|
else if (errorCategory !== 'permanent' && item.attempts + 1 < MAX_CHUNK_RETRY_ATTEMPTS) {
|
||||||
// If we're still below MAX_CHUNK_RETRY_ATTEMPTS, we'll try again in the next cycle
|
// If we're still below MAX_CHUNK_RETRY_ATTEMPTS, we'll try again in the next cycle
|
||||||
item.attempts++;
|
item.attempts++;
|
||||||
} else if (errorCategory === 'permanent') {
|
} else if (errorCategory === 'permanent') {
|
||||||
@ -265,17 +340,26 @@ export async function processNoteWithChunking(
|
|||||||
`Permanent: ${permanentErrors}, Temporary: ${temporaryErrors}, Unknown: ${unknownErrors}`);
|
`Permanent: ${permanentErrors}, Temporary: ${temporaryErrors}, Unknown: ${unknownErrors}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
// If no chunks were successfully processed, throw an error
|
// Calculate the failure ratio
|
||||||
// This will keep the note in the queue for another attempt
|
const failureRatio = failedChunks / totalChunks;
|
||||||
if (successfulChunks === 0 && failedChunks > 0) {
|
|
||||||
|
// If no chunks were successfully processed, or if more than 50% failed, mark the entire note as failed
|
||||||
|
if (successfulChunks === 0 || failureRatio > 0.5) {
|
||||||
// Check if all failures are permanent
|
// Check if all failures are permanent
|
||||||
const allPermanent = failedChunkDetails.every(d => d.category === 'permanent');
|
const allPermanent = failedChunkDetails.every(d => d.category === 'permanent');
|
||||||
|
const errorType = allPermanent ? 'permanent' : (failureRatio > 0.5 ? 'too_many_failures' : 'all_failed');
|
||||||
|
|
||||||
if (allPermanent) {
|
// Mark this note as failed in the embedding_queue table with a permanent error status
|
||||||
throw new Error(`All ${failedChunks} chunks failed with permanent errors for note ${noteId}. First error: ${failedChunkDetails[0]?.error}`);
|
const now = dateUtils.utcNowDateTime();
|
||||||
} else {
|
const errorSummary = `Note embedding failed: ${failedChunks}/${totalChunks} chunks failed (${errorType}). First error: ${failedChunkDetails[0]?.error}`;
|
||||||
throw new Error(`All ${failedChunks} chunks failed for note ${noteId}. First error: ${failedChunkDetails[0]?.error}`);
|
|
||||||
}
|
await sql.execute(`
|
||||||
|
UPDATE embedding_queue
|
||||||
|
SET error = ?, lastAttempt = ?, attempts = 999
|
||||||
|
WHERE noteId = ?
|
||||||
|
`, [errorSummary, now, noteId]);
|
||||||
|
|
||||||
|
throw new Error(errorSummary);
|
||||||
}
|
}
|
||||||
|
|
||||||
// If some chunks failed but others succeeded, log a warning but consider the processing complete
|
// If some chunks failed but others succeeded, log a warning but consider the processing complete
|
||||||
@ -300,6 +384,10 @@ export async function processNoteWithChunking(
|
|||||||
`, [errorSummary, now, noteId]);
|
`, [errorSummary, now, noteId]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Track total processing time
|
||||||
|
const totalTime = Date.now() - startTime;
|
||||||
|
log.info(`Total processing time for note ${noteId}: ${totalTime}ms`);
|
||||||
|
|
||||||
} catch (error: any) {
|
} catch (error: any) {
|
||||||
log.error(`Error in chunked embedding process for note ${noteId}: ${error.message || 'Unknown error'}`);
|
log.error(`Error in chunked embedding process for note ${noteId}: ${error.message || 'Unknown error'}`);
|
||||||
throw error;
|
throw error;
|
||||||
|
@ -289,6 +289,13 @@ export async function processEmbeddingQueue() {
|
|||||||
// This allows manual retries later
|
// This allows manual retries later
|
||||||
if (noteData.attempts + 1 >= 3) {
|
if (noteData.attempts + 1 >= 3) {
|
||||||
log.error(`Marked note ${noteData.noteId} as permanently failed after multiple embedding attempts`);
|
log.error(`Marked note ${noteData.noteId} as permanently failed after multiple embedding attempts`);
|
||||||
|
|
||||||
|
// Update the attempts to a very high number to indicate permanent failure
|
||||||
|
await sql.execute(`
|
||||||
|
UPDATE embedding_queue
|
||||||
|
SET attempts = 999
|
||||||
|
WHERE noteId = ?
|
||||||
|
`, [noteData.noteId]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user