diff options
| author | Mohamed Bassem <me@mbassem.com> | 2026-02-08 02:11:51 +0000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2026-02-08 02:11:51 +0000 |
| commit | bbd65fd6123f7d1a93d1f6a68f2b933d53ec3c23 (patch) | |
| tree | 1f23b04b4ca8fbccdea2d61e7c2c0a8b991d87d1 | |
| parent | 7d53e2e458cba7153dea27c625ca1bb534952ddf (diff) | |
| download | karakeep-bbd65fd6123f7d1a93d1f6a68f2b933d53ec3c23.tar.zst | |
feat: Add separate queue for import link crawling (#2452)
* feat: add separate queue for import link crawling
---------
Co-authored-by: Claude <noreply@anthropic.com>
Diffstat (limited to '')
| -rw-r--r-- | apps/workers/index.ts | 7 | ||||
| -rw-r--r-- | apps/workers/workers/crawlerWorker.ts | 75 | ||||
| -rw-r--r-- | apps/workers/workers/importWorker.ts | 4 | ||||
| -rw-r--r-- | packages/shared-server/src/queues.ts | 12 | ||||
| -rw-r--r-- | packages/trpc/routers/bookmarks.ts | 8 |
5 files changed, 70 insertions, 36 deletions
diff --git a/apps/workers/index.ts b/apps/workers/index.ts index 931a505f..c7b9533d 100644 --- a/apps/workers/index.ts +++ b/apps/workers/index.ts @@ -10,6 +10,7 @@ import { initTracing, LinkCrawlerQueue, loadAllPlugins, + LowPriorityCrawlerQueue, OpenAIQueue, prepareQueue, RuleEngineQueue, @@ -38,7 +39,11 @@ import { WebhookWorker } from "./workers/webhookWorker"; const workerBuilders = { crawler: async () => { await LinkCrawlerQueue.ensureInit(); - return CrawlerWorker.build(); + return CrawlerWorker.build(LinkCrawlerQueue); + }, + lowPriorityCrawler: async () => { + await LowPriorityCrawlerQueue.ensureInit(); + return CrawlerWorker.build(LowPriorityCrawlerQueue); }, inference: async () => { await OpenAIQueue.ensureInit(); diff --git a/apps/workers/workers/crawlerWorker.ts b/apps/workers/workers/crawlerWorker.ts index 48ea5352..5869354f 100644 --- a/apps/workers/workers/crawlerWorker.ts +++ b/apps/workers/workers/crawlerWorker.ts @@ -59,7 +59,6 @@ import { import { AssetPreprocessingQueue, getTracer, - LinkCrawlerQueue, OpenAIQueue, QuotaService, setSpanAttributes, @@ -84,8 +83,10 @@ import serverConfig from "@karakeep/shared/config"; import logger from "@karakeep/shared/logger"; import { DequeuedJob, + DequeuedJobError, EnqueueOptions, getQueueClient, + Queue, QueueRetryAfterError, } from "@karakeep/shared/queueing"; import { getRateLimitClient } from "@karakeep/shared/ratelimiting"; @@ -302,42 +303,54 @@ async function launchBrowser() { } export class CrawlerWorker { - static async build() { - chromium.use(StealthPlugin()); - if (serverConfig.crawler.enableAdblocker) { - logger.info("[crawler] Loading adblocker ..."); - const globalBlockerResult = await tryCatch( - PlaywrightBlocker.fromPrebuiltFull(fetchWithProxy, { - path: path.join(os.tmpdir(), "karakeep_adblocker.bin"), - read: fs.readFile, - write: fs.writeFile, - }), - ); - if (globalBlockerResult.error) { - logger.error( - `[crawler] Failed to load adblocker. Will not be blocking ads: ${globalBlockerResult.error}`, - ); - } else { - globalBlocker = globalBlockerResult.data; - } - } - if (!serverConfig.crawler.browserConnectOnDemand) { - await launchBrowser(); - } else { - logger.info( - "[Crawler] Browser connect on demand is enabled, won't proactively start the browser instance", - ); + private static initPromise: Promise<void> | null = null; + + private static ensureInitialized() { + if (!CrawlerWorker.initPromise) { + CrawlerWorker.initPromise = (async () => { + chromium.use(StealthPlugin()); + if (serverConfig.crawler.enableAdblocker) { + logger.info("[crawler] Loading adblocker ..."); + const globalBlockerResult = await tryCatch( + PlaywrightBlocker.fromPrebuiltFull(fetchWithProxy, { + path: path.join(os.tmpdir(), "karakeep_adblocker.bin"), + read: fs.readFile, + write: fs.writeFile, + }), + ); + if (globalBlockerResult.error) { + logger.error( + `[crawler] Failed to load adblocker. Will not be blocking ads: ${globalBlockerResult.error}`, + ); + } else { + globalBlocker = globalBlockerResult.data; + } + } + if (!serverConfig.crawler.browserConnectOnDemand) { + await launchBrowser(); + } else { + logger.info( + "[Crawler] Browser connect on demand is enabled, won't proactively start the browser instance", + ); + } + await loadCookiesFromFile(); + })(); } + return CrawlerWorker.initPromise; + } + + static async build(queue: Queue<ZCrawlLinkRequest>) { + await CrawlerWorker.ensureInitialized(); logger.info("Starting crawler worker ..."); - const worker = (await getQueueClient())!.createRunner< + const worker = (await getQueueClient()).createRunner< ZCrawlLinkRequest, CrawlerRunResult >( - LinkCrawlerQueue, + queue, { run: withWorkerTracing("crawlerWorker.run", runCrawler), - onComplete: async (job) => { + onComplete: async (job: DequeuedJob<ZCrawlLinkRequest>) => { workerStatsCounter.labels("crawler", "completed").inc(); const jobId = job.id; logger.info(`[Crawler][${jobId}] Completed successfully`); @@ -351,7 +364,7 @@ export class CrawlerWorker { .where(eq(bookmarkLinks.id, bookmarkId)); } }, - onError: async (job) => { + onError: async (job: DequeuedJobError<ZCrawlLinkRequest>) => { workerStatsCounter.labels("crawler", "failed").inc(); if (job.numRetriesLeft == 0) { workerStatsCounter.labels("crawler", "failed_permanent").inc(); @@ -402,8 +415,6 @@ export class CrawlerWorker { }, ); - await loadCookiesFromFile(); - return worker; } } diff --git a/apps/workers/workers/importWorker.ts b/apps/workers/workers/importWorker.ts index 11a738d7..e5b5c27e 100644 --- a/apps/workers/workers/importWorker.ts +++ b/apps/workers/workers/importWorker.ts @@ -20,7 +20,7 @@ import { importSessions, importStagingBookmarks, } from "@karakeep/db/schema"; -import { LinkCrawlerQueue, OpenAIQueue } from "@karakeep/shared-server"; +import { LowPriorityCrawlerQueue, OpenAIQueue } from "@karakeep/shared-server"; import logger, { throttledLogger } from "@karakeep/shared/logger"; import { BookmarkTypes } from "@karakeep/shared/types/bookmarks"; @@ -635,7 +635,7 @@ export class ImportWorker { ), ), ), - LinkCrawlerQueue.stats(), + LowPriorityCrawlerQueue.stats(), OpenAIQueue.stats(), ]); diff --git a/packages/shared-server/src/queues.ts b/packages/shared-server/src/queues.ts index fd9dac83..4d4a61d6 100644 --- a/packages/shared-server/src/queues.ts +++ b/packages/shared-server/src/queues.ts @@ -96,6 +96,18 @@ export const LinkCrawlerQueue = createDeferredQueue<ZCrawlLinkRequest>( }, ); +// Separate queue for low priority link crawling (e.g. imports) +// This prevents low priority crawling from impacting the parallelism of the main queue +export const LowPriorityCrawlerQueue = createDeferredQueue<ZCrawlLinkRequest>( + "low_priority_crawler_queue", + { + defaultJobArgs: { + numRetries: 5, + }, + keepFailedJobs: false, + }, +); + // Inference Worker export const zOpenAIRequestSchema = z.object({ bookmarkId: z.string(), diff --git a/packages/trpc/routers/bookmarks.ts b/packages/trpc/routers/bookmarks.ts index 0bbf4fb7..565558c3 100644 --- a/packages/trpc/routers/bookmarks.ts +++ b/packages/trpc/routers/bookmarks.ts @@ -19,6 +19,7 @@ import { import { AssetPreprocessingQueue, LinkCrawlerQueue, + LowPriorityCrawlerQueue, OpenAIQueue, QueuePriority, QuotaService, @@ -282,7 +283,12 @@ export const bookmarksAppRouter = router({ switch (bookmark.content.type) { case BookmarkTypes.LINK: { // The crawling job triggers openai when it's done - await LinkCrawlerQueue.enqueue( + // Use a separate queue for low priority crawling to avoid impacting main queue parallelism + const crawlerQueue = + input.crawlPriority === "low" + ? LowPriorityCrawlerQueue + : LinkCrawlerQueue; + await crawlerQueue.enqueue( { bookmarkId: bookmark.id, }, |
