diff options
| author | Mohamed Bassem <me@mbassem.com> | 2026-02-04 12:14:37 +0000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2026-02-04 12:14:37 +0000 |
| commit | e8e48a4144d2461666fa08b535c4de37d5db1b2f (patch) | |
| tree | b95ee8a1623a9958ff6dd2a4df81d476b00c2ac0 /apps | |
| parent | 3c838ddb26c1e86d3f201ce71f13c834be705f69 (diff) | |
| download | karakeep-e8e48a4144d2461666fa08b535c4de37d5db1b2f.tar.zst | |
fix: backfill old sessions and do queue backpressure (#2449)
* fix: backfill old sessions and do queue backpressure
* fix typo
Diffstat (limited to 'apps')
| -rw-r--r-- | apps/workers/workers/importWorker.ts | 75 |
1 files changed, 54 insertions, 21 deletions
diff --git a/apps/workers/workers/importWorker.ts b/apps/workers/workers/importWorker.ts index a717fd9d..e24d79f7 100644 --- a/apps/workers/workers/importWorker.ts +++ b/apps/workers/workers/importWorker.ts @@ -2,6 +2,7 @@ import { and, count, eq, + gt, inArray, isNotNull, isNull, @@ -18,7 +19,8 @@ import { importSessions, importStagingBookmarks, } from "@karakeep/db/schema"; -import logger from "@karakeep/shared/logger"; +import { LinkCrawlerQueue, OpenAIQueue } from "@karakeep/shared-server"; +import logger, { throttledLogger } from "@karakeep/shared/logger"; import { BookmarkTypes } from "@karakeep/shared/types/bookmarks"; // Prometheus metrics @@ -55,13 +57,15 @@ const importBatchDurationHistogram = new Histogram({ buckets: [0.1, 0.5, 1, 2, 5, 10, 30], }); +const backpressureLogger = throttledLogger(60_000); + function sleep(ms: number): Promise<void> { return new Promise((resolve) => setTimeout(resolve, ms)); } export class ImportWorker { private running = false; - private pollIntervalMs = 1000; + private pollIntervalMs = 5000; // Backpressure settings private maxInFlight = 50; @@ -88,8 +92,11 @@ export class ImportWorker { const processed = await this.processBatch(); if (processed === 0) { await this.checkAndCompleteIdleSessions(); + await this.updateGauges(); // Nothing to do, wait before polling again await sleep(this.pollIntervalMs); + } else { + await this.updateGauges(); } } catch (error) { logger.error(`[import] Error in polling loop: ${error}`); @@ -104,12 +111,22 @@ export class ImportWorker { } private async processBatch(): Promise<number> { - // 1. Check backpressure - count in-flight + recently completed items + const countPendingItems = await this.countPendingItems(); + importStagingPendingGauge.set(countPendingItems); + if (countPendingItems === 0) { + // Nothing to do, wait before polling again + return 0; + } + + // 1. Check backpressure - inflight items + queue sizes const availableCapacity = await this.getAvailableCapacity(); - importStagingInFlightGauge.set(this.maxInFlight - availableCapacity); if (availableCapacity <= 0) { // At capacity, wait before trying again + backpressureLogger( + "info", + `[import] Pending import items: ${countPendingItems}, but current capacity is ${availableCapacity}. Will wait until capacity is available.`, + ); return 0; } @@ -158,19 +175,11 @@ export class ImportWorker { await this.checkAndCompleteEmptySessions(sessionIds); batchTimer(); // Record batch duration - await this.updateGauges(); // Update pending/session gauges return batch.length; } private async updateGauges() { - // Update pending items gauge - const pending = await db - .select({ count: count() }) - .from(importStagingBookmarks) - .where(eq(importStagingBookmarks.status, "pending")); - importStagingPendingGauge.set(pending[0]?.count ?? 0); - // Update active sessions gauge by status const sessions = await db .select({ @@ -213,6 +222,14 @@ export class ImportWorker { await this.checkAndCompleteEmptySessions(sessionIds); } + private async countPendingItems(): Promise<number> { + const res = await db + .select({ count: count() }) + .from(importStagingBookmarks) + .where(eq(importStagingBookmarks.status, "pending")); + return res[0]?.count ?? 0; + } + private async getNextBatchFairly(limit: number): Promise<string[]> { // Query pending item IDs from active sessions, ordered by: // 1. User's last-served timestamp (fairness) @@ -504,18 +521,34 @@ export class ImportWorker { } /** - * Backpressure: Calculate available capacity. - * Counts items currently in "processing" status, which includes both: - * - Items being actively imported - * - Items waiting for downstream crawl/tag to complete + * Backpressure: Calculate available capacity based on number of items in flight and the health of the import queues. */ private async getAvailableCapacity(): Promise<number> { - const processing = await db - .select({ count: count() }) - .from(importStagingBookmarks) - .where(eq(importStagingBookmarks.status, "processing")); + const [processingCount, crawlerQueue, openaiQueue] = await Promise.all([ + db + .select({ count: count() }) + .from(importStagingBookmarks) + .where( + and( + eq(importStagingBookmarks.status, "processing"), + gt( + importStagingBookmarks.processingStartedAt, + new Date(Date.now() - this.staleThresholdMs), + ), + ), + ), + LinkCrawlerQueue.stats(), + OpenAIQueue.stats(), + ]); + + const inFlight = Math.max( + crawlerQueue.pending + crawlerQueue.running + crawlerQueue.pending_retry, + openaiQueue.pending + openaiQueue.running + openaiQueue.pending_retry, + processingCount[0]?.count ?? 0, + ); + importStagingInFlightGauge.set(inFlight); - return this.maxInFlight - (processing[0]?.count ?? 0); + return this.maxInFlight - inFlight; } /** |
