diff options
| author | Mohamed Bassem <me@mbassem.com> | 2026-02-04 12:14:37 +0000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2026-02-04 12:14:37 +0000 |
| commit | e8e48a4144d2461666fa08b535c4de37d5db1b2f (patch) | |
| tree | b95ee8a1623a9958ff6dd2a4df81d476b00c2ac0 | |
| parent | 3c838ddb26c1e86d3f201ce71f13c834be705f69 (diff) | |
| download | karakeep-e8e48a4144d2461666fa08b535c4de37d5db1b2f.tar.zst | |
fix: backfill old sessions and do queue backpressure (#2449)
* fix: backfill old sessions and do queue backpressure
* fix typo
| -rw-r--r-- | apps/workers/workers/importWorker.ts | 75 | ||||
| -rw-r--r-- | packages/db/drizzle/0077_import_listpaths_to_listids.sql | 53 | ||||
| -rw-r--r-- | packages/shared/logger.ts | 12 |
3 files changed, 118 insertions, 22 deletions
diff --git a/apps/workers/workers/importWorker.ts b/apps/workers/workers/importWorker.ts index a717fd9d..e24d79f7 100644 --- a/apps/workers/workers/importWorker.ts +++ b/apps/workers/workers/importWorker.ts @@ -2,6 +2,7 @@ import { and, count, eq, + gt, inArray, isNotNull, isNull, @@ -18,7 +19,8 @@ import { importSessions, importStagingBookmarks, } from "@karakeep/db/schema"; -import logger from "@karakeep/shared/logger"; +import { LinkCrawlerQueue, OpenAIQueue } from "@karakeep/shared-server"; +import logger, { throttledLogger } from "@karakeep/shared/logger"; import { BookmarkTypes } from "@karakeep/shared/types/bookmarks"; // Prometheus metrics @@ -55,13 +57,15 @@ const importBatchDurationHistogram = new Histogram({ buckets: [0.1, 0.5, 1, 2, 5, 10, 30], }); +const backpressureLogger = throttledLogger(60_000); + function sleep(ms: number): Promise<void> { return new Promise((resolve) => setTimeout(resolve, ms)); } export class ImportWorker { private running = false; - private pollIntervalMs = 1000; + private pollIntervalMs = 5000; // Backpressure settings private maxInFlight = 50; @@ -88,8 +92,11 @@ export class ImportWorker { const processed = await this.processBatch(); if (processed === 0) { await this.checkAndCompleteIdleSessions(); + await this.updateGauges(); // Nothing to do, wait before polling again await sleep(this.pollIntervalMs); + } else { + await this.updateGauges(); } } catch (error) { logger.error(`[import] Error in polling loop: ${error}`); @@ -104,12 +111,22 @@ export class ImportWorker { } private async processBatch(): Promise<number> { - // 1. Check backpressure - count in-flight + recently completed items + const countPendingItems = await this.countPendingItems(); + importStagingPendingGauge.set(countPendingItems); + if (countPendingItems === 0) { + // Nothing to do, wait before polling again + return 0; + } + + // 1. Check backpressure - inflight items + queue sizes const availableCapacity = await this.getAvailableCapacity(); - importStagingInFlightGauge.set(this.maxInFlight - availableCapacity); if (availableCapacity <= 0) { // At capacity, wait before trying again + backpressureLogger( + "info", + `[import] Pending import items: ${countPendingItems}, but current capacity is ${availableCapacity}. Will wait until capacity is available.`, + ); return 0; } @@ -158,19 +175,11 @@ export class ImportWorker { await this.checkAndCompleteEmptySessions(sessionIds); batchTimer(); // Record batch duration - await this.updateGauges(); // Update pending/session gauges return batch.length; } private async updateGauges() { - // Update pending items gauge - const pending = await db - .select({ count: count() }) - .from(importStagingBookmarks) - .where(eq(importStagingBookmarks.status, "pending")); - importStagingPendingGauge.set(pending[0]?.count ?? 0); - // Update active sessions gauge by status const sessions = await db .select({ @@ -213,6 +222,14 @@ export class ImportWorker { await this.checkAndCompleteEmptySessions(sessionIds); } + private async countPendingItems(): Promise<number> { + const res = await db + .select({ count: count() }) + .from(importStagingBookmarks) + .where(eq(importStagingBookmarks.status, "pending")); + return res[0]?.count ?? 0; + } + private async getNextBatchFairly(limit: number): Promise<string[]> { // Query pending item IDs from active sessions, ordered by: // 1. User's last-served timestamp (fairness) @@ -504,18 +521,34 @@ export class ImportWorker { } /** - * Backpressure: Calculate available capacity. - * Counts items currently in "processing" status, which includes both: - * - Items being actively imported - * - Items waiting for downstream crawl/tag to complete + * Backpressure: Calculate available capacity based on number of items in flight and the health of the import queues. */ private async getAvailableCapacity(): Promise<number> { - const processing = await db - .select({ count: count() }) - .from(importStagingBookmarks) - .where(eq(importStagingBookmarks.status, "processing")); + const [processingCount, crawlerQueue, openaiQueue] = await Promise.all([ + db + .select({ count: count() }) + .from(importStagingBookmarks) + .where( + and( + eq(importStagingBookmarks.status, "processing"), + gt( + importStagingBookmarks.processingStartedAt, + new Date(Date.now() - this.staleThresholdMs), + ), + ), + ), + LinkCrawlerQueue.stats(), + OpenAIQueue.stats(), + ]); + + const inFlight = Math.max( + crawlerQueue.pending + crawlerQueue.running + crawlerQueue.pending_retry, + openaiQueue.pending + openaiQueue.running + openaiQueue.pending_retry, + processingCount[0]?.count ?? 0, + ); + importStagingInFlightGauge.set(inFlight); - return this.maxInFlight - (processing[0]?.count ?? 0); + return this.maxInFlight - inFlight; } /** diff --git a/packages/db/drizzle/0077_import_listpaths_to_listids.sql b/packages/db/drizzle/0077_import_listpaths_to_listids.sql index f1c4f883..0a28ea20 100644 --- a/packages/db/drizzle/0077_import_listpaths_to_listids.sql +++ b/packages/db/drizzle/0077_import_listpaths_to_listids.sql @@ -23,4 +23,55 @@ CREATE TABLE `importStagingBookmarks` ( CREATE INDEX `importStaging_session_status_idx` ON `importStagingBookmarks` (`importSessionId`,`status`);--> statement-breakpoint CREATE INDEX `importStaging_completedAt_idx` ON `importStagingBookmarks` (`completedAt`);--> statement-breakpoint ALTER TABLE `importSessions` ADD `status` text DEFAULT 'staging' NOT NULL;--> statement-breakpoint -ALTER TABLE `importSessions` ADD `lastProcessedAt` integer;
\ No newline at end of file +ALTER TABLE `importSessions` ADD `lastProcessedAt` integer;--> statement-breakpoint +-- Migrate legacy importSessionBookmarks into importStagingBookmarks. +-- Reuses the same ID from the old table. +-- Calculates status based on actual downstream crawl/tagging state. +INSERT INTO importStagingBookmarks ( + id, importSessionId, type, url, + status, processingStartedAt, result, resultBookmarkId, createdAt, completedAt +) +SELECT + isb.id, + isb.importSessionId, + b.type, + bl.url, + CASE + WHEN (bl.crawlStatus IS NULL OR bl.crawlStatus IN ('success', 'failure')) + AND (b.taggingStatus IS NULL OR b.taggingStatus IN ('success', 'failure')) + THEN 'completed' + ELSE 'processing' + END, + isb.createdAt, + 'accepted', + isb.bookmarkId, + isb.createdAt, + CASE + WHEN (bl.crawlStatus IS NULL OR bl.crawlStatus IN ('success', 'failure')) + AND (b.taggingStatus IS NULL OR b.taggingStatus IN ('success', 'failure')) + THEN isb.createdAt + ELSE NULL + END +FROM importSessionBookmarks isb +JOIN bookmarks b ON b.id = isb.bookmarkId +LEFT JOIN bookmarkLinks bl ON bl.id = isb.bookmarkId +WHERE NOT EXISTS ( + SELECT 1 FROM importStagingBookmarks stg + WHERE stg.importSessionId = isb.importSessionId +); +--> statement-breakpoint +-- Move legacy sessions out of staging: +-- - Running if any items are still processing downstream +-- - Completed otherwise (including sessions with no remaining items) +UPDATE importSessions +SET status = CASE + WHEN EXISTS ( + SELECT 1 + FROM importStagingBookmarks stg + WHERE stg.importSessionId = importSessions.id + AND stg.status = 'processing' + ) + THEN 'running' + ELSE 'completed' +END +WHERE status = 'staging'; diff --git a/packages/shared/logger.ts b/packages/shared/logger.ts index efe78ff3..f3c5d45d 100644 --- a/packages/shared/logger.ts +++ b/packages/shared/logger.ts @@ -14,4 +14,16 @@ const logger = winston.createLogger({ transports: [new winston.transports.Console()], }); +export function throttledLogger(periodMs: number) { + let lastLogTime = 0; + + return (level: string, message: string) => { + const now = Date.now(); + if (now - lastLogTime >= periodMs) { + lastLogTime = now; + logger.log(level, message); + } + }; +} + export default logger; |
