diff options
| author | Mohamed Bassem <me@mbassem.com> | 2026-02-04 12:32:06 +0000 |
|---|---|---|
| committer | Mohamed Bassem <me@mbassem.com> | 2026-02-04 12:32:06 +0000 |
| commit | aec21d78bf51a9deb62ae9ca697996d1c10972fb (patch) | |
| tree | 8b936cbdfad55e9027f5c119be5ec708b2e81434 | |
| parent | b01be27013a11e1641448f8e38c4554be6718ba3 (diff) | |
| download | karakeep-aec21d78bf51a9deb62ae9ca697996d1c10972fb.tar.zst | |
fix: extra logging for the import worker
| -rw-r--r-- | apps/workers/workers/importWorker.ts | 52 |
1 files changed, 39 insertions, 13 deletions
diff --git a/apps/workers/workers/importWorker.ts b/apps/workers/workers/importWorker.ts index e24d79f7..491d6088 100644 --- a/apps/workers/workers/importWorker.ts +++ b/apps/workers/workers/importWorker.ts @@ -130,6 +130,10 @@ export class ImportWorker { return 0; } + logger.debug( + `[import] ${countPendingItems} pending items, available capacity: ${availableCapacity}`, + ); + // 2. Get candidate IDs with fair scheduling across users const batchLimit = Math.min(this.batchSize, availableCapacity); const candidateIds = await this.getNextBatchFairly(batchLimit); @@ -156,6 +160,9 @@ export class ImportWorker { // 4. Mark session(s) as running (using claimed rows, not candidates) const sessionIds = [...new Set(batch.map((b) => b.importSessionId))]; + logger.info( + `[import] Claimed batch of ${batch.length} items from ${sessionIds.length} session(s): [${sessionIds.join(", ")}]`, + ); await db .update(importSessions) .set({ status: "running" }) @@ -167,10 +174,21 @@ export class ImportWorker { ); // 5. Process in parallel - await Promise.allSettled( + const results = await Promise.allSettled( batch.map((staged) => this.processOneBookmark(staged)), ); + const outcomes: Record<string, number> = {}; + for (const r of results) { + const key = r.status === "fulfilled" ? r.value : "error"; + outcomes[key] = (outcomes[key] ?? 0) + 1; + } + logger.debug( + `[import] Batch results: ${Object.entries(outcomes) + .map(([k, v]) => `${k}=${v}`) + .join(", ")}`, + ); + // 6. Check if any sessions are now complete await this.checkAndCompleteEmptySessions(sessionIds); @@ -287,7 +305,7 @@ export class ImportWorker { private async processOneBookmark( staged: typeof importStagingBookmarks.$inferSelect, - ) { + ): Promise<string> { const session = await db.query.importSessions.findFirst({ where: eq(importSessions.id, staged.importSessionId), }); @@ -298,7 +316,7 @@ export class ImportWorker { .update(importStagingBookmarks) .set({ status: "pending" }) .where(eq(importStagingBookmarks.id, staged.id)); - return; + return "reset"; } try { @@ -340,9 +358,6 @@ export class ImportWorker { }; } else { // asset type - skip for now as it needs special handling - logger.warn( - `[import] Asset bookmarks not yet supported in import worker: ${staged.id}`, - ); await db .update(importStagingBookmarks) .set({ @@ -353,7 +368,7 @@ export class ImportWorker { }) .where(eq(importStagingBookmarks.id, staged.id)); await this.updateSessionLastProcessedAt(staged.importSessionId); - return; + return "unsupported"; } const result = await caller.bookmarks.createBookmark(bookmarkRequest); @@ -383,7 +398,7 @@ export class ImportWorker { importStagingProcessedCounter.inc({ result: "skipped_duplicate" }); await this.attachBookmarkToLists(caller, session, staged, result.id); await this.updateSessionLastProcessedAt(staged.importSessionId); - return; + return "duplicate"; } // Mark as accepted but keep in "processing" until crawl/tag is done @@ -399,6 +414,7 @@ export class ImportWorker { await this.attachBookmarkToLists(caller, session, staged, result.id); await this.updateSessionLastProcessedAt(staged.importSessionId); + return "accepted"; } catch (error) { logger.error( `[import] Error processing staged item ${staged.id}: ${error}`, @@ -415,6 +431,7 @@ export class ImportWorker { importStagingProcessedCounter.inc({ result: "rejected" }); await this.updateSessionLastProcessedAt(staged.importSessionId); + return "failed"; } } @@ -438,6 +455,9 @@ export class ImportWorker { ); if (remaining[0]?.count === 0) { + logger.info( + `[import] Session ${sessionId} completed, all items processed`, + ); await db .update(importSessions) .set({ status: "completed" }) @@ -491,6 +511,10 @@ export class ImportWorker { return 0; } + logger.debug( + `[import] ${completedItems.length} item(s) finished downstream processing, marking as completed`, + ); + // Mark them as completed await db .update(importStagingBookmarks) @@ -541,11 +565,13 @@ export class ImportWorker { OpenAIQueue.stats(), ]); - const inFlight = Math.max( - crawlerQueue.pending + crawlerQueue.running + crawlerQueue.pending_retry, - openaiQueue.pending + openaiQueue.running + openaiQueue.pending_retry, - processingCount[0]?.count ?? 0, - ); + const crawlerTotal = + crawlerQueue.pending + crawlerQueue.running + crawlerQueue.pending_retry; + const openaiTotal = + openaiQueue.pending + openaiQueue.running + openaiQueue.pending_retry; + const processingTotal = processingCount[0]?.count ?? 0; + + const inFlight = Math.max(crawlerTotal, openaiTotal, processingTotal); importStagingInFlightGauge.set(inFlight); return this.maxInFlight - inFlight; |
