aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2026-02-04 12:14:37 +0000
committerGitHub <noreply@github.com>2026-02-04 12:14:37 +0000
commite8e48a4144d2461666fa08b535c4de37d5db1b2f (patch)
treeb95ee8a1623a9958ff6dd2a4df81d476b00c2ac0
parent3c838ddb26c1e86d3f201ce71f13c834be705f69 (diff)
downloadkarakeep-e8e48a4144d2461666fa08b535c4de37d5db1b2f.tar.zst
fix: backfill old sessions and do queue backpressure (#2449)
* fix: backfill old sessions and do queue backpressure * fix typo
-rw-r--r--apps/workers/workers/importWorker.ts75
-rw-r--r--packages/db/drizzle/0077_import_listpaths_to_listids.sql53
-rw-r--r--packages/shared/logger.ts12
3 files changed, 118 insertions, 22 deletions
diff --git a/apps/workers/workers/importWorker.ts b/apps/workers/workers/importWorker.ts
index a717fd9d..e24d79f7 100644
--- a/apps/workers/workers/importWorker.ts
+++ b/apps/workers/workers/importWorker.ts
@@ -2,6 +2,7 @@ import {
and,
count,
eq,
+ gt,
inArray,
isNotNull,
isNull,
@@ -18,7 +19,8 @@ import {
importSessions,
importStagingBookmarks,
} from "@karakeep/db/schema";
-import logger from "@karakeep/shared/logger";
+import { LinkCrawlerQueue, OpenAIQueue } from "@karakeep/shared-server";
+import logger, { throttledLogger } from "@karakeep/shared/logger";
import { BookmarkTypes } from "@karakeep/shared/types/bookmarks";
// Prometheus metrics
@@ -55,13 +57,15 @@ const importBatchDurationHistogram = new Histogram({
buckets: [0.1, 0.5, 1, 2, 5, 10, 30],
});
+const backpressureLogger = throttledLogger(60_000);
+
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
export class ImportWorker {
private running = false;
- private pollIntervalMs = 1000;
+ private pollIntervalMs = 5000;
// Backpressure settings
private maxInFlight = 50;
@@ -88,8 +92,11 @@ export class ImportWorker {
const processed = await this.processBatch();
if (processed === 0) {
await this.checkAndCompleteIdleSessions();
+ await this.updateGauges();
// Nothing to do, wait before polling again
await sleep(this.pollIntervalMs);
+ } else {
+ await this.updateGauges();
}
} catch (error) {
logger.error(`[import] Error in polling loop: ${error}`);
@@ -104,12 +111,22 @@ export class ImportWorker {
}
private async processBatch(): Promise<number> {
- // 1. Check backpressure - count in-flight + recently completed items
+ const countPendingItems = await this.countPendingItems();
+ importStagingPendingGauge.set(countPendingItems);
+ if (countPendingItems === 0) {
+ // Nothing to do, wait before polling again
+ return 0;
+ }
+
+ // 1. Check backpressure - inflight items + queue sizes
const availableCapacity = await this.getAvailableCapacity();
- importStagingInFlightGauge.set(this.maxInFlight - availableCapacity);
if (availableCapacity <= 0) {
// At capacity, wait before trying again
+ backpressureLogger(
+ "info",
+ `[import] Pending import items: ${countPendingItems}, but current capacity is ${availableCapacity}. Will wait until capacity is available.`,
+ );
return 0;
}
@@ -158,19 +175,11 @@ export class ImportWorker {
await this.checkAndCompleteEmptySessions(sessionIds);
batchTimer(); // Record batch duration
- await this.updateGauges(); // Update pending/session gauges
return batch.length;
}
private async updateGauges() {
- // Update pending items gauge
- const pending = await db
- .select({ count: count() })
- .from(importStagingBookmarks)
- .where(eq(importStagingBookmarks.status, "pending"));
- importStagingPendingGauge.set(pending[0]?.count ?? 0);
-
// Update active sessions gauge by status
const sessions = await db
.select({
@@ -213,6 +222,14 @@ export class ImportWorker {
await this.checkAndCompleteEmptySessions(sessionIds);
}
+ private async countPendingItems(): Promise<number> {
+ const res = await db
+ .select({ count: count() })
+ .from(importStagingBookmarks)
+ .where(eq(importStagingBookmarks.status, "pending"));
+ return res[0]?.count ?? 0;
+ }
+
private async getNextBatchFairly(limit: number): Promise<string[]> {
// Query pending item IDs from active sessions, ordered by:
// 1. User's last-served timestamp (fairness)
@@ -504,18 +521,34 @@ export class ImportWorker {
}
/**
- * Backpressure: Calculate available capacity.
- * Counts items currently in "processing" status, which includes both:
- * - Items being actively imported
- * - Items waiting for downstream crawl/tag to complete
+ * Backpressure: Calculate available capacity based on number of items in flight and the health of the import queues.
*/
private async getAvailableCapacity(): Promise<number> {
- const processing = await db
- .select({ count: count() })
- .from(importStagingBookmarks)
- .where(eq(importStagingBookmarks.status, "processing"));
+ const [processingCount, crawlerQueue, openaiQueue] = await Promise.all([
+ db
+ .select({ count: count() })
+ .from(importStagingBookmarks)
+ .where(
+ and(
+ eq(importStagingBookmarks.status, "processing"),
+ gt(
+ importStagingBookmarks.processingStartedAt,
+ new Date(Date.now() - this.staleThresholdMs),
+ ),
+ ),
+ ),
+ LinkCrawlerQueue.stats(),
+ OpenAIQueue.stats(),
+ ]);
+
+ const inFlight = Math.max(
+ crawlerQueue.pending + crawlerQueue.running + crawlerQueue.pending_retry,
+ openaiQueue.pending + openaiQueue.running + openaiQueue.pending_retry,
+ processingCount[0]?.count ?? 0,
+ );
+ importStagingInFlightGauge.set(inFlight);
- return this.maxInFlight - (processing[0]?.count ?? 0);
+ return this.maxInFlight - inFlight;
}
/**
diff --git a/packages/db/drizzle/0077_import_listpaths_to_listids.sql b/packages/db/drizzle/0077_import_listpaths_to_listids.sql
index f1c4f883..0a28ea20 100644
--- a/packages/db/drizzle/0077_import_listpaths_to_listids.sql
+++ b/packages/db/drizzle/0077_import_listpaths_to_listids.sql
@@ -23,4 +23,55 @@ CREATE TABLE `importStagingBookmarks` (
CREATE INDEX `importStaging_session_status_idx` ON `importStagingBookmarks` (`importSessionId`,`status`);--> statement-breakpoint
CREATE INDEX `importStaging_completedAt_idx` ON `importStagingBookmarks` (`completedAt`);--> statement-breakpoint
ALTER TABLE `importSessions` ADD `status` text DEFAULT 'staging' NOT NULL;--> statement-breakpoint
-ALTER TABLE `importSessions` ADD `lastProcessedAt` integer; \ No newline at end of file
+ALTER TABLE `importSessions` ADD `lastProcessedAt` integer;--> statement-breakpoint
+-- Migrate legacy importSessionBookmarks into importStagingBookmarks.
+-- Reuses the same ID from the old table.
+-- Calculates status based on actual downstream crawl/tagging state.
+INSERT INTO importStagingBookmarks (
+ id, importSessionId, type, url,
+ status, processingStartedAt, result, resultBookmarkId, createdAt, completedAt
+)
+SELECT
+ isb.id,
+ isb.importSessionId,
+ b.type,
+ bl.url,
+ CASE
+ WHEN (bl.crawlStatus IS NULL OR bl.crawlStatus IN ('success', 'failure'))
+ AND (b.taggingStatus IS NULL OR b.taggingStatus IN ('success', 'failure'))
+ THEN 'completed'
+ ELSE 'processing'
+ END,
+ isb.createdAt,
+ 'accepted',
+ isb.bookmarkId,
+ isb.createdAt,
+ CASE
+ WHEN (bl.crawlStatus IS NULL OR bl.crawlStatus IN ('success', 'failure'))
+ AND (b.taggingStatus IS NULL OR b.taggingStatus IN ('success', 'failure'))
+ THEN isb.createdAt
+ ELSE NULL
+ END
+FROM importSessionBookmarks isb
+JOIN bookmarks b ON b.id = isb.bookmarkId
+LEFT JOIN bookmarkLinks bl ON bl.id = isb.bookmarkId
+WHERE NOT EXISTS (
+ SELECT 1 FROM importStagingBookmarks stg
+ WHERE stg.importSessionId = isb.importSessionId
+);
+--> statement-breakpoint
+-- Move legacy sessions out of staging:
+-- - Running if any items are still processing downstream
+-- - Completed otherwise (including sessions with no remaining items)
+UPDATE importSessions
+SET status = CASE
+ WHEN EXISTS (
+ SELECT 1
+ FROM importStagingBookmarks stg
+ WHERE stg.importSessionId = importSessions.id
+ AND stg.status = 'processing'
+ )
+ THEN 'running'
+ ELSE 'completed'
+END
+WHERE status = 'staging';
diff --git a/packages/shared/logger.ts b/packages/shared/logger.ts
index efe78ff3..f3c5d45d 100644
--- a/packages/shared/logger.ts
+++ b/packages/shared/logger.ts
@@ -14,4 +14,16 @@ const logger = winston.createLogger({
transports: [new winston.transports.Console()],
});
+export function throttledLogger(periodMs: number) {
+ let lastLogTime = 0;
+
+ return (level: string, message: string) => {
+ const now = Date.now();
+ if (now - lastLogTime >= periodMs) {
+ lastLogTime = now;
+ logger.log(level, message);
+ }
+ };
+}
+
export default logger;