aboutsummaryrefslogtreecommitdiffstats
path: root/apps
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2026-02-04 12:14:37 +0000
committerGitHub <noreply@github.com>2026-02-04 12:14:37 +0000
commite8e48a4144d2461666fa08b535c4de37d5db1b2f (patch)
treeb95ee8a1623a9958ff6dd2a4df81d476b00c2ac0 /apps
parent3c838ddb26c1e86d3f201ce71f13c834be705f69 (diff)
downloadkarakeep-e8e48a4144d2461666fa08b535c4de37d5db1b2f.tar.zst
fix: backfill old sessions and do queue backpressure (#2449)
* fix: backfill old sessions and do queue backpressure * fix typo
Diffstat (limited to 'apps')
-rw-r--r--apps/workers/workers/importWorker.ts75
1 files changed, 54 insertions, 21 deletions
diff --git a/apps/workers/workers/importWorker.ts b/apps/workers/workers/importWorker.ts
index a717fd9d..e24d79f7 100644
--- a/apps/workers/workers/importWorker.ts
+++ b/apps/workers/workers/importWorker.ts
@@ -2,6 +2,7 @@ import {
and,
count,
eq,
+ gt,
inArray,
isNotNull,
isNull,
@@ -18,7 +19,8 @@ import {
importSessions,
importStagingBookmarks,
} from "@karakeep/db/schema";
-import logger from "@karakeep/shared/logger";
+import { LinkCrawlerQueue, OpenAIQueue } from "@karakeep/shared-server";
+import logger, { throttledLogger } from "@karakeep/shared/logger";
import { BookmarkTypes } from "@karakeep/shared/types/bookmarks";
// Prometheus metrics
@@ -55,13 +57,15 @@ const importBatchDurationHistogram = new Histogram({
buckets: [0.1, 0.5, 1, 2, 5, 10, 30],
});
+const backpressureLogger = throttledLogger(60_000);
+
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
export class ImportWorker {
private running = false;
- private pollIntervalMs = 1000;
+ private pollIntervalMs = 5000;
// Backpressure settings
private maxInFlight = 50;
@@ -88,8 +92,11 @@ export class ImportWorker {
const processed = await this.processBatch();
if (processed === 0) {
await this.checkAndCompleteIdleSessions();
+ await this.updateGauges();
// Nothing to do, wait before polling again
await sleep(this.pollIntervalMs);
+ } else {
+ await this.updateGauges();
}
} catch (error) {
logger.error(`[import] Error in polling loop: ${error}`);
@@ -104,12 +111,22 @@ export class ImportWorker {
}
private async processBatch(): Promise<number> {
- // 1. Check backpressure - count in-flight + recently completed items
+ const countPendingItems = await this.countPendingItems();
+ importStagingPendingGauge.set(countPendingItems);
+ if (countPendingItems === 0) {
+ // Nothing to do, wait before polling again
+ return 0;
+ }
+
+ // 1. Check backpressure - inflight items + queue sizes
const availableCapacity = await this.getAvailableCapacity();
- importStagingInFlightGauge.set(this.maxInFlight - availableCapacity);
if (availableCapacity <= 0) {
// At capacity, wait before trying again
+ backpressureLogger(
+ "info",
+ `[import] Pending import items: ${countPendingItems}, but current capacity is ${availableCapacity}. Will wait until capacity is available.`,
+ );
return 0;
}
@@ -158,19 +175,11 @@ export class ImportWorker {
await this.checkAndCompleteEmptySessions(sessionIds);
batchTimer(); // Record batch duration
- await this.updateGauges(); // Update pending/session gauges
return batch.length;
}
private async updateGauges() {
- // Update pending items gauge
- const pending = await db
- .select({ count: count() })
- .from(importStagingBookmarks)
- .where(eq(importStagingBookmarks.status, "pending"));
- importStagingPendingGauge.set(pending[0]?.count ?? 0);
-
// Update active sessions gauge by status
const sessions = await db
.select({
@@ -213,6 +222,14 @@ export class ImportWorker {
await this.checkAndCompleteEmptySessions(sessionIds);
}
+ private async countPendingItems(): Promise<number> {
+ const res = await db
+ .select({ count: count() })
+ .from(importStagingBookmarks)
+ .where(eq(importStagingBookmarks.status, "pending"));
+ return res[0]?.count ?? 0;
+ }
+
private async getNextBatchFairly(limit: number): Promise<string[]> {
// Query pending item IDs from active sessions, ordered by:
// 1. User's last-served timestamp (fairness)
@@ -504,18 +521,34 @@ export class ImportWorker {
}
/**
- * Backpressure: Calculate available capacity.
- * Counts items currently in "processing" status, which includes both:
- * - Items being actively imported
- * - Items waiting for downstream crawl/tag to complete
+ * Backpressure: Calculate available capacity based on number of items in flight and the health of the import queues.
*/
private async getAvailableCapacity(): Promise<number> {
- const processing = await db
- .select({ count: count() })
- .from(importStagingBookmarks)
- .where(eq(importStagingBookmarks.status, "processing"));
+ const [processingCount, crawlerQueue, openaiQueue] = await Promise.all([
+ db
+ .select({ count: count() })
+ .from(importStagingBookmarks)
+ .where(
+ and(
+ eq(importStagingBookmarks.status, "processing"),
+ gt(
+ importStagingBookmarks.processingStartedAt,
+ new Date(Date.now() - this.staleThresholdMs),
+ ),
+ ),
+ ),
+ LinkCrawlerQueue.stats(),
+ OpenAIQueue.stats(),
+ ]);
+
+ const inFlight = Math.max(
+ crawlerQueue.pending + crawlerQueue.running + crawlerQueue.pending_retry,
+ openaiQueue.pending + openaiQueue.running + openaiQueue.pending_retry,
+ processingCount[0]?.count ?? 0,
+ );
+ importStagingInFlightGauge.set(inFlight);
- return this.maxInFlight - (processing[0]?.count ?? 0);
+ return this.maxInFlight - inFlight;
}
/**