aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2026-02-04 12:32:06 +0000
committerMohamed Bassem <me@mbassem.com>2026-02-04 12:32:06 +0000
commitaec21d78bf51a9deb62ae9ca697996d1c10972fb (patch)
tree8b936cbdfad55e9027f5c119be5ec708b2e81434
parentb01be27013a11e1641448f8e38c4554be6718ba3 (diff)
downloadkarakeep-aec21d78bf51a9deb62ae9ca697996d1c10972fb.tar.zst
fix: extra logging for the import worker
-rw-r--r--apps/workers/workers/importWorker.ts52
1 files changed, 39 insertions, 13 deletions
diff --git a/apps/workers/workers/importWorker.ts b/apps/workers/workers/importWorker.ts
index e24d79f7..491d6088 100644
--- a/apps/workers/workers/importWorker.ts
+++ b/apps/workers/workers/importWorker.ts
@@ -130,6 +130,10 @@ export class ImportWorker {
return 0;
}
+ logger.debug(
+ `[import] ${countPendingItems} pending items, available capacity: ${availableCapacity}`,
+ );
+
// 2. Get candidate IDs with fair scheduling across users
const batchLimit = Math.min(this.batchSize, availableCapacity);
const candidateIds = await this.getNextBatchFairly(batchLimit);
@@ -156,6 +160,9 @@ export class ImportWorker {
// 4. Mark session(s) as running (using claimed rows, not candidates)
const sessionIds = [...new Set(batch.map((b) => b.importSessionId))];
+ logger.info(
+ `[import] Claimed batch of ${batch.length} items from ${sessionIds.length} session(s): [${sessionIds.join(", ")}]`,
+ );
await db
.update(importSessions)
.set({ status: "running" })
@@ -167,10 +174,21 @@ export class ImportWorker {
);
// 5. Process in parallel
- await Promise.allSettled(
+ const results = await Promise.allSettled(
batch.map((staged) => this.processOneBookmark(staged)),
);
+ const outcomes: Record<string, number> = {};
+ for (const r of results) {
+ const key = r.status === "fulfilled" ? r.value : "error";
+ outcomes[key] = (outcomes[key] ?? 0) + 1;
+ }
+ logger.debug(
+ `[import] Batch results: ${Object.entries(outcomes)
+ .map(([k, v]) => `${k}=${v}`)
+ .join(", ")}`,
+ );
+
// 6. Check if any sessions are now complete
await this.checkAndCompleteEmptySessions(sessionIds);
@@ -287,7 +305,7 @@ export class ImportWorker {
private async processOneBookmark(
staged: typeof importStagingBookmarks.$inferSelect,
- ) {
+ ): Promise<string> {
const session = await db.query.importSessions.findFirst({
where: eq(importSessions.id, staged.importSessionId),
});
@@ -298,7 +316,7 @@ export class ImportWorker {
.update(importStagingBookmarks)
.set({ status: "pending" })
.where(eq(importStagingBookmarks.id, staged.id));
- return;
+ return "reset";
}
try {
@@ -340,9 +358,6 @@ export class ImportWorker {
};
} else {
// asset type - skip for now as it needs special handling
- logger.warn(
- `[import] Asset bookmarks not yet supported in import worker: ${staged.id}`,
- );
await db
.update(importStagingBookmarks)
.set({
@@ -353,7 +368,7 @@ export class ImportWorker {
})
.where(eq(importStagingBookmarks.id, staged.id));
await this.updateSessionLastProcessedAt(staged.importSessionId);
- return;
+ return "unsupported";
}
const result = await caller.bookmarks.createBookmark(bookmarkRequest);
@@ -383,7 +398,7 @@ export class ImportWorker {
importStagingProcessedCounter.inc({ result: "skipped_duplicate" });
await this.attachBookmarkToLists(caller, session, staged, result.id);
await this.updateSessionLastProcessedAt(staged.importSessionId);
- return;
+ return "duplicate";
}
// Mark as accepted but keep in "processing" until crawl/tag is done
@@ -399,6 +414,7 @@ export class ImportWorker {
await this.attachBookmarkToLists(caller, session, staged, result.id);
await this.updateSessionLastProcessedAt(staged.importSessionId);
+ return "accepted";
} catch (error) {
logger.error(
`[import] Error processing staged item ${staged.id}: ${error}`,
@@ -415,6 +431,7 @@ export class ImportWorker {
importStagingProcessedCounter.inc({ result: "rejected" });
await this.updateSessionLastProcessedAt(staged.importSessionId);
+ return "failed";
}
}
@@ -438,6 +455,9 @@ export class ImportWorker {
);
if (remaining[0]?.count === 0) {
+ logger.info(
+ `[import] Session ${sessionId} completed, all items processed`,
+ );
await db
.update(importSessions)
.set({ status: "completed" })
@@ -491,6 +511,10 @@ export class ImportWorker {
return 0;
}
+ logger.debug(
+ `[import] ${completedItems.length} item(s) finished downstream processing, marking as completed`,
+ );
+
// Mark them as completed
await db
.update(importStagingBookmarks)
@@ -541,11 +565,13 @@ export class ImportWorker {
OpenAIQueue.stats(),
]);
- const inFlight = Math.max(
- crawlerQueue.pending + crawlerQueue.running + crawlerQueue.pending_retry,
- openaiQueue.pending + openaiQueue.running + openaiQueue.pending_retry,
- processingCount[0]?.count ?? 0,
- );
+ const crawlerTotal =
+ crawlerQueue.pending + crawlerQueue.running + crawlerQueue.pending_retry;
+ const openaiTotal =
+ openaiQueue.pending + openaiQueue.running + openaiQueue.pending_retry;
+ const processingTotal = processingCount[0]?.count ?? 0;
+
+ const inFlight = Math.max(crawlerTotal, openaiTotal, processingTotal);
importStagingInFlightGauge.set(inFlight);
return this.maxInFlight - inFlight;